This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d87a84dd90f MSQ: Controller result batching, consolidated lifecycle. 
(#19043)
d87a84dd90f is described below

commit d87a84dd90f92308bfef1ed66f2fecb7b0eba7c7
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Feb 23 11:15:09 2026 -0800

    MSQ: Controller result batching, consolidated lifecycle. (#19043)
    
    Main changes:
    
    1) QueryListener is changed to deal in RowsAndColumns rather than
       individual rows.
    
    2) Logic for converting Frames to SQL-formatted rows is moved into
       TaskReportQueryListener, out of QueryResultsReader.
    
    3) DartQueryMaker's streaming mode (fullReport = false) is adjusted
       to use SequenceQueryListener + FrameChannelSequence.
    
    4) Controller register/deregister and execution lifecycle related logic
       is consolidated and moved into ControllerHolder#runAsync, to simplify
       DartQueryMaker.
---
 .../msq/dart/controller/ControllerHolder.java      | 188 +++++--
 .../msq/dart/controller/ControllerThreadPool.java  |  52 ++
 .../msq/dart/controller/sql/DartQueryMaker.java    | 541 ++++++---------------
 .../msq/dart/controller/sql/DartSqlEngine.java     |  37 +-
 .../druid/msq/dart/guice/DartControllerModule.java |  18 +
 .../druid/msq/exec/CaptureReportQueryListener.java |  24 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |   3 -
 .../msq/exec/ControllerQueryResultsReader.java     |  64 +--
 .../org/apache/druid/msq/exec/QueryListener.java   |  26 +-
 .../druid/msq/exec/SequenceQueryListener.java      | 122 +++++
 .../druid/msq/indexing/MSQControllerTask.java      |   5 +-
 .../msq/indexing/TaskReportQueryListener.java      |  99 +++-
 .../druid/msq/util/SqlStatementResourceHelper.java |  18 +-
 .../dart/controller/http/DartSqlResourceTest.java  |  63 ++-
 .../msq/indexing/TaskReportQueryListenerTest.java  | 112 ++++-
 .../msq/test/MSQTestOverlordServiceClient.java     |  49 +-
 .../apache/druid/msq/test/NoopQueryListener.java   |  11 +-
 .../frame/channel/BlockingQueueFrameChannel.java   |   2 +
 .../druid/frame/channel/FrameChannelSequence.java  |  24 +-
 19 files changed, 812 insertions(+), 646 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
index a0b9f7c34f9..b6595a89ba4 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
@@ -20,19 +20,30 @@
 package org.apache.druid.msq.dart.controller;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.dart.worker.WorkerId;
 import org.apache.druid.msq.exec.CaptureReportQueryListener;
 import org.apache.druid.msq.exec.Controller;
 import org.apache.druid.msq.exec.ControllerContext;
 import org.apache.druid.msq.exec.QueryListener;
+import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.CancellationReason;
 import org.apache.druid.msq.indexing.error.MSQErrorReport;
 import org.apache.druid.msq.indexing.error.WorkerFailedFault;
+import org.apache.druid.msq.indexing.report.MSQStatusReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.BaseQuery;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.sql.http.StandardQueryState;
 import org.joda.time.DateTime;
 
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -40,45 +51,7 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class ControllerHolder
 {
-  public enum State
-  {
-    /**
-     * Query has been accepted, but not yet {@link 
Controller#run(QueryListener)}.
-     */
-    ACCEPTED(StandardQueryState.ACCEPTED),
-
-    /**
-     * Query has had {@link Controller#run(QueryListener)} called.
-     */
-    RUNNING(StandardQueryState.RUNNING),
-
-    /**
-     * Query has been canceled.
-     */
-    CANCELED(StandardQueryState.CANCELED),
-
-    /**
-     * Query has exited successfully.
-     */
-    SUCCESS(StandardQueryState.SUCCESS),
-
-    /**
-     * Query has failed.
-     */
-    FAILED(StandardQueryState.FAILED);
-
-    private final String statusString;
-
-    State(String statusString)
-    {
-      this.statusString = statusString;
-    }
-
-    public String getStatusString()
-    {
-      return statusString;
-    }
-  }
+  private static final Logger log = new Logger(ControllerHolder.class);
 
   private final Controller controller;
   private final String sqlQueryId;
@@ -188,21 +161,70 @@ public class ControllerHolder
   }
 
   /**
-   * Calls {@link Controller#run(QueryListener)}, and returns true, if this 
holder was previously in state
-   * {@link State#ACCEPTED}. Otherwise returns false.
+   * Runs {@link Controller#run(QueryListener)} in the provided executor. 
Registers the controller with the provided
+   * registry while it is running.
    *
-   * @return whether {@link Controller#run(QueryListener)} was called.
+   * @return future that resolves when the controller is done or canceled.
    */
-  public boolean run(final QueryListener listener) throws Exception
-  {
-    if (state.compareAndSet(State.ACCEPTED, State.RUNNING)) {
-      final CaptureReportQueryListener reportListener = new 
CaptureReportQueryListener(listener);
-      controller.run(reportListener);
-      updateStateOnQueryComplete(reportListener.getReport());
-      return true;
-    } else {
-      return false;
-    }
+  public ListenableFuture<?> runAsync(
+      final QueryListener listener,
+      final DartControllerRegistry controllerRegistry,
+      final ControllerThreadPool threadPool
+  )
+  {
+    // Register controller before submitting anything to controllerExeuctor, 
so it shows up in
+    // "active controllers" lists.
+    controllerRegistry.register(this);
+
+    final ListenableFuture<?> future = 
threadPool.getExecutorService().submit(() -> {
+      final String threadName = Thread.currentThread().getName();
+      Thread.currentThread().setName(makeThreadName());
+
+      try {
+        final CaptureReportQueryListener reportListener = new 
CaptureReportQueryListener(listener);
+
+        try {
+          if (state.compareAndSet(State.ACCEPTED, State.RUNNING)) {
+            controller.run(reportListener);
+            updateStateOnQueryComplete(reportListener.getReport());
+          } else {
+            // Canceled before running.
+            reportListener.onQueryComplete(makeCanceledReport());
+          }
+        }
+        catch (Throwable e) {
+          log.warn(
+              e,
+              "Controller[%s] failed, queryId[%s], sqlQueryId[%s]",
+              controller.queryId(),
+              controller.getQueryContext().getString(BaseQuery.QUERY_ID),
+              sqlQueryId
+          );
+        }
+        finally {
+          // Build report and then call "deregister".
+          final MSQTaskReport taskReport;
+
+          if (reportListener.hasReport()) {
+            taskReport = new MSQTaskReport(controller.queryId(), 
reportListener.getReport());
+          } else {
+            taskReport = null;
+          }
+
+          final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+          reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
+          controllerRegistry.deregister(this, reportMap);
+        }
+      }
+      finally {
+        Thread.currentThread().setName(threadName);
+      }
+    });
+
+    // Must not cancel the above future, otherwise "deregister" may never get 
called. If a controller is canceled
+    // before it runs, the runnable above stays in the queue until it gets a 
thread, then it exits without running
+    // the controller.
+    return Futures.nonCancellationPropagating(future);
   }
 
   private void updateStateOnQueryComplete(final MSQTaskReportPayload report)
@@ -217,4 +239,66 @@ public class ControllerHolder
         break;
     }
   }
