This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a3ad942030 feat: Add counters, remove warehouse from 
DataServerQueryHandlers (#19196)
0a3ad942030 is described below

commit 0a3ad942030090b8fe5680ee8bda6d4fa103c7e6
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Mar 23 12:30:33 2026 -0700

    feat: Add counters, remove warehouse from DataServerQueryHandlers (#19196)
    
    Two changes to MSQ DataServerQueryHandlers:
    
    1) Add "queries" and "totalQueries" counters, reflecting queries made
       to realtime servers to get realtime data.
    
    2) Remove dependency on QueryToolChestWarehouse and QueryToolChest.
       When mapping functions from these are needed, the query logic passes
       it in rather than it being derived from the toolchest.
    
    Web console changes are included for (1). Realtime query counters are
    shown in an ↙ icon next to VSF counters.
---
 .../testing/embedded/msq/EmbeddedMSQApis.java      | 52 +++++++++++++++
 .../embedded/msq/EmbeddedMSQRealtimeQueryTest.java |  7 +-
 .../apache/druid/msq/counters/ChannelCounters.java | 78 +++++++++++++++++++++-
 .../druid/msq/dart/guice/DartWorkerModule.java     |  7 +-
 .../dart/worker/DartDataServerQueryHandler.java    | 35 +++++-----
 .../worker/DartDataServerQueryHandlerFactory.java  |  7 +-
 .../druid/msq/exec/DataServerQueryHandler.java     |  8 +--
 .../indexing/IndexerDataServerQueryHandler.java    | 41 ++++++------
 .../IndexerDataServerQueryHandlerFactory.java      |  7 +-
 .../druid/msq/indexing/IndexerWorkerContext.java   |  5 +-
 .../msq/input/table/SegmentsInputSliceReader.java  |  1 +
 .../groupby/GroupByPreShuffleFrameProcessor.java   | 21 +++++-
 .../groupby/GroupByPreShuffleStageProcessor.java   |  6 ++
 .../msq/querykit/scan/ScanQueryFrameProcessor.java |  6 ++
 .../druid/msq/exec/MSQCompactionTaskRunTest.java   |  2 +
 .../druid/msq/exec/MSQLoadedSegmentTests.java      | 10 +--
 .../IndexerDataServerQueryHandlerTest.java         | 17 ++---
 .../sql/resources/SqlStatementResourceTest.java    |  4 +-
 .../druid/msq/test/CalciteMSQTestsHelper.java      |  2 +
 web-console/src/druid-models/stages/stages.spec.ts | 50 ++++++++++++++
 web-console/src/druid-models/stages/stages.ts      | 10 ++-
 .../execution-stages-pane.tsx                      | 26 ++++++++
 22 files changed, 314 insertions(+), 88 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
index 115d1c22f16..e58ad81b465 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
@@ -26,6 +26,9 @@ import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.report.TaskReport;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.counters.CounterSnapshots;
+import org.apache.druid.msq.counters.QueryCounterSnapshot;
 import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
@@ -42,7 +45,10 @@ import org.apache.druid.testing.embedded.EmbeddedOverlord;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -236,6 +242,52 @@ public class EmbeddedMSQApis
     return cluster.callApi().onTargetBroker(targetBroker, b -> 
b.cancelSqlQuery(sqlQueryId));
   }
 
+  /**
+   * Returns the sum of completed queries across all input channel snapshots 
from all stages and workers.
+   */
+  public long getQueriesSum(final MSQTaskReportPayload payload)
+  {
+    return getAllInputChannelCounters(payload)
+        .stream()
+        .filter(snapshot -> snapshot.getQueries() != null)
+        .mapToLong(snapshot -> Arrays.stream(snapshot.getQueries()).sum())
+        .sum();
+  }
+
+  /**
+   * Returns the sum of files read across all input channel snapshots from all 
stages and workers.
+   */
+  public long getFilesSum(final MSQTaskReportPayload payload)
+  {
+    return getAllInputChannelCounters(payload)
+        .stream()
+        .filter(snapshot -> snapshot.getFiles() != null)
+        .mapToLong(snapshot -> Arrays.stream(snapshot.getFiles()).sum())
+        .sum();
+  }
+
+  /**
+   * Returns all {@link ChannelCounters.Snapshot} from input channels across 
all stages and workers.
+   */
+  private List<ChannelCounters.Snapshot> getAllInputChannelCounters(final 
MSQTaskReportPayload payload)
+  {
+    final Map<Integer, Map<Integer, CounterSnapshots>> countersMap = 
payload.getCounters().copyMap();
+    final List<ChannelCounters.Snapshot> snapshots = new ArrayList<>();
+
+    for (final Map.Entry<Integer, Map<Integer, CounterSnapshots>> stageEntry : 
countersMap.entrySet()) {
+      for (final Map.Entry<Integer, CounterSnapshots> workerEntry : 
stageEntry.getValue().entrySet()) {
+        for (final Map.Entry<String, QueryCounterSnapshot> counterEntry : 
workerEntry.getValue().getMap().entrySet()) {
+          if (counterEntry.getKey().startsWith("input")
+              && counterEntry.getValue() instanceof ChannelCounters.Snapshot) {
+            snapshots.add((ChannelCounters.Snapshot) counterEntry.getValue());
+          }
+        }
+      }
+    }
+
+    return snapshots;
+  }
+
   private static GetQueryReportResponse parseReportResponse(String 
responseJson, ObjectMapper jsonMapper)
   {
     try {
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
index 9031ac12705..a651ccf03ad 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQRealtimeQueryTest.java
@@ -38,6 +38,7 @@ import org.apache.druid.testing.embedded.EmbeddedOverlord;
 import org.apache.druid.testing.embedded.EmbeddedRouter;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
+import org.hamcrest.Matchers;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -187,6 +188,10 @@ public class EmbeddedMSQRealtimeQueryTest extends 
BaseRealtimeQueryTest
         Collections.singletonList(new Object[]{totalRows}),
         payload.getResults().getResults()
     );
+
+    // Verify that realtime queries were issued and no files were read (all 
data is realtime).
+    MatcherAssert.assertThat(msqApis.getQueriesSum(payload), 
Matchers.greaterThan(0L));
+    Assertions.assertEquals(0, msqApis.getFilesSum(payload));
   }
 
   @Test
