This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b72de1af05d MSQ: Dependency injection into StageProcessors. (#19080)
b72de1af05d is described below

commit b72de1af05d3a1e371151fcbc89f68ca5ea97af1
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Mar 4 08:53:06 2026 -0800

    MSQ: Dependency injection into StageProcessors. (#19080)
    
    This patch updates tests to round-trip work orders using an
    ObjectMapper, which ensures that any JacksonInject fields present on
    StageProcessor are populated. In production, this happens naturally
    because work orders are generally sent over HTTP.
    
    To demonstrate that the approach works, GroupingEngine is removed
    from FrameContext. Instead it is injected directly into the relevant
    StageProcessors.
---
 .../druid/msq/dart/worker/DartFrameContext.java       |  6 ------
 .../java/org/apache/druid/msq/exec/FrameContext.java  |  3 ---
 .../druid/msq/indexing/IndexerFrameContext.java       |  7 -------
 .../groupby/GroupByPostShuffleStageProcessor.java     |  9 +++++++--
 .../groupby/GroupByPreShuffleStageProcessor.java      |  9 ++++++++-
 .../druid/msq/exec/MSQCompactionTaskRunTest.java      |  2 ++
 .../apache/druid/msq/sql/MSQTaskQueryMakerTest.java   |  1 +
 .../druid/msq/test/MSQTestControllerContext.java      |  2 +-
 .../apache/druid/msq/test/MSQTestWorkerClient.java    | 19 +++++++++++++++++--
 .../apache/druid/msq/test/MSQTestWorkerContext.java   |  9 +--------
 .../test/TestDartControllerContextFactoryImpl.java    |  2 +-
 11 files changed, 38 insertions(+), 31 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
index 3ac9d6bb402..d17a94754ab 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
@@ -100,12 +100,6 @@ public class DartFrameContext implements FrameContext
     return segmentWrangler;
   }
 
-  @Override
-  public GroupingEngine groupingEngine()
-  {
-    return groupingEngine;
-  }
-
   @Override
   public RowIngestionMeters rowIngestionMeters()
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/FrameContext.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/FrameContext.java
index 4826872b35d..f093cb0b8aa 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/FrameContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/FrameContext.java
@@ -22,7 +22,6 @@ package org.apache.druid.msq.exec;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.msq.kernel.WorkOrder;
-import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.query.policy.PolicyEnforcer;
 import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
 import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
@@ -49,8 +48,6 @@ public interface FrameContext extends Closeable
 
   SegmentWrangler segmentWrangler();
 
-  GroupingEngine groupingEngine();
-
   RowIngestionMeters rowIngestionMeters();
 
   /**
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
index 4af15e79aee..cfae065176a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
@@ -30,7 +30,6 @@ import org.apache.druid.msq.exec.ProcessingBuffers;
 import org.apache.druid.msq.exec.WorkerMemoryParameters;
 import org.apache.druid.msq.exec.WorkerStorageParameters;
 import org.apache.druid.msq.kernel.StageId;
-import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.query.policy.PolicyEnforcer;
 import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
 import org.apache.druid.segment.IndexIO;
@@ -94,12 +93,6 @@ public class IndexerFrameContext implements FrameContext
     return context.injector().getInstance(SegmentWrangler.class);
   }
 
-  @Override
-  public GroupingEngine groupingEngine()
-  {
-    return context.injector().getInstance(GroupingEngine.class);
-  }
-
   @Override
   public RowIngestionMeters rowIngestionMeters()
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
index 06e320a9165..b41c1efb49c 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.querykit.groupby;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -44,6 +45,7 @@ import org.apache.druid.msq.querykit.ReadableInput;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupingEngine;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 
@@ -52,6 +54,10 @@ public class GroupByPostShuffleStageProcessor extends 
BasicStageProcessor
 {
   private final GroupByQuery query;
 
+  @JacksonInject
+  @Nullable
+  private GroupingEngine groupingEngine;
+
   @JsonCreator
   public GroupByPostShuffleStageProcessor(
       @JsonProperty("query") GroupByQuery query
@@ -74,7 +80,6 @@ public class GroupByPostShuffleStageProcessor extends 
BasicStageProcessor
     // Expecting a single input slice from some prior stage.
     final List<InputSlice> inputSlices = context.workOrder().getInputs();
     final StageInputSlice slice = (StageInputSlice) 
Iterables.getOnlyElement(inputSlices);
-    final GroupingEngine engine = context.frameContext().groupingEngine();
     final Int2ObjectSortedMap<OutputChannel> outputChannels = new 
Int2ObjectAVLTreeMap<>();
 
     for (final ReadablePartition partition : slice.getPartitions()) {
@@ -99,7 +104,7 @@ public class GroupByPostShuffleStageProcessor extends 
BasicStageProcessor
 
           return new GroupByPostShuffleFrameProcessor(
               query,
-              engine,
+              groupingEngine,
               readableInput.getChannel(),
               outputChannel.getWritableChannel(),
               
context.workOrder().getStageDefinition().createFrameWriterFactory(
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
index 09953af1d47..6865076c90b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.querykit.groupby;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -33,9 +34,11 @@ import org.apache.druid.msq.input.PhysicalInputSlice;
 import org.apache.druid.msq.querykit.BaseLeafStageProcessor;
 import org.apache.druid.msq.querykit.ReadableInput;
 import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.segment.SegmentMapFunction;
 import org.joda.time.Interval;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -44,6 +47,10 @@ public class GroupByPreShuffleStageProcessor extends 
BaseLeafStageProcessor
 {
   private final GroupByQuery query;
 
+  @JacksonInject
+  @Nullable
+  private GroupingEngine groupingEngine;
+
   @JsonCreator
   public GroupByPreShuffleStageProcessor(@JsonProperty("query") GroupByQuery 
query)
   {
@@ -68,7 +75,7 @@ public class GroupByPreShuffleStageProcessor extends 
BaseLeafStageProcessor
   {
     return new GroupByPreShuffleFrameProcessor(
         query,
-        frameContext.groupingEngine(),
+        groupingEngine,
         frameContext.processingBuffers().getBufferPool(),
         baseInput,
         segmentMapFn,
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index 0193dd46331..51235417799 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.exec;
 
+import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
@@ -217,6 +218,7 @@ public class MSQCompactionTaskRunTest extends 
CompactionTaskRunBase
         new GroupByQueryConfig(),
         TestGroupByBuffers.createDefault()
     ).getGroupingEngine();
+    ((InjectableValues.Std) 
objectMapper.getInjectableValues()).addValue(GroupingEngine.class, 
groupingEngine);
 
     Module modules = Modules.combine(
         new DruidGuiceExtensions(),
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
index 8b28e77508a..c604590851e 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
@@ -222,6 +222,7 @@ public class MSQTaskQueryMakerTest
     );
     Injector injector = Guice.createInjector(defaultModule, 
BoundFieldModule.of(this));
     DruidSecondaryModule.setupJackson(injector, objectMapper, 
Collections.emptyMap(), true);
+    new 
MSQIndexingModule().getJacksonModules().forEach(objectMapper::registerModule);
 
     // Populate loadedSegmentMetadata from walker segments so 
CoordinatorClient.fetchSegment() can find them
     List<ImmutableSegmentLoadInfo> loadedSegmentMetadata = new ArrayList<>();
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 8aec86483c7..758fc644adb 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -467,7 +467,7 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
   @Override
   public WorkerClient newWorkerClient()
   {
-    return new MSQTestWorkerClient(inMemoryWorkers);
+    return new MSQTestWorkerClient(inMemoryWorkers, mapper);
   }
 
   @Override
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
index 10b7db40c60..35ce6475b88 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.msq.test;
 
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.common.guava.FutureUtils;
@@ -27,6 +29,7 @@ import org.apache.druid.frame.key.ClusterByPartitions;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.exec.StageProcessor;
 import org.apache.druid.msq.exec.Worker;
 import org.apache.druid.msq.exec.WorkerClient;
 import org.apache.druid.msq.exec.WorkerRunRef;
@@ -45,17 +48,19 @@ public class MSQTestWorkerClient implements WorkerClient
   private static final long WORKER_WAIT_TIMEOUT_MS = 
TimeUnit.SECONDS.toMillis(10);
 
   protected final Map<String, WorkerRunRef> inMemoryWorkers;
+  private final ObjectMapper objectMapper;
   private final AtomicBoolean closed = new AtomicBoolean();
 
-  public MSQTestWorkerClient(Map<String, WorkerRunRef> inMemoryWorkers)
+  public MSQTestWorkerClient(Map<String, WorkerRunRef> inMemoryWorkers, 
ObjectMapper objectMapper)
   {
     this.inMemoryWorkers = inMemoryWorkers;
+    this.objectMapper = objectMapper;
   }
 
   @Override
   public ListenableFuture<Void> postWorkOrder(String workerTaskId, WorkOrder 
workOrder)
   {
-    getWorkerFor(workerTaskId).postWorkOrder(workOrder);
+    
getWorkerFor(workerTaskId).postWorkOrder(roundTripSerdeWorkOrder(workOrder));
     return Futures.immediateFuture(null);
   }
 
@@ -190,4 +195,14 @@ public class MSQTestWorkerClient implements WorkerClient
       inMemoryWorkers.forEach((k, v) -> v.cancel());
     }
   }
+
+  /**
+   * Using {@link #objectMapper}, convert work order to work order. This 
ensures that any {@link JacksonInject} fields
+   * present on {@link StageProcessor} are populated. In production, this 
happens naturally because work orders are
+   * generally sent over HTTP.
+   */
+  private WorkOrder roundTripSerdeWorkOrder(final WorkOrder workOrder)
+  {
+    return objectMapper.convertValue(workOrder, WorkOrder.class);
+  }
 }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index f1d9d0f72bc..c57df8bcfe5 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -43,7 +43,6 @@ import org.apache.druid.msq.exec.WorkerRunRef;
 import org.apache.druid.msq.exec.WorkerStorageParameters;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.util.MultiStageQueryContext;
-import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.query.policy.PolicyEnforcer;
 import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
 import org.apache.druid.segment.IndexIO;
@@ -158,7 +157,7 @@ public class MSQTestWorkerContext implements WorkerContext
   @Override
   public WorkerClient makeWorkerClient()
   {
-    return new MSQTestWorkerClient(inMemoryWorkers);
+    return new MSQTestWorkerClient(inMemoryWorkers, mapper);
   }
 
   @Override
@@ -231,12 +230,6 @@ public class MSQTestWorkerContext implements WorkerContext
       return injector.getInstance(SegmentWrangler.class);
     }
 
-    @Override
-    public GroupingEngine groupingEngine()
-    {
-      return injector.getInstance(GroupingEngine.class);
-    }
-
     @Override
     public RowIngestionMeters rowIngestionMeters()
     {
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
index 94fd7ee2d50..c965c297685 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
@@ -116,7 +116,7 @@ public class TestDartControllerContextFactoryImpl extends 
DartControllerContextF
 
     public DartTestWorkerClient()
     {
-      super(workerMap);
+      super(workerMap, jsonMapper);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to