This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a594bb9f6a Use task actions to fetch used segments in MSQ (#15284)
4a594bb9f6a is described below

commit 4a594bb9f6ab89eabda5db654a9d215633340c66
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Fri Dec 1 15:29:33 2023 +0530

    Use task actions to fetch used segments in MSQ (#15284)
    
    * Use task actions to fetch used segments in MSQ
    
    * Fix tests
    
    * Fixing tests.
    
    * Revert "Fix tests"
    
    This reverts commit 95ab6494
    
    * Removing conditional check in tests.
    
    * Pulling in latest changes.
    
    ---------
    
    Co-authored-by: cryptoe <[email protected]>
---
 .../java/org/apache/druid/msq/exec/ControllerImpl.java   | 15 +++++++++++++--
 .../apache/druid/msq/test/CalciteArraysQueryMSQTest.java |  2 +-
 .../druid/msq/test/CalciteSelectJoinQueryMSQTest.java    |  2 +-
 .../apache/druid/msq/test/CalciteSelectQueryMSQTest.java |  2 +-
 .../apache/druid/msq/test/CalciteUnionQueryMSQTest.java  |  2 +-
 .../test/java/org/apache/druid/msq/test/MSQTestBase.java |  2 +-
 .../apache/druid/msq/test/MSQTestControllerContext.java  | 15 ---------------
 .../apache/druid/msq/test/MSQTestTaskActionClient.java   | 16 +++++++++++++++-
 8 files changed, 33 insertions(+), 23 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 1da68a423fa..ae9c1122498 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -69,6 +69,7 @@ import org.apache.druid.indexing.common.TaskReport;
 import org.apache.druid.indexing.common.actions.LockListAction;
 import org.apache.druid.indexing.common.actions.LockReleaseAction;
 import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
+import 
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
@@ -1200,8 +1201,18 @@ public class ControllerImpl implements Controller
       }
 
       // Fetch all published, used segments (all non-realtime segments) from 
the metadata store.
-      final Collection<DataSegment> publishedUsedSegments =
-          
FutureUtils.getUnchecked(context.coordinatorClient().fetchUsedSegments(dataSource,
 intervals), true);
+      // If the task is operating with a REPLACE lock,
+      // any segment created after the lock was acquired for its interval will 
not be considered.
+      final Collection<DataSegment> publishedUsedSegments;
+      try {
+        publishedUsedSegments = context.taskActionClient().submit(new 
RetrieveSegmentsToReplaceAction(
+            dataSource,
+            intervals
+        ));
+      }
+      catch (IOException e) {
+        throw new MSQException(e, UnknownFault.forException(e));
+      }
 
       int realtimeCount = 0;
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysQueryMSQTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysQueryMSQTest.java
index abefe6a378d..92f8d6f4e79 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysQueryMSQTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysQueryMSQTest.java
@@ -80,7 +80,7 @@ public class CalciteArraysQueryMSQTest extends 
CalciteArraysQueryTest
     final MSQTestOverlordServiceClient indexingServiceClient = new 
MSQTestOverlordServiceClient(
         queryJsonMapper,
         injector,
-        new MSQTestTaskActionClient(queryJsonMapper),
+        new MSQTestTaskActionClient(queryJsonMapper, injector),
         workerMemoryParameters,
         ImmutableList.of()
     );
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
index 114583d31a1..644d0d05451 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectJoinQueryMSQTest.java
@@ -128,7 +128,7 @@ public class CalciteSelectJoinQueryMSQTest
       final MSQTestOverlordServiceClient indexingServiceClient = new 
MSQTestOverlordServiceClient(
           queryJsonMapper,
           injector,
-          new MSQTestTaskActionClient(queryJsonMapper),
+          new MSQTestTaskActionClient(queryJsonMapper, injector),
           workerMemoryParameters,
           ImmutableList.of()
       );
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
index 974eed48734..504999ae45b 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteSelectQueryMSQTest.java
@@ -84,7 +84,7 @@ public class CalciteSelectQueryMSQTest extends 
CalciteQueryTest
     final MSQTestOverlordServiceClient indexingServiceClient = new 
MSQTestOverlordServiceClient(
         queryJsonMapper,
         injector,
-        new MSQTestTaskActionClient(queryJsonMapper),
+        new MSQTestTaskActionClient(queryJsonMapper, injector),
         workerMemoryParameters,
         ImmutableList.of()
     );
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java
index 6ec17687c45..01379e2a93f 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteUnionQueryMSQTest.java
@@ -94,7 +94,7 @@ public class CalciteUnionQueryMSQTest extends 
CalciteUnionQueryTest
     final MSQTestOverlordServiceClient indexingServiceClient = new 
MSQTestOverlordServiceClient(
         queryJsonMapper,
         injector,
-        new MSQTestTaskActionClient(queryJsonMapper),
+        new MSQTestTaskActionClient(queryJsonMapper, injector),
         workerMemoryParameters,
         ImmutableList.of()
     );
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index ba78e213ca4..36301a5fe0f 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -503,7 +503,7 @@ public class MSQTestBase extends BaseCalciteQueryTest
 
     doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), 