@@ -370,7 +375,7 @@ public class EmbeddedMSQRealtimeQueryTest extends 
BaseRealtimeQueryTest
 
   @Test
   @Timeout(60)
-  public void test_selectJoinwithLookup_dart()
+  public void test_selectJoinWithLookup_dart()
   {
     final String sql = StringUtils.format(
         "SELECT \n"
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
index d7ae0a47924..975cd50aaf0 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/ChannelCounters.java
@@ -27,6 +27,7 @@ import com.google.errorprone.annotations.concurrent.GuardedBy;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.longs.LongList;
 import org.apache.druid.frame.Frame;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.segment.loading.AcquireSegmentResult;
 
@@ -74,6 +75,12 @@ public class ChannelCounters implements QueryCounter
   @GuardedBy("this")
   private final LongList loadWait = new LongArrayList();
 
+  @GuardedBy("this")
+  private final LongList queries = new LongArrayList();
+
+  @GuardedBy("this")
+  private final LongList totalQueries = new LongArrayList();
+
   public void incrementRowCount()
   {
     incrementRowCount(NO_PARTITION);
@@ -132,6 +139,29 @@ public class ChannelCounters implements QueryCounter
     }
   }
 
+  /**
+   * Increment the number of queries made via {@link DataServerQueryHandler}.
+   */
+  public void incrementQueries()
+  {
+    synchronized (this) {
+      ensureCapacityForPartition(NO_PARTITION);
+      queries.set(NO_PARTITION, queries.getLong(NO_PARTITION) + 1);
+    }
+  }
+
+  /**
+   * Total expected number of queries to be made via {@link 
DataServerQueryHandler}.
+   */
+  public ChannelCounters addTotalQueries(final long n)
+  {
+    synchronized (this) {
+      ensureCapacityForPartition(NO_PARTITION);
+      totalQueries.set(NO_PARTITION, totalQueries.getLong(NO_PARTITION) + n);
+      return this;
+    }
+  }
+
   private void add(
       final int partitionNumber,
       final long nRows,
@@ -188,6 +218,14 @@ public class ChannelCounters implements QueryCounter
     while (partitionNumber >= totalFiles.size()) {
       totalFiles.add(0);
     }
+
+    while (partitionNumber >= queries.size()) {
+      queries.add(0);
+    }
+
+    while (partitionNumber >= totalQueries.size()) {
+      totalQueries.add(0);
+    }
   }
 
   @GuardedBy("this")
@@ -223,6 +261,8 @@ public class ChannelCounters implements QueryCounter
     final long[] loadTimeArray;
     final long[] loadWaitArray;
     final long[] loadFilesArray;
+    final long[] queriesArray;
+    final long[] totalQueriesArray;
 
     synchronized (this) {
       rowsArray = listToArray(rows);
@@ -234,6 +274,8 @@ public class ChannelCounters implements QueryCounter
       loadTimeArray = listToArray(loadTime);
       loadWaitArray = listToArray(loadWait);
       loadFilesArray = listToArray(loadFiles);
+      queriesArray = listToArray(queries);
+      totalQueriesArray = listToArray(totalQueries);
     }
 
     if (rowsArray == null
@@ -245,6 +287,8 @@ public class ChannelCounters implements QueryCounter
         && loadTimeArray == null
         && loadWaitArray == null
         && loadFilesArray == null
+        && queriesArray == null
+        && totalQueriesArray == null
     ) {
       return null;
     } else {
@@ -271,7 +315,9 @@ public class ChannelCounters implements QueryCounter
           loadBytesArray,
           loadTimeArray,
           loadWaitArray,
-          loadFilesArray
+          loadFilesArray,
+          queriesArray,
+          totalQueriesArray
       );
     }
   }
@@ -307,6 +353,8 @@ public class ChannelCounters implements QueryCounter
     private final long[] loadTime;
     private final long[] loadWait;
     private final long[] loadFiles;
+    private final long[] queries;
+    private final long[] totalQueries;
 
     @JsonCreator
     public Snapshot(
@@ -318,7 +366,9 @@ public class ChannelCounters implements QueryCounter
         @Nullable @JsonProperty("loadBytes") final long[] loadBytes,
         @Nullable @JsonProperty("loadTime") final long[] loadTime,
         @Nullable @JsonProperty("loadWait") final long[] loadWait,
-        @Nullable @JsonProperty("loadFiles") final long[] loadFiles
+        @Nullable @JsonProperty("loadFiles") final long[] loadFiles,
+        @Nullable @JsonProperty("queries") final long[] queries,
+        @Nullable @JsonProperty("totalQueries") final long[] totalQueries
     )
     {
       this.rows = rows;
@@ -330,6 +380,8 @@ public class ChannelCounters implements QueryCounter
       this.loadTime = loadTime;
       this.loadWait = loadWait;
       this.loadFiles = loadFiles;
+      this.queries = queries;
+      this.totalQueries = totalQueries;
     }
 
     @JsonProperty
@@ -395,6 +447,20 @@ public class ChannelCounters implements QueryCounter
       return loadFiles;
     }
 
