kfaraz commented on code in PR #15699:
URL: https://github.com/apache/druid/pull/15699#discussion_r1461629309


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1233,10 +1233,15 @@ private DataSegmentTimelineView 
makeDataSegmentTimelineView()
       // any segment created after the lock was acquired for its interval will 
not be considered.
       final Collection<DataSegment> publishedUsedSegments;
       try {
-        publishedUsedSegments = context.taskActionClient().submit(new 
RetrieveSegmentsToReplaceAction(
-            dataSource,
-            intervals
-        ));
+        // Additional check as the task action does not accept empty intervals

Review Comment:
   Don't think this comment is really needed or accurate. The real reason we 
are not firing a task action if the intervals is empty because we know we would 
get back an empty result. Why perform an unnecessary round trip?



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java:
##########
@@ -113,6 +136,75 @@ public TypeReference<Collection<DataSegment>> 
getReturnTypeReference()
 
   @Override
   public Collection<DataSegment> perform(Task task, TaskActionToolbox toolbox)
+  {
+    // When fetching segments for a datasource other than the one this task is 
writing to,
+    // just return all segments with the needed visibility.
+    // This is because we can't ensure that the set of returned segments is 
consistent throughout the task's lifecycle
+    if (!task.getDataSource().equals(dataSource)) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    final String supervisorId;
+    if (task instanceof AbstractBatchSubtask) {
+      supervisorId = ((AbstractBatchSubtask) task).getSupervisorTaskId();
+    } else {
+      supervisorId = task.getId();
+    }
+
+    final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
+        .getTaskLockbox()
+        .getAllReplaceLocksForDatasource(task.getDataSource())
+        .stream()
+        .filter(lock -> supervisorId.equals(lock.getSupervisorTaskId()))
+        .collect(Collectors.toSet());
+
+    // If there are no replace locks for the task, simply fetch all visible 
segments for the interval
+    if (replaceLocksForTask.isEmpty()) {
+      return retrieveUsedSegments(toolbox);
+    }
+
+    Map<Interval, Map<String, Set<DataSegment>>> intervalToCreatedToSegments = 
new HashMap<>();

Review Comment:
   I recall putting a comment here but can't find it anywhere.
   I would prefer it if we moved the code here on down into a new method
   `retrieveUsedSegmentsForReplace(toolbox, replaceLocks)`.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1233,10 +1233,15 @@ private DataSegmentTimelineView 
makeDataSegmentTimelineView()
       // any segment created after the lock was acquired for its interval will 
not be considered.
       final Collection<DataSegment> publishedUsedSegments;
       try {
-        publishedUsedSegments = context.taskActionClient().submit(new 
RetrieveSegmentsToReplaceAction(
-            dataSource,
-            intervals
-        ));
+        // Additional check as the task action does not accept empty intervals
+        if (intervals.isEmpty()) {
+          publishedUsedSegments = Collections.emptySet();
+        } else {
+          publishedUsedSegments = context.taskActionClient().submit(new 
RetrieveUsedSegmentsAction(
+              dataSource,
+              intervals
+          ));

Review Comment:
   style:
   ```suggestion
             publishedUsedSegments = context.taskActionClient().submit(
                 new RetrieveUsedSegmentsAction(dataSource, intervals)
             );
   ```



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java:
##########
@@ -128,14 +126,6 @@ public <RetType> RetType submit(TaskAction<RetType> 
taskAction)
                                                                  .build()
                                      ).collect(Collectors.toSet());
       }
-    } else if (taskAction instanceof RetrieveSegmentsToReplaceAction) {

Review Comment:
   Okay, please make sure it is handled correctly in the new code as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to