abhishekrb19 commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1529607147


##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -460,6 +461,26 @@ private void alterEntryTableAddTypeAndGroupId(final String 
tableName)
     }
   }
 
+  private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String 
tableName)
+  {
+    List<String> statements = new ArrayList<>();
+    if (tableHasColumn(tableName, "parent_id")) {
+      log.info("Table[%s] already has column[parent_id].", tableName);
+    } else {
+      log.info("Adding column[parent_id] to table[%s].", tableName);
+      statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN parent_id 
VARCHAR(255)", tableName));
+    }
+    if (tableHasColumn(tableName, "task_group")) {
+      log.info("Table[%s] already has column[task_group].", tableName);
+    } else {
+      log.info("Adding column[task_group] to table[%s].", tableName);
+      statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN 
task_group VARCHAR(255)", tableName));

Review Comment:
   Similar to `validateSegmentsTable()`, do we also need validation that the 
pending segments table is upgraded to the desired schema?



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -237,7 +238,7 @@ Map<SegmentCreateRequest, SegmentIdWithShardSpec> 
allocatePendingSegments(
    *                                identifier may have a version lower than 
this one, but will not have one higher.
    * @param skipSegmentLineageCheck if true, perform lineage validation using 
previousSegmentId for this sequence.
    *                                Should be set to false if replica tasks 
would index events in same order
-   *
+   * @param taskGroup

Review Comment:
   Dangling param without description



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -324,7 +326,8 @@ SegmentPublishResult commitSegmentsAndMetadata(
    */
   SegmentPublishResult commitAppendSegments(
       Set<DataSegment> appendSegments,
-      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock
+      Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
+      String taskGroup

Review Comment:
   nit: update javadoc to include the context for `taskGroup`.
   Same for `commitAppendSegmentsAndMetadata`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java:
##########
@@ -279,6 +279,13 @@ public String getGroupId()
     return groupId;
   }
 
+  @Nullable

Review Comment:
   I don't think `groupId` can be null: `this.groupId = groupId == null ? id : 
groupId;`
   ```suggestion
   
   ```



##########
server/src/main/java/org/apache/druid/metadata/PendingSegment.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.sql.ResultSet;
+
+public class PendingSegment
+{
+  private final SegmentIdWithShardSpec id;
+  private final String sequenceName;
+  private final String sequencePrevId;
+  private final String parentId;
+  private final String taskGroup;
+
+  @JsonCreator
+  public PendingSegment(
+      @JsonProperty("id") SegmentIdWithShardSpec id,
+      @JsonProperty("sequenceName") String sequenceName,
+      @JsonProperty("sequencePrevId") String sequencePrevId,
+      @JsonProperty("parentId") @Nullable String parentId,
+      @JsonProperty("taskGroup") @Nullable String taskGroup
+  )
+  {
+    this.id = id;
+    this.sequenceName = sequenceName;
+    this.sequencePrevId = sequencePrevId;
+    this.parentId = parentId;
+    this.taskGroup = taskGroup;
+  }
+
+  @JsonProperty
+  public SegmentIdWithShardSpec getId()
+  {
+    return id;
+  }
+
+  @JsonProperty
+  public String getSequenceName()
+  {
+    return sequenceName;
+  }
+
+  @JsonProperty
+  public String getSequencePrevId()
+  {
+    return sequencePrevId;
+  }
+
+  @JsonProperty
+  public String getParentId()

Review Comment:
   nit: annotate `getParentId()` and `getTaskGroupId()` with `@Nullable`



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -475,4 +476,8 @@ SegmentPublishResult commitMetadataOnly(
    * @return number of deleted entries from the metadata store
    */
   int deleteUpgradeSegmentsForTask(String taskId);
+
+  int deletePendingSegmentsForTaskGroup(String taskGroup);

Review Comment:
   Can you please add a brief javadoc for these new interface methods?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java:
##########
@@ -156,6 +156,13 @@ public Set<ResourceAction> getInputSourceResources()
     return ImmutableSet.of();
   }
 
+  @JsonIgnore

Review Comment:
   Is `@JsonIgnore` required given that id isn't a member of this class?



##########
server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java:
##########
@@ -288,6 +288,7 @@ tableName, getPayloadType(), getQuoteString(), 
getCollation()
             )
         )
     );
+    alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);

Review Comment:
   It'd be better to move this function out to `createPendingSegmentsTable()` 
after the call to this method.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java:
##########
@@ -101,6 +101,12 @@ public interface Task
    */
   String getGroupId();
 
+  /**
+   * The group used to associate a pending segment with a task
+   * @return task group for pending segment allocations

Review Comment:
   ```suggestion
      * @return the group ID used to associate a pending segment with a task
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java:
##########
@@ -101,6 +101,12 @@ public interface Task
    */
   String getGroupId();
 
+  /**
+   * The group used to associate a pending segment with a task
+   * @return task group for pending segment allocations
+   */
+  String getPendingSegmentGroup();

Review Comment:
   To keep it consistent with the other functions in this class:
   ```suggestion
     String getPendingSegmentGroupId();
   ```



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java:
##########
@@ -125,6 +125,13 @@ public Set<ResourceAction> getInputSourceResources()
     return ImmutableSet.of();
   }
 
+  @JsonIgnore

Review Comment:
   ```suggestion
   
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -135,14 +135,16 @@ public SegmentPublishResult perform(Task task, 
TaskActionToolbox toolbox)
     if (startMetadata == null) {
       publishAction = () -> 
toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
           segments,

Review Comment:
   Do we need this branch at all? Can't we just simply call 
`commitAppendSegmentsAndMetadata()` with start and end metadata and remove 
`commitAppendSegments()`, if it's not anywhere else, which appears to be the 
case?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -103,6 +103,11 @@ public class TaskLockbox
   // this set should be accessed under the giant lock.
   private final Set<String> activeTasks = new HashSet<>();
 
+  // Stores map of pending task group of tasks to the set of their ids.
+  // Useful for task replicas. Clean up pending segments only when the set is 
empty.
+  // this map should be accessed under the giant lock.

Review Comment:
   ```suggestion
     // This map should be accessed under the giant lock.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -141,22 +144,31 @@ private void tryUpgradeOverlappingPendingSegments(Task 
task, TaskActionToolbox t
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    List<PendingSegment> pendingSegments
+        = 
toolbox.getIndexerMetadataStorageCoordinator().getAllPendingSegments(task.getDataSource());
+    Map<String, SegmentIdWithShardSpec> pendingSegmentIdMap = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> pendingSegmentIdMap.put(
+        pendingSegment.getId().asSegmentId().toString(),
+        pendingSegment.getId()
+    ));
+    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> {
+      if (pendingSegment.getParentId() != null
+          && 
!pendingSegment.getParentId().equals(pendingSegment.getId().asSegmentId().toString()))
 {
+        upgradedPendingSegments.put(
+            pendingSegment.getId(),
+            pendingSegmentIdMap.get(pendingSegment.getParentId())
+        );
+      }
+    });
 
     upgradedPendingSegments.forEach(
-        (oldId, newId) -> toolbox.getSupervisorManager()
+        (newId, oldId) -> toolbox.getSupervisorManager()

Review Comment:
   ```suggestion
           (newId, oldId) -> supervisorManager
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -141,22 +144,31 @@ private void tryUpgradeOverlappingPendingSegments(Task 
task, TaskActionToolbox t
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();

Review Comment:
   General comment: this task action in particular is a bit more involved to 
follow. I found the PR description quite useful to understand the high-level 
flow. I would suggest including a version of that as javadocs in this class 
itself.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -141,22 +144,31 @@ private void tryUpgradeOverlappingPendingSegments(Task 
task, TaskActionToolbox t
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    List<PendingSegment> pendingSegments
+        = 
toolbox.getIndexerMetadataStorageCoordinator().getAllPendingSegments(task.getDataSource());
+    Map<String, SegmentIdWithShardSpec> pendingSegmentIdMap = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> pendingSegmentIdMap.put(
+        pendingSegment.getId().asSegmentId().toString(),
+        pendingSegment.getId()
+    ));
+    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments = new HashMap<>();

Review Comment:
   Given the type of map, it's not clear between the key and value which one is 
pre-upgrade and post-upgrade id. Should we rename this for clarity?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -103,6 +103,11 @@ public class TaskLockbox
   // this set should be accessed under the giant lock.
   private final Set<String> activeTasks = new HashSet<>();
 
+  // Stores map of pending task group of tasks to the set of their ids.

Review Comment:
   nit: this multi-line comment can straight up be a javadoc



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -213,6 +218,13 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, 
TaskLock> right)
           activeTasks.remove(task.getId());
         }
       }
+      activePendingTaskGroupToTaskIds.clear();
+      for (Task task : storedActiveTasks) {
+        if (activeTasks.contains(task.getId()) && 
task.getPendingSegmentGroup() != null) {

Review Comment:
   Curious - can `task.getPendingSegmentGroup()` ever be null? Or is this more 
of a defensive check?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -387,7 +399,7 @@ public LockResult tryLock(final Task task, final 
LockRequest request)
       if (request instanceof LockRequestForNewSegment) {
         final LockRequestForNewSegment lockRequestForNewSegment = 
(LockRequestForNewSegment) request;
         if (lockRequestForNewSegment.getGranularity() == 
LockGranularity.SEGMENT) {
-          newSegmentId = allocateSegmentId(lockRequestForNewSegment, 
request.getVersion());
+          newSegmentId = allocateSegmentId(lockRequestForNewSegment, 
request.getVersion(), null);

Review Comment:
   why don't we pass in the task group id for segment locks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to