+    @JsonProperty
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    public long[] getQueries()
+    {
+      return queries;
+    }
+
+    @JsonProperty
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    public long[] getTotalQueries()
+    {
+      return totalQueries;
+    }
+
     @Override
     public boolean equals(Object o)
     {
@@ -413,7 +479,9 @@ public class ChannelCounters implements QueryCounter
              && Arrays.equals(loadBytes, snapshot.loadBytes)
              && Arrays.equals(loadTime, snapshot.loadTime)
              && Arrays.equals(loadWait, snapshot.loadWait)
-             && Arrays.equals(loadFiles, snapshot.loadFiles);
+             && Arrays.equals(loadFiles, snapshot.loadFiles)
+             && Arrays.equals(queries, snapshot.queries)
+             && Arrays.equals(totalQueries, snapshot.totalQueries);
     }
 
     @Override
@@ -428,6 +496,8 @@ public class ChannelCounters implements QueryCounter
       result = 31 * result + Arrays.hashCode(loadTime);
       result = 31 * result + Arrays.hashCode(loadWait);
       result = 31 * result + Arrays.hashCode(loadFiles);
+      result = 31 * result + Arrays.hashCode(queries);
+      result = 31 * result + Arrays.hashCode(totalQueries);
       return result;
     }
 
@@ -444,6 +514,8 @@ public class ChannelCounters implements QueryCounter
              ", loadTime=" + Arrays.toString(loadTime) +
              ", loadWait=" + Arrays.toString(loadWait) +
              ", loadFiles=" + Arrays.toString(loadFiles) +
+             ", queries=" + Arrays.toString(queries) +
+             ", totalQueries=" + Arrays.toString(totalQueries) +
              '}';
     }
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
index 1139a13f4a6..e98d0ec2195 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
@@ -60,7 +60,6 @@ import 
org.apache.druid.msq.dart.worker.http.DartWorkerResource;
 import org.apache.druid.msq.exec.MemoryIntrospector;
 import org.apache.druid.msq.rpc.ResourcePermissionMapper;
 import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -159,14 +158,12 @@ public class DartWorkerModule implements DruidModule
     @Provides
     public DartDataServerQueryHandlerFactory 
createDataServerQueryHandlerFactory(
         @EscalatedGlobal ServiceClientFactory serviceClientFactory,
-        @Smile ObjectMapper smileMapper,
-        QueryToolChestWarehouse queryToolChestWarehouse
+        @Smile ObjectMapper smileMapper
     )
     {
       return new DartDataServerQueryHandlerFactory(
           serviceClientFactory,
-          smileMapper,
-          queryToolChestWarehouse
+          smileMapper
       );
     }
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
index 5907846abd9..e81f6ee92dc 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
@@ -35,14 +35,12 @@ import 
org.apache.druid.msq.exec.DataServerQueryHandlerUtils;
 import org.apache.druid.msq.exec.DataServerQueryResult;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
 import org.apache.druid.msq.input.table.RichSegmentDescriptor;
-import org.apache.druid.query.Queries;
 import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.aggregation.MetricManipulatorFns;
 import org.apache.druid.query.context.DefaultResponseContext;
 import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.ServiceLocation;
 import org.apache.druid.rpc.StandardRetryPolicy;
@@ -62,7 +60,6 @@ public class DartDataServerQueryHandler implements 
DataServerQueryHandler
   private final ChannelCounters channelCounters;
   private final ServiceClientFactory serviceClientFactory;
   private final ObjectMapper objectMapper;
-  private final QueryToolChestWarehouse warehouse;
   private final DataServerRequestDescriptor requestDescriptor;
 
   public DartDataServerQueryHandler(
@@ -71,7 +68,6 @@ public class DartDataServerQueryHandler implements 
DataServerQueryHandler
       ChannelCounters channelCounters,
       ServiceClientFactory serviceClientFactory,
       ObjectMapper objectMapper,
-      QueryToolChestWarehouse warehouse,
       DataServerRequestDescriptor requestDescriptor
   )
   {
@@ -80,7 +76,6 @@ public class DartDataServerQueryHandler implements 
DataServerQueryHandler
     this.channelCounters = channelCounters;
     this.serviceClientFactory = serviceClientFactory;
     this.objectMapper = objectMapper;
-    this.warehouse = warehouse;
     this.requestDescriptor = requestDescriptor;
   }
 
@@ -95,33 +90,37 @@ public class DartDataServerQueryHandler implements 
DataServerQueryHandler
   @Override
   public <RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>> 