anyString());
 
-    testTaskActionClient = Mockito.spy(new 
MSQTestTaskActionClient(objectMapper));
+    testTaskActionClient = Mockito.spy(new 
MSQTestTaskActionClient(objectMapper, injector));
     indexingServiceClient = new MSQTestOverlordServiceClient(
         objectMapper,
         injector,
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 85592fd0c53..5ab8932de3e 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -50,7 +50,6 @@ import org.apache.druid.msq.indexing.MSQWorkerTask;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.mockito.ArgumentMatchers;
 import org.mockito.Mockito;
@@ -104,20 +103,6 @@ public class MSQTestControllerContext implements 
ControllerContext
     this.injector = injector;
     this.taskActionClient = taskActionClient;
     coordinatorClient = Mockito.mock(CoordinatorClient.class);
-    Mockito.when(coordinatorClient.fetchUsedSegments(
-                     ArgumentMatchers.anyString(),
-                     ArgumentMatchers.anyList()
-                 )
-    ).thenAnswer(invocation ->
-                     Futures.immediateFuture(
-                         
injector.getInstance(SpecificSegmentsQuerySegmentWalker.class)
-                                 .getSegments()
-                                 .stream()
-                                 .filter(dataSegment -> 
dataSegment.getDataSource()
-                                                                   
.equals(invocation.getArguments()[0]))
-                                 .collect(Collectors.toList())
-                     )
-    );
 
     Mockito.when(coordinatorClient.fetchServerViewSegments(
                     ArgumentMatchers.anyString(),
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
index 31b3272b74f..5192aafccdc 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java
@@ -23,9 +23,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.inject.Injector;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TimeChunkLock;
 import org.apache.druid.indexing.common.actions.LockListAction;
+import 
org.apache.druid.indexing.common.actions.RetrieveSegmentsToReplaceAction;
 import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
@@ -39,6 +41,7 @@ import 
org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.granularity.PeriodGranularity;
 import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
+import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
@@ -62,12 +65,15 @@ public class MSQTestTaskActionClient implements 
TaskActionClient
       "foo2", ImmutableList.of(Intervals.of("2000-01-01/P1D"))
   );
   private final Set<DataSegment> publishedSegments = new HashSet<>();
+  private final Injector injector;
 
   public MSQTestTaskActionClient(
-      ObjectMapper mapper
+      ObjectMapper mapper,
+      Injector injector
   )
   {
     this.mapper = mapper;
+    this.injector = injector;
   }
 
   @Override
@@ -122,6 +128,14 @@ public class MSQTestTaskActionClient implements 
TaskActionClient
                                                                  .build()
                                      ).collect(Collectors.toSet());
       }
+    } else if (taskAction instanceof RetrieveSegmentsToReplaceAction) {
+      String dataSource = ((RetrieveSegmentsToReplaceAction) 
taskAction).getDataSource();
+      return (RetType) 
injector.getInstance(SpecificSegmentsQuerySegmentWalker.class)
+                               .getSegments()
+                               .stream()
+                               .filter(dataSegment -> 
dataSegment.getDataSource()
+                                                                 
.equals(dataSource))
+                               .collect(Collectors.toSet());
     } else if (taskAction instanceof SegmentTransactionalInsertAction) {
       final Set<DataSegment> segments = ((SegmentTransactionalInsertAction) 
taskAction).getSegments();
       publishedSegments.addAll(segments);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to