kfaraz commented on code in PR #16144:
URL: https://github.com/apache/druid/pull/16144#discussion_r1567653351


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -134,34 +151,55 @@ public SegmentPublishResult perform(Task task, 
TaskActionToolbox toolbox)
   }
 
   /**
-   * Tries to upgrade any pending segments that overlap with the committed 
segments.
+   * Registers upgraded pending segments on the active supervisor, if any
    */
-  private void tryUpgradeOverlappingPendingSegments(Task task, 
TaskActionToolbox toolbox)
+  private void registerUpgradedPendingSegmentsOnSupervisor(Task task, 
TaskActionToolbox toolbox)
   {
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
 
-    upgradedPendingSegments.forEach(
-        (oldId, newId) -> toolbox.getSupervisorManager()
-                                 
.registerNewVersionOfPendingSegmentOnSupervisor(
-                                     activeSupervisorIdWithAppendLock.get(),
-                                     oldId,
-                                     newId
-                                 )
+    Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
+    for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+      pendingSegments.addAll(
+          toolbox.getIndexerMetadataStorageCoordinator()
+                 .getPendingSegments(task.getDataSource(), 
replaceLock.getInterval())
+      );
+    }
+    Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
+        pendingSegment.getId().asSegmentId().toString(),
+        pendingSegment.getId()
+    ));
+    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new 
HashMap<>();
+    pendingSegments.forEach(pendingSegment -> {
+      if (pendingSegment.getUpgradedFromSegmentId() != null
+          && 
!pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString()))
 {

Review Comment:
   Can the `upgradedFromSegmentId` ever be equal to the `id` itself?
   A normal/root pending segment (i.e. one created by allocation and not 
upgrade) would have `upgraded_from_segment_id` as `null`, right?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java:
##########
@@ -45,7 +46,7 @@
 import java.util.Set;
 
 @JsonTypeName(MSQWorkerTask.TYPE)
-public class MSQWorkerTask extends AbstractTask
+public class MSQWorkerTask extends AbstractTask implements 
PendingSegmentAllocatingTask

Review Comment:
   This class need not implement `PendingSegmentAllocatingTask` as it never 
actually does any allocation. The allocation is always done by the controller 
task.
   
   this can be addressed in a follow up PR.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -134,34 +151,55 @@ public SegmentPublishResult perform(Task task, 
TaskActionToolbox toolbox)
   }
 
   /**
-   * Tries to upgrade any pending segments that overlap with the committed 
segments.
+   * Registers upgraded pending segments on the active supervisor, if any
    */
-  private void tryUpgradeOverlappingPendingSegments(Task task, 
TaskActionToolbox toolbox)
+  private void registerUpgradedPendingSegmentsOnSupervisor(Task task, 
TaskActionToolbox toolbox)
   {
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox

Review Comment:
   Some comments here would be helpful.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -340,7 +330,7 @@ public boolean 
registerNewVersionOfPendingSegmentOnSupervisor(
       return true;
     }
     catch (Exception e) {
-      log.error(e, "PendingSegment[%s] mapping update request to version[%s] 
on Supervisor[%s] failed",
+      log.error(e, "PendingSegmentRecord[%s] mapping update request to 
version[%s] on Supervisor[%s] failed",

Review Comment:
   This change is not needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java:
##########
@@ -39,7 +39,7 @@
 
 /**
  */
-public class NoopTask extends AbstractTask
+public class NoopTask extends AbstractTask implements 
PendingSegmentAllocatingTask

Review Comment:
   Does `NoopTask` need to implement the new interface for the purpose of tests?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java:
##########
@@ -134,34 +151,55 @@ public SegmentPublishResult perform(Task task, 
TaskActionToolbox toolbox)
   }
 
   /**
-   * Tries to upgrade any pending segments that overlap with the committed 
segments.
+   * Registers upgraded pending segments on the active supervisor, if any
    */
-  private void tryUpgradeOverlappingPendingSegments(Task task, 
TaskActionToolbox toolbox)
+  private void registerUpgradedPendingSegmentsOnSupervisor(Task task, 
TaskActionToolbox toolbox)
   {
     final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
     final Optional<String> activeSupervisorIdWithAppendLock =
         
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
+
     if (!activeSupervisorIdWithAppendLock.isPresent()) {
       return;
     }
 
-    final Set<String> activeRealtimeSequencePrefixes
-        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
-    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        toolbox.getIndexerMetadataStorageCoordinator()
-               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
-    log.info(
-        "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
-        upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
-    );
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
 
-    upgradedPendingSegments.forEach(
-        (oldId, newId) -> toolbox.getSupervisorManager()
-                                 
.registerNewVersionOfPendingSegmentOnSupervisor(
-                                     activeSupervisorIdWithAppendLock.get(),
-                                     oldId,
-                                     newId
-                                 )
+    Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
+    for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
+      pendingSegments.addAll(
+          toolbox.getIndexerMetadataStorageCoordinator()
+                 .getPendingSegments(task.getDataSource(), 
replaceLock.getInterval())
+      );
+    }
+    Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
+    pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
+        pendingSegment.getId().asSegmentId().toString(),
+        pendingSegment.getId()
+    ));
+    Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new 
HashMap<>();
+    pendingSegments.forEach(pendingSegment -> {
+      if (pendingSegment.getUpgradedFromSegmentId() != null

Review Comment:
   Should we only look at pending segments that were upgraded by this task 
rather than all upgraded pending segments?



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1073,7 +1057,9 @@ private SegmentIdWithShardSpec allocatePendingSegment(
     );
 
     // always insert empty previous sequence id
-    insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, 
interval, "", sequenceName, sequenceNamePrevIdSha1);
+    insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, 
interval, "", sequenceName, sequenceNamePrevIdSha1,
+                                      taskAllocatorId

Review Comment:
   Formatting is off.



##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -476,4 +478,19 @@ SegmentPublishResult commitMetadataOnly(
    * @return number of deleted entries from the metadata store
    */
   int deleteUpgradeSegmentsForTask(String taskId);
+
+  /**
+   * Delete pending segment for a give task group after all the tasks 
belonging to it have completed.
+   * @param taskAllocatorId task id / task group / replica group for an 
appending task
+   * @return number of pending segments deleted from the metadata store
+   */
+  int deletePendingSegmentsForTaskGroup(String taskAllocatorId);

Review Comment:
   Method needs to be renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to