fetchRowsFromDataServer(
       Query<QueryType> query,
+      JavaType queryResultType,
       Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
       Closer closer
   )
   {
+    if (query.getDataSource() instanceof QueryDataSource) {
+      // Subqueries being included would cause "withQuerySegmentSpec" to not 
work properly.
+      throw DruidException.defensive("Cannot run query with subquery[%s]", 
query);
+    }
+
     final Query<QueryType> preparedQuery =
-        Queries.withSpecificSegments(
-            DataServerQueryHandlerUtils.prepareQuery(query, inputNumber, 
dataSourceName),
-            requestDescriptor.getSegments()
-                             .stream()
-                             .map(RichSegmentDescriptor::toPlainDescriptor)
-                             .collect(Collectors.toList())
+        DataServerQueryHandlerUtils.prepareQuery(query, inputNumber, 
dataSourceName).withQuerySegmentSpec(
+            new MultipleSpecificSegmentSpec(
+                requestDescriptor.getSegments()
+                                 .stream()
+                                 .map(RichSegmentDescriptor::toPlainDescriptor)
+                                 .collect(Collectors.toList())
+            )
         );
 
     final ServiceLocation serviceLocation =
         
ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata());
     final DataServerClient dataServerClient = 
makeDataServerClient(serviceLocation);
-    final QueryToolChest<QueryType, Query<QueryType>> toolChest = 
warehouse.getToolChest(query);
-    final Function<QueryType, QueryType> preComputeManipulatorFn =
-        toolChest.makePreComputeManipulatorFn(query, 
MetricManipulatorFns.deserializing());
-    final JavaType queryResultType = toolChest.getBaseResultType();
     final ResponseContext responseContext = new DefaultResponseContext();
 
     return FutureUtils.transform(
         dataServerClient.run(preparedQuery, responseContext, queryResultType, 
closer),
         resultSequence -> {
+          channelCounters.incrementQueries();
           final Yielder<RowType> yielder = 
DataServerQueryHandlerUtils.createYielder(
-              resultSequence.map(preComputeManipulatorFn),
+              resultSequence,
               mappingFunction,
               channelCounters
           );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
index 56334071a77..fddc4b82fd6 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
-import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.rpc.ServiceClientFactory;
 
 /**
@@ -33,17 +32,14 @@ public class DartDataServerQueryHandlerFactory implements 
DataServerQueryHandler
 {
   private final ServiceClientFactory serviceClientFactory;
   private final ObjectMapper objectMapper;
-  private final QueryToolChestWarehouse warehouse;
 
   public DartDataServerQueryHandlerFactory(
       ServiceClientFactory serviceClientFactory,
-      ObjectMapper objectMapper,
-      QueryToolChestWarehouse warehouse
+      ObjectMapper objectMapper
   )
   {
     this.serviceClientFactory = serviceClientFactory;
     this.objectMapper = objectMapper;
-    this.warehouse = warehouse;
   }
 
   @Override
@@ -60,7 +56,6 @@ public class DartDataServerQueryHandlerFactory implements 
DataServerQueryHandler
         channelCounters,
         serviceClientFactory,
         objectMapper,
-        warehouse,
         requestDescriptor
     );
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
index f9c8c14ad14..91788903a49 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
@@ -19,13 +19,12 @@
 
 package org.apache.druid.msq.exec;
 
+import com.fasterxml.jackson.databind.JavaType;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
 import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.aggregation.MetricManipulationFn;
 
 import java.util.function.Function;
 
@@ -44,10 +43,10 @@ public interface DataServerQueryHandler
    * The query datasource is updated to refer to the specific segments from
    * {@link DataServerRequestDescriptor#getSegments()}.
    *
-   * Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query, 
MetricManipulationFn)} and reports channel
-   * metrics on the returned results.
+   * Reports channel metrics on the returned results.
    *
    * @param query           query to run
+   * @param queryResultType Jackson type for deserializing individual query 
results
    * @param mappingFunction function to apply to results
    * @param closer          will register query canceler with this closer
    * @param <QueryType>     result return type for the query from the data 
server
@@ -55,6 +54,7 @@ public interface DataServerQueryHandler
    */
   <RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>> 
fetchRowsFromDataServer(
       Query<QueryType> query,
+      JavaType queryResultType,
       Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
       Closer closer
   );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
index 2bee9ed89ba..6cdb44cb1d1 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
@@ -44,15 +44,13 @@ import org.apache.druid.msq.exec.SegmentSource;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
 import org.apache.druid.msq.input.table.DataServerSelector;
 import org.apache.druid.msq.input.table.RichSegmentDescriptor;
-import org.apache.druid.query.Queries;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.QueryInterruptedException;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.aggregation.MetricManipulatorFns;
 import org.apache.druid.query.context.DefaultResponseContext;
 import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
 import org.apache.druid.rpc.RpcException;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.ServiceLocation;
@@ -71,7 +69,7 @@ import java.util.stream.Collectors;
 
 /**
  * Task implementation of {@link DataServerQueryHandler}. Implements retry 
logic as described in
- * {@link #fetchRowsFromDataServer(Query, Function, Closer)}.
+ * {@link #fetchRowsFromDataServer(Query, JavaType, Function, Closer)}.
  */
 public class IndexerDataServerQueryHandler implements DataServerQueryHandler
 {
@@ -83,7 +81,6 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
   private final ServiceClientFactory serviceClientFactory;
   private final CoordinatorClient coordinatorClient;
   private final ObjectMapper objectMapper;
-  private final QueryToolChestWarehouse warehouse;
   private final DataServerRequestDescriptor dataServerRequestDescriptor;
   private final ServiceRetryPolicy retryPolicy;
 
@@ -94,7 +91,6 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
       ServiceClientFactory serviceClientFactory,
       CoordinatorClient coordinatorClient,
       ObjectMapper objectMapper,
-      QueryToolChestWarehouse warehouse,
       DataServerRequestDescriptor dataServerRequestDescriptor
   )
   {
@@ -105,7 +101,6 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
         serviceClientFactory,
         coordinatorClient,
         objectMapper,
-        warehouse,
         dataServerRequestDescriptor,
         IndexerDataServerRetryPolicy.standard()
     );
@@ -119,7 +114,6 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
       ServiceClientFactory serviceClientFactory,
       CoordinatorClient coordinatorClient,
       ObjectMapper objectMapper,
-      QueryToolChestWarehouse warehouse,
       DataServerRequestDescriptor dataServerRequestDescriptor,
       ServiceRetryPolicy retryPolicy
   )
@@ -130,7 +124,6 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
     this.serviceClientFactory = serviceClientFactory;
     this.coordinatorClient = coordinatorClient;
     this.objectMapper = objectMapper;
-    this.warehouse = warehouse;
     this.dataServerRequestDescriptor = dataServerRequestDescriptor;
     this.retryPolicy = retryPolicy;
   }
@@ -166,6 +159,7 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
   @Override
   public <RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>> 
fetchRowsFromDataServer(
       Query<QueryType> query,
+      JavaType queryResultType,
       Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
       Closer closer
   )
@@ -190,8 +184,10 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
             responseContext,
             closer,
             preparedQuery,
+            queryResultType,
             mappingFunction
         );
