This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 520978e94e2 feat: Extension point for MSQ InputSpecs. (#19479)
520978e94e2 is described below
commit 520978e94e24ce6cd8bde2a4c158c68a61e9cd5d
Author: Gian Merlino <[email protected]>
AuthorDate: Thu May 21 12:29:41 2026 -0700
feat: Extension point for MSQ InputSpecs. (#19479)
This patch adds extension points for InputSpecSlicer and InputSliceReader,
and uses them to implement TableInputSpec. This eliminates and generalizes
the "newTableInputSpecSlicer" method on the ControllerContext, which was
previously needed because the slicing logic differs for tasks and Dart.
---
.../msq/dart/controller/DartControllerContext.java | 21 +++++--
.../DartControllerContextFactoryImpl.java | 9 +++
.../DartTableInputSpecSlicerProvider.java | 55 ++++++++++++++++
.../druid/msq/dart/guice/DartControllerModule.java | 6 ++
.../druid/msq/dart/guice/DartWorkerModule.java | 7 +++
.../DartSegmentsInputSliceReaderProvider.java | 46 ++++++++++++++
.../druid/msq/dart/worker/DartWorkerContext.java | 17 +++--
.../dart/worker/DartWorkerContextFactoryImpl.java | 12 +++-
.../apache/druid/msq/exec/ControllerContext.java | 11 ++--
.../org/apache/druid/msq/exec/ControllerImpl.java | 43 +++++++++----
.../org/apache/druid/msq/exec/RunWorkOrder.java | 28 +++++----
.../org/apache/druid/msq/exec/WorkerContext.java | 12 ++++
.../org/apache/druid/msq/guice/MSQBinders.java | 34 ++++++++++
.../apache/druid/msq/guice/MSQIndexingModule.java | 13 ++++
.../msq/indexing/IndexerControllerContext.java | 20 +++---
.../IndexerSegmentsInputSliceReaderProvider.java | 47 ++++++++++++++
.../IndexerTableInputSpecSlicerProvider.java | 73 ++++++++++++++++++++++
.../druid/msq/indexing/IndexerWorkerContext.java | 21 ++++++-
.../druid/msq/input/InputSliceReaderProvider.java | 40 ++++++++++++
.../druid/msq/input/InputSpecSlicerProvider.java | 46 ++++++++++++++
.../dart/controller/DartControllerContextTest.java | 13 +++-
.../druid/msq/exec/MSQCompactionTaskRunTest.java | 2 +
.../apache/druid/msq/exec/TestMSQSqlModule.java | 9 +++
.../druid/msq/sql/MSQTaskQueryMakerTest.java | 7 ++-
.../msq/test/AbstractDartComponentSupplier.java | 3 -
.../org/apache/druid/msq/test/MSQTestBase.java | 5 +-
.../druid/msq/test/MSQTestControllerContext.java | 14 ++---
.../test/TestDartControllerContextFactoryImpl.java | 16 ++++-
28 files changed, 560 insertions(+), 70 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index 52936b8e0f8..ec748a74b7d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -40,7 +40,7 @@ import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.indexing.IndexerControllerContext;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
-import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
@@ -88,8 +88,9 @@ public class DartControllerContext implements
ControllerContext
private final DartWorkerClient workerClient;
private final TimelineServerView serverView;
private final MemoryIntrospector memoryIntrospector;
- private final QueryContext context;
+ private final List<InputSpecSlicerProvider> inputSpecSlicerProviders;
private final ServiceEmitter emitter;
+ private final QueryContext context;
public DartControllerContext(
final Injector injector,
@@ -98,6 +99,7 @@ public class DartControllerContext implements
ControllerContext
final DartWorkerClient workerClient,
final MemoryIntrospector memoryIntrospector,
final TimelineServerView serverView,
+ final List<InputSpecSlicerProvider> inputSpecSlicerProviders,
final ServiceEmitter emitter,
final QueryContext context
)
@@ -108,8 +110,9 @@ public class DartControllerContext implements
ControllerContext
this.workerClient = workerClient;
this.serverView = serverView;
this.memoryIntrospector = memoryIntrospector;
- this.context = context;
+ this.inputSpecSlicerProviders = inputSpecSlicerProviders;
this.emitter = emitter;
+ this.context = context;
}
@Override
@@ -190,9 +193,9 @@ public class DartControllerContext implements
ControllerContext
}
@Override
- public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager)
+ public List<InputSpecSlicerProvider> inputSpecSlicerProviders()
{
- return
DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(),
serverView, context);
+ return inputSpecSlicerProviders;
}
@Override
@@ -260,4 +263,12 @@ public class DartControllerContext implements
ControllerContext
{
return context.isDebug();
}
+
+ /**
+ * Getter for {@link DartTableInputSpecSlicerProvider} to retrieve the
server view.
+ */
+ TimelineServerView serverView()
+ {
+ return serverView;
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
index 87582d977aa..260d6156c8c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
@@ -28,14 +28,19 @@ import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.worker.DartWorkerClientImpl;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;
+import java.util.List;
+import java.util.Set;
+
public class DartControllerContextFactoryImpl implements
DartControllerContextFactory
{
protected final Injector injector;
@@ -45,6 +50,7 @@ public class DartControllerContextFactoryImpl implements
DartControllerContextFa
protected final ServiceClientFactory serviceClientFactory;
protected final TimelineServerView serverView;
protected final MemoryIntrospector memoryIntrospector;
+ protected final List<InputSpecSlicerProvider> inputSpecSlicerProviders;
protected final ServiceEmitter emitter;
@Inject
@@ -56,6 +62,7 @@ public class DartControllerContextFactoryImpl implements
DartControllerContextFa
@EscalatedGlobal final ServiceClientFactory serviceClientFactory,
final MemoryIntrospector memoryIntrospector,
final TimelineServerView serverView,
+ @Dart final Set<InputSpecSlicerProvider> inputSpecSlicerProviders,
final ServiceEmitter emitter
)
{
@@ -66,6 +73,7 @@ public class DartControllerContextFactoryImpl implements
DartControllerContextFa
this.serviceClientFactory = serviceClientFactory;
this.serverView = serverView;
this.memoryIntrospector = memoryIntrospector;
+ this.inputSpecSlicerProviders = List.copyOf(inputSpecSlicerProviders);
this.emitter = emitter;
}
@@ -80,6 +88,7 @@ public class DartControllerContextFactoryImpl implements
DartControllerContextFa
new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper,
selfNode.getHostAndPortToUse()),
memoryIntrospector,
serverView,
+ inputSpecSlicerProviders,
emitter,
context
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java
new file mode 100644
index 00000000000..13dcb49e519
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
+import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.query.QueryContext;
+
+import java.util.List;
+
+/**
+ * Controller-side provider for {@link TableInputSpec} in Dart.
+ */
+public class DartTableInputSpecSlicerProvider implements
InputSpecSlicerProvider
+{
+ @Override
+ public Class<? extends InputSpec> specClass()
+ {
+ return TableInputSpec.class;
+ }
+
+ @Override
+ public InputSpecSlicer createSlicer(
+ ControllerContext controllerContext,
+ QueryContext queryContext,
+ List<String> workerIds
+ )
+ {
+ return DartTableInputSpecSlicer.createFromWorkerIds(
+ workerIds,
+ ((DartControllerContext) controllerContext).serverView(),
+ queryContext
+ );
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
index 8a9b1cf3607..0937888b0a9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
@@ -47,11 +47,13 @@ import
org.apache.druid.msq.dart.controller.DartControllerContextFactoryImpl;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.controller.DartMessageRelayFactoryImpl;
import org.apache.druid.msq.dart.controller.DartMessageRelays;
+import org.apache.druid.msq.dart.controller.DartTableInputSpecSlicerProvider;
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactory;
import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactoryImpl;
import org.apache.druid.msq.dart.controller.sql.DartSqlClients;
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
+import org.apache.druid.msq.guice.MSQBinders;
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryConfigProvider;
@@ -111,6 +113,10 @@ public class DartControllerModule implements DruidModule
.addBinding()
.to(DartSqlEngine.class)
.in(LazySingleton.class);
+ MSQBinders.inputSpecSlicerProviderBinder(binder, Dart.class)
+ .addBinding()
+ .to(DartTableInputSpecSlicerProvider.class)
+ .in(LazySingleton.class);
}
@Provides
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
index e98d0ec2195..5ee9aa1f6ee 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
@@ -53,11 +53,13 @@ import
org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory;
+import org.apache.druid.msq.dart.worker.DartSegmentsInputSliceReaderProvider;
import org.apache.druid.msq.dart.worker.DartWorkerContextFactory;
import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl;
import org.apache.druid.msq.dart.worker.DartWorkerRunner;
import org.apache.druid.msq.dart.worker.http.DartWorkerResource;
import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.apache.druid.msq.guice.MSQBinders;
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.rpc.ServiceClientFactory;
@@ -104,6 +106,11 @@ public class DartWorkerModule implements DruidModule
binder.bind(ResourcePermissionMapper.class)
.annotatedWith(Dart.class)
.to(DartResourcePermissionMapper.class);
+
+ MSQBinders.inputSliceReaderProviderBinder(binder, Dart.class)
+ .addBinding()
+ .to(DartSegmentsInputSliceReaderProvider.class)
+ .in(LazySingleton.class);
}
@Provides
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java
new file mode 100644
index 00000000000..7421e87f0a3
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.worker;
+
+import org.apache.druid.msq.exec.FrameContext;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReader;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
+import org.apache.druid.msq.input.table.SegmentsInputSlice;
+import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
+import org.apache.druid.query.QueryContext;
+
+/**
+ * Worker-side provider for {@link SegmentsInputSlice} in Dart.
+ */
+public class DartSegmentsInputSliceReaderProvider implements
InputSliceReaderProvider
+{
+ @Override
+ public Class<? extends InputSlice> sliceClass()
+ {
+ return SegmentsInputSlice.class;
+ }
+
+ @Override
+ public InputSliceReader createReader(FrameContext frameContext, QueryContext
queryContext)
+ {
+ return new SegmentsInputSliceReader(frameContext, false);
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
index 768801261d1..39888b75717 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
@@ -43,12 +43,12 @@ import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerContext;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerStorageParameters;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
-import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.server.DruidNode;
@@ -57,6 +57,7 @@ import org.apache.druid.utils.CloseableUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import java.io.File;
+import java.util.List;
/**
* Dart implementation of {@link WorkerContext}.
@@ -79,7 +80,6 @@ public class DartWorkerContext implements WorkerContext
private final Injector injector;
private final DartWorkerClient workerClient;
private final SegmentWrangler segmentWrangler;
- private final GroupingEngine groupingEngine;
private final SegmentManager segmentManager;
private final CoordinatorClient coordinatorClient;
private final MemoryIntrospector memoryIntrospector;
@@ -96,6 +96,7 @@ public class DartWorkerContext implements WorkerContext
@MonotonicNonNull
private volatile ResourceHolder<ProcessingBuffersSet> processingBuffersSet;
private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
+ private final List<InputSliceReaderProvider> inputSliceReaderProviders;
DartWorkerContext(
final String queryId,
@@ -107,7 +108,6 @@ public class DartWorkerContext implements WorkerContext
final DartWorkerClient workerClient,
final DruidProcessingConfig processingConfig,
final SegmentWrangler segmentWrangler,
- final GroupingEngine groupingEngine,
final SegmentManager segmentManager,
final CoordinatorClient coordinatorClient,
final MemoryIntrospector memoryIntrospector,
@@ -116,7 +116,8 @@ public class DartWorkerContext implements WorkerContext
final File tempDir,
final QueryContext queryContext,
final DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
- final ServiceEmitter emitter
+ final ServiceEmitter emitter,
+ final List<InputSliceReaderProvider> inputSliceReaderProviders
)
{
this.queryId = queryId;
@@ -129,7 +130,6 @@ public class DartWorkerContext implements WorkerContext
this.injector = injector;
this.workerClient = workerClient;
this.segmentWrangler = segmentWrangler;
- this.groupingEngine = groupingEngine;
this.segmentManager = segmentManager;
this.coordinatorClient = coordinatorClient;
this.memoryIntrospector = memoryIntrospector;
@@ -138,6 +138,7 @@ public class DartWorkerContext implements WorkerContext
this.tempDir = tempDir;
this.queryContext = Preconditions.checkNotNull(queryContext,
"queryContext");
this.emitter = emitter;
+ this.inputSliceReaderProviders = inputSliceReaderProviders;
// Compute thread count once in constructor
final int baseThreadCount = processingConfig.getNumThreads();
@@ -175,6 +176,12 @@ public class DartWorkerContext implements WorkerContext
return injector;
}
+ @Override
+ public List<InputSliceReaderProvider> inputSliceReaderProviders()
+ {
+ return inputSliceReaderProviders;
+ }
+
@Override
public void registerWorker(Worker worker, Closer closer)
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
index 07117c80f19..dc74cea3513 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
@@ -34,6 +34,7 @@ import
org.apache.druid.msq.dart.controller.messages.ControllerMessage;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.exec.ProcessingBuffersProvider;
import org.apache.druid.msq.exec.WorkerContext;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.groupby.GroupingEngine;
@@ -44,6 +45,8 @@ import org.apache.druid.server.DruidNode;
import org.apache.druid.server.SegmentManager;
import java.io.File;
+import java.util.List;
+import java.util.Set;
/**
* Production implementation of {@link DartWorkerContextFactory}.
@@ -66,6 +69,7 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
private final Outbox<ControllerMessage> outbox;
private final DartDataServerQueryHandlerFactory
dataServerQueryHandlerFactory;
private final ServiceEmitter emitter;
+ private final List<InputSliceReaderProvider> inputSliceReaderProviders;
@Inject
public DartWorkerContextFactoryImpl(
@@ -84,7 +88,8 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
@Dart ProcessingBuffersProvider processingBuffersProvider,
Outbox<ControllerMessage> outbox,
DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory,
- ServiceEmitter emitter
+ ServiceEmitter emitter,
+ @Dart Set<InputSliceReaderProvider> inputSliceReaderProviders
)
{
this.selfNode = selfNode;
@@ -103,6 +108,7 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
this.outbox = outbox;
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
this.emitter = emitter;
+ this.inputSliceReaderProviders = List.copyOf(inputSliceReaderProviders);
}
@Override
@@ -123,7 +129,6 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
createWorkerClient(queryId),
processingConfig,
segmentWrangler,
- groupingEngine,
segmentManager,
coordinatorClient,
memoryIntrospector,
@@ -132,7 +137,8 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
tempDir,
queryContext,
dataServerQueryHandlerFactory,
- emitter
+ emitter,
+ inputSliceReaderProviders
);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index e12c5c3b336..013df33fad1 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -26,14 +26,14 @@ import
org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.indexing.MSQSpec;
-import org.apache.druid.msq.input.InputSpecSlicer;
-import org.apache.druid.msq.input.table.SegmentsInputSlice;
-import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.server.DruidNode;
import java.io.File;
+import java.util.List;
/**
* Context used by multi-stage query controllers. Useful because it allows
test fixtures to provide their own
@@ -82,9 +82,10 @@ public interface ControllerContext
DruidNode selfNode();
/**
- * Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec}
into {@link SegmentsInputSlice}.
+ * Extension point for {@link InputSpec} beyond the builtin ones provided by
+ * {@link ControllerImpl#makeInputSpecSlicerFactory}.
*/
- InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager);
+ List<InputSpecSlicerProvider> inputSpecSlicerProviders();
/**
* Provide access to segment actions in the Overlord. Only called for
ingestion queries, i.e., where
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 83568629be1..db9b3cbc9fb 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -139,6 +139,7 @@ import
org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.InputSpecSlicerFactory;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.msq.input.MapInputSpecSlicer;
import org.apache.druid.msq.input.external.ExternalInputSpec;
import org.apache.druid.msq.input.external.ExternalInputSpecSlicer;
@@ -148,7 +149,6 @@ import org.apache.druid.msq.input.lookup.LookupInputSpec;
import org.apache.druid.msq.input.lookup.LookupInputSpecSlicer;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
-import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageId;
@@ -208,6 +208,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -423,8 +424,11 @@ public class ControllerImpl implements Controller
closer.register(workerSketchFetcher::close);
// Execution-related: run the multi-stage QueryDefinition.
- final InputSpecSlicerFactory inputSpecSlicerFactory =
-
makeInputSpecSlicerFactory(context.newTableInputSpecSlicer(workerManager));
+ final InputSpecSlicerFactory inputSpecSlicerFactory =
makeInputSpecSlicerFactory(
+ context,
+ workerManager.getWorkerIds(),
+ getQueryContext()
+ );
final Pair<ControllerQueryKernel, ListenableFuture<?>> queryRunResult =
new RunQueryUntilDone(
@@ -2149,17 +2153,30 @@ public class ControllerImpl implements Controller
);
}
- private static InputSpecSlicerFactory makeInputSpecSlicerFactory(final
InputSpecSlicer tableInputSpecSlicer)
+ private static InputSpecSlicerFactory makeInputSpecSlicerFactory(
+ final ControllerContext controllerContext,
+ final List<String> workerIds,
+ final QueryContext queryContext
+ )
{
- return (stagePartitionsMap, stageOutputChannelModeMap) -> new
MapInputSpecSlicer(
- ImmutableMap.<Class<? extends InputSpec>, InputSpecSlicer>builder()
- .put(StageInputSpec.class, new
StageInputSpecSlicer(stagePartitionsMap, stageOutputChannelModeMap))
- .put(ExternalInputSpec.class, new
ExternalInputSpecSlicer())
- .put(InlineInputSpec.class, new InlineInputSpecSlicer())
- .put(LookupInputSpec.class, new LookupInputSpecSlicer())
- .put(TableInputSpec.class, tableInputSpecSlicer)
- .build()
- );
+ return (stagePartitionsMap, stageOutputChannelModeMap) -> {
+ Map<Class<? extends InputSpec>, InputSpecSlicer> slicers = new
LinkedHashMap<>();
+
+ slicers.put(StageInputSpec.class, new
StageInputSpecSlicer(stagePartitionsMap, stageOutputChannelModeMap));
+ slicers.put(ExternalInputSpec.class, new ExternalInputSpecSlicer());
+ slicers.put(InlineInputSpec.class, new InlineInputSpecSlicer());
+ slicers.put(LookupInputSpec.class, new LookupInputSpecSlicer());
+
+ // Context-supplied providers override the default ones, so they get
added last.
+ for (final InputSpecSlicerProvider slicerProvider :
controllerContext.inputSpecSlicerProviders()) {
+ slicers.put(
+ slicerProvider.specClass(),
+ slicerProvider.createSlicer(controllerContext, queryContext,
workerIds)
+ );
+ }
+
+ return new MapInputSpecSlicer(slicers);
+ };
}
private static Map<Integer, Interval> copyOfStageRuntimesEndingAtCurrentTime(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index ba613768781..20593f26621 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -21,7 +21,6 @@ package org.apache.druid.msq.exec;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -48,6 +47,7 @@ import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSliceReader;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
import org.apache.druid.msq.input.MapInputSliceReader;
import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.msq.input.NilInputSliceReader;
@@ -71,6 +71,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@@ -399,16 +400,21 @@ public class RunWorkOrder
private InputSliceReader makeInputSliceReader()
{
final boolean reindex =
MultiStageQueryContext.isReindex(workOrder.getWorkerContext());
- return new MapInputSliceReader(
- ImmutableMap.<Class<? extends InputSlice>, InputSliceReader>builder()
- .put(NilInputSlice.class, NilInputSliceReader.INSTANCE)
- .put(StageInputSlice.class, StageInputSliceReader.INSTANCE)
- .put(ExternalInputSlice.class, new
ExternalInputSliceReader(frameContext.tempDir("external")))
- .put(InlineInputSlice.class, new
InlineInputSliceReader(frameContext.segmentWrangler()))
- .put(LookupInputSlice.class, new
LookupInputSliceReader(frameContext.segmentWrangler()))
- .put(SegmentsInputSlice.class, new
SegmentsInputSliceReader(frameContext, reindex))
- .build()
- );
+ LinkedHashMap<Class<? extends InputSlice>, InputSliceReader> readers = new
LinkedHashMap<>();
+ readers.put(NilInputSlice.class, NilInputSliceReader.INSTANCE);
+ readers.put(StageInputSlice.class, StageInputSliceReader.INSTANCE);
+ readers.put(ExternalInputSlice.class, new
ExternalInputSliceReader(frameContext.tempDir("external")));
+ readers.put(InlineInputSlice.class, new
InlineInputSliceReader(frameContext.segmentWrangler()));
+ readers.put(LookupInputSlice.class, new
LookupInputSliceReader(frameContext.segmentWrangler()));
+ readers.put(SegmentsInputSlice.class, new
SegmentsInputSliceReader(frameContext, reindex));
+
+ // Context-supplied providers override the default ones, so they get added
last.
+ for (final InputSliceReaderProvider readerProvider :
workerContext.inputSliceReaderProviders()) {
+ final InputSliceReader reader =
readerProvider.createReader(frameContext, workOrder.getWorkerContext());
+ readers.put(readerProvider.sliceClass(), reader);
+ }
+
+ return new MapInputSliceReader(readers);
}
private OutputChannelFactory makeStageOutputChannelFactory()
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
index 9e696a16cce..b225112236d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
@@ -24,6 +24,8 @@ import com.google.inject.Injector;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.indexing.MSQWorkerTask;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.policy.PolicyEnforcer;
@@ -31,6 +33,7 @@ import org.apache.druid.server.DruidNode;
import java.io.Closeable;
import java.io.File;
+import java.util.List;
/**
* Context used by multi-stage query workers.
@@ -121,6 +124,15 @@ public interface WorkerContext extends Closeable
*/
boolean isDebug();
+ /**
+ * Extension point for additional {@link InputSlice} beyond those provided by
+ * {@link RunWorkOrder#makeInputSliceReader()}.
+ */
+ default List<InputSliceReaderProvider> inputSliceReaderProviders()
+ {
+ return List.of();
+ }
+
@Override
void close();
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java
index 0a6b0827324..e3cd3e4028e 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java
@@ -20,11 +20,20 @@
package org.apache.druid.msq.guice;
import com.google.inject.Binder;
+import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
+import com.google.inject.multibindings.Multibinder;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.msq.dart.Dart;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.query.Query;
+import java.lang.annotation.Annotation;
+
/**
* Utility class for MSQ-related Guice bindings.
*/
@@ -50,4 +59,29 @@ public class MSQBinders
new TypeLiteral<>() {}
);
}
+
+ /**
+ * Bind an {@link InputSpecSlicerProvider} for use on a controller. The
annotation should be
+ * {@link IndexingService} for providers used by tasks, or {@link Dart} for
providers used by Dart.
+ */
+ public static Multibinder<InputSpecSlicerProvider>
inputSpecSlicerProviderBinder(
+ Binder binder,
+ Class<? extends Annotation> annotation
+ )
+ {
+ return Multibinder.newSetBinder(binder,
Key.get(InputSpecSlicerProvider.class, annotation));
+ }
+
+ /**
+ * Bind an {@link InputSliceReaderProvider} for use on a worker, to handle a
particular {@link InputSlice}.
+ * The annotation should be {@link IndexingService} for providers used by
tasks, or {@link Dart} for providers
+ * used by Dart.
+ */
+ public static Multibinder<InputSliceReaderProvider>
inputSliceReaderProviderBinder(
+ Binder binder,
+ Class<? extends Annotation> annotation
+ )
+ {
+ return Multibinder.newSetBinder(binder,
Key.get(InputSliceReaderProvider.class, annotation));
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
index 629754f2a9a..8781fa5bb0f 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
@@ -27,6 +27,7 @@ import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provides;
+import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.initialization.DruidModule;
@@ -40,6 +41,8 @@ import org.apache.druid.msq.counters.StorageCounters;
import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter;
import org.apache.druid.msq.counters.WarningCounters;
import org.apache.druid.msq.indexing.IndexerControllerContextFactory;
+import org.apache.druid.msq.indexing.IndexerSegmentsInputSliceReaderProvider;
+import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicerProvider;
import org.apache.druid.msq.indexing.MSQCompactionRunner;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQWorkerTask;
@@ -250,6 +253,16 @@ public class MSQIndexingModule implements DruidModule
.addBinding(WindowOperatorQuery.class)
.to(WindowOperatorQueryKit.class);
binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
+
+ MSQBinders.inputSpecSlicerProviderBinder(binder, IndexingService.class)
+ .addBinding()
+ .to(IndexerTableInputSpecSlicerProvider.class)
+ .in(LazySingleton.class);
+
+ MSQBinders.inputSliceReaderProviderBinder(binder, IndexingService.class)
+ .addBinding()
+ .to(IndexerSegmentsInputSliceReaderProvider.class)
+ .in(LazySingleton.class);
}
@Provides
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 18aebe0bcd2..5ea69d8c6a0 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.frame.FrameType;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.TaskLockType;
@@ -48,7 +50,7 @@ import
org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.indexing.error.UnknownFault;
-import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -66,7 +68,9 @@ import org.apache.druid.storage.StorageConnectorProvider;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -90,6 +94,7 @@ public class IndexerControllerContext implements
ControllerContext
private final ServiceClientFactory clientFactory;
private final OverlordClient overlordClient;
private final MemoryIntrospector memoryIntrospector;
+ private final List<InputSpecSlicerProvider> inputSpecSlicerProviders;
public IndexerControllerContext(
final MSQControllerTask task,
@@ -110,6 +115,9 @@ public class IndexerControllerContext implements
ControllerContext
this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class);
final StorageConnectorProvider storageConnectorProvider =
injector.getInstance(Key.get(StorageConnectorProvider.class,
MultiStageQuery.class));
final StorageConnector storageConnector =
storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir());
+ final Set<InputSpecSlicerProvider> inputSpecSlicerProviders =
+ injector.getInstance(Key.get(new TypeLiteral<>() {},
IndexingService.class));
+ this.inputSpecSlicerProviders = List.copyOf(inputSpecSlicerProviders);
this.injector = injector.createChildInjector(
binder -> binder.bind(Key.get(StorageConnector.class,
MultiStageQuery.class))
.toInstance(storageConnector));
@@ -174,15 +182,9 @@ public class IndexerControllerContext implements
ControllerContext
}
@Override
- public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager
workerManager)
+ public List<InputSpecSlicerProvider> inputSpecSlicerProviders()
{
- final SegmentSource includeSegmentSource =
- MultiStageQueryContext.getSegmentSources(taskQuerySpecContext,
DEFAULT_SEGMENT_SOURCE);
- return new IndexerTableInputSpecSlicer(
- toolbox.getCoordinatorClient(),
- toolbox.getTaskActionClient(),
- includeSegmentSource
- );
+ return inputSpecSlicerProviders;
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java
new file mode 100644
index 00000000000..d27a7852cac
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import org.apache.druid.msq.exec.FrameContext;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReader;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
+import org.apache.druid.msq.input.table.SegmentsInputSlice;
+import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
+
+/**
+ * Worker-side provider for {@link SegmentsInputSlice} in tasks.
+ */
+public class IndexerSegmentsInputSliceReaderProvider implements
InputSliceReaderProvider
+{
+ @Override
+ public Class<? extends InputSlice> sliceClass()
+ {
+ return SegmentsInputSlice.class;
+ }
+
+ @Override
+ public InputSliceReader createReader(FrameContext frameContext, QueryContext
queryContext)
+ {
+ return new SegmentsInputSliceReader(frameContext,
MultiStageQueryContext.isReindex(queryContext));
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java
new file mode 100644
index 00000000000..052d5c7691c
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
+import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.rpc.StandardRetryPolicy;
+
+import java.util.List;
+
+/**
+ * Controller-side provider for {@link TableInputSpec} in tasks.
+ */
+public class IndexerTableInputSpecSlicerProvider implements
InputSpecSlicerProvider
+{
+ private final CoordinatorClient coordinatorClient;
+
+ @Inject
+ public IndexerTableInputSpecSlicerProvider(CoordinatorClient
coordinatorClient)
+ {
+ // Use the "aboutAnHour" retry policy, same as the one used in the
TaskToolboxFactory. This prevents
+ // long-running tasks from failing if there are Coordinator/Overlord
problems. Calls will still fail
+ // eventually if problems persist.
+ this.coordinatorClient =
coordinatorClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour());
+ }
+
+ @Override
+ public Class<? extends InputSpec> specClass()
+ {
+ return TableInputSpec.class;
+ }
+
+ @Override
+ public InputSpecSlicer createSlicer(
+ ControllerContext controllerContext,
+ QueryContext queryContext,
+ List<String> workerIds
+ )
+ {
+ final SegmentSource includeSegmentSource =
+ MultiStageQueryContext.getSegmentSources(queryContext,
IndexerControllerContext.DEFAULT_SEGMENT_SOURCE);
+ return new IndexerTableInputSpecSlicer(
+ coordinatorClient,
+ controllerContext.taskActionClient(),
+ includeSegmentSource
+ );
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 2589d0c0dab..82d462fd096 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Smile;
@@ -46,6 +48,7 @@ import org.apache.druid.msq.guice.MultiStageQuery;
import org.apache.druid.msq.indexing.client.IndexerControllerClient;
import org.apache.druid.msq.indexing.client.IndexerWorkerClient;
import org.apache.druid.msq.indexing.client.WorkerChatHandler;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
@@ -66,6 +69,8 @@ import org.apache.druid.storage.StorageConnectorProvider;
import javax.annotation.Nullable;
import java.io.File;
+import java.util.List;
+import java.util.Set;
public class IndexerWorkerContext implements WorkerContext
{
@@ -90,6 +95,7 @@ public class IndexerWorkerContext implements WorkerContext
private final ServiceClientFactory clientFactory;
private final MemoryIntrospector memoryIntrospector;
private final ProcessingBuffersProvider processingBuffersProvider;
+ private final List<InputSliceReaderProvider> inputSliceReaderProviders;
private final int maxConcurrentStages;
private final boolean liveReportCounters;
private final boolean includeAllCounters;
@@ -112,7 +118,8 @@ public class IndexerWorkerContext implements WorkerContext
final ServiceClientFactory clientFactory,
final MemoryIntrospector memoryIntrospector,
final ProcessingBuffersProvider processingBuffersProvider,
- final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory
+ final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory,
+ final List<InputSliceReaderProvider> inputSliceReaderProviders
)
{
this.task = task;
@@ -127,6 +134,7 @@ public class IndexerWorkerContext implements WorkerContext
this.memoryIntrospector = memoryIntrospector;
this.processingBuffersProvider = processingBuffersProvider;
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
+ this.inputSliceReaderProviders = inputSliceReaderProviders;
final QueryContext queryContext = QueryContext.of(task.getContext());
this.maxConcurrentStages =
MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
@@ -171,6 +179,8 @@ public class IndexerWorkerContext implements WorkerContext
injector.getInstance(OverlordClient.class).withRetryPolicy(StandardRetryPolicy.unlimited());
final ProcessingBuffersProvider processingBuffersProvider =
injector.getInstance(ProcessingBuffersProvider.class);
final ObjectMapper smileMapper =
injector.getInstance(Key.get(ObjectMapper.class, Smile.class));
+ final Set<InputSliceReaderProvider> inputSliceReaderProviders =
+ injector.getInstance(Key.get(new TypeLiteral<>() {},
IndexingService.class));
return new IndexerWorkerContext(
task,
@@ -189,7 +199,8 @@ public class IndexerWorkerContext implements WorkerContext
toolbox.getCoordinatorClient(),
serviceClientFactory,
smileMapper
- )
+ ),
+ List.copyOf(inputSliceReaderProviders)
);
}
@@ -228,6 +239,12 @@ public class IndexerWorkerContext implements WorkerContext
return injector;
}
+ @Override
+ public List<InputSliceReaderProvider> inputSliceReaderProviders()
+ {
+ return inputSliceReaderProviders;
+ }
+
@Override
public void emitMetric(MSQMetricEventBuilder metricBuilder)
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java
new file mode 100644
index 00000000000..8dd71efe593
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input;
+
+import org.apache.druid.msq.exec.FrameContext;
+import org.apache.druid.query.QueryContext;
+
+/**
+ * Worker-side extension point for {@link InputSlice}: provides an {@link
InputSliceReader} for a particular
+ * {@link InputSlice} class.
+ */
+public interface InputSliceReaderProvider
+{
+ /**
+ * The {@link InputSlice} class handled by this provider.
+ */
+ Class<? extends InputSlice> sliceClass();
+
+ /**
+ * Returns an {@link InputSliceReader} that accepts {@link #sliceClass()}.
+ */
+ InputSliceReader createReader(FrameContext frameContext, QueryContext
queryContext);
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java
new file mode 100644
index 00000000000..3d0f0e77428
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input;
+
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.query.QueryContext;
+
+import java.util.List;
+
+/**
+ * Controller-side extension point for {@link InputSpec}: provides an {@link
InputSpecSlicer} for a particular
+ * {@link InputSpec} class.
+ */
+public interface InputSpecSlicerProvider
+{
+ /**
+ * The {@link InputSpec} class handled by this provider.
+ */
+ Class<? extends InputSpec> specClass();
+
+ /**
+ * Returns an {@link InputSpecSlicer} that accepts {@link #specClass()}.
+ */
+ InputSpecSlicer createSlicer(
+ ControllerContext controllerContext,
+ QueryContext queryContext,
+ List<String> workerIds
+ );
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
index 3888368c1b8..1ab1b55024c 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
@@ -106,8 +106,17 @@ public class DartControllerContextTest
@Test
public void test_queryKernelConfig()
{
- final DartControllerContext controllerContext =
- new DartControllerContext(null, null, SELF_NODE, null,
memoryIntrospector, serverView, null, queryContext);
+ final DartControllerContext controllerContext = new DartControllerContext(
+ null,
+ null,
+ SELF_NODE,
+ null,
+ memoryIntrospector,
+ serverView,
+ List.of(),
+ null,
+ queryContext
+ );
final ControllerQueryKernelConfig queryKernelConfig =
controllerContext.queryKernelConfig(querySpec);
Assertions.assertFalse(queryKernelConfig.isFaultTolerant());
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index 787b980307f..895de0319da 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -30,6 +30,7 @@ import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.util.Modules;
import com.google.inject.util.Providers;
+import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.error.DruidException;
@@ -237,6 +238,7 @@ public class MSQCompactionTaskRunTest extends
CompactionTaskRunBase
new SegmentWranglerModule(),
new LookylooModule(),
new MSQIndexingModule(),
+ binder ->
binder.bind(CoordinatorClient.class).toInstance(coordinatorClient),
binder ->
binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()),
binder -> binder.bind(WireTransferableContext.class).toInstance(new
WireTransferableContext(null, null, true)),
binder -> binder.bind(DataSegmentPusher.class)
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
index 7b7f4cb93b1..fc7b499eb1a 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
@@ -21,8 +21,11 @@ package org.apache.druid.msq.exec;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Provides;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.TestDruidModule;
import org.apache.druid.msq.guice.MultiStageQuery;
@@ -39,6 +42,12 @@ import org.apache.druid.sql.avatica.MSQDruidMeta;
public class TestMSQSqlModule extends TestDruidModule
{
+ @Override
+ public void configure(Binder binder)
+ {
+
binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class).in(LazySingleton.class);
+ }
+
@Provides
@MultiStageQuery
@LazySingleton
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
index 41956d55695..f7947676ed7 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
@@ -31,6 +31,8 @@ import com.google.inject.testing.fieldbinder.BoundFieldModule;
import com.google.inject.util.Modules;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.guice.ConfigModule;
import org.apache.druid.guice.DruidGuiceExtensions;
@@ -217,7 +219,10 @@ public class MSQTaskQueryMakerTest
new SegmentWranglerModule(),
new LookylooModule(),
new MSQIndexingModule(),
- binder ->
binder.bind(WireTransferableContext.class).toInstance(FrameTestUtil.WT_CONTEXT_LEGACY)
+ binder -> {
+
binder.bind(WireTransferableContext.class).toInstance(FrameTestUtil.WT_CONTEXT_LEGACY);
+ binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class);
+ }
);
Injector injector = Guice.createInjector(defaultModule,
BoundFieldModule.of(this));
DruidSecondaryModule.setupJackson(injector, objectMapper,
Collections.emptyMap(), true);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
index dd0370008ad..43558e8d517 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
@@ -22,8 +22,6 @@ package org.apache.druid.msq.test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Binder;
import com.google.inject.Provides;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.LazySingleton;
@@ -126,7 +124,6 @@ public abstract class AbstractDartComponentSupplier extends
AbstractMSQComponent
@Override
public void configure(Binder binder)
{
- binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class);
}
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 5a2ecbcf243..10c3e8cbec3 100644
--- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -38,6 +38,8 @@ import com.google.inject.util.Modules;
import com.google.inject.util.Providers;
import org.apache.calcite.avatica.remote.TypedValue;
import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.LongDimensionSchema;
@@ -576,7 +578,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
new SegmentWranglerModule(),
new HllSketchModule(),
binder -> binder.bind(Bouncer.class).toInstance(new Bouncer(1)),
- binder ->
binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance())
+ binder ->
binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()),
+ binder ->
binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class).in(LazySingleton.class)
);
// adding node role injection to the modules, since CliPeon would also do
that through run method
injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build(),
ImmutableSet.of(NodeRole.PEON))
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 6f09c43ff02..8cc6ed620d2 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -51,7 +51,6 @@ import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.ControllerMemoryParameters;
import org.apache.druid.msq.exec.MSQMetricEventBuilder;
-import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerFailureListener;
@@ -61,12 +60,12 @@ import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerRunRef;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.indexing.IndexerControllerContext;
-import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer;
+import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicerProvider;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
-import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
@@ -169,6 +168,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
serviceEmitter,
Mockito.mock(CoordinatorClient.class)
);
+
Mockito.when(coordinatorClient.withRetryPolicy(ArgumentMatchers.any())).thenReturn(coordinatorClient);
Mockito.when(coordinatorClient.fetchServerViewSegments(
ArgumentMatchers.anyString(),
ArgumentMatchers.any()
@@ -416,13 +416,9 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
}
@Override
- public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager)
+ public List<InputSpecSlicerProvider> inputSpecSlicerProviders()
{
- return new IndexerTableInputSpecSlicer(
- coordinatorClient,
- taskActionClient,
- MultiStageQueryContext.getSegmentSources(queryContext,
SegmentSource.NONE)
- );
+ return List.of(new IndexerTableInputSpecSlicerProvider(coordinatorClient));
}
@Override
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
index c3eaf6e9c9e..d7f48f2dc41 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
@@ -46,6 +46,7 @@ import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerImpl;
import org.apache.druid.msq.exec.WorkerRunRef;
import org.apache.druid.msq.exec.WorkerStorageParameters;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.query.QueryContext;
@@ -53,6 +54,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executors;
public class TestDartControllerContextFactoryImpl extends
DartControllerContextFactoryImpl
@@ -74,11 +76,22 @@ public class TestDartControllerContextFactoryImpl extends
DartControllerContextF
@EscalatedGlobal final ServiceClientFactory serviceClientFactory,
final MemoryIntrospector memoryIntrospector,
final TimelineServerView serverView,
+ @Dart final Set<InputSpecSlicerProvider> inputSpecSlicerProviders,
final ServiceEmitter emitter,
@Dart Map<String, WorkerRunRef> workerMap
)
{
- super(injector, jsonMapper, smileMapper, selfNode, serviceClientFactory,
memoryIntrospector, serverView, emitter);
+ super(
+ injector,
+ jsonMapper,
+ smileMapper,
+ selfNode,
+ serviceClientFactory,
+ memoryIntrospector,
+ serverView,
+ inputSpecSlicerProviders,
+ emitter
+ );
this.workerMap = workerMap;
}
@@ -92,6 +105,7 @@ public class TestDartControllerContextFactoryImpl extends
DartControllerContextF
new DartTestWorkerClient(),
memoryIntrospector,
serverView,
+ inputSpecSlicerProviders,
emitter,
context
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]