kfaraz commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1561973915


##########
server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java:
##########
@@ -74,16 +74,18 @@ public String getSequencePrevId()
     return sequencePrevId;
   }
 
+  @Nullable
   @JsonProperty
-  public String getParentId()
+  public String getUpgradedFromSegmentId()
   {
-    return parentId;
+    return upgradedFromSegmentId;
   }
 
+  @Nullable
   @JsonProperty
-  public String getTaskGroup()
+  public String getTaskAllocatorId()

Review Comment:
   Please add a javadoc to explain these fields.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java:
##########
@@ -132,19 +133,20 @@ public SegmentPublishResult perform(Task task, 
TaskActionToolbox toolbox)
         = TaskLocks.findReplaceLocksCoveringSegments(datasource, 
toolbox.getTaskLockbox(), segments);
 
     final CriticalAction.Action<SegmentPublishResult> publishAction;
+    final String pendingSegmentGroupId = ((PendingSegmentAllocatingTask) 
task).getPendingSegmentGroupId();

Review Comment:
   Shouldn't there be a check before this that the task is actually an instance 
of `PendingSegmentAllocatingTask`? Currently, there is no non-allocating task 
that uses this action but better be on the safe side.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task;
+
+public interface PendingSegmentAllocatingTask

Review Comment:
   Needs a javadoc.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -141,22 +144,31 @@ private void tryUpgradeOverlappingPendingSegments(Task 
task, TaskActionToolbox t
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();

Review Comment:
   @AmatyaAvadhanula , this is yet to be addressed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java:
##########
@@ -137,7 +137,7 @@
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
+public class IndexTask extends AbstractBatchIndexTask implements ChatHandler, 
PendingSegmentAllocatingTask

Review Comment:
   Does this need to implement the new interface? `index` task is the 
non-parallel form of native batch ingestion. I don't think this needs to have 
support for concurrent append/replace.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -134,34 +137,42 @@ public SegmentPublishResult perform(Task task, 
TaskActionToolbox toolbox)
   }
 
   /**
-   * Tries to upgrade any pending segments that overlap with the committed 
segments.
+   * Registers upgraded pending segments on the active supervisor, if any
    */
-  private void tryUpgradeOverlappingPendingSegments(Task task, 
TaskActionToolbox toolbox)
+  private void registerUpgradedPendingSegmentsOnSupervisor(Task task, 
TaskActionToolbox toolbox)
   {
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    List<PendingSegmentRecord> pendingSegments
+        = 
toolbox.getIndexerMetadataStorageCoordinator().getAllPendingSegments(task.getDataSource());
+    Map<String, SegmentIdWithShardSpec> pendingSegmentIdMap = new HashMap<>();

Review Comment:
   Rename to `idToPendingSegment`



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java:
##########
@@ -119,7 +119,8 @@
 import java.util.concurrent.TimeoutException;
 
 @Deprecated
-public class AppenderatorDriverRealtimeIndexTask extends AbstractTask 
implements ChatHandler
+public class AppenderatorDriverRealtimeIndexTask extends AbstractTask

Review Comment:
   I don't think this needs to implement the new interface as it is a 
deprecated class. We need not support new features on this class.



##########
server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java:
##########
@@ -82,13 +82,13 @@ public PartialShardSpec getPartialShardSpec()
     return partialShardSpec;
   }
 
-  public String getParentId()
+  public String getUpgradedFromSegmentId()
   {
-    return parentId;
+    return upgradedFromSegmentId;
   }
 
-  public String getTaskGroup()
+  public String getTaskAllocatorId()

Review Comment:
   Please add a javadoc or link to the javadocs (yet to be added) in 
`PendingSegmentRecord`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -141,22 +144,31 @@ private void tryUpgradeOverlappingPendingSegments(Task 
task, TaskActionToolbox t
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    List<PendingSegment> pendingSegments
+        = 
toolbox.getIndexerMetadataStorageCoordinator().getAllPendingSegments(task.getDataSource());
+    Map<String, SegmentIdWithShardSpec> pendingSegmentIdMap = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> pendingSegmentIdMap.put(
+        pendingSegment.getId().asSegmentId().toString(),
+        pendingSegment.getId()
+    ));
+    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments = new HashMap<>();

Review Comment:
   +1, rename to `segmentToParent` (we can omit the prefix `pending` as this 
method only deals with pending segments, and it can be taken for granted.)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -103,6 +103,11 @@ public class TaskLockbox
   // this set should be accessed under the giant lock.
   private final Set<String> activeTasks = new HashSet<>();
 
+  // Stores map of pending task group of tasks to the set of their ids.

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to