+        channelCounters.incrementQueries();
 
         // Add results
         if (yielder != null && !yielder.isDone()) {
@@ -244,29 +240,32 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
       final ResponseContext responseContext,
       final Closer closer,
       final Query<QueryType> query,
+      final JavaType queryResultType,
       final Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction
   )
   {
     final ServiceLocation serviceLocation = 
ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata());
     final DataServerClient dataServerClient = 
makeDataServerClient(serviceLocation);
-    final QueryToolChest<QueryType, Query<QueryType>> toolChest = 
warehouse.getToolChest(query);
-    final Function<QueryType, QueryType> preComputeManipulatorFn =
-        toolChest.makePreComputeManipulatorFn(query, 
MetricManipulatorFns.deserializing());
-    final JavaType queryResultType = toolChest.getBaseResultType();
     final List<SegmentDescriptor> segmentDescriptors =
         requestDescriptor.getSegments()
                          .stream()
                          
.map(IndexerDataServerQueryHandler::toSegmentDescriptorWithFullInterval)
                          .collect(Collectors.toList());
 
+    if (query.getDataSource() instanceof QueryDataSource) {
+      // Subqueries being included would cause "withQuerySegmentSpec" to not 
work properly.
+      throw DruidException.defensive("Cannot run query with subquery[%s]", 
query);
+    }
+
     try {
       final ListenableFuture<Sequence<QueryType>> queryFuture = 
dataServerClient.run(
-          Queries.withSpecificSegments(
-              query,
-              requestDescriptor.getSegments()
-                               .stream()
-                               .map(RichSegmentDescriptor::toPlainDescriptor)
-                               .collect(Collectors.toList())
+          query.withQuerySegmentSpec(
+              new MultipleSpecificSegmentSpec(
+                  requestDescriptor.getSegments()
+                                   .stream()
+                                   
.map(RichSegmentDescriptor::toPlainDescriptor)
+                                   .collect(Collectors.toList())
+              )
           ),
           responseContext,
           queryResultType,
@@ -275,7 +274,7 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
 
       return closer.register(
           DataServerQueryHandlerUtils.createYielder(
-              queryFuture.get().map(preComputeManipulatorFn),
+              queryFuture.get(),
               mappingFunction,
               channelCounters
           )
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
index cf93c398ba2..6fde3f341d9 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
@@ -24,7 +24,6 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
-import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.rpc.ServiceClientFactory;
 
 /**
@@ -35,19 +34,16 @@ public class IndexerDataServerQueryHandlerFactory 
implements DataServerQueryHand
   private final CoordinatorClient coordinatorClient;
   private final ServiceClientFactory serviceClientFactory;
   private final ObjectMapper objectMapper;
-  private final QueryToolChestWarehouse warehouse;
 
   public IndexerDataServerQueryHandlerFactory(
       CoordinatorClient coordinatorClient,
       ServiceClientFactory serviceClientFactory,
-      ObjectMapper objectMapper,
-      QueryToolChestWarehouse warehouse
+      ObjectMapper objectMapper
   )
   {
     this.coordinatorClient = coordinatorClient;
     this.serviceClientFactory = serviceClientFactory;
     this.objectMapper = objectMapper;
-    this.warehouse = warehouse;
   }
 
   @Override
@@ -65,7 +61,6 @@ public class IndexerDataServerQueryHandlerFactory implements 
DataServerQueryHand
         serviceClientFactory,
         coordinatorClient,
         objectMapper,
-        warehouse,
         requestDescriptor
     );
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index cf76c3f9041..1e445f5e2c9 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -49,7 +49,6 @@ import org.apache.druid.msq.indexing.client.WorkerChatHandler;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
-import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.policy.PolicyEnforcer;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.ServiceLocator;
@@ -170,7 +169,6 @@ public class IndexerWorkerContext implements WorkerContext
         
injector.getInstance(OverlordClient.class).withRetryPolicy(StandardRetryPolicy.unlimited());
     final ProcessingBuffersProvider processingBuffersProvider = 
injector.getInstance(ProcessingBuffersProvider.class);
     final ObjectMapper smileMapper = 
injector.getInstance(Key.get(ObjectMapper.class, Smile.class));
-    final QueryToolChestWarehouse warehouse = 
injector.getInstance(QueryToolChestWarehouse.class);
 
     return new IndexerWorkerContext(
         task,
@@ -188,8 +186,7 @@ public class IndexerWorkerContext implements WorkerContext
         new IndexerDataServerQueryHandlerFactory(
             toolbox.getCoordinatorClient(),
             serviceClientFactory,
-            smileMapper,
-            warehouse
+            smileMapper
         )
     );
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
index 7c46509e09e..ae494e66168 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentsInputSliceReader.java
@@ -93,6 +93,7 @@ public class SegmentsInputSliceReader implements 
InputSliceReader
     }
 
     if (segmentsInputSlice.getQueryableServers() != null) {
+      
inputCounters.addTotalQueries(segmentsInputSlice.getQueryableServers().size());
       for (final DataServerRequestDescriptor queryableServer : 
segmentsInputSlice.getQueryableServers()) {
         queryableServers.add(
             dataServerQueryHandlerFactory.createDataServerQueryHandler(
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index 4435ae6f8e6..ea342efdb6f 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -24,6 +24,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameChannel;
@@ -48,6 +49,8 @@ import org.apache.druid.msq.input.table.SegmentsInputSlice;
 import org.apache.druid.msq.querykit.BaseLeafFrameProcessor;
 import org.apache.druid.msq.querykit.ReadableInput;
 import org.apache.druid.msq.querykit.SegmentReferenceHolder;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.aggregation.MetricManipulatorFns;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.query.groupby.ResultRow;
@@ -61,6 +64,7 @@ import org.apache.druid.segment.SegmentMapFunction;
 import org.apache.druid.segment.TimeBoundaryInspector;
 import org.apache.druid.segment.column.RowSignature;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -77,6 +81,8 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
   private static final Logger log = new 
Logger(GroupByPreShuffleFrameProcessor.class);
   private final GroupByQuery query;
   private final GroupingEngine groupingEngine;
+  @Nullable
+  private final QueryToolChest<ResultRow, GroupByQuery> toolChest;
   private final NonBlockingPool<ByteBuffer> bufferPool;
   private final ColumnSelectorFactory frameWriterColumnSelectorFactory;
   private final Closer closer = Closer.create();
@@ -91,6 +97,7 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
   public GroupByPreShuffleFrameProcessor(
       final GroupByQuery query,
       final GroupingEngine groupingEngine,
+      @Nullable final QueryToolChest<ResultRow, GroupByQuery> toolChest,
       final NonBlockingPool<ByteBuffer> bufferPool,
       final ReadableInput baseInput,
       final SegmentMapFunction segmentMapFn,
@@ -106,6 +113,7 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
     );
     this.query = query;
     this.groupingEngine = groupingEngine;
+    this.toolChest = toolChest;
     this.bufferPool = bufferPool;
     this.frameWriterColumnSelectorFactory = 
RowBasedGrouperHelper.createResultRowBasedColumnSelectorFactory(
         query,
@@ -117,13 +125,22 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
   @Override
   protected ReturnOrAwait<SegmentsInputSlice> 
runWithDataServerQuery(DataServerQueryHandler dataServerQueryHandler) throws 
IOException
   {
+    if (toolChest == null) {
+      // toolChest is always set in production, but may not always be set in 
tests.
+      throw DruidException.defensive("toolChest is required for data server 
queries");
+    }
+
     if (resultYielder == null || resultYielder.isDone()) {
       if (currentResultsYielder == null) {
         if (dataServerQueryResultFuture == null) {
+          final GroupByQuery preparedQuery = 
groupingEngine.prepareGroupByQuery(query);
+          final Function<ResultRow, ResultRow> preComputeManipulatorFn =
+              toolChest.makePreComputeManipulatorFn(preparedQuery, 
MetricManipulatorFns.deserializing());
           dataServerQueryResultFuture =
               dataServerQueryHandler.fetchRowsFromDataServer(
-                  groupingEngine.prepareGroupByQuery(query),
-                  Function.identity(),
+                  preparedQuery,
+                  toolChest.getBaseResultType(),
+                  sequence -> sequence.map(preComputeManipulatorFn),
                   closer
               );
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
index 6865076c90b..b7cb8ece55c 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleStageProcessor.java
@@ -33,6 +33,7 @@ import org.apache.druid.msq.input.LoadableSegment;
 import org.apache.druid.msq.input.PhysicalInputSlice;
 import org.apache.druid.msq.querykit.BaseLeafStageProcessor;
 import org.apache.druid.msq.querykit.ReadableInput;
+import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.segment.SegmentMapFunction;
@@ -51,6 +52,10 @@ public class GroupByPreShuffleStageProcessor extends 
BaseLeafStageProcessor
   @Nullable
   private GroupingEngine groupingEngine;
 
+  @JacksonInject
+  @Nullable
+  private QueryToolChestWarehouse warehouse;
+
   @JsonCreator
   public GroupByPreShuffleStageProcessor(@JsonProperty("query") GroupByQuery 
query)
   {
@@ -76,6 +81,7 @@ public class GroupByPreShuffleStageProcessor extends 
BaseLeafStageProcessor
     return new GroupByPreShuffleFrameProcessor(
         query,
         groupingEngine,
+        warehouse != null ? warehouse.getToolChest(query) : null,
         frameContext.processingBuffers().getBufferPool(),
         baseInput,
         segmentMapFn,
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index 22fe6d2dfcd..82e596b3e29 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -19,7 +19,9 @@
 
 package org.apache.druid.msq.querykit.scan;
 
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -98,6 +100,9 @@ import java.util.stream.Collectors;
  */
 public class ScanQueryFrameProcessor extends BaseLeafFrameProcessor
 {
+  public static final JavaType SCAN_RESULT_VALUE_TYPE =
+      TypeFactory.defaultInstance().constructType(ScanResultValue.class);
+
   private static final Logger log = new Logger(ScanQueryFrameProcessor.class);
   private static final int NO_LIMIT = -1;
 
@@ -229,6 +234,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
         dataServerQueryResultFuture =
             dataServerQueryHandler.fetchRowsFromDataServer(
                 preparedQuery,
+                ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
                 ScanQueryFrameProcessor::mappingFunction,
                 closer
             );
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index 3f182cef253..5ecb5519235 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -69,6 +69,7 @@ import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.msq.test.MSQTestControllerContext;
 import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
@@ -228,6 +229,7 @@ public class MSQCompactionTaskRunTest extends 
CompactionTaskRunBase
         TestGroupByBuffers.createDefault()
     ).getGroupingEngine();
     ((InjectableValues.Std) 
objectMapper.getInjectableValues()).addValue(GroupingEngine.class, 
groupingEngine);
+    ((InjectableValues.Std) 
objectMapper.getInjectableValues()).addValue(QueryToolChestWarehouse.class, 
null);
 
     Module modules = Modules.combine(
         new DruidGuiceExtensions(),
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
index b1f69af51c8..e442591fd3f 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
@@ -120,7 +120,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
                 "foo"
             )
         )).when(dataServerQueryHandler)
-          .fetchRowsFromDataServer(any(), any(), any());
+          .fetchRowsFromDataServer(any(), any(), any(), any());
 
     testSelectQuery()
         .setSql("select cnt, dim1 from foo")
@@ -186,7 +186,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
         }
     )
         .when(dataServerQueryHandler)
