This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0a3ad942030 feat: Add counters, remove warehouse from
DataServerQueryHandlers (#19196)
0a3ad942030 is described below
commit 0a3ad942030090b8fe5680ee8bda6d4fa103c7e6
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Mar 23 12:30:33 2026 -0700
feat: Add counters, remove warehouse from DataServerQueryHandlers (#19196)
Two changes to MSQ DataServerQueryHandlers:
1) Add "queries" and "totalQueries" counters, reflecting queries made
to realtime servers to get realtime data.
2) Remove dependency on QueryToolChestWarehouse and QueryToolChest.
When mapping functions from these are needed, the query logic passes
it in rather than it being derived from the toolchest.
Web console changes are included for (1). Realtime query counters are
shown in an ↙ icon next to VSF counters.
---
.../testing/embedded/msq/EmbeddedMSQApis.java | 52 +++++++++++++++
.../embedded/msq/EmbeddedMSQRealtimeQueryTest.java | 7 +-
.../apache/druid/msq/counters/ChannelCounters.java | 78 +++++++++++++++++++++-
.../druid/msq/dart/guice/DartWorkerModule.java | 7 +-
.../dart/worker/DartDataServerQueryHandler.java | 35 +++++-----
.../worker/DartDataServerQueryHandlerFactory.java | 7 +-
.../druid/msq/exec/DataServerQueryHandler.java | 8 +--
.../indexing/IndexerDataServerQueryHandler.java | 41 ++++++------
.../IndexerDataServerQueryHandlerFactory.java | 7 +-
.../druid/msq/indexing/IndexerWorkerContext.java | 5 +-
.../msq/input/table/SegmentsInputSliceReader.java | 1 +
.../groupby/GroupByPreShuffleFrameProcessor.java | 21 +++++-
.../groupby/GroupByPreShuffleStageProcessor.java | 6 ++
.../msq/querykit/scan/ScanQueryFrameProcessor.java | 6 ++
.../druid/msq/exec/MSQCompactionTaskRunTest.java | 2 +
.../druid/msq/exec/MSQLoadedSegmentTests.java | 10 +--
.../IndexerDataServerQueryHandlerTest.java | 17 ++---
.../sql/resources/SqlStatementResourceTest.java | 4 +-
.../druid/msq/test/CalciteMSQTestsHelper.java | 2 +
web-console/src/druid-models/stages/stages.spec.ts | 50 ++++++++++++++
web-console/src/druid-models/stages/stages.ts | 10 ++-
.../execution-stages-pane.tsx | 26 ++++++++
22 files changed, 314 insertions(+), 88 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
index 115d1c22f16..e58ad81b465 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
@@ -26,6 +26,9 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.counters.CounterSnapshots;
+import org.apache.druid.msq.counters.QueryCounterSnapshot;
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
@@ -42,7 +45,10 @@ import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -236,6 +242,52 @@ public class EmbeddedMSQApis
return cluster.callApi().onTargetBroker(targetBroker, b ->
b.cancelSqlQuery(sqlQueryId));
}
+ /**
+ * Returns the sum of completed queries across all input channel snapshots
from all stages and workers.
+ */
+ public long getQueriesSum(final MSQTaskReportPayload payload)
+ {
+ return getAllInputChannelCounters(payload)
+ .stream()
+ .filter(snapshot -> snapshot.getQueries() != null)
+ .mapToLong(snapshot -> Arrays.stream(snapshot.getQueries()).sum())
+ .sum();
+ }
+
+ /**
+ * Returns the sum of files read across all input channel snapshots from all
stages and workers.
+ */
+ public long getFilesSum(final MSQTaskReportPayload payload)
+ {
+ return getAllInputChannelCounters(payload)
+ .stream()
+ .filter(snapshot -> snapshot.getFiles() != null)
+ .mapToLong(snapshot -> Arrays.stream(snapshot.getFiles()).sum())
+ .sum();
+ }
+
+ /**
+ * Returns all {@link ChannelCounters.Snapshot} from input channels across
all stages and workers.
+ */
+ private List<ChannelCounters.Snapshot> getAllInputChannelCounters(final
MSQTaskReportPayload payload)
+ {
+ final Map<Integer, Map<Integer, CounterSnapshots>> countersMap =
payload.getCounters().copyMap();
+ final List<ChannelCounters.Snapshot> snapshots = new ArrayList<>();
+
+ for (final Map.Entry<Integer, Map<Integer, CounterSnapshots>> stageEntry :
countersMap.entrySet()) {
+ for (final Map.Entry<Integer, CounterSnapshots> workerEntry :
stageEntry.getValue().entrySet()) {
+ for (final Map.Entry<String, QueryCounterSnapshot> counterEntry :
workerEntry.getValue().getMap().entrySet()) {
+ if (counterEntry.getKey().startsWith("input")
+ && counterEntry.getValue() instanceof ChannelCounters.Snapshot) {
+ snapshots.add((ChannelCounters.Snapshot) counterEntry.getValue());
+ }
+ }
+ }
+ }
+
+ return snapshots;
+ }
+
private static GetQueryReportResponse parseReportResponse(String
responseJson, ObjectMapper jsonMapper)
{
try {
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
index 9031ac12705..a651ccf03ad 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.EmbeddedRouter;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -187,6 +188,10 @@ public class EmbeddedMSQRealtimeQueryTest extends
BaseRealtimeQueryTest
Collections.singletonList(new Object[]{totalRows}),
payload.getResults().getResults()
);
+
+ // Verify that realtime queries were issued and no files were read (all
data is realtime).
+ MatcherAssert.assertThat(msqApis.getQueriesSum(payload),
Matchers.greaterThan(0L));
+ Assertions.assertEquals(0, msqApis.getFilesSum(payload));
}
@Test
@@ -370,7 +375,7 @@ public class EmbeddedMSQRealtimeQueryTest extends
BaseRealtimeQueryTest
@Test
@Timeout(60)
- public void test_selectJoinwithLookup_dart()
+ public void test_selectJoinWithLookup_dart()
{
final String sql = StringUtils.format(
"SELECT \n"
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
index d7ae0a47924..975cd50aaf0 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
@@ -27,6 +27,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.longs.LongList;
import org.apache.druid.frame.Frame;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.segment.loading.AcquireSegmentResult;
@@ -74,6 +75,12 @@ public class ChannelCounters implements QueryCounter
@GuardedBy("this")
private final LongList loadWait = new LongArrayList();
+ @GuardedBy("this")
+ private final LongList queries = new LongArrayList();
+
+ @GuardedBy("this")
+ private final LongList totalQueries = new LongArrayList();
+
public void incrementRowCount()
{
incrementRowCount(NO_PARTITION);
@@ -132,6 +139,29 @@ public class ChannelCounters implements QueryCounter
}
}
+ /**
+ * Increment the number of queries made via {@link DataServerQueryHandler}.
+ */
+ public void incrementQueries()
+ {
+ synchronized (this) {
+ ensureCapacityForPartition(NO_PARTITION);
+ queries.set(NO_PARTITION, queries.getLong(NO_PARTITION) + 1);
+ }
+ }
+
+ /**
+ * Total expected number of queries to be made via {@link
DataServerQueryHandler}.
+ */
+ public ChannelCounters addTotalQueries(final long n)
+ {
+ synchronized (this) {
+ ensureCapacityForPartition(NO_PARTITION);
+ totalQueries.set(NO_PARTITION, totalQueries.getLong(NO_PARTITION) + n);
+ return this;
+ }
+ }
+
private void add(
final int partitionNumber,
final long nRows,
@@ -188,6 +218,14 @@ public class ChannelCounters implements QueryCounter
while (partitionNumber >= totalFiles.size()) {
totalFiles.add(0);
}
+
+ while (partitionNumber >= queries.size()) {
+ queries.add(0);
+ }
+
+ while (partitionNumber >= totalQueries.size()) {
+ totalQueries.add(0);
+ }
}
@GuardedBy("this")
@@ -223,6 +261,8 @@ public class ChannelCounters implements QueryCounter
final long[] loadTimeArray;
final long[] loadWaitArray;
final long[] loadFilesArray;
+ final long[] queriesArray;
+ final long[] totalQueriesArray;
synchronized (this) {
rowsArray = listToArray(rows);
@@ -234,6 +274,8 @@ public class ChannelCounters implements QueryCounter
loadTimeArray = listToArray(loadTime);
loadWaitArray = listToArray(loadWait);
loadFilesArray = listToArray(loadFiles);
+ queriesArray = listToArray(queries);
+ totalQueriesArray = listToArray(totalQueries);
}
if (rowsArray == null
@@ -245,6 +287,8 @@ public class ChannelCounters implements QueryCounter
&& loadTimeArray == null
&& loadWaitArray == null
&& loadFilesArray == null
+ && queriesArray == null
+ && totalQueriesArray == null
) {
return null;
} else {
@@ -271,7 +315,9 @@ public class ChannelCounters implements QueryCounter
loadBytesArray,
loadTimeArray,
loadWaitArray,
- loadFilesArray
+ loadFilesArray,
+ queriesArray,
+ totalQueriesArray
);
}
}
@@ -307,6 +353,8 @@ public class ChannelCounters implements QueryCounter
private final long[] loadTime;
private final long[] loadWait;
private final long[] loadFiles;
+ private final long[] queries;
+ private final long[] totalQueries;
@JsonCreator
public Snapshot(
@@ -318,7 +366,9 @@ public class ChannelCounters implements QueryCounter
@Nullable @JsonProperty("loadBytes") final long[] loadBytes,
@Nullable @JsonProperty("loadTime") final long[] loadTime,
@Nullable @JsonProperty("loadWait") final long[] loadWait,
- @Nullable @JsonProperty("loadFiles") final long[] loadFiles
+ @Nullable @JsonProperty("loadFiles") final long[] loadFiles,
+ @Nullable @JsonProperty("queries") final long[] queries,
+ @Nullable @JsonProperty("totalQueries") final long[] totalQueries
)
{
this.rows = rows;
@@ -330,6 +380,8 @@ public class ChannelCounters implements QueryCounter
this.loadTime = loadTime;
this.loadWait = loadWait;
this.loadFiles = loadFiles;
+ this.queries = queries;
+ this.totalQueries = totalQueries;
}
@JsonProperty
@@ -395,6 +447,20 @@ public class ChannelCounters implements QueryCounter
return loadFiles;
}
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public long[] getQueries()
+ {
+ return queries;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public long[] getTotalQueries()
+ {
+ return totalQueries;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -413,7 +479,9 @@ public class ChannelCounters implements QueryCounter
&& Arrays.equals(loadBytes, snapshot.loadBytes)
&& Arrays.equals(loadTime, snapshot.loadTime)
&& Arrays.equals(loadWait, snapshot.loadWait)
- && Arrays.equals(loadFiles, snapshot.loadFiles);
+ && Arrays.equals(loadFiles, snapshot.loadFiles)
+ && Arrays.equals(queries, snapshot.queries)
+ && Arrays.equals(totalQueries, snapshot.totalQueries);
}
@Override
@@ -428,6 +496,8 @@ public class ChannelCounters implements QueryCounter
result = 31 * result + Arrays.hashCode(loadTime);
result = 31 * result + Arrays.hashCode(loadWait);
result = 31 * result + Arrays.hashCode(loadFiles);
+ result = 31 * result + Arrays.hashCode(queries);
+ result = 31 * result + Arrays.hashCode(totalQueries);
return result;
}
@@ -444,6 +514,8 @@ public class ChannelCounters implements QueryCounter
", loadTime=" + Arrays.toString(loadTime) +
", loadWait=" + Arrays.toString(loadWait) +
", loadFiles=" + Arrays.toString(loadFiles) +
+ ", queries=" + Arrays.toString(queries) +
+ ", totalQueries=" + Arrays.toString(totalQueries) +
'}';
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
index 1139a13f4a6..e98d0ec2195 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
@@ -60,7 +60,6 @@ import
org.apache.druid.msq.dart.worker.http.DartWorkerResource;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -159,14 +158,12 @@ public class DartWorkerModule implements DruidModule
@Provides
public DartDataServerQueryHandlerFactory
createDataServerQueryHandlerFactory(
@EscalatedGlobal ServiceClientFactory serviceClientFactory,
- @Smile ObjectMapper smileMapper,
- QueryToolChestWarehouse queryToolChestWarehouse
+ @Smile ObjectMapper smileMapper
)
{
return new DartDataServerQueryHandlerFactory(
serviceClientFactory,
- smileMapper,
- queryToolChestWarehouse
+ smileMapper
);
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
index 5907846abd9..e81f6ee92dc 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
@@ -35,14 +35,12 @@ import
org.apache.druid.msq.exec.DataServerQueryHandlerUtils;
import org.apache.druid.msq.exec.DataServerQueryResult;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
-import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.StandardRetryPolicy;
@@ -62,7 +60,6 @@ public class DartDataServerQueryHandler implements
DataServerQueryHandler
private final ChannelCounters channelCounters;
private final ServiceClientFactory serviceClientFactory;
private final ObjectMapper objectMapper;
- private final QueryToolChestWarehouse warehouse;
private final DataServerRequestDescriptor requestDescriptor;
public DartDataServerQueryHandler(
@@ -71,7 +68,6 @@ public class DartDataServerQueryHandler implements
DataServerQueryHandler
ChannelCounters channelCounters,
ServiceClientFactory serviceClientFactory,
ObjectMapper objectMapper,
- QueryToolChestWarehouse warehouse,
DataServerRequestDescriptor requestDescriptor
)
{
@@ -80,7 +76,6 @@ public class DartDataServerQueryHandler implements
DataServerQueryHandler
this.channelCounters = channelCounters;
this.serviceClientFactory = serviceClientFactory;
this.objectMapper = objectMapper;
- this.warehouse = warehouse;
this.requestDescriptor = requestDescriptor;
}
@@ -95,33 +90,37 @@ public class DartDataServerQueryHandler implements
DataServerQueryHandler
@Override
public <RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>>
fetchRowsFromDataServer(
Query<QueryType> query,
+ JavaType queryResultType,
Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
Closer closer
)
{
+ if (query.getDataSource() instanceof QueryDataSource) {
+ // Subqueries being included would cause "withQuerySegmentSpec" to not
work properly.
+ throw DruidException.defensive("Cannot run query with subquery[%s]",
query);
+ }
+
final Query<QueryType> preparedQuery =
- Queries.withSpecificSegments(
- DataServerQueryHandlerUtils.prepareQuery(query, inputNumber,
dataSourceName),
- requestDescriptor.getSegments()
- .stream()
- .map(RichSegmentDescriptor::toPlainDescriptor)
- .collect(Collectors.toList())
+ DataServerQueryHandlerUtils.prepareQuery(query, inputNumber,
dataSourceName).withQuerySegmentSpec(
+ new MultipleSpecificSegmentSpec(
+ requestDescriptor.getSegments()
+ .stream()
+ .map(RichSegmentDescriptor::toPlainDescriptor)
+ .collect(Collectors.toList())
+ )
);
final ServiceLocation serviceLocation =
ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata());
final DataServerClient dataServerClient =
makeDataServerClient(serviceLocation);
- final QueryToolChest<QueryType, Query<QueryType>> toolChest =
warehouse.getToolChest(query);
- final Function<QueryType, QueryType> preComputeManipulatorFn =
- toolChest.makePreComputeManipulatorFn(query,
MetricManipulatorFns.deserializing());
- final JavaType queryResultType = toolChest.getBaseResultType();
final ResponseContext responseContext = new DefaultResponseContext();
return FutureUtils.transform(
dataServerClient.run(preparedQuery, responseContext, queryResultType,
closer),
resultSequence -> {
+ channelCounters.incrementQueries();
final Yielder<RowType> yielder =
DataServerQueryHandlerUtils.createYielder(
- resultSequence.map(preComputeManipulatorFn),
+ resultSequence,
mappingFunction,
channelCounters
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
index 56334071a77..fddc4b82fd6 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
-import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.rpc.ServiceClientFactory;
/**
@@ -33,17 +32,14 @@ public class DartDataServerQueryHandlerFactory implements
DataServerQueryHandler
{
private final ServiceClientFactory serviceClientFactory;
private final ObjectMapper objectMapper;
- private final QueryToolChestWarehouse warehouse;
public DartDataServerQueryHandlerFactory(
ServiceClientFactory serviceClientFactory,
- ObjectMapper objectMapper,
- QueryToolChestWarehouse warehouse
+ ObjectMapper objectMapper
)
{
this.serviceClientFactory = serviceClientFactory;
this.objectMapper = objectMapper;
- this.warehouse = warehouse;
}
@Override
@@ -60,7 +56,6 @@ public class DartDataServerQueryHandlerFactory implements
DataServerQueryHandler
channelCounters,
serviceClientFactory,
objectMapper,
- warehouse,
requestDescriptor
);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
index f9c8c14ad14..91788903a49 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
@@ -19,13 +19,12 @@
package org.apache.druid.msq.exec;
+import com.fasterxml.jackson.databind.JavaType;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.aggregation.MetricManipulationFn;
import java.util.function.Function;
@@ -44,10 +43,10 @@ public interface DataServerQueryHandler
* The query datasource is updated to refer to the specific segments from
* {@link DataServerRequestDescriptor#getSegments()}.
*
- * Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query,
MetricManipulationFn)} and reports channel
- * metrics on the returned results.
+ * Reports channel metrics on the returned results.
*
* @param query query to run
+ * @param queryResultType Jackson type for deserializing individual query
results
* @param mappingFunction function to apply to results
* @param closer will register query canceler with this closer
* @param <QueryType> result return type for the query from the data
server
@@ -55,6 +54,7 @@ public interface DataServerQueryHandler
*/
<RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>>
fetchRowsFromDataServer(
Query<QueryType> query,
+ JavaType queryResultType,
Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
Closer closer
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
index 2bee9ed89ba..6cdb44cb1d1 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
@@ -44,15 +44,13 @@ import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
import org.apache.druid.msq.input.table.DataServerSelector;
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
-import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryInterruptedException;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.rpc.RpcException;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
@@ -71,7 +69,7 @@ import java.util.stream.Collectors;
/**
* Task implementation of {@link DataServerQueryHandler}. Implements retry
logic as described in
- * {@link #fetchRowsFromDataServer(Query, Function, Closer)}.
+ * {@link #fetchRowsFromDataServer(Query, JavaType, Function, Closer)}.
*/
public class IndexerDataServerQueryHandler implements DataServerQueryHandler
{
@@ -83,7 +81,6 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
private final ServiceClientFactory serviceClientFactory;
private final CoordinatorClient coordinatorClient;
private final ObjectMapper objectMapper;
- private final QueryToolChestWarehouse warehouse;
private final DataServerRequestDescriptor dataServerRequestDescriptor;
private final ServiceRetryPolicy retryPolicy;
@@ -94,7 +91,6 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
ServiceClientFactory serviceClientFactory,
CoordinatorClient coordinatorClient,
ObjectMapper objectMapper,
- QueryToolChestWarehouse warehouse,
DataServerRequestDescriptor dataServerRequestDescriptor
)
{
@@ -105,7 +101,6 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
serviceClientFactory,
coordinatorClient,
objectMapper,
- warehouse,
dataServerRequestDescriptor,
IndexerDataServerRetryPolicy.standard()
);
@@ -119,7 +114,6 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
ServiceClientFactory serviceClientFactory,
CoordinatorClient coordinatorClient,
ObjectMapper objectMapper,
- QueryToolChestWarehouse warehouse,
DataServerRequestDescriptor dataServerRequestDescriptor,
ServiceRetryPolicy retryPolicy
)
@@ -130,7 +124,6 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
this.serviceClientFactory = serviceClientFactory;
this.coordinatorClient = coordinatorClient;
this.objectMapper = objectMapper;
- this.warehouse = warehouse;
this.dataServerRequestDescriptor = dataServerRequestDescriptor;
this.retryPolicy = retryPolicy;
}
@@ -166,6 +159,7 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
@Override
public <RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>>
fetchRowsFromDataServer(
Query<QueryType> query,
+ JavaType queryResultType,
Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
Closer closer
)
@@ -190,8 +184,10 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
responseContext,
closer,
preparedQuery,
+ queryResultType,
mappingFunction
);
+ channelCounters.incrementQueries();
// Add results
if (yielder != null && !yielder.isDone()) {
@@ -244,29 +240,32 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
final ResponseContext responseContext,
final Closer closer,
final Query<QueryType> query,
+ final JavaType queryResultType,
final Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction
)
{
final ServiceLocation serviceLocation =
ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata());
final DataServerClient dataServerClient =
makeDataServerClient(serviceLocation);
- final QueryToolChest<QueryType, Query<QueryType>> toolChest =
warehouse.getToolChest(query);
- final Function<QueryType, QueryType> preComputeManipulatorFn =
- toolChest.makePreComputeManipulatorFn(query,
MetricManipulatorFns.deserializing());
- final JavaType queryResultType = toolChest.getBaseResultType();
final List<SegmentDescriptor> segmentDescriptors =
requestDescriptor.getSegments()
.stream()
.map(IndexerDataServerQueryHandler::toSegmentDescriptorWithFullInterval)
.collect(Collectors.toList());
+ if (query.getDataSource() instanceof QueryDataSource) {
+ // Subqueries being included would cause "withQuerySegmentSpec" to not
work properly.
+ throw DruidException.defensive("Cannot run query with subquery[%s]",
query);
+ }
+
try {
final ListenableFuture<Sequence<QueryType>> queryFuture =
dataServerClient.run(
- Queries.withSpecificSegments(
- query,
- requestDescriptor.getSegments()
- .stream()
- .map(RichSegmentDescriptor::toPlainDescriptor)
- .collect(Collectors.toList())
+ query.withQuerySegmentSpec(
+ new MultipleSpecificSegmentSpec(
+ requestDescriptor.getSegments()
+ .stream()
+
.map(RichSegmentDescriptor::toPlainDescriptor)
+ .collect(Collectors.toList())
+ )
),
responseContext,
queryResultType,
@@ -275,7 +274,7 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
return closer.register(
DataServerQueryHandlerUtils.createYielder(
- queryFuture.get().map(preComputeManipulatorFn),
+ queryFuture.get(),
mappingFunction,
channelCounters
)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
index cf93c398ba2..6fde3f341d9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
@@ -24,7 +24,6 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
-import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.rpc.ServiceClientFactory;
/**
@@ -35,19 +34,16 @@ public class IndexerDataServerQueryHandlerFactory
implements DataServerQueryHand
private final CoordinatorClient coordinatorClient;
private final ServiceClientFactory serviceClientFactory;
private final ObjectMapper objectMapper;
- private final QueryToolChestWarehouse warehouse;
public IndexerDataServerQueryHandlerFactory(
CoordinatorClient coordinatorClient,
ServiceClientFactory serviceClientFactory,
- ObjectMapper objectMapper,
- QueryToolChestWarehouse warehouse
+ ObjectMapper objectMapper
)
{
this.coordinatorClient = coordinatorClient;
this.serviceClientFactory = serviceClientFactory;
this.objectMapper = objectMapper;
- this.warehouse = warehouse;
}
@Override
@@ -65,7 +61,6 @@ public class IndexerDataServerQueryHandlerFactory implements
DataServerQueryHand
serviceClientFactory,
coordinatorClient,
objectMapper,
- warehouse,
requestDescriptor
);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index cf76c3f9041..1e445f5e2c9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -49,7 +49,6 @@ import org.apache.druid.msq.indexing.client.WorkerChatHandler;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
-import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocator;
@@ -170,7 +169,6 @@ public class IndexerWorkerContext implements WorkerContext
injector.getInstance(OverlordClient.class).withRetryPolicy(StandardRetryPolicy.unlimited());
final ProcessingBuffersProvider processingBuffersProvider =
injector.getInstance(ProcessingBuffersProvider.class);
final ObjectMapper smileMapper =
injector.getInstance(Key.get(ObjectMapper.class, Smile.class));
- final QueryToolChestWarehouse warehouse =
injector.getInstance(QueryToolChestWarehouse.class);
return new IndexerWorkerContext(
task,
@@ -188,8 +186,7 @@ public class IndexerWorkerContext implements WorkerContext
new IndexerDataServerQueryHandlerFactory(
toolbox.getCoordinatorClient(),
serviceClientFactory,
- smileMapper,
- warehouse
+ smileMapper
)
);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
index 7c46509e09e..ae494e66168 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
@@ -93,6 +93,7 @@ public class SegmentsInputSliceReader implements
InputSliceReader
}
if (segmentsInputSlice.getQueryableServers() != null) {
+
inputCounters.addTotalQueries(segmentsInputSlice.getQueryableServers().size());
for (final DataServerRequestDescriptor queryableServer :
segmentsInputSlice.getQueryableServers()) {
queryableServers.add(
dataServerQueryHandlerFactory.createDataServerQueryHandler(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index 4435ae6f8e6..ea342efdb6f 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.ReadableFrameChannel;
import org.apache.druid.frame.channel.WritableFrameChannel;
@@ -48,6 +49,8 @@ import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
import org.apache.druid.msq.querykit.ReadableInput;
import org.apache.druid.msq.querykit.SegmentReferenceHolder;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
@@ -61,6 +64,7 @@ import org.apache.druid.segment.SegmentMapFunction;
import org.apache.druid.segment.TimeBoundaryInspector;
import org.apache.druid.segment.column.RowSignature;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -77,6 +81,8 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
private static final Logger log = new
Logger(GroupByPreShuffleFrameProcessor.class);
private final GroupByQuery query;
private final GroupingEngine groupingEngine;
+ @Nullable
+ private final QueryToolChest<ResultRow, GroupByQuery> toolChest;
private final NonBlockingPool<ByteBuffer> bufferPool;
private final ColumnSelectorFactory frameWriterColumnSelectorFactory;
private final Closer closer = Closer.create();
@@ -91,6 +97,7 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
public GroupByPreShuffleFrameProcessor(
final GroupByQuery query,
final GroupingEngine groupingEngine,
+ @Nullable final QueryToolChest<ResultRow, GroupByQuery> toolChest,
final NonBlockingPool<ByteBuffer> bufferPool,
final ReadableInput baseInput,
final SegmentMapFunction segmentMapFn,
@@ -106,6 +113,7 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
);
this.query = query;
this.groupingEngine = groupingEngine;
+ this.toolChest = toolChest;
this.bufferPool = bufferPool;
this.frameWriterColumnSelectorFactory =
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
query,
@@ -117,13 +125,22 @@ public class GroupByPreShuffleFrameProcessor extends
BaseLeafFrameProcessor
@Override
protected ReturnOrAwait<SegmentsInputSlice>
runWithDataServerQuery(DataServerQueryHandler dataServerQueryHandler) throws
IOException
{
+ if (toolChest == null) {
+ // toolChest is always set in production, but may not always be set in
tests.
+ throw DruidException.defensive("toolChest is required for data server
queries");
+ }
+
if (resultYielder == null || resultYielder.isDone()) {
if (currentResultsYielder == null) {
if (dataServerQueryResultFuture == null) {
+ final GroupByQuery preparedQuery =
groupingEngine.prepareGroupByQuery(query);
+ final Function<ResultRow, ResultRow> preComputeManipulatorFn =
+ toolChest.makePreComputeManipulatorFn(preparedQuery,
MetricManipulatorFns.deserializing());
dataServerQueryResultFuture =
dataServerQueryHandler.fetchRowsFromDataServer(
- groupingEngine.prepareGroupByQuery(query),
- Function.identity(),
+ preparedQuery,
+ toolChest.getBaseResultType(),
+ sequence -> sequence.map(preComputeManipulatorFn),
closer
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
index 6865076c90b..b7cb8ece55c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
@@ -33,6 +33,7 @@ import org.apache.druid.msq.input.LoadableSegment;
import org.apache.druid.msq.input.PhysicalInputSlice;
import org.apache.druid.msq.querykit.BaseLeafStageProcessor;
import org.apache.druid.msq.querykit.ReadableInput;
+import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.segment.SegmentMapFunction;
@@ -51,6 +52,10 @@ public class GroupByPreShuffleStageProcessor extends
BaseLeafStageProcessor
@Nullable
private GroupingEngine groupingEngine;
+ @JacksonInject
+ @Nullable
+ private QueryToolChestWarehouse warehouse;
+
@JsonCreator
public GroupByPreShuffleStageProcessor(@JsonProperty("query") GroupByQuery
query)
{
@@ -76,6 +81,7 @@ public class GroupByPreShuffleStageProcessor extends
BaseLeafStageProcessor
return new GroupByPreShuffleFrameProcessor(
query,
groupingEngine,
+ warehouse != null ? warehouse.getToolChest(query) : null,
frameContext.processingBuffers().getBufferPool(),
baseInput,
segmentMapFn,
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 22fe6d2dfcd..82e596b3e29 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -19,7 +19,9 @@
package org.apache.druid.msq.querykit.scan;
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@@ -98,6 +100,9 @@ import java.util.stream.Collectors;
*/
public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
{
+ public static final JavaType SCAN_RESULT_VALUE_TYPE =
+ TypeFactory.defaultInstance().constructType(ScanResultValue.class);
+
private static final Logger log = new Logger(ScanQueryFrameProcessor.class);
private static final int NO_LIMIT = -1;
@@ -229,6 +234,7 @@ public class ScanQueryFrameProcessor extends
BaseLeafFrameProcessor
dataServerQueryResultFuture =
dataServerQueryHandler.fetchRowsFromDataServer(
preparedQuery,
+ ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
ScanQueryFrameProcessor::mappingFunction,
closer
);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index 3f182cef253..5ecb5519235 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.test.MSQTestControllerContext;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
@@ -228,6 +229,7 @@ public class MSQCompactionTaskRunTest extends
CompactionTaskRunBase
TestGroupByBuffers.createDefault()
).getGroupingEngine();
((InjectableValues.Std)
objectMapper.getInjectableValues()).addValue(GroupingEngine.class,
groupingEngine);
+ ((InjectableValues.Std)
objectMapper.getInjectableValues()).addValue(QueryToolChestWarehouse.class,
null);
Module modules = Modules.combine(
new DruidGuiceExtensions(),
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
index b1f69af51c8..e442591fd3f 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
@@ -120,7 +120,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
"foo"
)
)).when(dataServerQueryHandler)
- .fetchRowsFromDataServer(any(), any(), any());
+ .fetchRowsFromDataServer(any(), any(), any(), any());
testSelectQuery()
.setSql("select cnt, dim1 from foo")
@@ -186,7 +186,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
}
)
.when(dataServerQueryHandler)
- .fetchRowsFromDataServer(any(), any(), any());
+ .fetchRowsFromDataServer(any(), any(), any(), any());
testSelectQuery()
.setSql("select cnt, dim1 from foo order by dim1")
@@ -247,7 +247,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
)
)
.when(dataServerQueryHandler)
- .fetchRowsFromDataServer(any(), any(), any());
+ .fetchRowsFromDataServer(any(), any(), any(), any());
testSelectQuery()
.setSql("select cnt,count(*) as cnt1 from foo group by cnt")
@@ -309,7 +309,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
)
)
.when(dataServerQueryHandler)
- .fetchRowsFromDataServer(any(), any(), any());
+ .fetchRowsFromDataServer(any(), any(), any(), any());
testSelectQuery()
.setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP
'2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01
00:00:00') group by cnt")
@@ -358,7 +358,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
new ISE("Segment could not be found on data server, but segment was
not handed off.")
)
.when(dataServerQueryHandler)
- .fetchRowsFromDataServer(any(), any(), any());
+ .fetchRowsFromDataServer(any(), any(), any(), any());
testSelectQuery()
.setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP
'2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01
00:00:00') group by cnt")
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
index ecf30145e29..884714fe5ce 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
@@ -40,16 +40,11 @@ import org.apache.druid.msq.querykit.InputNumberDataSource;
import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessor;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.FilteredDataSource;
-import org.apache.druid.query.MapQueryToolChestWarehouse;
-import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryInterruptedException;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.rpc.RpcException;
@@ -133,11 +128,6 @@ public class IndexerDataServerQueryHandlerTest
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.context(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 1,
MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE,
SegmentSource.REALTIME.toString()))
.build();
- QueryToolChestWarehouse queryToolChestWarehouse = new
MapQueryToolChestWarehouse(
- ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
- .put(ScanQuery.class, new ScanQueryQueryToolChest(null))
- .build()
- );
target = spy(
new IndexerDataServerQueryHandler(
1,
@@ -146,7 +136,6 @@ public class IndexerDataServerQueryHandlerTest
mock(ServiceClientFactory.class),
coordinatorClient,
TestHelper.makeJsonMapper(),
- queryToolChestWarehouse,
new DataServerRequestDescriptor(DRUID_SERVER_1,
ImmutableList.of(SEGMENT_1, SEGMENT_2)),
IndexerDataServerRetryPolicy.noRetries()
)
@@ -181,6 +170,7 @@ public class IndexerDataServerQueryHandlerTest
DataServerQueryResult<Object[]> dataServerQueryResult =
target.fetchRowsFromDataServer(
query,
+ ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
).get();
@@ -247,6 +237,7 @@ public class IndexerDataServerQueryHandlerTest
DataServerQueryResult<Object[]> dataServerQueryResult =
target.fetchRowsFromDataServer(
query,
+ ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
).get();
@@ -292,6 +283,7 @@ public class IndexerDataServerQueryHandlerTest
DataServerQueryResult<Object[]> dataServerQueryResult =
target.fetchRowsFromDataServer(
query,
+ ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
).get();
@@ -317,6 +309,7 @@ public class IndexerDataServerQueryHandlerTest
Assert.assertThrows(DruidException.class, () ->
target.fetchRowsFromDataServer(
queryWithRetry,
+ ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
)
@@ -341,6 +334,7 @@ public class IndexerDataServerQueryHandlerTest
DataServerQueryResult<Object[]> dataServerQueryResult =
target.fetchRowsFromDataServer(
query,
+ ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
).get();
@@ -364,6 +358,7 @@ public class IndexerDataServerQueryHandlerTest
Assert.assertThrows(DruidException.class, () ->
target.fetchRowsFromDataServer(
query,
+ ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
ScanQueryFrameProcessor::mappingFunction,
Closer.create()
)
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index 41a0cf288b8..e31ca579b7c 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -276,7 +276,9 @@ public class SqlStatementResourceTest extends MSQTestBase
new long[]{},
new long[]{},
new long[]{},
- new long[]{}
+ new long[]{},
+ null,
+ null
)
)
)
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index baeeda7b0e4..586b17b0563 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -19,6 +19,7 @@
package org.apache.druid.msq.test;
+import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@@ -173,6 +174,7 @@ public class CalciteMSQTestsHelper
@Override
public <RowType, QueryType>
ListenableFuture<DataServerQueryResult<RowType>> fetchRowsFromDataServer(
Query<QueryType> query,
+ JavaType queryResultType,
Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
Closer closer
)
diff --git a/web-console/src/druid-models/stages/stages.spec.ts
b/web-console/src/druid-models/stages/stages.spec.ts
index d0054d7fe0d..bc67b811124 100644
--- a/web-console/src/druid-models/stages/stages.spec.ts
+++ b/web-console/src/druid-models/stages/stages.spec.ts
@@ -495,8 +495,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 39742,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
]
@@ -516,8 +518,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 888,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -530,8 +534,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 995,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -544,8 +550,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 1419,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -558,8 +566,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 905,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -572,8 +582,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 590,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -586,8 +598,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 652,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -600,8 +614,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 322,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -614,8 +630,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 247,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -628,8 +646,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 236,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -642,8 +662,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 309,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -656,8 +678,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 256,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -670,8 +694,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 260,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -684,8 +710,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 440,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -698,8 +726,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 876,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -712,8 +742,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 1394,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -726,8 +758,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 892,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -740,8 +774,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 3595,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -754,8 +790,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 6522,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -768,8 +806,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 4525,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -782,8 +822,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 4326,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -796,8 +838,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 4149,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -810,8 +854,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 2561,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -824,8 +870,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 1914,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
{
@@ -838,8 +886,10 @@ describe('Stages', () => {
"loadFiles": 0,
"loadTime": 0,
"loadWait": 0,
+ "queries": 0,
"rows": 1452,
"totalFiles": 0,
+ "totalQueries": 0,
},
},
]
diff --git a/web-console/src/druid-models/stages/stages.ts
b/web-console/src/druid-models/stages/stages.ts
index 46c8387b362..8ad539a109f 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -180,6 +180,8 @@ export interface ChannelCounter {
loadTime?: number[];
loadWait?: number[];
loadFiles?: number[];
+ queries?: number[];
+ totalQueries?: number[];
}
export type ChannelFields =
@@ -191,7 +193,9 @@ export type ChannelFields =
| 'loadBytes'
| 'loadTime'
| 'loadWait'
- | 'loadFiles';
+ | 'loadFiles'
+ | 'queries'
+ | 'totalQueries';
export interface SortProgressCounter {
type: 'sortProgress';
@@ -325,6 +329,8 @@ function zeroChannelFields(): Record<ChannelFields, number>
{
loadTime: 0,
loadWait: 0,
loadFiles: 0,
+ queries: 0,
+ totalQueries: 0,
};
}
@@ -627,6 +633,8 @@ export class Stages {
loadTime: sum(c.loadTime || []),
loadWait: sum(c.loadWait || []),
loadFiles: sum(c.loadFiles || []),
+ queries: sum(c.queries || []),
+ totalQueries: sum(c.totalQueries || []),
}
: zeroChannelFields();
}
diff --git
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
index 197b0aba8a2..2a91a805a36 100644
---
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
+++
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
@@ -382,6 +382,18 @@ export const ExecutionStagesPane = React.memo(function
ExecutionStagesPane(
/>
</>
)}
+ {Boolean(c.queries || c.totalQueries) && (
+ <>
+ {' '}
+ {' '}
+ <Icon
+ icon={IconNames.ARROW_BOTTOM_RIGHT}
+ data-tooltip={`Realtime queries (${formatInteger(
+ c.queries || 0,
+ )} / ${formatInteger(c.totalQueries || 0)})`}
+ />
+ </>
+ )}
</>
);
},
@@ -554,6 +566,8 @@ export const ExecutionStagesPane = React.memo(function
ExecutionStagesPane(
const loadBytes = stages.getTotalCounterForStage(stage, inputCounter,
'loadBytes');
const loadTime = stages.getTotalCounterForStage(stage, inputCounter,
'loadTime');
const loadWait = stages.getTotalCounterForStage(stage, inputCounter,
'loadWait');
+ const queries = stages.getTotalCounterForStage(stage, inputCounter,
'queries');
+ const totalQueries = stages.getTotalCounterForStage(stage, inputCounter,
'totalQueries');
const inputLabel = `${formatInputLabel(stage, inputNumber)}
(input${inputNumber})`;
return (
<div
@@ -597,6 +611,18 @@ export const ExecutionStagesPane = React.memo(function
ExecutionStagesPane(
/>
</>
)}
+ {Boolean(queries || totalQueries) && (
+ <>
+ {' '}
+ {' '}
+ <Icon
+ icon={IconNames.ARROW_BOTTOM_RIGHT}
+ data-tooltip={`Realtime queries (${formatInteger(queries || 0)}
/ ${formatInteger(
+ totalQueries || 0,
+ )})`}
+ />
+ </>
+ )}
</div>
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]