+
+  /**
+   * Generate a name for the thread that {@link #runAsync} uses.
+   */
+  private String makeThreadName()
+  {
+    return StringUtils.format(
+        "%s[%s]-sqlQueryId[%s]",
+        Thread.currentThread().getName(),
+        controller.queryId(),
+        sqlQueryId
+    );
+  }
+
+  private MSQTaskReportPayload makeCanceledReport()
+  {
+    final MSQErrorReport errorReport =
+        MSQErrorReport.fromFault(controller.queryId(), null, null, 
CanceledFault.userRequest());
+    final MSQStatusReport statusReport =
+        new MSQStatusReport(TaskState.FAILED, errorReport, null, null, 0, 
Map.of(), 0, 0, null, null);
+    return new MSQTaskReportPayload(statusReport, null, null, null);
+  }
+
+  public enum State
+  {
+    /**
+     * Query has been accepted, but not yet {@link 
Controller#run(QueryListener)}.
+     */
+    ACCEPTED(StandardQueryState.ACCEPTED),
+
+    /**
+     * Query has had {@link Controller#run(QueryListener)} called.
+     */
+    RUNNING(StandardQueryState.RUNNING),
+
+    /**
+     * Query has been canceled.
+     */
+    CANCELED(StandardQueryState.CANCELED),
+
+    /**
+     * Query has exited successfully.
+     */
+    SUCCESS(StandardQueryState.SUCCESS),
+
+    /**
+     * Query has failed.
+     */
+    FAILED(StandardQueryState.FAILED);
+
+    private final String statusString;
+
+    State(String statusString)
+    {
+      this.statusString = statusString;
+    }
+
+    public String getStatusString()
+    {
+      return statusString;
+    }
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
new file mode 100644
index 00000000000..8e369d6fa81
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerThreadPool.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.msq.dart.guice.DartControllerConfig;
+import org.apache.druid.msq.exec.Controller;
+
+/**
+ * Thread pool for running {@link Controller}. Number of threads is equal to
+ * {@link DartControllerConfig#getConcurrentQueries()}, which limits the 
number of concurrent controllers.
+ */
+@ManageLifecycle
+public class ControllerThreadPool
+{
+  private final ListeningExecutorService executorService;
+
+  public ControllerThreadPool(final ListeningExecutorService executorService)
+  {
+    this.executorService = executorService;
+  }
+
+  public ListeningExecutorService getExecutorService()
+  {
+    return executorService;
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    executorService.shutdown();
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index 5270c9f63d8..d9e3ad30001 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -19,43 +19,35 @@
 
 package org.apache.druid.msq.dart.controller.sql;
 
-import com.google.common.base.Throwables;
-import com.google.common.collect.Iterators;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
 import org.apache.druid.indexer.report.TaskReport;
 import org.apache.druid.io.LimitedOutputStream;
 import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Either;
-import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.guava.BaseSequence;
 import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.common.guava.SequenceWrapper;
+import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.msq.dart.controller.ControllerHolder;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
 import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
 import org.apache.druid.msq.dart.controller.DartControllerRegistry;
 import org.apache.druid.msq.dart.guice.DartControllerConfig;
-import org.apache.druid.msq.exec.Controller;
 import org.apache.druid.msq.exec.ControllerContext;
 import org.apache.druid.msq.exec.ControllerImpl;
 import org.apache.druid.msq.exec.QueryKitSpecFactory;
-import org.apache.druid.msq.exec.QueryListener;
 import org.apache.druid.msq.exec.ResultsContext;
+import org.apache.druid.msq.exec.SequenceQueryListener;
 import org.apache.druid.msq.indexing.LegacyMSQSpec;
 import org.apache.druid.msq.indexing.QueryDefMSQSpec;
 import org.apache.druid.msq.indexing.TaskReportQueryListener;
 import org.apache.druid.msq.indexing.destination.MSQDestination;
-import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.CancellationReason;
-import org.apache.druid.msq.indexing.error.MSQErrorReport;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
-import org.apache.druid.msq.indexing.report.MSQStatusReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReport;
-import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.querykit.MultiQueryKit;
 import org.apache.druid.msq.sql.MSQTaskQueryMaker;
+import org.apache.druid.msq.util.SqlStatementResourceHelper;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.segment.column.ColumnType;
@@ -68,38 +60,24 @@ import org.apache.druid.sql.calcite.rel.DruidQuery;
 import org.apache.druid.sql.calcite.run.QueryMaker;
 import org.apache.druid.sql.calcite.run.SqlResults;
 
-import javax.annotation.Nullable;
 import java.io.ByteArrayOutputStream;
-import java.time.Duration;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Optional;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
  * SQL {@link QueryMaker}. Executes queries in two ways, depending on whether 
the user asked for a full report.
  *
- * When including a full report, the controller runs in the SQL planning 
thread (typically an HTTP thread) using
- * the method {@link #runWithReport(ControllerHolder)}. The entire response is 
buffered in memory, up to
+ * When including a full report, the entire response is buffered in memory, up 
to
  * {@link DartControllerConfig#getMaxQueryReportSize()}.
  *
- * When not including a full report, the controller runs in {@link 
#controllerExecutor} and results are streamed
- * back to the user through {@link ResultIterator}. There is no limit to the 
size of the returned results.
+ * When not including a full report, results are streamed back to the user 
through a {@link SequenceQueryListener}.
+ * There is no limit to the size of the returned results.
  */
 public class DartQueryMaker implements QueryMaker
 {
-  private static final Logger log = new Logger(DartQueryMaker.class);
-
   final List<Entry<Integer, String>> fieldMapping;
   private final DartControllerContextFactory controllerContextFactory;
   private final PlannerContext plannerContext;
@@ -115,10 +93,9 @@ public class DartQueryMaker implements QueryMaker
   private final DartControllerConfig controllerConfig;
 
   /**
-   * Executor for {@link #runWithIterator(ControllerHolder)}. Number of thread 
is equal to
-   * {@link DartControllerConfig#getConcurrentQueries()}, which limits the 
number of concurrent controllers.
+   * Executor for running controllers.
    */
-  private final ExecutorService controllerExecutor;
+  private final ControllerThreadPool controllerThreadPool;
   private final ServerConfig serverConfig;
 
   final QueryKitSpecFactory queryKitSpecFactory;
@@ -130,7 +107,7 @@ public class DartQueryMaker implements QueryMaker
       PlannerContext plannerContext,
       DartControllerRegistry controllerRegistry,
       DartControllerConfig controllerConfig,
-      ExecutorService controllerExecutor,
+      ControllerThreadPool controllerThreadPool,
       QueryKitSpecFactory queryKitSpecFactory,
       MultiQueryKit queryKit,
       ServerConfig serverConfig
@@ -141,7 +118,7 @@ public class DartQueryMaker implements QueryMaker
     this.plannerContext = plannerContext;
     this.controllerRegistry = controllerRegistry;
     this.controllerConfig = controllerConfig;
-    this.controllerExecutor = controllerExecutor;
+    this.controllerThreadPool = controllerThreadPool;
     this.queryKitSpecFactory = queryKitSpecFactory;
     this.queryKit = queryKit;
     this.serverConfig = serverConfig;
@@ -166,8 +143,11 @@ public class DartQueryMaker implements QueryMaker
     return runLegacyMSQSpec(querySpec, druidQuery.getQuery().context(), 
resultsContext);
   }
 
-  public static ResultsContext makeResultsContext(DruidQuery druidQuery, 
List<Entry<Integer, String>> fieldMapping,
-      PlannerContext plannerContext)
+  public static ResultsContext makeResultsContext(
+      DruidQuery druidQuery,
+      List<Entry<Integer, String>> fieldMapping,
+      PlannerContext plannerContext
+  )
   {
     final List<Pair<SqlTypeName, ColumnType>> types = 
MSQTaskQueryMaker.getTypes(druidQuery, fieldMapping, plannerContext);
     final ResultsContext resultsContext = new ResultsContext(
@@ -177,6 +157,32 @@ public class DartQueryMaker implements QueryMaker
     return resultsContext;
   }
 
+  /**
+   * Creates a controller using {@link LegacyMSQSpec} and calls {@link 
#runController}.
+   */
+  public QueryResponse<Object[]> runLegacyMSQSpec(
+      LegacyMSQSpec querySpec,
+      QueryContext context,
+      ResultsContext resultsContext
+  )
+  {
+    final ControllerImpl controller = makeLegacyController(querySpec, context, 
resultsContext);
+    return runController(controller, context.getFullReport(), 
querySpec.getColumnMappings(), resultsContext);
+  }
+
+  /**
+   * Creates a controller using {@link QueryDefMSQSpec} and calls {@link 
#runController}.
+   */
+  public QueryResponse<Object[]> runQueryDefMSQSpec(
+      QueryDefMSQSpec querySpec,
+      QueryContext context,
+      ResultsContext resultsContext
+  )
+  {
+    final ControllerImpl controller = makeQueryDefController(querySpec, 
context, resultsContext);
+    return runController(controller, context.getFullReport(), 
querySpec.getColumnMappings(), resultsContext);
+  }
+
   private ControllerImpl makeLegacyController(LegacyMSQSpec querySpec, 
QueryContext context, ResultsContext resultsContext)
   {
     final ControllerContext controllerContext = 
controllerContextFactory.newContext(context);
@@ -201,46 +207,6 @@ public class DartQueryMaker implements QueryMaker
     );
   }
 
-  public QueryResponse<Object[]> runLegacyMSQSpec(LegacyMSQSpec querySpec, 
QueryContext context, ResultsContext resultsContext)
-  {
-    final ControllerImpl controller = makeLegacyController(querySpec, context, 
resultsContext);
-    return runController(controller, context.getFullReport());
-  }
-
-  public QueryResponse<Object[]> runQueryDefMSQSpec(QueryDefMSQSpec querySpec, 
QueryContext context, ResultsContext resultsContext)
-  {
-    final ControllerImpl controller = makeQueryDefController(querySpec, 
context, resultsContext);
-    return runController(controller, context.getFullReport());
-  }
-
-  private QueryResponse<Object[]> runController(final ControllerImpl 
controller, final boolean fullReport)
-  {
-    final ControllerHolder controllerHolder = new ControllerHolder(
-        controller,
-        plannerContext.getSqlQueryId(),
-        plannerContext.getSql(),
-        plannerContext.getAuthenticationResult(),
-        DateTimes.nowUtc()
-    );
-
-    // Register controller before submitting anything to controllerExeuctor, 
so it shows up in
-    // "active controllers" lists.
-    controllerRegistry.register(controllerHolder);
-
-    try {
-      // runWithReport, runWithoutReport are responsible for calling 
controllerRegistry.deregister(controllerHolder)
-      // when their work is done.
-      final Sequence<Object[]> results =
-          fullReport ? runWithReport(controllerHolder) : 
runWithIterator(controllerHolder);
-      return QueryResponse.withEmptyContext(results);
-    }
-    catch (Throwable e) {
-      // Error while calling runWithReport or runWithoutReport. Deregister 
controller immediately.
-      controllerRegistry.deregister(controllerHolder, null);
-      throw e;
-    }
-  }
-
   /**
    * Adds the timeout parameter to the query context, considering the default 
and maximum values from
    * {@link ServerConfig}.
@@ -254,353 +220,124 @@ public class DartQueryMaker implements QueryMaker
   }
 
   /**
-   * Run a query and return the full report, buffered in memory up to
-   * {@link DartControllerConfig#getMaxQueryReportSize()}.
+   * Runs a controller in {@link #controllerThreadPool} and returns a {@link 
QueryResponse} object.
    *
-   * Arranges for {@link DartControllerRegistry#deregister} to be called upon 
completion (either success or failure).
+   * @param controller     controller to run
+   * @param fullReport     if true, buffer the results into a report and 
return it in a single row.
+   *                       if false, stream the results back
+   * @param columnMappings SQL column mappings
+   * @param resultsContext SQL results context
    */
-  private Sequence<Object[]> runWithReport(final ControllerHolder 
controllerHolder)
+  private QueryResponse<Object[]> runController(
+      final ControllerImpl controller,
+      final boolean fullReport,
+      final ColumnMappings columnMappings,
+      final ResultsContext resultsContext
+  )
   {
-    final Future<TaskReport.ReportMap> reportFuture;
-
-    // Run in controllerExecutor. Control doesn't really *need* to be moved to 
another thread, but we have to
-    // use the controllerExecutor anyway, to ensure we respect the 
concurrentQueries configuration.
-    reportFuture = controllerExecutor.submit(() -> {
-      TaskReport.ReportMap retVal = null;
-      final String threadName = Thread.currentThread().getName();
-
-      try {
-        Thread.currentThread().setName(nameThread(plannerContext));
-
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        final TaskReportQueryListener queryListener = new 
TaskReportQueryListener(
-            () -> new LimitedOutputStream(
-                baos,
-                controllerConfig.getMaxQueryReportSize(),
-                limit -> StringUtils.format(
-                    "maxQueryReportSize[%,d] exceeded. "
-                    + "Try limiting the result set for your query, or run it 
with %s[false]",
-                    limit,
-                    QueryContexts.CTX_FULL_REPORT
-                )
-            ),
-            plannerContext.getJsonMapper(),
-            controllerHolder.getController().queryId(),
-            Collections.emptyMap(),
-            MSQDestination.UNLIMITED
-        );
-
-        if (controllerHolder.run(queryListener)) {
-          retVal = plannerContext.getJsonMapper()
-                                 .readValue(baos.toByteArray(), 
TaskReport.ReportMap.class);
-        } else {
-          // Controller was canceled before it ran.
-          throw MSQErrorReport
-              .fromFault(
-                  controllerHolder.getController().queryId(),
-                  null,
-                  null,
-                  CanceledFault.userRequest()
-              )
-              .toDruidException();
-        }
-      }
-      finally {
-        controllerRegistry.deregister(controllerHolder, retVal);
-        Thread.currentThread().setName(threadName);
-      }
-
-      return retVal;
-    });
-
-    // Return a sequence that reads one row (the report) from reportFuture.
-    return new BaseSequence<>(
-        new BaseSequence.IteratorMaker<>()
-        {
-          @Override
-          public Iterator<Object[]> make()
-          {
-            try {
-              return Iterators.singletonIterator(new 
Object[]{reportFuture.get()});
-            }
-            catch (InterruptedException e) {
-              throw new RuntimeException(e);
-            }
-            catch (ExecutionException e) {
-              // Unwrap ExecutionExceptions, so errors such as DruidException 
are serialized properly.
-              Throwables.throwIfUnchecked(e.getCause());
-              throw new RuntimeException(e.getCause());
-            }
-          }
-
-          @Override
-          public void cleanup(Iterator<Object[]> iterFromMake)
-          {
-            // Nothing to do.
-          }
-        }
+    final ControllerHolder controllerHolder = new ControllerHolder(
+        controller,
+        plannerContext.getSqlQueryId(),
+        plannerContext.getSql(),
+        plannerContext.getAuthenticationResult(),
+        DateTimes.nowUtc()
     );
+
+    // runWithReport, runWithoutReport are responsible for calling 
controllerRegistry.deregister(controllerHolder)
+    // when their work is done.
+    final Sequence<Object[]> results =
+        fullReport
+        ? runWithReport(controllerHolder, columnMappings, resultsContext)
+        : runWithSequence(controllerHolder, columnMappings, resultsContext);
+    return QueryResponse.withEmptyContext(results);
   }
 
   /**
-   * Run a query and return the results only, streamed back using {@link 
ResultIteratorMaker}.
+   * Run a query and return the full report, buffered in memory up to
+   * {@link DartControllerConfig#getMaxQueryReportSize()}.
    *
    * Arranges for {@link DartControllerRegistry#deregister} to be called upon 
completion (either success or failure).
    */
-  private Sequence<Object[]> runWithIterator(final ControllerHolder 
controllerHolder)
-  {
-    return new BaseSequence<>(new ResultIteratorMaker(controllerHolder));
-  }
-
-  /**
-   * Generate a name for a thread in {@link #controllerExecutor}.
-   */
-  private String nameThread(final PlannerContext plannerContext)
+  private Sequence<Object[]> runWithReport(
+      final ControllerHolder controllerHolder,
+      final ColumnMappings columnMappings,
+      final ResultsContext resultsContext
+  )
   {
-    return StringUtils.format(
-        "%s-sqlQueryId[%s]-queryId[%s]",
-        Thread.currentThread().getName(),
-        plannerContext.getSqlQueryId(),
-        plannerContext.queryContext().get(QueryContexts.CTX_DART_QUERY_ID)
+    final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    final TaskReportQueryListener listener = new TaskReportQueryListener(
+        () -> new LimitedOutputStream(
+            baos,
+            controllerConfig.getMaxQueryReportSize(),
+            limit -> StringUtils.format(
+                "maxQueryReportSize[%,d] exceeded. "
+                + "Try limiting the result set for your query, or run it with 
%s[false]",
+                limit,
+                QueryContexts.CTX_FULL_REPORT
+            )
+        ),
+        plannerContext.getJsonMapper(),
+        controllerHolder.getController().queryId(),
+        Collections.emptyMap(),
+        MSQDestination.UNLIMITED,
+        columnMappings,
+        resultsContext
     );
-  }
-
-  /**
-   * Helper for {@link #runWithIterator(ControllerHolder)}.
-   */
-  class ResultIteratorMaker implements BaseSequence.IteratorMaker<Object[], 
ResultIterator>
-  {
-    private final ControllerHolder controllerHolder;
-    private final ResultIterator resultIterator;
-    private boolean made;
 
-    public ResultIteratorMaker(ControllerHolder holder)
-    {
-      this.controllerHolder = holder;
-      this.resultIterator = new 
ResultIterator(controllerHolder.getController().getQueryContext().getTimeoutDuration());
-      submitController();
-    }
-
-    /**
-     * Submits the controller to the executor in the constructor, and remove 
it from the registry when the
-     * future resolves.
-     */
-    private void submitController()
-    {
-      controllerExecutor.submit(() -> {
-        final Controller controller = controllerHolder.getController();
-        final String threadName = Thread.currentThread().getName();
-
-        try {
-          Thread.currentThread().setName(nameThread(plannerContext));
-
-          if (!controllerHolder.run(resultIterator)) {
-            // Controller was canceled before it ran. Push a cancellation 
error to the resultIterator, so the sequence
-            // returned by "runWithoutReport" can resolve.
-            resultIterator.pushError(
-                MSQErrorReport.fromFault(
-                    controllerHolder.getController().queryId(),
-                    null,
-                    null,
-                    CanceledFault.userRequest()
-                ).toDruidException()
-            );
-          }
-        }
-        catch (Exception e) {
-          log.warn(
-              e,
-              "Controller failed for sqlQueryId[%s], controllerHost[%s]",
-              plannerContext.getSqlQueryId(),
-              controller.queryId()
-          );
-        }
-        catch (Throwable e) {
-          log.error(
-              e,
-              "Controller failed for sqlQueryId[%s], controllerHost[%s]",
-              plannerContext.getSqlQueryId(),
-              controller.queryId()
-          );
-          throw e;
-        }
-        finally {
-          final MSQTaskReport taskReport;
-
-          if (resultIterator.report != null) {
-            taskReport = new MSQTaskReport(
-                controllerHolder.getController().queryId(),
-                resultIterator.report
-            );
-          } else {
-            taskReport = null;
-          }
+    try {
+      // Submit controller and wait for it to finish.
+      controllerHolder.runAsync(listener, controllerRegistry, 
controllerThreadPool).get();
 
-          final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
-          reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
-          controllerRegistry.deregister(controllerHolder, reportMap);
-          Thread.currentThread().setName(threadName);
-        }
-      });
+      // Return a sequence with just one row (the report).
+      final TaskReport.ReportMap reportMap =
+          plannerContext.getJsonMapper().readValue(baos.toByteArray(), 
TaskReport.ReportMap.class);
+      return Sequences.simple(List.<Object[]>of(new Object[]{reportMap}));
     }
-
-    @Override
-    public ResultIterator make()
-    {
-      if (made) {
-        throw new ISE("Cannot call make() more than once");
-      }
-
-      made = true;
-      return resultIterator;
+    catch (InterruptedException e) {
+      controllerHolder.cancel(CancellationReason.UNKNOWN);
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
     }
-
-    @Override
-    public void cleanup(final ResultIterator iterFromMake)
-    {
-      if (!iterFromMake.complete) {
-        controllerHolder.cancel(CancellationReason.UNKNOWN);
-      }
+    catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 
   /**
-   * Helper for {@link ResultIteratorMaker}, which is in turn a helper for 
{@link #runWithIterator(ControllerHolder)}.
+   * Run a query and return the results only, streamed back using {@link 
SequenceQueryListener}.
+   *
+   * Arranges for {@link DartControllerRegistry#deregister} to be called upon 
completion (either success or failure).
    */
-  static class ResultIterator implements Iterator<Object[]>, QueryListener
+  private Sequence<Object[]> runWithSequence(
+      final ControllerHolder controllerHolder,
+      final ColumnMappings columnMappings,
+      final ResultsContext resultsContext
+  )
   {
-    /**
-     * Number of rows to buffer from {@link #onResultRow(Object[])}.
-     */
-    private static final int BUFFER_SIZE = 128;
-
-    /**
-     * Empty optional signifies results are complete.
-     */
-    private final BlockingQueue<Either<Throwable, Object[]>> rowBuffer = new 
ArrayBlockingQueue<>(BUFFER_SIZE);
-
-    /**
-     * Only accessed by {@link Iterator} methods, so no need to be thread-safe.
-     */
-    @Nullable
-    private Either<Throwable, Object[]> current;
-
-    private volatile boolean complete;
-    private volatile MSQTaskReportPayload report;
-
-    @Nullable
-    private final Duration timeout;
-
-    public ResultIterator(@Nullable Duration timeout)
-    {
-      this.timeout = timeout;
-    }
-
-    @Override
-    public boolean hasNext()
-    {
-      return populateAndReturnCurrent().isPresent();
-    }
-
-    @Override
-    public Object[] next()
-    {
-      final Object[] retVal = 
populateAndReturnCurrent().orElseThrow(NoSuchElementException::new);
-      current = null;
-      return retVal;
-    }
-
-    private Optional<Object[]> populateAndReturnCurrent()
-    {
-      if (current == null) {
-        try {
-          if (timeout != null) {
-            current = rowBuffer.poll(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
-            if (current == null) {
-              throw DruidException.defensive("Result reader timed out [%s]", 
timeout);
+    final SequenceQueryListener listener = new SequenceQueryListener();
+    final ListenableFuture<?> runFuture =
+        controllerHolder.runAsync(listener, controllerRegistry, 
controllerThreadPool);
+
+    return Sequences.wrap(
+        listener.getSequence().flatMap(
+            rac -> SqlStatementResourceHelper.getResultSequence(
+                rac.as(Frame.class),
+                listener.getFrameReader(),
+                columnMappings,
+                resultsContext,
+                plannerContext.getJsonMapper()
+            )
+        ),
+        new SequenceWrapper()
+        {
+          @Override
+          public void after(final boolean isDone, final Throwable thrown)
+          {
+            if (!isDone || thrown != null) {
+              runFuture.cancel(true); // Cancel on early stop or failure
             }
-          } else {
-            current = rowBuffer.take();
           }
         }
-        catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        }
-      }
-
-      if (current.isValue()) {
-        return Optional.ofNullable(current.valueOrThrow());
-      } else {
-        // Don't use valueOrThrow to throw errors; here we *don't* want the 
wrapping in RuntimeException
-        // that Either.valueOrThrow does. We want the original DruidException 
to be propagated to the user, if
-        // there is one.
-        final Throwable e = current.error();
-        Throwables.throwIfUnchecked(e);
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public boolean readResults()
-    {
-      return !complete;
-    }
-
-    @Override
-    public void onResultsStart(
-        final List<MSQResultsReport.ColumnAndType> signature,
-        @Nullable final List<SqlTypeName> sqlTypeNames
-    )
-    {
-      // Nothing to do.
-    }
-
-    @Override
-    public boolean onResultRow(Object[] row)
-    {
-      try {
-        rowBuffer.put(Either.value(row));
-        return !complete;
-      }
-      catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void onResultsComplete()
-    {
-      // Nothing to do.
-    }
-
-    @Override
-    public void onQueryComplete(MSQTaskReportPayload report)
-    {
-      try {
-        this.report = report;
-        this.complete = true;
-
-        final MSQStatusReport statusReport = report.getStatus();
-
-        if (statusReport.getStatus().isSuccess()) {
-          rowBuffer.put(Either.value(null));
-        } else {
-          pushError(statusReport.getErrorReport().toDruidException());
-        }
-      }
-      catch (InterruptedException e) {
-        // Can't fix this by pushing an error, because the rowBuffer isn't 
accepting new entries.
-        // Give up, allow controllerHolder.run() to fail.
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
-      }
-    }
-
-    public void pushError(final Throwable e) throws InterruptedException
-    {
-      rowBuffer.put(Either.error(e));
-    }
+    );
   }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index 954c44d2305..2410ea8486d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -32,10 +32,10 @@ import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.dart.Dart;
 import org.apache.druid.msq.dart.controller.ControllerHolder;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
 import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
 import org.apache.druid.msq.dart.controller.DartControllerRegistry;
 import org.apache.druid.msq.dart.controller.QueryInfoAndReport;
@@ -72,7 +72,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 @LazySingleton
@@ -84,7 +83,7 @@ public class DartSqlEngine implements SqlEngine
   private final DartControllerContextFactory controllerContextFactory;
   private final DartControllerRegistry controllerRegistry;
   private final DartControllerConfig controllerConfig;
-  private final ExecutorService controllerExecutor;
+  private final ControllerThreadPool controllerThreadPool;
   private final ServerConfig serverConfig;
   private final QueryKitSpecFactory queryKitSpecFactory;
   private final MultiQueryKit queryKit;
@@ -97,6 +96,7 @@ public class DartSqlEngine implements SqlEngine
       DartControllerContextFactory controllerContextFactory,
       DartControllerRegistry controllerRegistry,
       DartControllerConfig controllerConfig,
+      @Dart ControllerThreadPool controllerThreadPool,
       DartQueryKitSpecFactory queryKitSpecFactory,
       MultiQueryKit queryKit,
       ServerConfig serverConfig,
@@ -104,38 +104,11 @@ public class DartSqlEngine implements SqlEngine
       SqlToolbox toolbox,
       DartSqlClients sqlClients
   )
-  {
-    this(
-        controllerContextFactory,
-        controllerRegistry,
-        controllerConfig,
-        Execs.multiThreaded(controllerConfig.getConcurrentQueries(), 
"dart-controller-%s"),
-        queryKitSpecFactory,
-        queryKit,
-        serverConfig,
-        dartQueryConfig,
-        toolbox,
-        sqlClients
-    );
-  }
-
-  public DartSqlEngine(
-      DartControllerContextFactory controllerContextFactory,
-      DartControllerRegistry controllerRegistry,
-      DartControllerConfig controllerConfig,
-      ExecutorService controllerExecutor,
-      QueryKitSpecFactory queryKitSpecFactory,
-      MultiQueryKit queryKit,
-      ServerConfig serverConfig,
-      DefaultQueryConfig dartQueryConfig,
-      SqlToolbox toolbox,
-      DartSqlClients sqlClients
-  )
   {
     this.controllerContextFactory = controllerContextFactory;
     this.controllerRegistry = controllerRegistry;
     this.controllerConfig = controllerConfig;
-    this.controllerExecutor = controllerExecutor;
+    this.controllerThreadPool = controllerThreadPool;
     this.queryKitSpecFactory = queryKitSpecFactory;
     this.queryKit = queryKit;
     this.serverConfig = serverConfig;
@@ -224,7 +197,7 @@ public class DartSqlEngine implements SqlEngine
         plannerContext,
         controllerRegistry,
         controllerConfig,
-        controllerExecutor,
+        controllerThreadPool,
         queryKitSpecFactory,
         queryKit,
         serverConfig
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
index 3ee647e65ba..a4175ecad0f 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.dart.guice;
 
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Binder;
 import com.google.inject.Inject;
 import com.google.inject.Module;
@@ -34,9 +35,11 @@ import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.LoadScope;
 import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.msq.dart.Dart;
 import org.apache.druid.msq.dart.DartResourcePermissionMapper;
 import org.apache.druid.msq.dart.controller.ControllerMessageListener;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
 import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
 import org.apache.druid.msq.dart.controller.DartControllerContextFactoryImpl;
 import org.apache.druid.msq.dart.controller.DartControllerRegistry;
@@ -122,6 +125,21 @@ public class DartControllerModule implements DruidModule
     }
   }
 
+  @Provides
+  @Dart
+  @ManageLifecycle
+  public ControllerThreadPool makeControllerThreadPool(DartControllerConfig 
dartControllerConfig)
+  {
+    return new ControllerThreadPool(
+        MoreExecutors.listeningDecorator(
+            Execs.multiThreaded(
+                dartControllerConfig.getConcurrentQueries(),
+                "dart-controller-%s"
+            )
+        )
+    );
+  }
+
   @Override
   public List<? extends com.fasterxml.jackson.databind.Module> 
getJacksonModules()
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
index 597ffc82692..7632ca2ecf2 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
@@ -19,13 +19,12 @@
 
 package org.apache.druid.msq.exec;
 
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.druid.error.DruidException;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
 import javax.annotation.Nullable;
-import java.util.List;
 
 /**
  * A {@link QueryListener} wrapper that captures the report from {@link 
#onQueryComplete(MSQTaskReportPayload)}.
@@ -42,6 +41,14 @@ public class CaptureReportQueryListener implements 
QueryListener
     this.delegate = delegate;
   }
 
+  /**
+   * Whether this listener has captured a report. Will be true if the query 
has completed, false otherwise.
+   */
+  public boolean hasReport()
+  {
+    return report != null;
+  }
+
   /**
    * Retrieves the report. Can only be called once the query is complete.
    */
@@ -61,18 +68,15 @@ public class CaptureReportQueryListener implements 
QueryListener
   }
 
   @Override
-  public void onResultsStart(
-      final List<MSQResultsReport.ColumnAndType> signature,
-      @Nullable final List<SqlTypeName> sqlTypeNames
-  )
+  public void onResultsStart(final FrameReader frameReader)
   {
-    delegate.onResultsStart(signature, sqlTypeNames);
+    delegate.onResultsStart(frameReader);
   }
 
   @Override
-  public boolean onResultRow(final Object[] row)
+  public boolean onResultBatch(RowsAndColumns rac)
   {
-    return delegate.onResultRow(row);
+    return delegate.onResultBatch(rac);
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 879f8177868..78d021dca6c 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -2839,9 +2839,6 @@ public class ControllerImpl implements Controller
         final ControllerQueryResultsReader resultsReader = new 
ControllerQueryResultsReader(
             resultsChannel,
             queryDef.getFinalStageDefinition().getFrameReader(),
-            querySpec.getColumnMappings(),
-            resultsContext,
-            context.jsonMapper(),
             queryListener
         );
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
index 6ef4b743c21..e8766f049fd 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerQueryResultsReader.java
@@ -19,25 +19,14 @@
 
 package org.apache.druid.msq.exec;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
 import org.apache.druid.frame.channel.WritableFrameChannel;
 import org.apache.druid.frame.processor.FrameProcessor;
 import org.apache.druid.frame.processor.FrameProcessors;
 import org.apache.druid.frame.processor.ReturnOrAwait;
 import org.apache.druid.frame.read.FrameReader;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
-import org.apache.druid.msq.util.SqlStatementResourceHelper;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.planner.ColumnMapping;
-import org.apache.druid.sql.calcite.planner.ColumnMappings;
-import org.apache.druid.utils.CloseableUtils;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -52,9 +41,6 @@ public class ControllerQueryResultsReader implements 
FrameProcessor<Void>
 
   private final ReadableFrameChannel in;
   private final FrameReader frameReader;
-  private final ColumnMappings columnMappings;
-  private final ResultsContext resultsContext;
-  private final ObjectMapper jsonMapper;
   private final QueryListener queryListener;
 
   private boolean wroteResultsStart;
@@ -62,17 +48,11 @@ public class ControllerQueryResultsReader implements 
FrameProcessor<Void>
   ControllerQueryResultsReader(
       final ReadableFrameChannel in,
       final FrameReader frameReader,
-      final ColumnMappings columnMappings,
-      final ResultsContext resultsContext,
-      final ObjectMapper jsonMapper,
       final QueryListener queryListener
   )
   {
     this.in = in;
     this.frameReader = frameReader;
-    this.columnMappings = columnMappings;
-    this.resultsContext = resultsContext;
-    this.jsonMapper = jsonMapper;
     this.queryListener = queryListener;
   }
 
@@ -96,23 +76,7 @@ public class ControllerQueryResultsReader implements 
FrameProcessor<Void>
     }
 
     if (!wroteResultsStart) {
-      final RowSignature querySignature = frameReader.signature();
-      final ImmutableList.Builder<MSQResultsReport.ColumnAndType> 
mappedSignature = ImmutableList.builder();
-
-      for (final ColumnMapping mapping : columnMappings.getMappings()) {
-        mappedSignature.add(
-            new MSQResultsReport.ColumnAndType(
-                mapping.getOutputColumn(),
-                
querySignature.getColumnType(mapping.getQueryColumn()).orElse(null)
-            )
-        );
-      }
-
-      queryListener.onResultsStart(
-          mappedSignature.build(),
-          resultsContext.getSqlTypeNames()
-      );
-
+      queryListener.onResultsStart(frameReader);
       wroteResultsStart = true;
     }
 
@@ -121,31 +85,7 @@ public class ControllerQueryResultsReader implements 
FrameProcessor<Void>
       queryListener.onResultsComplete();
       return ReturnOrAwait.returnObject(null);
     } else {
-      final Frame frame = in.readFrame();
-      Yielder<Object[]> rowYielder = Yielders.each(
-          SqlStatementResourceHelper.getResultSequence(
-              frame,
-              frameReader,
-              columnMappings,
-              resultsContext,
-              jsonMapper
-          )
-      );
-
-      try {
-        while (!rowYielder.isDone()) {
-          if (queryListener.onResultRow(rowYielder.get())) {
-            rowYielder = rowYielder.next(null);
-          } else {
-            // Caller wanted to stop reading.
-            return ReturnOrAwait.returnObject(null);
-          }
-        }
-      }
-      finally {
-        CloseableUtils.closeAndSuppressExceptions(rowYielder, e -> log.warn(e, 
"Failed to close frame yielder"));
-      }
-
+      queryListener.onResultBatch(in.read());
       return ReturnOrAwait.awaitAll(inputChannels().size());
     }
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryListener.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryListener.java
index 997fe4c8682..1987c4cc666 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryListener.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/QueryListener.java
@@ -19,12 +19,10 @@
 
 package org.apache.druid.msq.exec;
 
-import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.msq.indexing.report.MSQResultsReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-
-import javax.annotation.Nullable;
-import java.util.List;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
 /**
  * Object passed to {@link Controller#run(QueryListener)} to enable retrieval 
of results, status, counters, etc.
@@ -39,27 +37,21 @@ public interface QueryListener
   /**
    * Called when results start coming in.
    *
-   * @param signature    signature of results
-   * @param sqlTypeNames SQL type names of results; same length as the 
signature
+   * @param frameReader reader for frames that will be passed to {@link 
#onResultBatch(RowsAndColumns)}
    */
-  void onResultsStart(
-      List<MSQResultsReport.ColumnAndType> signature,
-      @Nullable List<SqlTypeName> sqlTypeNames
-  );
+  void onResultsStart(FrameReader frameReader);
 
   /**
-   * Called for each result row. Follows a call to {@link 
#onResultsStart(List, List)}.
-   *
-   * @param row result row
+   * Called for each result batch. Follows a call to {@link #onResultsStart}.
    *
    * @return whether the controller should keep reading results
    */
-  boolean onResultRow(Object[] row);
+  boolean onResultBatch(RowsAndColumns rac);
 
   /**
-   * Called after the last result has been delivered by {@link 
#onResultRow(Object[])}. Only called if results are
-   * actually complete. If results are truncated due to {@link #readResults()} 
or {@link #onResultRow(Object[])}
-   * returning false, this method is not called.
+   * Called after the last result has been delivered by {@link 
#onResultBatch(RowsAndColumns)}. Only called if results
+   * are actually complete. If results are truncated due to {@link 
#readResults()} or
+   * {@link #onResultBatch(RowsAndColumns)} returning false, this method is 
not called.
    */
   void onResultsComplete();
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SequenceQueryListener.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SequenceQueryListener.java
new file mode 100644
index 00000000000..6837a9f6c4b
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/SequenceQueryListener.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.channel.BlockingQueueFrameChannel;
+import org.apache.druid.frame.channel.FrameChannelSequence;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+
+import java.io.IOException;
+
+/**
+ * Listener that provides a {@link Sequence} of {@link RowsAndColumns}, via 
{@link #getSequence()}.
+ */
+public class SequenceQueryListener implements QueryListener
+{
+  private static final int DEFAULT_CAPACITY = 2;
+
+  private final BlockingQueueFrameChannel channel;
+  private volatile FrameReader frameReader;
+
+  public SequenceQueryListener()
+  {
+    this(DEFAULT_CAPACITY);
+  }
+
+  public SequenceQueryListener(final int bufferSize)
+  {
+    this.channel = new BlockingQueueFrameChannel(bufferSize);
+  }
+
+  /**
+   * Returns a {@link Sequence} of {@link RowsAndColumns} read from the 
channel. The sequence performs blocking
+   * reads; it should be consumed from a thread that is allowed to block.
+   */
+  public Sequence<RowsAndColumns> getSequence()
+  {
+    return new FrameChannelSequence(channel.readable());
+  }
+
+  /**
+   * Returns the {@link FrameReader} set by {@link 
#onResultsStart(FrameReader)}.
+   */
+  public FrameReader getFrameReader()
+  {
+    return frameReader;
+  }
+
+  @Override
+  public boolean readResults()
+  {
+    return true;
+  }
+
+  @Override
+  public void onResultsStart(final FrameReader frameReader)
+  {
+    this.frameReader = frameReader;
+  }
+
+  @Override
+  public boolean onResultBatch(final RowsAndColumns rac)
+  {
+    try {
+      final WritableFrameChannel writable = channel.writable();
+
+      // Blocking write.
+      FutureUtils.getUnchecked(writable.writabilityFuture(), false);
+      writable.write(rac);
+      return true;
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void onResultsComplete()
+  {
+    // Nothing to do. We'll close the writable channel in onQueryComplete.
+  }
+
+  @Override
+  public void onQueryComplete(final MSQTaskReportPayload report)
+  {
+    try {
+      final WritableFrameChannel writable = channel.writable();
+
+      if (!writable.isClosed()) {
+        if (!report.getStatus().getStatus().isSuccess()) {
+          
writable.fail(report.getStatus().getErrorReport().toDruidException());
+        }
+
+        writable.close();
+      }
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index ab051344921..c43d855ad50 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -269,12 +269,15 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery,
         injector.getInstance(MSQTaskQueryKitSpecFactory.class)
     );
 
+    final ResultsContext resultsContext = new 
ResultsContext(getSqlTypeNames(), getSqlResultsContext());
     final TaskReportQueryListener queryListener = new TaskReportQueryListener(
         () -> 
toolbox.getTaskReportFileWriter().openReportOutputStream(getId()),
         toolbox.getJsonMapper(),
         getId(),
         getContext(),
-        querySpec.getDestination().getRowsInTaskReport()
+        querySpec.getDestination().getRowsInTaskReport(),
+        querySpec.getColumnMappings(),
+        resultsContext
     );
 
     controller.run(queryListener);
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
index 4c21ca005ee..4cb9f957d39 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
@@ -22,21 +22,31 @@ package org.apache.druid.msq.indexing;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializerProvider;
-import org.apache.calcite.sql.type.SqlTypeName;
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.indexer.report.TaskContextReport;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.msq.exec.OutputChannelMode;
 import org.apache.druid.msq.exec.QueryListener;
+import org.apache.druid.msq.exec.ResultsContext;
 import org.apache.druid.msq.indexing.destination.MSQDestination;
 import org.apache.druid.msq.indexing.report.MSQResultsReport;
 import org.apache.druid.msq.indexing.report.MSQStatusReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.msq.util.SqlStatementResourceHelper;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -67,18 +77,25 @@ public class TaskReportQueryListener implements 
QueryListener
   private final SerializerProvider serializers;
   private final String taskId;
   private final Map<String, Object> taskContext;
+  @Nullable
+  private final ColumnMappings columnMappings;
+  @Nullable
+  private final ResultsContext resultsContext;
 
   private JsonGenerator jg;
   private long numResults;
   private MSQStatusReport statusReport;
   private boolean resultsCurrentlyOpen;
+  private FrameReader frameReader; // Set after onResultsStart
 
   public TaskReportQueryListener(
       final OutputStreamSupplier reportSink,
       final ObjectMapper jsonMapper,
       final String taskId,
       final Map<String, Object> taskContext,
-      final long rowsInTaskReport
+      final long rowsInTaskReport,
+      @Nullable final ColumnMappings columnMappings,
+      @Nullable final ResultsContext resultsContext
   )
   {
     this.reportSink = reportSink;
@@ -87,6 +104,36 @@ public class TaskReportQueryListener implements 
QueryListener
     this.taskId = taskId;
     this.taskContext = taskContext;
     this.rowsInTaskReport = rowsInTaskReport;
+    this.columnMappings = columnMappings;
+    this.resultsContext = resultsContext;
+  }
+
+  /**
+   * Maps {@link FrameReader#signature()} using {@link ColumnMappings}, then 
returns the result in the
+   * form expected for {@link MSQResultsReport#getSignature()}.
+   */
+  public static List<MSQResultsReport.ColumnAndType> computeResultSignature(
+      final FrameReader frameReader,
+      @Nullable final ColumnMappings columnMappings
+  )
+  {
+    if (columnMappings == null) {
+      return computeResultSignature(frameReader, 
ColumnMappings.identity(frameReader.signature()));
+    }
+
+    final RowSignature querySignature = frameReader.signature();
+    final ImmutableList.Builder<MSQResultsReport.ColumnAndType> 
mappedSignature = ImmutableList.builder();
+
+    for (final ColumnMapping mapping : columnMappings.getMappings()) {
+      mappedSignature.add(
+          new MSQResultsReport.ColumnAndType(
+              mapping.getOutputColumn(),
+              
querySignature.getColumnType(mapping.getQueryColumn()).orElse(null)
+          )
+      );
+    }
+
+    return mappedSignature.build();
   }
 
   @Override
@@ -96,16 +143,18 @@ public class TaskReportQueryListener implements 
QueryListener
   }
 
   @Override
-  public void onResultsStart(List<MSQResultsReport.ColumnAndType> signature, 
@Nullable List<SqlTypeName> sqlTypeNames)
+  public void onResultsStart(final FrameReader frameReader)
   {
+    this.frameReader = frameReader;
+
     try {
       openGenerator();
       resultsCurrentlyOpen = true;
 
       jg.writeObjectFieldStart(FIELD_RESULTS);
-      writeObjectField(FIELD_RESULTS_SIGNATURE, signature);
-      if (sqlTypeNames != null) {
-        writeObjectField(FIELD_RESULTS_SQL_TYPE_NAMES, sqlTypeNames);
+      writeObjectField(FIELD_RESULTS_SIGNATURE, 
computeResultSignature(frameReader, columnMappings));
+      if (resultsContext != null && resultsContext.getSqlTypeNames() != null) {
+        writeObjectField(FIELD_RESULTS_SQL_TYPE_NAMES, 
resultsContext.getSqlTypeNames());
       }
       jg.writeArrayFieldStart(FIELD_RESULTS_RESULTS);
     }
@@ -115,16 +164,40 @@ public class TaskReportQueryListener implements 
QueryListener
   }
 
   @Override
-  public boolean onResultRow(Object[] row)
+  public boolean onResultBatch(RowsAndColumns rac)
   {
-    try {
-      JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, row);
-      numResults++;
-      return rowsInTaskReport == MSQDestination.UNLIMITED || numResults < 
rowsInTaskReport;
+    final Frame frame = rac.as(Frame.class);
+    if (frame == null) {
+      throw DruidException.defensive(
+          "Expected Frame, got RAC[%s]. Can only handle Frames in task 
reports.",
+          rac.getClass().getName()
+      );
     }
-    catch (IOException e) {
-      throw new RuntimeException(e);
+
+    final Iterator<Object[]> resultIterator = 
SqlStatementResourceHelper.getResultIterator(
+        frame,
+        frameReader,
+        columnMappings,
+        resultsContext,
+        jsonMapper
+    );
+
+    while (resultIterator.hasNext()) {
+      final Object[] row = resultIterator.next();
+      try {
+        JacksonUtils.writeObjectUsingSerializerProvider(jg, serializers, row);
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      numResults++;
+
+      if (rowsInTaskReport != MSQDestination.UNLIMITED && numResults >= 
rowsInTaskReport) {
+        return false;
+      }
     }
+
+    return true;
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
index f70dd81caee..f8662643c95 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/SqlStatementResourceHelper.java
@@ -304,7 +304,7 @@ public class SqlStatementResourceHelper
     ));
   }
 
-  public static Sequence<Object[]> getResultSequence(
+  public static Iterator<Object[]> getResultIterator(
       final Frame resultsFrame,
       final FrameReader resultFrameReader,
       final ColumnMappings resultColumnMappings,
@@ -321,7 +321,7 @@ public class SqlStatementResourceHelper
                             .map(mapping -> 
columnSelectorFactory.makeColumnValueSelector(mapping.getQueryColumn()))
                             .collect(Collectors.toList());
 
-    final Iterable<Object[]> retVal = () -> new Iterator<>()
+    return new Iterator<>()
     {
       @Override
       public boolean hasNext()
@@ -356,7 +356,19 @@ public class SqlStatementResourceHelper
         return row;
       }
     };
-    return Sequences.simple(retVal);
+  }
+
+  public static Sequence<Object[]> getResultSequence(
+      final Frame resultsFrame,
+      final FrameReader resultFrameReader,
+      final ColumnMappings resultColumnMappings,
+      final ResultsContext resultsContext,
+      final ObjectMapper jsonMapper
+  )
+  {
+    return Sequences.simple(
+        () -> getResultIterator(resultsFrame, resultFrameReader, 
resultColumnMappings, resultsContext, jsonMapper)
+    );
   }
 
   @Nullable
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 4cf48396142..0b77461d178 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.report.TaskReport;
 import org.apache.druid.indexing.common.TaskLockType;
@@ -36,6 +37,7 @@ import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.msq.dart.controller.ControllerHolder;
+import org.apache.druid.msq.dart.controller.ControllerThreadPool;
 import org.apache.druid.msq.dart.controller.DartControllerRegistry;
 import org.apache.druid.msq.dart.controller.sql.DartQueryMaker;
 import org.apache.druid.msq.dart.controller.sql.DartSqlClient;
@@ -149,7 +151,7 @@ public class DartSqlResourceTest extends MSQTestBase
 
   private SqlResource sqlResource;
   private DartControllerRegistry controllerRegistry;
-  private ExecutorService controllerExecutor;
+  private ControllerThreadPool controllerThreadPool;
   private AutoCloseable mockCloser;
   private final StubServiceEmitter serviceEmitter = new StubServiceEmitter();
 
@@ -255,9 +257,13 @@ public class DartSqlResourceTest extends MSQTestBase
           }
         },
         objectMapper.convertValue(ImmutableMap.of(), 
DartControllerConfig.class),
-        controllerExecutor = Execs.multiThreaded(
-            MAX_CONTROLLERS,
-            StringUtils.encodeForFormat(getClass().getSimpleName() + 
"-controller-exec")
+        controllerThreadPool = new ControllerThreadPool(
+            MoreExecutors.listeningDecorator(
+                Execs.multiThreaded(
+                    MAX_CONTROLLERS,
+                    StringUtils.encodeForFormat(getClass().getSimpleName() + 
"-controller-exec")
+                )
+            )
         ),
         new DartQueryKitSpecFactory(new 
TestTimelineServerView(Collections.emptyList())),
         injector.getInstance(MultiQueryKit.class),
@@ -291,9 +297,9 @@ public class DartSqlResourceTest extends MSQTestBase
     mockCloser.close();
 
     // shutdown(), not shutdownNow(), to ensure controllers stop timely on 
their own.
-    controllerExecutor.shutdown();
+    controllerThreadPool.getExecutorService().shutdown();
 
-    if (!controllerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+    if (!controllerThreadPool.getExecutorService().awaitTermination(1, 
TimeUnit.MINUTES)) {
       throw new IAE("controllerExecutor.awaitTermination() timed out");
     }
 
@@ -744,7 +750,7 @@ public class DartSqlResourceTest extends MSQTestBase
            .thenReturn(makeAuthenticationResult(REGULAR_USER_NAME));
 
     // Block up the controllerExecutor so the controller runs long enough to 
cancel it.
-    final Future<?> sleepFuture = controllerExecutor.submit(() -> {
+    final Future<?> sleepFuture = 
controllerThreadPool.getExecutorService().submit(() -> {
       try {
         Thread.sleep(3_600_000);
       }
@@ -798,19 +804,38 @@ public class DartSqlResourceTest extends MSQTestBase
 
     // Wait for the SQL POST to come back.
     Assertions.assertNull(doPostFuture.get());
-    
Assertions.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
asyncResponse.getStatus());
 
-    // Ensure MSQ fault (CanceledFault) is properly translated to a 
DruidException and then properly serialized.
-    final Map<String, Object> e = objectMapper.readValue(
-        asyncResponse.baos.toByteArray(),
-        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
-    );
-    Assertions.assertEquals("Canceled", e.get("errorCode"));
-    Assertions.assertEquals("CANCELED", e.get("category"));
-    Assertions.assertEquals(
-        
MSQFaultUtils.generateMessageWithErrorCode(CanceledFault.userRequest()),
-        e.get("errorMessage")
-    );
+    if (fullReport) {
+      // Buffered report path -- should get a cancellation error in the report.
+      Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
asyncResponse.getStatus());
+
+      final List<List<TaskReport.ReportMap>> reportMaps = 
objectMapper.readValue(
+          asyncResponse.baos.toByteArray(),
+          new TypeReference<>() {}
+      );
+
+      final MSQTaskReport report = (MSQTaskReport) 
Iterables.getOnlyElement(Iterables.getOnlyElement(reportMaps)).get(MSQTaskReport.REPORT_KEY);
+      final MSQStatusReport statusReport = report.getPayload().getStatus();
+
+      Assertions.assertEquals(TaskState.FAILED, statusReport.getStatus());
+      Assertions.assertEquals(CanceledFault.userRequest(), 
statusReport.getErrorReport().getFault());
+
+    } else {
+      // Streaming path -- should get a serialized DruidException.
+      
Assertions.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
asyncResponse.getStatus());
+
+      // Ensure MSQ fault (CanceledFault) is properly translated to a 
DruidException and then properly serialized.
+      final Map<String, Object> e = objectMapper.readValue(
+          asyncResponse.baos.toByteArray(),
+          JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+      );
+      Assertions.assertEquals("Canceled", e.get("errorCode"));
+      Assertions.assertEquals("CANCELED", e.get("category"));
+      Assertions.assertEquals(
+          
MSQFaultUtils.generateMessageWithErrorCode(CanceledFault.userRequest()),
+          e.get("errorMessage")
+      );
+    }
   }
 
   @Test
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
index e50da1c75ed..99e443e5144 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
@@ -26,22 +26,31 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.testutil.FrameSequenceBuilder;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.report.TaskContextReport;
 import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.msq.counters.CounterSnapshotsTree;
 import org.apache.druid.msq.exec.Limits;
+import org.apache.druid.msq.exec.ResultsContext;
 import org.apache.druid.msq.guice.MSQIndexingModule;
 import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
 import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
 import org.apache.druid.msq.indexing.report.MSQStagesReport;
 import org.apache.druid.msq.indexing.report.MSQStatusReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.indexing.report.MSQTaskReportTest;
+import org.apache.druid.segment.RowAdapter;
+import org.apache.druid.segment.RowBasedCursorFactory;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -52,6 +61,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
+import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -59,30 +70,60 @@ public class TaskReportQueryListenerTest
 {
   private static final String TASK_ID = "mytask";
   private static final Map<String, Object> TASK_CONTEXT = 
ImmutableMap.of("foo", "bar");
-  private static final List<MSQResultsReport.ColumnAndType> SIGNATURE = 
ImmutableList.of(
-      new MSQResultsReport.ColumnAndType("x", ColumnType.STRING)
-  );
+  private static final RowSignature SIGNATURE = RowSignature.builder()
+                                                            .add("x", 
ColumnType.STRING)
+                                                            .build();
   private static final List<SqlTypeName> SQL_TYPE_NAMES = 
ImmutableList.of(SqlTypeName.VARCHAR);
   private static final ObjectMapper JSON_MAPPER =
       TestHelper.makeJsonMapper().registerModules(new 
MSQIndexingModule().getJacksonModules());
 
   private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
+  /**
+   * Creates a Frame containing the given rows using the test signature.
+   */
+  private Frame createFrame(final List<Map<String, Object>> rows)
+  {
+    final RowBasedCursorFactory<Map<String, Object>> cursorFactory = new 
RowBasedCursorFactory<>(
+        Sequences.simple(rows),
+        new MapRowAdapter(SIGNATURE),
+        SIGNATURE
+    );
+
+    return FrameSequenceBuilder.fromCursorFactory(cursorFactory)
+                               .frameType(FrameType.latestRowBased())
+                               .maxRowsPerFrame(Integer.MAX_VALUE)
+                               .frames()
+                               .toList()
+                               .get(0);
+  }
+
   @Test
   public void test_taskReportDestination() throws IOException
   {
+    final FrameReader frameReader = FrameReader.create(SIGNATURE);
+    final ResultsContext resultsContext = new ResultsContext(SQL_TYPE_NAMES, 
null);
+
     final TaskReportQueryListener listener = new TaskReportQueryListener(
         Suppliers.ofInstance(baos)::get,
         JSON_MAPPER,
         TASK_ID,
         TASK_CONTEXT,
-        TaskReportMSQDestination.instance().getRowsInTaskReport()
+        TaskReportMSQDestination.instance().getRowsInTaskReport(),
+        ColumnMappings.identity(SIGNATURE),
+        resultsContext
     );
 
     Assert.assertTrue(listener.readResults());
-    listener.onResultsStart(SIGNATURE, SQL_TYPE_NAMES);
-    Assert.assertTrue(listener.onResultRow(new Object[]{"foo"}));
-    Assert.assertTrue(listener.onResultRow(new Object[]{"bar"}));
+    listener.onResultsStart(frameReader);
+
+    // Create a frame with two rows
+    final Frame frame = createFrame(ImmutableList.of(
+        ImmutableMap.of("x", "foo"),
+        ImmutableMap.of("x", "bar")
+    ));
+    Assert.assertTrue(listener.onResultBatch(frame.asRAC()));
+
     listener.onResultsComplete();
     listener.onQueryComplete(
         new MSQTaskReportPayload(
@@ -139,20 +180,40 @@ public class TaskReportQueryListenerTest
   @Test
   public void test_durableDestination() throws IOException
   {
+    final FrameReader frameReader = FrameReader.create(SIGNATURE);
+    final ResultsContext resultsContext = new ResultsContext(SQL_TYPE_NAMES, 
null);
+
     final TaskReportQueryListener listener = new TaskReportQueryListener(
         Suppliers.ofInstance(baos)::get,
         JSON_MAPPER,
         TASK_ID,
         TASK_CONTEXT,
-        DurableStorageMSQDestination.instance().getRowsInTaskReport()
+        DurableStorageMSQDestination.instance().getRowsInTaskReport(),
+        ColumnMappings.identity(SIGNATURE),
+        resultsContext
     );
 
     Assert.assertTrue(listener.readResults());
-    listener.onResultsStart(SIGNATURE, SQL_TYPE_NAMES);
-    for (int i = 0; i < Limits.MAX_SELECT_RESULT_ROWS - 1; i++) {
-      Assert.assertTrue("row #" + i, listener.onResultRow(new 
Object[]{"foo"}));
+    listener.onResultsStart(frameReader);
+
+    // Create frames with rows up to MAX_SELECT_RESULT_ROWS
+    final int batchSize = 100;
+    int rowsAdded = 0;
+    boolean keepGoing = true;
+
+    while (keepGoing && rowsAdded < Limits.MAX_SELECT_RESULT_ROWS) {
+      final int rowsToAdd = (int) Math.min(batchSize, 
Limits.MAX_SELECT_RESULT_ROWS - rowsAdded);
+      final List<Map<String, Object>> rows = IntStream.range(0, rowsToAdd)
+                                                       .mapToObj(i -> 
ImmutableMap.<String, Object>of("x", "foo"))
+                                                       
.collect(Collectors.toList());
+      final Frame frame = createFrame(rows);
+      keepGoing = listener.onResultBatch(frame.asRAC());
+      rowsAdded += rowsToAdd;
     }
-    Assert.assertFalse(listener.onResultRow(new Object[]{"foo"}));
+
+    // Should have stopped accepting results after MAX_SELECT_RESULT_ROWS
+    Assert.assertFalse(keepGoing);
+
     listener.onQueryComplete(
         new MSQTaskReportPayload(
             new MSQStatusReport(
@@ -203,4 +264,29 @@ public class TaskReportQueryListenerTest
     Assert.assertTrue(report.getPayload().getResults().isResultsTruncated());
     Assert.assertEquals(TaskState.SUCCESS, 
report.getPayload().getStatus().getStatus());
   }
+
+  /**
+   * Simple RowAdapter for Map-based rows.
+   */
+  private static class MapRowAdapter implements RowAdapter<Map<String, Object>>
+  {
+    private final RowSignature signature;
+
+    MapRowAdapter(final RowSignature signature)
+    {
+      this.signature = signature;
+    }
+
+    @Override
+    public ToLongFunction<Map<String, Object>> timestampFunction()
+    {
+      return row -> 0L;
+    }
+
+    @Override
+    public Function<Map<String, Object>, Object> columnFunction(final String 
columnName)
+    {
+      return row -> row.get(columnName);
+    }
+  }
 }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
index 8bfcc49f917..658139289db 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestOverlordServiceClient.java
@@ -31,6 +31,8 @@ import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.client.indexing.TaskPayloadResponse;
 import org.apache.druid.client.indexing.TaskStatusResponse;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexer.report.TaskReport;
@@ -45,6 +47,7 @@ import org.apache.druid.msq.exec.QueryListener;
 import org.apache.druid.msq.exec.ResultsContext;
 import org.apache.druid.msq.exec.WorkerMemoryParameters;
 import org.apache.druid.msq.indexing.MSQControllerTask;
+import org.apache.druid.msq.indexing.TaskReportQueryListener;
 import org.apache.druid.msq.indexing.destination.MSQDestination;
 import org.apache.druid.msq.indexing.error.CancellationReason;
 import org.apache.druid.msq.indexing.report.MSQResultsReport;
@@ -52,12 +55,16 @@ import org.apache.druid.msq.indexing.report.MSQStatusReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.sql.MSQTaskQueryKitSpecFactory;
+import org.apache.druid.msq.util.SqlStatementResourceHelper;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.rpc.indexing.NoopOverlordClient;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
 import org.joda.time.DateTime;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -172,10 +179,14 @@ public class MSQTestOverlordServiceClient extends 
NoopOverlordClient
 
       testTaskDetails.addController(controller);
 
+      final ResultsContext resultsContext = new 
ResultsContext(cTask.getSqlTypeNames(), cTask.getSqlResultsContext());
       queryListener =
           new TestQueryListener(
               cTask.getId(),
-              cTask.getQuerySpec().getDestination()
+              cTask.getQuerySpec().getDestination(),
+              objectMapper,
+              cTask.getQuerySpec().getColumnMappings(),
+              resultsContext
           );
 
       try {
@@ -296,17 +307,30 @@ public class MSQTestOverlordServiceClient extends 
NoopOverlordClient
   {
     private final String taskId;
     private final MSQDestination destination;
+    private final ObjectMapper jsonMapper;
     private final List<Object[]> results = new ArrayList<>();
+    private final ColumnMappings columnMappings;
+    private final ResultsContext resultsContext;
 
+    private FrameReader frameReader;
     private List<MSQResultsReport.ColumnAndType> signature;
     private List<SqlTypeName> sqlTypeNames;
     private boolean resultsTruncated = true;
     private TaskReport.ReportMap reportMap;
 
-    public TestQueryListener(final String taskId, final MSQDestination 
destination)
+    public TestQueryListener(
+        final String taskId,
+        final MSQDestination destination,
+        final ObjectMapper jsonMapper,
+        final ColumnMappings columnMappings,
+        final ResultsContext resultsContext
+    )
     {
       this.taskId = taskId;
       this.destination = destination;
+      this.jsonMapper = jsonMapper;
+      this.columnMappings = columnMappings;
+      this.resultsContext = resultsContext;
     }
 
     @Override
@@ -316,18 +340,29 @@ public class MSQTestOverlordServiceClient extends 
NoopOverlordClient
     }
 
     @Override
-    public void onResultsStart(List<MSQResultsReport.ColumnAndType> signature, 
@Nullable List<SqlTypeName> sqlTypeNames)
+    public void onResultsStart(final FrameReader frameReader)
     {
-      this.signature = signature;
-      this.sqlTypeNames = sqlTypeNames;
+      this.frameReader = frameReader;
+      this.signature = 
TaskReportQueryListener.computeResultSignature(frameReader, columnMappings);
+      this.sqlTypeNames = resultsContext != null ? 
resultsContext.getSqlTypeNames() : null;
     }
 
     @Override
-    public boolean onResultRow(Object[] row)
+    public boolean onResultBatch(RowsAndColumns rac)
     {
       if (destination.getRowsInTaskReport() == MSQDestination.UNLIMITED
           || results.size() < destination.getRowsInTaskReport()) {
-        results.add(row);
+        final Iterator<Object[]> resultIterator = 
SqlStatementResourceHelper.getResultIterator(
+            rac.as(Frame.class),
+            frameReader,
+            columnMappings,
+            resultsContext,
+            jsonMapper
+        );
+        while (resultIterator.hasNext()) {
+          final Object[] row = resultIterator.next();
+          results.add(row);
+        }
         return true;
       } else {
         return false;
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/NoopQueryListener.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/NoopQueryListener.java
index fe38819a451..39d88d78c8d 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/NoopQueryListener.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/NoopQueryListener.java
@@ -19,13 +19,10 @@
 
 package org.apache.druid.msq.test;
 
-import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.msq.exec.QueryListener;
-import org.apache.druid.msq.indexing.report.MSQResultsReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
-
-import javax.annotation.Nullable;
-import java.util.List;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
 public class NoopQueryListener implements QueryListener
 {
@@ -36,13 +33,13 @@ public class NoopQueryListener implements QueryListener
   }
 
   @Override
-  public void onResultsStart(List<MSQResultsReport.ColumnAndType> signature, 
@Nullable List<SqlTypeName> sqlTypeNames)
+  public void onResultsStart(final FrameReader frameReader)
   {
     // Do nothing.
   }
 
   @Override
-  public boolean onResultRow(Object[] row)
+  public boolean onResultBatch(RowsAndColumns rac)
   {
     return true;
   }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
index 7a41a781147..e434b62c209 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/BlockingQueueFrameChannel.java
@@ -173,6 +173,8 @@ public class BlockingQueueFrameChannel
           // If this happens, it's a bug, potentially due to incorrectly using 
this class with multiple writers.
           throw new ISE("Could not write error to channel");
         }
+
+        notifyReader();
       }
     }
 
diff --git 
a/processing/src/main/java/org/apache/druid/frame/channel/FrameChannelSequence.java
 
b/processing/src/main/java/org/apache/druid/frame/channel/FrameChannelSequence.java
index 83075b0e48e..44de5ee263b 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/channel/FrameChannelSequence.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/channel/FrameChannelSequence.java
@@ -20,19 +20,21 @@
 package org.apache.druid.frame.channel;
 
 import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.guava.BaseSequence;
+import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.utils.Throwables;
 
 import java.io.Closeable;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 /**
- * Adapter that converts a {@link ReadableFrameChannel} into a {@link 
org.apache.druid.java.util.common.guava.Sequence}
- * of {@link RowsAndColumns}.
+ * Adapter that converts a {@link ReadableFrameChannel} into a {@link 
Sequence} of {@link RowsAndColumns}.
  *
- * This class does blocking reads on the channel, rather than nonblocking 
reads. Therefore, it is preferable to use
- * {@link ReadableFrameChannel} directly whenever nonblocking reads are 
desired.
+ * This class does blocking reads on the channel. Therefore, it is preferable 
to use {@link ReadableFrameChannel}
+ * directly whenever nonblocking reads are desired.
  */
 public class FrameChannelSequence extends BaseSequence<RowsAndColumns, 
FrameChannelSequence.FrameChannelIterator>
 {
@@ -81,7 +83,19 @@ public class FrameChannelSequence extends 
BaseSequence<RowsAndColumns, FrameChan
         throw new NoSuchElementException();
       }
 
-      return channel.read();
+      try {
+        return channel.read();
+      }
+      catch (Exception e) {
+        // Unwrap DruidException. Bit of a hack to have this here, but it's 
necessary to properly bubble up
+        // DruidExceptions that might be wrapped by "channel.read()" due to 
being thrown in another thread.
+        final DruidException druidException = Throwables.getCauseOfType(e, 
DruidException.class);
+        if (druidException != null) {
+          throw druidException;
+        } else {
+          throw e;
+        }
+      }
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to