-        .fetchRowsFromDataServer(any(), any(), any());
+        .fetchRowsFromDataServer(any(), any(), any(), any());
 
     testSelectQuery()
         .setSql("select cnt, dim1 from foo order by dim1")
@@ -247,7 +247,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
         )
     )
         .when(dataServerQueryHandler)
-        .fetchRowsFromDataServer(any(), any(), any());
+        .fetchRowsFromDataServer(any(), any(), any(), any());
 
     testSelectQuery()
         .setSql("select cnt,count(*) as cnt1 from foo group by cnt")
@@ -309,7 +309,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
         )
     )
         .when(dataServerQueryHandler)
-        .fetchRowsFromDataServer(any(), any(), any());
+        .fetchRowsFromDataServer(any(), any(), any(), any());
 
     testSelectQuery()
         .setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP 
'2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01 
00:00:00') group by cnt")
@@ -358,7 +358,7 @@ public class MSQLoadedSegmentTests extends MSQTestBase
         new ISE("Segment could not be found on data server, but segment was 
not handed off.")
     )
         .when(dataServerQueryHandler)
-        .fetchRowsFromDataServer(any(), any(), any());
+        .fetchRowsFromDataServer(any(), any(), any(), any());
 
     testSelectQuery()
         .setSql("select cnt,count(*) as cnt1 from foo where (TIMESTAMP 
'2003-01-01 00:00:00' <= \"__time\" AND \"__time\" < TIMESTAMP '2005-01-01 
00:00:00') group by cnt")
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
index ecf30145e29..884714fe5ce 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
@@ -40,16 +40,11 @@ import org.apache.druid.msq.querykit.InputNumberDataSource;
 import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessor;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.FilteredDataSource;
-import org.apache.druid.query.MapQueryToolChestWarehouse;
-import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryInterruptedException;
-import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.scan.ScanQueryQueryToolChest;
 import org.apache.druid.query.scan.ScanResultValue;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.rpc.RpcException;
@@ -133,11 +128,6 @@ public class IndexerDataServerQueryHandlerTest
         .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
         
.context(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, 1, 
MultiStageQueryContext.CTX_INCLUDE_SEGMENT_SOURCE, 
SegmentSource.REALTIME.toString()))
         .build();
