This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b72de1af05d MSQ: Dependency injection into StageProcessors. (#19080)
b72de1af05d is described below
commit b72de1af05d3a1e371151fcbc89f68ca5ea97af1
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Mar 4 08:53:06 2026 -0800
MSQ: Dependency injection into StageProcessors. (#19080)
This patch updates tests to round-trip work orders using an
ObjectMapper, which ensures that any JacksonInject fields present on
StageProcessor are populated. In production, this happens naturally
because work orders are generally sent over HTTP.
To demonstrate that the approach works, GroupingEngine is removed
from FrameContext. Instead it is injected directly into the relevant
StageProcessors.
---
.../druid/msq/dart/worker/DartFrameContext.java | 6 ------
.../java/org/apache/druid/msq/exec/FrameContext.java | 3 ---
.../druid/msq/indexing/IndexerFrameContext.java | 7 -------
.../groupby/GroupByPostShuffleStageProcessor.java | 9 +++++++--
.../groupby/GroupByPreShuffleStageProcessor.java | 9 ++++++++-
.../druid/msq/exec/MSQCompactionTaskRunTest.java | 2 ++
.../apache/druid/msq/sql/MSQTaskQueryMakerTest.java | 1 +
.../druid/msq/test/MSQTestControllerContext.java | 2 +-
.../apache/druid/msq/test/MSQTestWorkerClient.java | 19 +++++++++++++++++--
.../apache/druid/msq/test/MSQTestWorkerContext.java | 9 +--------
.../test/TestDartControllerContextFactoryImpl.java | 2 +-
11 files changed, 38 insertions(+), 31 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
index 3ac9d6bb402..d17a94754ab 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
@@ -100,12 +100,6 @@ public class DartFrameContext implements FrameContext
return segmentWrangler;
}
- @Override
- public GroupingEngine groupingEngine()
- {
- return groupingEngine;
- }
-
@Override
public RowIngestionMeters rowIngestionMeters()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/FrameContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/FrameContext.java
index 4826872b35d..f093cb0b8aa 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/FrameContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/FrameContext.java
@@ -22,7 +22,6 @@ package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.msq.kernel.WorkOrder;
-import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
@@ -49,8 +48,6 @@ public interface FrameContext extends Closeable
SegmentWrangler segmentWrangler();
- GroupingEngine groupingEngine();
-
RowIngestionMeters rowIngestionMeters();
/**
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
index 4af15e79aee..cfae065176a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
@@ -30,7 +30,6 @@ import org.apache.druid.msq.exec.ProcessingBuffers;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.kernel.StageId;
-import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
import org.apache.druid.segment.IndexIO;
@@ -94,12 +93,6 @@ public class IndexerFrameContext implements FrameContext
return context.injector().getInstance(SegmentWrangler.class);
}
- @Override
- public GroupingEngine groupingEngine()
- {
- return context.injector().getInstance(GroupingEngine.class);
- }
-
@Override
public RowIngestionMeters rowIngestionMeters()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
index 06e320a9165..b41c1efb49c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPostShuffleStageProcessor.java
@@ -19,6 +19,7 @@
package org.apache.druid.msq.querykit.groupby;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -44,6 +45,7 @@ import org.apache.druid.msq.querykit.ReadableInput;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupingEngine;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
@@ -52,6 +54,10 @@ public class GroupByPostShuffleStageProcessor extends
BasicStageProcessor
{
private final GroupByQuery query;
+ @JacksonInject
+ @Nullable
+ private GroupingEngine groupingEngine;
+
@JsonCreator
public GroupByPostShuffleStageProcessor(
@JsonProperty("query") GroupByQuery query
@@ -74,7 +80,6 @@ public class GroupByPostShuffleStageProcessor extends
BasicStageProcessor
// Expecting a single input slice from some prior stage.
final List<InputSlice> inputSlices = context.workOrder().getInputs();
final StageInputSlice slice = (StageInputSlice)
Iterables.getOnlyElement(inputSlices);
- final GroupingEngine engine = context.frameContext().groupingEngine();
final Int2ObjectSortedMap<OutputChannel> outputChannels = new
Int2ObjectAVLTreeMap<>();
for (final ReadablePartition partition : slice.getPartitions()) {
@@ -99,7 +104,7 @@ public class GroupByPostShuffleStageProcessor extends
BasicStageProcessor
return new GroupByPostShuffleFrameProcessor(
query,
- engine,
+ groupingEngine,
readableInput.getChannel(),
outputChannel.getWritableChannel(),
context.workOrder().getStageDefinition().createFrameWriterFactory(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
index 09953af1d47..6865076c90b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
@@ -19,6 +19,7 @@
package org.apache.druid.msq.querykit.groupby;
+import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
@@ -33,9 +34,11 @@ import org.apache.druid.msq.input.PhysicalInputSlice;
import org.apache.druid.msq.querykit.BaseLeafStageProcessor;
import org.apache.druid.msq.querykit.ReadableInput;
import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.segment.SegmentMapFunction;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
@@ -44,6 +47,10 @@ public class GroupByPreShuffleStageProcessor extends
BaseLeafStageProcessor
{
private final GroupByQuery query;
+ @JacksonInject
+ @Nullable
+ private GroupingEngine groupingEngine;
+
@JsonCreator
public GroupByPreShuffleStageProcessor(@JsonProperty("query") GroupByQuery
query)
{
@@ -68,7 +75,7 @@ public class GroupByPreShuffleStageProcessor extends
BaseLeafStageProcessor
{
return new GroupByPreShuffleFrameProcessor(
query,
- frameContext.groupingEngine(),
+ groupingEngine,
frameContext.processingBuffers().getBufferPool(),
baseInput,
segmentMapFn,
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index 0193dd46331..51235417799 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.msq.exec;
+import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
@@ -217,6 +218,7 @@ public class MSQCompactionTaskRunTest extends
CompactionTaskRunBase
new GroupByQueryConfig(),
TestGroupByBuffers.createDefault()
).getGroupingEngine();
+ ((InjectableValues.Std)
objectMapper.getInjectableValues()).addValue(GroupingEngine.class,
groupingEngine);
Module modules = Modules.combine(
new DruidGuiceExtensions(),
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
index 8b28e77508a..c604590851e 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
@@ -222,6 +222,7 @@ public class MSQTaskQueryMakerTest
);
Injector injector = Guice.createInjector(defaultModule,
BoundFieldModule.of(this));
DruidSecondaryModule.setupJackson(injector, objectMapper,
Collections.emptyMap(), true);
+ new
MSQIndexingModule().getJacksonModules().forEach(objectMapper::registerModule);
// Populate loadedSegmentMetadata from walker segments so
CoordinatorClient.fetchSegment() can find them
List<ImmutableSegmentLoadInfo> loadedSegmentMetadata = new ArrayList<>();
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 8aec86483c7..758fc644adb 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -467,7 +467,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
@Override
public WorkerClient newWorkerClient()
{
- return new MSQTestWorkerClient(inMemoryWorkers);
+ return new MSQTestWorkerClient(inMemoryWorkers, mapper);
}
@Override
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
index 10b7db40c60..35ce6475b88 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerClient.java
@@ -19,6 +19,8 @@
package org.apache.druid.msq.test;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.guava.FutureUtils;
@@ -27,6 +29,7 @@ import org.apache.druid.frame.key.ClusterByPartitions;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.exec.StageProcessor;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerRunRef;
@@ -45,17 +48,19 @@ public class MSQTestWorkerClient implements WorkerClient
private static final long WORKER_WAIT_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(10);
protected final Map<String, WorkerRunRef> inMemoryWorkers;
+ private final ObjectMapper objectMapper;
private final AtomicBoolean closed = new AtomicBoolean();
- public MSQTestWorkerClient(Map<String, WorkerRunRef> inMemoryWorkers)
+ public MSQTestWorkerClient(Map<String, WorkerRunRef> inMemoryWorkers,
ObjectMapper objectMapper)
{
this.inMemoryWorkers = inMemoryWorkers;
+ this.objectMapper = objectMapper;
}
@Override
public ListenableFuture<Void> postWorkOrder(String workerTaskId, WorkOrder
workOrder)
{
- getWorkerFor(workerTaskId).postWorkOrder(workOrder);
+
getWorkerFor(workerTaskId).postWorkOrder(roundTripSerdeWorkOrder(workOrder));
return Futures.immediateFuture(null);
}
@@ -190,4 +195,14 @@ public class MSQTestWorkerClient implements WorkerClient
inMemoryWorkers.forEach((k, v) -> v.cancel());
}
}
+
+ /**
+ * Using {@link #objectMapper}, convert work order to work order. This
ensures that any {@link JacksonInject} fields
+ * present on {@link StageProcessor} are populated. In production, this
happens naturally because work orders are
+ * generally sent over HTTP.
+ */
+ private WorkOrder roundTripSerdeWorkOrder(final WorkOrder workOrder)
+ {
+ return objectMapper.convertValue(workOrder, WorkOrder.class);
+ }
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index f1d9d0f72bc..c57df8bcfe5 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -43,7 +43,6 @@ import org.apache.druid.msq.exec.WorkerRunRef;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.util.MultiStageQueryContext;
-import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
import org.apache.druid.segment.IndexIO;
@@ -158,7 +157,7 @@ public class MSQTestWorkerContext implements WorkerContext
@Override
public WorkerClient makeWorkerClient()
{
- return new MSQTestWorkerClient(inMemoryWorkers);
+ return new MSQTestWorkerClient(inMemoryWorkers, mapper);
}
@Override
@@ -231,12 +230,6 @@ public class MSQTestWorkerContext implements WorkerContext
return injector.getInstance(SegmentWrangler.class);
}
- @Override
- public GroupingEngine groupingEngine()
- {
- return injector.getInstance(GroupingEngine.class);
- }
-
@Override
public RowIngestionMeters rowIngestionMeters()
{
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
index 94fd7ee2d50..c965c297685 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
@@ -116,7 +116,7 @@ public class TestDartControllerContextFactoryImpl extends
DartControllerContextF
public DartTestWorkerClient()
{
- super(workerMap);
+ super(workerMap, jsonMapper);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]