This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e8e548f7c7a Allow Dart to query realtime tasks (#18076)
e8e548f7c7a is described below
commit e8e548f7c7a38e2d2406dc65a0e4532509eabb30
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Fri Jun 6 23:38:36 2025 +0530
Allow Dart to query realtime tasks (#18076)
* Wire up injectables
* Refactor to pass query context to slicer
* Modify Dart input slicer
* Add test
* Clean up
* Address review comments
* Fix equals
* Update docs
* Have default segment sources for MSQ task and Dart
---
docs/querying/query-context.md | 2 +-
.../msq/dart/controller/DartControllerContext.java | 10 +-
.../controller/DartControllerContextFactory.java | 3 +-
.../DartControllerContextFactoryImpl.java | 8 +-
.../msq/dart/controller/DartSegmentAssignment.java | 74 ++++++++++
.../dart/controller/DartTableInputSpecSlicer.java | 120 +++++++++++-----
.../msq/dart/controller/sql/DartQueryMaker.java | 12 +-
.../druid/msq/dart/worker/DartFrameContext.java | 9 +-
.../msq/dart/worker/DartQueryableSegment.java | 33 +++--
.../druid/msq/dart/worker/DartWorkerContext.java | 12 +-
.../msq/dart/worker/DartWorkerFactoryImpl.java | 19 ++-
.../org/apache/druid/msq/exec/ControllerImpl.java | 4 +-
.../druid/msq/exec/DataServerQueryHandler.java | 18 +--
.../org/apache/druid/msq/exec/SegmentSource.java | 11 +-
.../org/apache/druid/msq/exec/WorkerContext.java | 3 +
.../msq/indexing/IndexerControllerContext.java | 3 +-
.../druid/msq/sql/DartQueryKitSpecFactory.java | 8 +-
.../druid/msq/util/MSQTaskQueryMakerUtils.java | 3 +-
.../druid/msq/util/MultiStageQueryContext.java | 5 +-
.../dart/controller/DartControllerContextTest.java | 2 +-
.../controller/DartTableInputSpecSlicerTest.java | 157 ++++++++++++++++++++-
.../druid/msq/test/DartComponentSupplier.java | 3 +
.../druid/msq/test/MSQTestControllerContext.java | 5 +-
.../test/TestDartControllerContextFactoryImpl.java | 6 +-
.../druid/client/selector/ServerSelector.java | 10 ++
25 files changed, 428 insertions(+), 112 deletions(-)
diff --git a/docs/querying/query-context.md b/docs/querying/query-context.md
index 0a277949fdd..35fb7fe3c4e 100644
--- a/docs/querying/query-context.md
+++ b/docs/querying/query-context.md
@@ -66,7 +66,7 @@ See [SQL query context](sql-query-context.md) for other query
context parameters
|`debug`| `false` | Flag indicating whether to enable debugging outputs for
the query. When set to false, no additional logs will be produced (logs
produced will be entirely dependent on your logging level). When set to true,
the following addition logs will be produced:<br />- Log the stack trace of the
exception (if any) produced by the query |
|`setProcessingThreadNames`|`true`| Whether processing thread names will be
set to `queryType_dataSource_intervals` while processing a query. This aids in
interpreting thread dumps, and is on by default. Query overhead can be reduced
slightly by setting this to `false`. This has a tiny effect in most scenarios,
but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge
two Project operators when inlining expressions causes complexity to increase.
Implemented as a workaround to exception `There are not enough rules to produce
a node with desired properties: convention=DRUID, sort=[]` thrown after
rejecting the merge of two projects.|
-|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should
be queried by brokers. Clone servers are created by the `cloneServers`
Coordinator dynamic configuration. Possible values are `excludeClones`,
`includeClones` and `preferClones`. `excludeClones` means that clone
Historicals are not queried by the broker. `preferClones` indicates that when
given a choice between the clone Historical and the original Historical which
is being cloned, the broker chooses the clones [...]
+|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should
be queried by brokers. Clone servers are created by the `cloneServers`
Coordinator dynamic configuration. Possible values are `excludeClones`,
`includeClones` and `preferClones`. `excludeClones` means that clone
Historicals are not queried by the broker. `preferClones` indicates that when
given a choice between the clone Historical and the original Historical which
is being cloned, the broker chooses the clones [...]
## Parameters by query type
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index a76f15d6b60..39464406cef 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -34,6 +34,7 @@ import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.ControllerMemoryParameters;
import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.exec.WorkerFailureListener;
import org.apache.druid.msq.exec.WorkerManager;
import org.apache.druid.msq.indexing.IndexerControllerContext;
@@ -77,12 +78,15 @@ public class DartControllerContext implements
ControllerContext
*/
public static final int DEFAULT_MAX_NON_LEAF_WORKER_COUNT = 1;
+ public static final SegmentSource DEFAULT_SEGMENT_SOURCE =
SegmentSource.REALTIME;
+
private final Injector injector;
private final ObjectMapper jsonMapper;
private final DruidNode selfNode;
private final DartWorkerClient workerClient;
private final TimelineServerView serverView;
private final MemoryIntrospector memoryIntrospector;
+ private final QueryContext context;
private final ServiceMetricEvent.Builder metricBuilder;
private final ServiceEmitter emitter;
@@ -93,7 +97,8 @@ public class DartControllerContext implements
ControllerContext
final DartWorkerClient workerClient,
final MemoryIntrospector memoryIntrospector,
final TimelineServerView serverView,
- final ServiceEmitter emitter
+ final ServiceEmitter emitter,
+ final QueryContext context
)
{
this.injector = injector;
@@ -102,6 +107,7 @@ public class DartControllerContext implements
ControllerContext
this.workerClient = workerClient;
this.serverView = serverView;
this.memoryIntrospector = memoryIntrospector;
+ this.context = context;
this.metricBuilder = new ServiceMetricEvent.Builder();
this.emitter = emitter;
}
@@ -180,7 +186,7 @@ public class DartControllerContext implements
ControllerContext
@Override
public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager)
{
- return
DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(),
serverView);
+ return
DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(),
serverView, context);
}
@Override
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactory.java
index f58eb4bfa68..796c52bb54b 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactory.java
@@ -21,11 +21,12 @@ package org.apache.druid.msq.dart.controller;
import org.apache.druid.msq.dart.controller.sql.DartQueryMaker;
import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.query.QueryContext;
/**
* Class for creating {@link ControllerContext} in {@link DartQueryMaker}.
*/
public interface DartControllerContextFactory
{
- ControllerContext newContext(String queryId);
+ ControllerContext newContext(QueryContext queryContext);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
index 113714aa9b9..87582d977aa 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
@@ -31,6 +31,8 @@ import
org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.msq.dart.worker.DartWorkerClientImpl;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;
@@ -68,8 +70,9 @@ public class DartControllerContextFactoryImpl implements
DartControllerContextFa
}
@Override
- public ControllerContext newContext(final String queryId)
+ public ControllerContext newContext(final QueryContext context)
{
+ final String queryId = context.getString(QueryContexts.CTX_DART_QUERY_ID);
return new DartControllerContext(
injector,
jsonMapper,
@@ -77,7 +80,8 @@ public class DartControllerContextFactoryImpl implements
DartControllerContextFa
new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper,
selfNode.getHostAndPortToUse()),
memoryIntrospector,
serverView,
- emitter
+ emitter,
+ context
);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartSegmentAssignment.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartSegmentAssignment.java
new file mode 100644
index 00000000000..a9a2f7b3a96
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartSegmentAssignment.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import org.apache.druid.msq.dart.worker.DartQueryableSegment;
+import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents the set of segments assigned to a particular dart worker, used
by {@link DartTableInputSpecSlicer}.
+ */
+public class DartSegmentAssignment
+{
+ private final List<DartQueryableSegment> dartQueryableSegments;
+ private final List<DataServerRequestDescriptor> dataServerRequestDescriptor;
+
+ public DartSegmentAssignment(
+ List<DartQueryableSegment> dartQueryableSegments,
+ List<DataServerRequestDescriptor> dataServerRequestDescriptor
+ )
+ {
+ this.dartQueryableSegments = dartQueryableSegments;
+ this.dataServerRequestDescriptor = dataServerRequestDescriptor;
+ }
+
+ public static DartSegmentAssignment empty()
+ {
+ return new DartSegmentAssignment(new ArrayList<>(), new ArrayList<>());
+ }
+
+ public void addSegments(DartQueryableSegment segment)
+ {
+ dartQueryableSegments.add(segment);
+ }
+
+ public void addRequest(DataServerRequestDescriptor requestDescriptor)
+ {
+ dataServerRequestDescriptor.add(requestDescriptor);
+ }
+
+ public List<DartQueryableSegment> getDartQueryableSegments()
+ {
+ return dartQueryableSegments;
+ }
+
+ public List<DataServerRequestDescriptor> getDataServerRequestDescriptor()
+ {
+ return dataServerRequestDescriptor;
+ }
+
+ public boolean isEmpty()
+ {
+ return dataServerRequestDescriptor.isEmpty() &&
dartQueryableSegments.isEmpty();
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
index 00813d0f2fb..4b7ea2ed8ee 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicer.java
@@ -20,7 +20,6 @@
package org.apache.druid.msq.dart.controller;
import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.client.QueryableDruidServer;
@@ -36,22 +35,29 @@ import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
import org.apache.druid.msq.input.NilInputSlice;
+import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
+import org.apache.druid.msq.input.table.DataServerSelector;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.CloneQueryMode;
+import org.apache.druid.query.QueryContext;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.filter.DimFilterUtils;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
+import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.function.ToIntFunction;
+import java.util.stream.Collectors;
/**
* Slices {@link TableInputSpec} into {@link SegmentsInputSlice} for
persistent servers using
@@ -71,15 +77,32 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
*/
private final TimelineServerView serverView;
- DartTableInputSpecSlicer(final Object2IntMap<String> workerIdToNumber, final
TimelineServerView serverView)
+ /**
+ * Determines the kind of tasks that should be queried.
+ */
+ private final SegmentSource segmentSource;
+
+ /**
+ * Determines if cloning historicals should be queried.
+ */
+ private final CloneQueryMode cloneQueryMode;
+
+ DartTableInputSpecSlicer(
+ final Object2IntMap<String> workerIdToNumber,
+ final TimelineServerView serverView,
+ final QueryContext queryContext
+ )
{
this.workerIdToNumber = workerIdToNumber;
this.serverView = serverView;
+ this.segmentSource =
MultiStageQueryContext.getSegmentSources(queryContext,
DartControllerContext.DEFAULT_SEGMENT_SOURCE);
+ this.cloneQueryMode = queryContext.getCloneQueryMode();
}
public static DartTableInputSpecSlicer createFromWorkerIds(
final List<String> workerIds,
- final TimelineServerView serverView
+ final TimelineServerView serverView,
+ final QueryContext queryContext
)
{
final Object2IntMap<String> reverseWorkers = new Object2IntOpenHashMap<>();
@@ -89,7 +112,7 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
reverseWorkers.put(WorkerId.fromString(workerIds.get(i)).getHostAndPort(), i);
}
- return new DartTableInputSpecSlicer(reverseWorkers, serverView);
+ return new DartTableInputSpecSlicer(reverseWorkers, serverView,
queryContext);
}
@Override
@@ -116,14 +139,20 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
serverSelector -> findWorkerForServerSelector(serverSelector,
maxNumSlices)
);
- final List<List<DartQueryableSegment>> assignments = new
ArrayList<>(maxNumSlices);
+ final List<DartSegmentAssignment> assignments = new
ArrayList<>(maxNumSlices);
while (assignments.size() < maxNumSlices) {
- assignments.add(null);
+ assignments.add(DartSegmentAssignment.empty());
}
int nextRoundRobinWorker = 0;
+ final Map<DruidServerMetadata, List<DartQueryableSegment>>
serverRequestMap = new HashMap<>();
for (final DartQueryableSegment segment : prunedSegments) {
final int worker;
+ DruidServerMetadata realtimeServer = segment.getRealtimeServer();
+ if (realtimeServer != null) {
+ serverRequestMap.computeIfAbsent(realtimeServer, s -> new
ArrayList<>()).add(segment);
+ continue;
+ }
if (segment.getWorkerNumber() == UNKNOWN) {
// Segment is not available on any worker. Assign to some worker,
round-robin. Today, that server will throw
// an error about the segment not being findable, but perhaps one day,
it will be able to load the segment
@@ -134,11 +163,18 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
worker = segment.getWorkerNumber();
}
- if (assignments.get(worker) == null) {
- assignments.set(worker, new ArrayList<>());
- }
+ assignments.get(worker).addSegments(segment);
+ }
- assignments.get(worker).add(segment);
+ for (Map.Entry<DruidServerMetadata, List<DartQueryableSegment>> entry :
serverRequestMap.entrySet()) {
+ final int worker;
+ DruidServerMetadata server = entry.getKey();
+ worker = nextRoundRobinWorker;
+ nextRoundRobinWorker = (nextRoundRobinWorker + 1) % maxNumSlices;
+ List<RichSegmentDescriptor> descriptors =
serverRequestMap.get(server).stream()
+
.map(DartTableInputSpecSlicer::toRichSegmentDescriptor)
+
.collect(Collectors.toList());
+ assignments.get(worker).addRequest(new
DataServerRequestDescriptor(server, descriptors));
}
return makeSegmentSlices(tableInputSpec.getDataSource(), assignments);
@@ -164,7 +200,7 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
int findWorkerForServerSelector(final ServerSelector serverSelector, final
int maxNumSlices)
{
// Currently, Dart does not support clone query modes, all servers can be
queried.
- final QueryableDruidServer server = serverSelector.pick(null,
CloneQueryMode.INCLUDECLONES);
+ final QueryableDruidServer server = serverSelector.pick(null,
cloneQueryMode);
if (server == null) {
return UNKNOWN;
@@ -186,7 +222,7 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
* Pull the list of {@link DataSegment} that we should query, along with a
clipping interval for each one, and
* a worker to get it from.
*/
- static Set<DartQueryableSegment> findQueryableDataSegments(
+ private Set<DartQueryableSegment> findQueryableDataSegments(
final TableInputSpec tableInputSpec,
final TimelineLookup<?, ServerSelector> timeline,
final ToIntFunction<ServerSelector> toWorkersFunction
@@ -202,9 +238,7 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
.filter(chunk ->
shouldIncludeSegment(chunk.getObject()))
.transform(chunk -> {
final ServerSelector serverSelector =
chunk.getObject();
- final DataSegment dataSegment =
serverSelector.getSegment();
- final int worker =
toWorkersFunction.applyAsInt(serverSelector);
- return new
DartQueryableSegment(dataSegment, holder.getInterval(), worker);
+ return
toDartQueryableSegment(serverSelector, holder.getInterval(), toWorkersFunction);
})
.filter(segment ->
!segment.getSegment().isTombstone())
);
@@ -218,6 +252,27 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
);
}
+ private DartQueryableSegment toDartQueryableSegment(
+ ServerSelector serverSelector,
+ Interval interval,
+ ToIntFunction<ServerSelector> toWorkersFunction
+ )
+ {
+ final DataSegment dataSegment = serverSelector.getSegment();
+ if (serverSelector.isRealtimeSegment()) {
+ final Set<DruidServerMetadata> servers =
+ serverSelector.getAllServers(cloneQueryMode)
+ .stream()
+ .filter(druidServerMetadata ->
segmentSource.getUsedServerTypes()
+
.contains(druidServerMetadata.getType()))
+ .collect(Collectors.toSet());
+ return new DartQueryableSegment(dataSegment, interval, -1,
DataServerSelector.RANDOM.getSelectServerFunction().apply(servers));
+ } else {
+ final int worker = toWorkersFunction.applyAsInt(serverSelector);
+ return new DartQueryableSegment(dataSegment, interval, worker, null);
+ }
+ }
+
/**
* Create a list of {@link SegmentsInputSlice} and {@link NilInputSlice}
assignments.
*
@@ -228,27 +283,26 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
*
* @throws IllegalStateException if any provided segments do not match the
provided datasource
*/
- static List<InputSlice> makeSegmentSlices(
+ private List<InputSlice> makeSegmentSlices(
final String dataSource,
- final List<List<DartQueryableSegment>> assignments
+ final List<DartSegmentAssignment> assignments
)
{
final List<InputSlice> retVal = new ArrayList<>(assignments.size());
- for (final List<DartQueryableSegment> assignment : assignments) {
+ for (final DartSegmentAssignment assignment : assignments) {
if (assignment == null || assignment.isEmpty()) {
retVal.add(NilInputSlice.INSTANCE);
} else {
final List<RichSegmentDescriptor> descriptors = new ArrayList<>();
- for (final DartQueryableSegment segment : assignment) {
+ for (DartQueryableSegment segment :
assignment.getDartQueryableSegments()) {
if (!dataSource.equals(segment.getSegment().getDataSource())) {
throw new ISE("Expected dataSource[%s] but got[%s]", dataSource,
segment.getSegment().getDataSource());
}
-
descriptors.add(toRichSegmentDescriptor(segment));
}
-
- retVal.add(new SegmentsInputSlice(dataSource, descriptors,
ImmutableList.of()));
+ final List<DataServerRequestDescriptor> queryableDruidServers =
assignment.getDataServerRequestDescriptor();
+ retVal.add(new SegmentsInputSlice(dataSource, descriptors,
queryableDruidServers));
}
}
@@ -269,27 +323,17 @@ public class DartTableInputSpecSlicer implements
InputSpecSlicer
}
/**
- * Whether to include a segment from the timeline. Segments are included if
they are not tombstones, and are also not
- * purely realtime segments.
+ * Whether to include a segment from the timeline. Segments are included if
they are not tombstones, and for realtime
+ * segments, are only included based on the segmentSource.
*/
- static boolean shouldIncludeSegment(final ServerSelector serverSelector)
+ private boolean shouldIncludeSegment(final ServerSelector serverSelector)
{
if (serverSelector.getSegment().isTombstone()) {
return false;
}
-
- int numRealtimeServers = 0;
- int numOtherServers = 0;
-
- // Currently, Dart does not support clone query modes, all servers can be
queried.
- for (final DruidServerMetadata server :
serverSelector.getAllServers(CloneQueryMode.INCLUDECLONES)) {
- if
(SegmentSource.REALTIME.getUsedServerTypes().contains(server.getType())) {
- numRealtimeServers++;
- } else {
- numOtherServers++;
- }
+ if (serverSelector.isRealtimeSegment()) {
+ return SegmentSource.shouldQueryRealtimeServers(segmentSource);
}
-
- return numOtherServers > 0 || (numOtherServers + numRealtimeServers == 0);
+ return true;
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index a40c46f52ff..080b93034fb 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -172,33 +172,29 @@ public class DartQueryMaker implements QueryMaker
private ControllerImpl makeLegacyController(LegacyMSQSpec querySpec,
QueryContext context, ResultsContext resultsContext)
{
final String dartQueryId =
context.getString(QueryContexts.CTX_DART_QUERY_ID);
- final ControllerContext controllerContext =
controllerContextFactory.newContext(dartQueryId);
+ final ControllerContext controllerContext =
controllerContextFactory.newContext(context);
- final ControllerImpl controller = new ControllerImpl(
+ return new ControllerImpl(
dartQueryId,
querySpec,
resultsContext,
controllerContext,
queryKitSpecFactory
-
);
- return controller;
}
private ControllerImpl makeQueryDefController(QueryDefMSQSpec querySpec,
QueryContext context, ResultsContext resultsContext)
{
final String dartQueryId =
context.getString(QueryContexts.CTX_DART_QUERY_ID);
- final ControllerContext controllerContext =
controllerContextFactory.newContext(dartQueryId);
-
+ final ControllerContext controllerContext =
controllerContextFactory.newContext(context);
- final ControllerImpl controller = new ControllerImpl(
+ return new ControllerImpl(
dartQueryId,
querySpec,
resultsContext,
controllerContext,
queryKitSpecFactory
);
- return controller;
}
public QueryResponse<Object[]> runLegacyMSQSpec(LegacyMSQSpec querySpec,
QueryContext context, ResultsContext resultsContext)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
index 130a639396e..7f2f928e6f0 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartFrameContext.java
@@ -57,6 +57,7 @@ public class DartFrameContext implements FrameContext
private final ResourceHolder<ProcessingBuffers> processingBuffers;
private final WorkerMemoryParameters memoryParameters;
private final WorkerStorageParameters storageParameters;
+ private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
public DartFrameContext(
final StageId stageId,
@@ -66,7 +67,8 @@ public class DartFrameContext implements FrameContext
final DataSegmentProvider dataSegmentProvider,
@Nullable ResourceHolder<ProcessingBuffers> processingBuffers,
final WorkerMemoryParameters memoryParameters,
- final WorkerStorageParameters storageParameters
+ final WorkerStorageParameters storageParameters,
+ final DataServerQueryHandlerFactory dataServerQueryHandlerFactory
)
{
this.stageId = stageId;
@@ -77,6 +79,7 @@ public class DartFrameContext implements FrameContext
this.processingBuffers = processingBuffers;
this.memoryParameters = memoryParameters;
this.storageParameters = storageParameters;
+ this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
}
@Override
@@ -170,9 +173,7 @@ public class DartFrameContext implements FrameContext
@Override
public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
{
- // We don't query data servers. This factory won't actually be used,
because Dart doesn't allow segmentSource to be
- // overridden; it always uses SegmentSource.NONE. (If it is called, some
wires got crossed somewhere.)
- return null;
+ return dataServerQueryHandlerFactory;
}
@Override
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartQueryableSegment.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartQueryableSegment.java
index 574601517b4..20cccc55b98 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartQueryableSegment.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartQueryableSegment.java
@@ -20,9 +20,11 @@
package org.apache.druid.msq.dart.worker;
import com.google.common.base.Preconditions;
+import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
+import javax.annotation.Nullable;
import java.util.Objects;
/**
@@ -33,12 +35,20 @@ public class DartQueryableSegment
private final DataSegment segment;
private final Interval interval;
private final int workerNumber;
+ @Nullable
+ private final DruidServerMetadata realtimeServer;
- public DartQueryableSegment(final DataSegment segment, final Interval
interval, final int workerNumber)
+ public DartQueryableSegment(
+ final DataSegment segment,
+ final Interval interval,
+ final int workerNumber,
+ @Nullable DruidServerMetadata realtimeServer
+ )
{
this.segment = Preconditions.checkNotNull(segment, "segment");
this.interval = Preconditions.checkNotNull(interval, "interval");
this.workerNumber = workerNumber;
+ this.realtimeServer = realtimeServer;
}
public DataSegment getSegment()
@@ -56,6 +66,12 @@ public class DartQueryableSegment
return workerNumber;
}
+ @Nullable
+ public DruidServerMetadata getRealtimeServer()
+ {
+ return realtimeServer;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -68,22 +84,15 @@ public class DartQueryableSegment
DartQueryableSegment that = (DartQueryableSegment) o;
return workerNumber == that.workerNumber
&& Objects.equals(segment, that.segment)
- && Objects.equals(interval, that.interval);
+ && Objects.equals(interval, that.interval)
+ && Objects.equals(realtimeServer, that.realtimeServer);
}
@Override
public int hashCode()
{
- return Objects.hash(segment, interval, workerNumber);
+ return Objects.hash(segment, interval, workerNumber, realtimeServer);
}
- @Override
- public String toString()
- {
- return "QueryableDataSegment{" +
- "segment=" + segment +
- ", interval=" + interval +
- ", workerNumber=" + workerNumber +
- '}';
- }
+
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
index 530a4ec3e2e..947b85c9efb 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
@@ -80,6 +80,7 @@ public class DartWorkerContext implements WorkerContext
*/
@MonotonicNonNull
private volatile ResourceHolder<ProcessingBuffersSet> processingBuffersSet;
+ private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
DartWorkerContext(
final String queryId,
@@ -97,11 +98,13 @@ public class DartWorkerContext implements WorkerContext
final ProcessingBuffersProvider processingBuffersProvider,
final Outbox<ControllerMessage> outbox,
final File tempDir,
- final QueryContext queryContext
+ final QueryContext queryContext,
+ final DataServerQueryHandlerFactory dataServerQueryHandlerFactory
)
{
this.queryId = queryId;
this.controllerHost = controllerHost;
+ this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
this.workerId = WorkerId.fromDruidNode(selfNode, queryId);
this.selfNode = selfNode;
this.jsonMapper = jsonMapper;
@@ -223,7 +226,8 @@ public class DartWorkerContext implements WorkerContext
dataSegmentProvider,
processingBuffersSet.get().acquireForStage(workOrder.getStageDefinition()),
memoryParameters,
- storageParameters
+ storageParameters,
+ dataServerQueryHandlerFactory
);
}
@@ -236,9 +240,7 @@ public class DartWorkerContext implements WorkerContext
@Override
public DataServerQueryHandlerFactory dataServerQueryHandlerFactory()
{
- // We don't query data servers. Return null so this factory is ignored
when the main worker code tries
- // to close it.
- return null;
+ return dataServerQueryHandlerFactory;
}
@Override
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java
index 06b9226bc37..ad6328234e1 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerFactoryImpl.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.dart.worker;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.Injector;
+import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Self;
@@ -29,6 +30,7 @@ import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.messages.server.Outbox;
import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
+import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.exec.ProcessingBuffersProvider;
import org.apache.druid.msq.exec.Worker;
@@ -37,6 +39,7 @@ import org.apache.druid.msq.exec.WorkerImpl;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContext;
+import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.rpc.ServiceClientFactory;
@@ -63,6 +66,8 @@ public class DartWorkerFactoryImpl implements
DartWorkerFactory
private final MemoryIntrospector memoryIntrospector;
private final ProcessingBuffersProvider processingBuffersProvider;
private final Outbox<ControllerMessage> outbox;
+ private final CoordinatorClient coordinatorClient;
+ private final QueryToolChestWarehouse warehouse;
@Inject
public DartWorkerFactoryImpl(
@@ -78,7 +83,9 @@ public class DartWorkerFactoryImpl implements
DartWorkerFactory
@Dart DataSegmentProvider dataSegmentProvider,
MemoryIntrospector memoryIntrospector,
@Dart ProcessingBuffersProvider processingBuffersProvider,
- Outbox<ControllerMessage> outbox
+ Outbox<ControllerMessage> outbox,
+ CoordinatorClient coordinatorClient,
+ QueryToolChestWarehouse warehouse
)
{
this.selfNode = selfNode;
@@ -94,6 +101,8 @@ public class DartWorkerFactoryImpl implements
DartWorkerFactory
this.memoryIntrospector = memoryIntrospector;
this.processingBuffersProvider = processingBuffersProvider;
this.outbox = outbox;
+ this.coordinatorClient = coordinatorClient;
+ this.warehouse = warehouse;
}
@Override
@@ -115,7 +124,13 @@ public class DartWorkerFactoryImpl implements
DartWorkerFactory
processingBuffersProvider,
outbox,
tempDir,
- queryContext
+ queryContext,
+ new DataServerQueryHandlerFactory(
+ coordinatorClient,
+ serviceClientFactory,
+ jsonMapper,
+ warehouse
+ )
);
return new WorkerImpl(null, workerContext);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index edf4419b096..3732dbcc010 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -381,7 +381,7 @@ public class ControllerImpl implements Controller
// Planning-related: convert the native query from MSQSpec into a
multi-stage QueryDefinition.
this.queryStartTime = DateTimes.nowUtc();
context.registerController(this, closer);
- queryDef = initializeQueryDefAndState(closer);
+ queryDef = initializeQueryDefAndState();
this.netClient = closer.register(new
ExceptionWrappingWorkerClient(context.newWorkerClient()));
this.workerSketchFetcher = new WorkerSketchFetcher(
@@ -657,7 +657,7 @@ public class ControllerImpl implements Controller
}
}
- private QueryDefinition initializeQueryDefAndState(final Closer closer)
+ private QueryDefinition initializeQueryDefAndState()
{
this.selfDruidNode = context.selfNode();
this.queryKernelConfig = context.queryKernelConfig(queryId, querySpec);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
index 920bceb952b..04458d70170 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
@@ -39,7 +39,6 @@ import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.msq.input.table.DataServerSelector;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
-import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
@@ -183,9 +182,7 @@ public class DataServerQueryHandler
}
pendingRequests = createNextPendingRequests(
- missingRichSegmentDescriptors,
- MultiStageQueryContext.getSegmentSources(query.context()),
- DataServerSelector.RANDOM
+ missingRichSegmentDescriptors
);
if (!pendingRequests.isEmpty()) {
@@ -270,9 +267,7 @@ public class DataServerQueryHandler
}
private List<DataServerRequestDescriptor> createNextPendingRequests(
- final Set<RichSegmentDescriptor> richSegmentDescriptors,
- final SegmentSource includeSegmentSource,
- final DataServerSelector dataServerSelector
+ final Set<RichSegmentDescriptor> richSegmentDescriptors
)
{
final Map<DruidServerMetadata, Set<RichSegmentDescriptor>>
serverVsSegmentsMap = new HashMap<>();
@@ -300,16 +295,17 @@ public class DataServerQueryHandler
if
(segmentLoadInfo.getSegment().toDescriptor().equals(segmentDescriptorWithFullInterval))
{
Set<DruidServerMetadata> servers = segmentLoadInfo.getServers()
.stream()
-
.filter(druidServerMetadata -> includeSegmentSource.getUsedServerTypes()
-
.contains(druidServerMetadata.getType()))
+
.filter(druidServerMetadata -> SegmentSource.REALTIME.getUsedServerTypes()
+
.contains(druidServerMetadata.getType()))
.collect(Collectors.toSet());
if (servers.isEmpty()) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build("Could not find a server matching
includeSegmentSource[%s] for segment[%s]. Only found servers [%s]",
includeSegmentSource, richSegmentDescriptor, servers);
+ .build("Could not find a server matching
includeSegmentSource[%s] for segment[%s]. Only found servers [%s]",
+ SegmentSource.REALTIME,
richSegmentDescriptor, servers);
}
- DruidServerMetadata druidServerMetadata =
dataServerSelector.getSelectServerFunction().apply(servers);
+ DruidServerMetadata druidServerMetadata =
DataServerSelector.RANDOM.getSelectServerFunction().apply(servers);
serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored ->
new HashSet<>());
SegmentDescriptor descriptor =
segmentLoadInfo.getSegment().toDescriptor();
serverVsSegmentsMap.get(druidServerMetadata)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentSource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentSource.java
index 22f3a5df973..54ac532cc24 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentSource.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SegmentSource.java
@@ -25,22 +25,23 @@ import org.apache.druid.server.coordination.ServerType;
import java.util.Set;
/**
- * Decides the types of data servers contacted by MSQ tasks to fetch results.
+ * Decides the types of data servers contacted by MSQ queries to fetch results.
*/
public enum SegmentSource
{
/**
- * Include only segments from deep storage.
+ * Do not include any other segments.
*/
NONE(ImmutableSet.of()),
+
/**
- * Include segments from realtime tasks as well as segments from deep
storage.
+ * Include segments from realtime tasks as well.
*/
REALTIME(ImmutableSet.of(ServerType.REALTIME, ServerType.INDEXER_EXECUTOR));
/**
- * The type of dataservers (if any) to include. This does not include
segments queried from deep storage, which are
- * always included in queries.
+ * The type of dataservers (if any) to include. This does not include
segments queried from deep storage, for MSQ Task
+ * and segments loaded on historicals for MSQ Dart.
*/
private final Set<ServerType> usedServerTypes;
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
index 50cbe781dc7..c7aa6c1c1a0 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
@@ -101,6 +101,9 @@ public interface WorkerContext
*/
DruidNode selfNode();
+ /**
+ * Returns the factory for {@link DataServerQueryHandler} from the context.
Used to query realtime tasks.
+ */
DataServerQueryHandlerFactory dataServerQueryHandlerFactory();
/**
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 1e55bf15f20..667335ad867 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -72,6 +72,7 @@ import java.util.concurrent.TimeUnit;
public class IndexerControllerContext implements ControllerContext
{
public static final int DEFAULT_MAX_CONCURRENT_STAGES = 1;
+ public static final SegmentSource DEFAULT_SEGMENT_SOURCE =
SegmentSource.NONE;
private static final Logger log = new Logger(IndexerControllerContext.class);
@@ -172,7 +173,7 @@ public class IndexerControllerContext implements
ControllerContext
public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager
workerManager)
{
final SegmentSource includeSegmentSource =
- MultiStageQueryContext.getSegmentSources(taskQuerySpecContext);
+ MultiStageQueryContext.getSegmentSources(taskQuerySpecContext,
DEFAULT_SEGMENT_SOURCE);
return new IndexerTableInputSpecSlicer(
toolbox.getCoordinatorClient(),
toolbox.getTaskActionClient(),
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
index 2e3823b3af4..d76264621d5 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/DartQueryKitSpecFactory.java
@@ -52,7 +52,7 @@ public class DartQueryKitSpecFactory implements
QueryKitSpecFactory
return new QueryKitSpec(
queryKit,
queryId,
- getNumHistoricals(),
+ getNumWorkers(),
queryContext.getInt(
DartControllerContext.CTX_MAX_NON_LEAF_WORKER_COUNT,
DartControllerContext.DEFAULT_MAX_NON_LEAF_WORKER_COUNT
@@ -64,7 +64,7 @@ public class DartQueryKitSpecFactory implements
QueryKitSpecFactory
);
}
- private int getNumHistoricals()
+ private int getNumWorkers()
{
int cnt = 0;
for (DruidServerMetadata s : serverView.getDruidServerMetadatas()) {
@@ -72,6 +72,8 @@ public class DartQueryKitSpecFactory implements
QueryKitSpecFactory
cnt++;
}
}
- return cnt;
+
+ // Even if all segments are realtime, launch at least one worker.
+ return Math.max(1, cnt);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
index 838c438cbf4..2e9e67a7534 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MSQTaskQueryMakerUtils.java
@@ -24,6 +24,7 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.indexing.IndexerControllerContext;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
@@ -100,7 +101,7 @@ public class MSQTaskQueryMakerUtils
*/
public static void validateRealtimeReindex(QueryContext context,
MSQDestination destination, Query<?> query)
{
- final SegmentSource segmentSources =
MultiStageQueryContext.getSegmentSources(context);
+ final SegmentSource segmentSources =
MultiStageQueryContext.getSegmentSources(context,
IndexerControllerContext.DEFAULT_SEGMENT_SOURCE);
if (MSQControllerTask.isReplaceInputDataSourceTask(query, destination) &&
SegmentSource.REALTIME.equals(segmentSources)) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 93d79a7b3f1..3ced82b1756 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -122,7 +122,6 @@ public class MultiStageQueryContext
private static final boolean DEFAULT_FINALIZE_AGGREGATIONS = true;
public static final String CTX_INCLUDE_SEGMENT_SOURCE =
"includeSegmentSource";
- public static final SegmentSource DEFAULT_INCLUDE_SEGMENT_SOURCE =
SegmentSource.NONE;
public static final String CTX_MAX_CONCURRENT_STAGES = "maxConcurrentStages";
public static final String CTX_DURABLE_SHUFFLE_STORAGE =
"durableShuffleStorage";
@@ -320,12 +319,12 @@ public class MultiStageQueryContext
);
}
- public static SegmentSource getSegmentSources(final QueryContext
queryContext)
+ public static SegmentSource getSegmentSources(final QueryContext
queryContext, final SegmentSource defaultSource)
{
return queryContext.getEnum(
CTX_INCLUDE_SEGMENT_SOURCE,
SegmentSource.class,
- DEFAULT_INCLUDE_SEGMENT_SOURCE
+ defaultSource
);
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
index da23ead30f8..be90b20bae9 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
@@ -101,7 +101,7 @@ public class DartControllerContextTest
public void test_queryKernelConfig()
{
final DartControllerContext controllerContext =
- new DartControllerContext(null, null, SELF_NODE, null,
memoryIntrospector, serverView, null);
+ new DartControllerContext(null, null, SELF_NODE, null,
memoryIntrospector, serverView, null, null);
final ControllerQueryKernelConfig queryKernelConfig =
controllerContext.queryKernelConfig(QUERY_ID, querySpec);
Assertions.assertFalse(queryKernelConfig.isFaultTolerant());
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
index a65c1252c6e..528b2facfa2 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
@@ -34,11 +34,15 @@ import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.dart.worker.WorkerId;
+import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.NilInputSlice;
+import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.segment.column.ColumnType;
@@ -157,7 +161,7 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
);
/**
- * Segment that should be ignored (for now) because it's realtime-only.
+ * Segment that's realtime-only.
*/
private static final DataSegment SEGMENT5 = new DataSegment(
DATASOURCE,
@@ -172,6 +176,22 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
BYTES_PER_SEGMENT
);
+ /**
+ * Segment that's realtime and located at the same host as segment 5
+ */
+ private static final DataSegment SEGMENT6 = new DataSegment(
+ DATASOURCE,
+ Intervals.of("2004/2005"),
+ "1",
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ new NumberedShardSpec(0, 1),
+ null,
+ null,
+ BYTES_PER_SEGMENT
+ );
+
/**
* Mapping of segment to servers (indexes in {@link #SERVERS}).
*/
@@ -182,6 +202,7 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
.put(SEGMENT3, IntLists.emptyList())
.put(SEGMENT4, IntList.of(1))
.put(SEGMENT5, IntList.of(2))
+ .put(SEGMENT6, IntList.of(2))
.build();
private AutoCloseable mockCloser;
@@ -206,7 +227,7 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
void setUp()
{
mockCloser = MockitoAnnotations.openMocks(this);
- slicer = DartTableInputSpecSlicer.createFromWorkerIds(WORKER_IDS,
serverView);
+ slicer = DartTableInputSpecSlicer.createFromWorkerIds(WORKER_IDS,
serverView, QueryContext.empty());
// Add all segments to the timeline, round-robin across the two servers.
timeline = new VersionedIntervalTimeline<>(Ordering.natural());
@@ -296,7 +317,25 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
SEGMENT3.getShardSpec().getPartitionNum()
)
),
- ImmutableList.of()
+ ImmutableList.of(
+ new DataServerRequestDescriptor(
+ SERVERS.get(2),
+ ImmutableList.of(
+ new RichSegmentDescriptor(
+ SEGMENT5.getInterval(),
+ SEGMENT5.getInterval(),
+ SEGMENT5.getVersion(),
+ SEGMENT5.getShardSpec().getPartitionNum()
+ ),
+ new RichSegmentDescriptor(
+ SEGMENT6.getInterval(),
+ SEGMENT6.getInterval(),
+ SEGMENT6.getVersion(),
+ SEGMENT6.getShardSpec().getPartitionNum()
+ )
+ )
+ )
+ )
)
),
inputSlices
@@ -340,7 +379,25 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
SEGMENT2.getShardSpec().getPartitionNum()
)
),
- ImmutableList.of()
+ ImmutableList.of(
+ new DataServerRequestDescriptor(
+ SERVERS.get(2),
+ ImmutableList.of(
+ new RichSegmentDescriptor(
+ SEGMENT5.getInterval(),
+ SEGMENT5.getInterval(),
+ SEGMENT5.getVersion(),
+ SEGMENT5.getShardSpec().getPartitionNum()
+ ),
+ new RichSegmentDescriptor(
+ SEGMENT6.getInterval(),
+ SEGMENT6.getInterval(),
+ SEGMENT6.getVersion(),
+ SEGMENT6.getShardSpec().getPartitionNum()
+ )
+ )
+ )
+ )
)
),
inputSlices
@@ -384,7 +441,25 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
SEGMENT2.getShardSpec().getPartitionNum()
)
),
- ImmutableList.of()
+ ImmutableList.of(
+ new DataServerRequestDescriptor(
+ SERVERS.get(2),
+ ImmutableList.of(
+ new RichSegmentDescriptor(
+ SEGMENT5.getInterval(),
+ SEGMENT5.getInterval(),
+ SEGMENT5.getVersion(),
+ SEGMENT5.getShardSpec().getPartitionNum()
+ ),
+ new RichSegmentDescriptor(
+ SEGMENT6.getInterval(),
+ SEGMENT6.getInterval(),
+ SEGMENT6.getVersion(),
+ SEGMENT6.getShardSpec().getPartitionNum()
+ )
+ )
+ )
+ )
),
NilInputSlice.INSTANCE
),
@@ -437,7 +512,29 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
),
ImmutableList.of()
),
- NilInputSlice.INSTANCE
+ new SegmentsInputSlice(
+ DATASOURCE,
+ ImmutableList.of(),
+ ImmutableList.of(
+ new DataServerRequestDescriptor(
+ SERVERS.get(2),
+ ImmutableList.of(
+ new RichSegmentDescriptor(
+ SEGMENT5.getInterval(),
+ SEGMENT5.getInterval(),
+ SEGMENT5.getVersion(),
+ SEGMENT5.getShardSpec().getPartitionNum()
+ ),
+ new RichSegmentDescriptor(
+ SEGMENT6.getInterval(),
+ SEGMENT6.getInterval(),
+ SEGMENT6.getVersion(),
+ SEGMENT6.getShardSpec().getPartitionNum()
+ )
+ )
+ )
+ )
+ )
),
inputSlices
);
@@ -487,4 +584,52 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
inputSlices
);
}
+
+ @Test
+ void test_withoutRealtime_twoSlices()
+ {
+ final QueryContext queryContext =
QueryContext.of(Map.of(MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE,
SegmentSource.NONE.toString()));
+ slicer = DartTableInputSpecSlicer.createFromWorkerIds(WORKER_IDS,
serverView, queryContext);
+
+ // When 2 slices are requested, we assign segments to the servers that
have those segments.
+
+ final TableInputSpec inputSpec = new TableInputSpec(DATASOURCE, null,
null, null);
+ final List<InputSlice> inputSlices = slicer.sliceStatic(inputSpec, 2);
+ // Expect segment 2 and then the realtime segments 5 and 6 to be assigned
round-robin.
+ Assertions.assertEquals(
+ ImmutableList.of(
+ new SegmentsInputSlice(
+ DATASOURCE,
+ ImmutableList.of(
+ new RichSegmentDescriptor(
+ SEGMENT1.getInterval(),
+ SEGMENT1.getInterval(),
+ SEGMENT1.getVersion(),
+ SEGMENT1.getShardSpec().getPartitionNum()
+ ),
+ new RichSegmentDescriptor(
+ SEGMENT3.getInterval(),
+ SEGMENT3.getInterval(),
+ SEGMENT3.getVersion(),
+ SEGMENT3.getShardSpec().getPartitionNum()
+ )
+ ),
+ ImmutableList.of()
+ ),
+ new SegmentsInputSlice(
+ DATASOURCE,
+ ImmutableList.of(
+ new RichSegmentDescriptor(
+ SEGMENT2.getInterval(),
+ SEGMENT2.getInterval(),
+ SEGMENT2.getVersion(),
+ SEGMENT2.getShardSpec().getPartitionNum()
+ )
+ ),
+ ImmutableList.of()
+ )
+ ),
+ inputSlices
+ );
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
index cac156e071e..b2dd1a974f8 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
@@ -21,6 +21,8 @@ package org.apache.druid.msq.test;
import com.google.inject.Binder;
import com.google.inject.Provides;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.guice.LazySingleton;
@@ -121,6 +123,7 @@ public class DartComponentSupplier extends
AbstractMSQComponentSupplierDelegate
@Override
public void configure(Binder binder)
{
+ binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class);
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index a2600999d90..626ce3c1f43 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -48,6 +48,7 @@ import
org.apache.druid.msq.dart.controller.DartControllerContextFactory;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.ControllerMemoryParameters;
+import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerClient;
import org.apache.druid.msq.exec.WorkerFailureListener;
@@ -324,7 +325,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
return new IndexerTableInputSpecSlicer(
coordinatorClient,
taskActionClient,
- MultiStageQueryContext.getSegmentSources(queryContext)
+ MultiStageQueryContext.getSegmentSources(queryContext,
SegmentSource.NONE)
);
}
@@ -371,7 +372,7 @@ public class MSQTestControllerContext implements
ControllerContext, DartControll
}
@Override
- public ControllerContext newContext(String queryId)
+ public ControllerContext newContext(QueryContext context)
{
return this;
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
index a002b277973..95b1a7fc97d 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
@@ -43,6 +43,7 @@ import org.apache.druid.msq.exec.WorkerImpl;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.WorkOrder;
+import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;
@@ -76,7 +77,7 @@ public class TestDartControllerContextFactoryImpl extends
DartControllerContextF
}
@Override
- public ControllerContext newContext(String queryId)
+ public ControllerContext newContext(QueryContext context)
{
return new DartControllerContext(
injector,
@@ -85,7 +86,8 @@ public class TestDartControllerContextFactoryImpl extends
DartControllerContextF
new DartTestWorkerClient(),
memoryIntrospector,
serverView,
- emitter
+ emitter,
+ context
)
{
@Override
diff --git
a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
index 1d42ff3dc9e..4a26c0ed6fa 100644
--- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
+++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java
@@ -229,4 +229,14 @@ public class ServerSelector implements
Overshadowable<ServerSelector>
{
return segment.get().hasData();
}
+
+ /**
+ * Checks if the segment is currently served by a realtime server, and is
not served by a historical.
+ */
+ public boolean isRealtimeSegment()
+ {
+ synchronized (this) {
+ return (!realtimeServers.isEmpty()) && historicalServers.isEmpty();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]