-    QueryToolChestWarehouse queryToolChestWarehouse = new 
MapQueryToolChestWarehouse(
-        ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
-                    .put(ScanQuery.class, new ScanQueryQueryToolChest(null))
-                    .build()
-    );
     target = spy(
         new IndexerDataServerQueryHandler(
             1,
@@ -146,7 +136,6 @@ public class IndexerDataServerQueryHandlerTest
             mock(ServiceClientFactory.class),
             coordinatorClient,
             TestHelper.makeJsonMapper(),
-            queryToolChestWarehouse,
             new DataServerRequestDescriptor(DRUID_SERVER_1, 
ImmutableList.of(SEGMENT_1, SEGMENT_2)),
             IndexerDataServerRetryPolicy.noRetries()
         )
@@ -181,6 +170,7 @@ public class IndexerDataServerQueryHandlerTest
 
     DataServerQueryResult<Object[]> dataServerQueryResult = 
target.fetchRowsFromDataServer(
         query,
+        ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
         ScanQueryFrameProcessor::mappingFunction,
         Closer.create()
     ).get();
@@ -247,6 +237,7 @@ public class IndexerDataServerQueryHandlerTest
 
     DataServerQueryResult<Object[]> dataServerQueryResult = 
target.fetchRowsFromDataServer(
         query,
+        ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
         ScanQueryFrameProcessor::mappingFunction,
         Closer.create()
     ).get();
@@ -292,6 +283,7 @@ public class IndexerDataServerQueryHandlerTest
 
     DataServerQueryResult<Object[]> dataServerQueryResult = 
target.fetchRowsFromDataServer(
         query,
+        ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
         ScanQueryFrameProcessor::mappingFunction,
         Closer.create()
     ).get();
@@ -317,6 +309,7 @@ public class IndexerDataServerQueryHandlerTest
     Assert.assertThrows(DruidException.class, () ->
         target.fetchRowsFromDataServer(
             queryWithRetry,
+            ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
             ScanQueryFrameProcessor::mappingFunction,
             Closer.create()
         )
@@ -341,6 +334,7 @@ public class IndexerDataServerQueryHandlerTest
 
     DataServerQueryResult<Object[]> dataServerQueryResult = 
target.fetchRowsFromDataServer(
         query,
+        ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
         ScanQueryFrameProcessor::mappingFunction,
         Closer.create()
     ).get();
@@ -364,6 +358,7 @@ public class IndexerDataServerQueryHandlerTest
     Assert.assertThrows(DruidException.class, () ->
         target.fetchRowsFromDataServer(
             query,
+            ScanQueryFrameProcessor.SCAN_RESULT_VALUE_TYPE,
             ScanQueryFrameProcessor::mappingFunction,
             Closer.create()
         )
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index 41a0cf288b8..e31ca579b7c 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -276,7 +276,9 @@ public class SqlStatementResourceTest extends MSQTestBase
                           new long[]{},
                           new long[]{},
                           new long[]{},
-                          new long[]{}
+                          new long[]{},
+                          null,
+                          null
                       )
                   )
                   )
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index baeeda7b0e4..586b17b0563 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.test;
 
+import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -173,6 +174,7 @@ public class CalciteMSQTestsHelper
       @Override
       public <RowType, QueryType> 
ListenableFuture<DataServerQueryResult<RowType>> fetchRowsFromDataServer(
           Query<QueryType> query,
+          JavaType queryResultType,
           Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
           Closer closer
       )
diff --git a/web-console/src/druid-models/stages/stages.spec.ts 
b/web-console/src/druid-models/stages/stages.spec.ts
index d0054d7fe0d..bc67b811124 100644
--- a/web-console/src/druid-models/stages/stages.spec.ts
+++ b/web-console/src/druid-models/stages/stages.spec.ts
@@ -495,8 +495,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 39742,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
         ]
@@ -516,8 +518,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 888,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -530,8 +534,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 995,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -544,8 +550,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 1419,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -558,8 +566,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 905,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -572,8 +582,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 590,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -586,8 +598,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 652,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -600,8 +614,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 322,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -614,8 +630,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 247,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -628,8 +646,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 236,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -642,8 +662,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 309,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -656,8 +678,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 256,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -670,8 +694,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 260,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -684,8 +710,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 440,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -698,8 +726,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 876,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -712,8 +742,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 1394,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -726,8 +758,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 892,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -740,8 +774,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 3595,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -754,8 +790,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 6522,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -768,8 +806,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 4525,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -782,8 +822,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 4326,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -796,8 +838,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 4149,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -810,8 +854,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 2561,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -824,8 +870,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 1914,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
           {
@@ -838,8 +886,10 @@ describe('Stages', () => {
               "loadFiles": 0,
               "loadTime": 0,
               "loadWait": 0,
+              "queries": 0,
               "rows": 1452,
               "totalFiles": 0,
+              "totalQueries": 0,
             },
           },
         ]
diff --git a/web-console/src/druid-models/stages/stages.ts 
b/web-console/src/druid-models/stages/stages.ts
index 46c8387b362..8ad539a109f 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -180,6 +180,8 @@ export interface ChannelCounter {
   loadTime?: number[];
   loadWait?: number[];
   loadFiles?: number[];
+  queries?: number[];
+  totalQueries?: number[];
 }
 
 export type ChannelFields =
@@ -191,7 +193,9 @@ export type ChannelFields =
   | 'loadBytes'
   | 'loadTime'
   | 'loadWait'
-  | 'loadFiles';
+  | 'loadFiles'
+  | 'queries'
+  | 'totalQueries';
 
 export interface SortProgressCounter {
   type: 'sortProgress';
@@ -325,6 +329,8 @@ function zeroChannelFields(): Record<ChannelFields, number> 
{
     loadTime: 0,
     loadWait: 0,
     loadFiles: 0,
+    queries: 0,
+    totalQueries: 0,
   };
 }
 
@@ -627,6 +633,8 @@ export class Stages {
               loadTime: sum(c.loadTime || []),
               loadWait: sum(c.loadWait || []),
               loadFiles: sum(c.loadFiles || []),
+              queries: sum(c.queries || []),
+              totalQueries: sum(c.totalQueries || []),
             }
           : zeroChannelFields();
       }
diff --git 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
index 197b0aba8a2..2a91a805a36 100644
--- 
a/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
+++ 
b/web-console/src/views/workbench-view/execution-stages-pane/execution-stages-pane.tsx
@@ -382,6 +382,18 @@ export const ExecutionStagesPane = React.memo(function 
ExecutionStagesPane(
                         />
                       </>
                     )}
+                    {Boolean(c.queries || c.totalQueries) && (
+                      <>
+                        {' '}
+                        &nbsp;{' '}
+                        <Icon
+                          icon={IconNames.ARROW_BOTTOM_RIGHT}
+                          data-tooltip={`Realtime queries (${formatInteger(
+                            c.queries || 0,
+                          )} / ${formatInteger(c.totalQueries || 0)})`}
+                        />
+                      </>
+                    )}
                   </>
                 );
               },
@@ -554,6 +566,8 @@ export const ExecutionStagesPane = React.memo(function 
ExecutionStagesPane(
     const loadBytes = stages.getTotalCounterForStage(stage, inputCounter, 
'loadBytes');
     const loadTime = stages.getTotalCounterForStage(stage, inputCounter, 
'loadTime');
     const loadWait = stages.getTotalCounterForStage(stage, inputCounter, 
'loadWait');
+    const queries = stages.getTotalCounterForStage(stage, inputCounter, 
'queries');
+    const totalQueries = stages.getTotalCounterForStage(stage, inputCounter, 
'totalQueries');
     const inputLabel = `${formatInputLabel(stage, inputNumber)} 
(input${inputNumber})`;
     return (
       <div
@@ -597,6 +611,18 @@ export const ExecutionStagesPane = React.memo(function 
ExecutionStagesPane(
             />
           </>
         )}
+        {Boolean(queries || totalQueries) && (
+          <>
+            {' '}
+            &nbsp;{' '}
+            <Icon
+              icon={IconNames.ARROW_BOTTOM_RIGHT}
+              data-tooltip={`Realtime queries (${formatInteger(queries || 0)} 
/ ${formatInteger(
+                totalQueries || 0,
+              )})`}
+            />
+          </>
+        )}
       </div>
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to