This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 93735f5df89 Dart: Serve reports for running and recently-finished 
queries. (#18886)
93735f5df89 is described below

commit 93735f5df890f90e22e613549fe4d65533975765
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jan 8 11:04:15 2026 -0800

    Dart: Serve reports for running and recently-finished queries. (#18886)
    
    This patch adds query reports, and a mapping from SQL query ID to
    Dart query ID, to DartControllerRegistry. It is made available through
    a new getQueryReport method on SqlEngine, and through a new
    "/druid/v2/sql/queries/<sqlQueryId>/report" endpoint on SqlResource.
---
 .../embedded/msq/EmbeddedDartReportApiTest.java    | 340 +++++++++++++++++++++
 .../testing/embedded/msq/EmbeddedMSQApis.java      |  95 ++++++
 .../druid/msq/counters/CounterSnapshotsTree.java   |  21 ++
 .../msq/counters/NilQueryCounterSnapshot.java      |   9 +
 .../dart/controller/ControllerMessageListener.java |   4 +-
 .../dart/controller/DartControllerRegistry.java    | 201 +++++++++++-
 .../msq/dart/controller/QueryInfoAndReport.java    |  93 ++++++
 .../controller/messages/ControllerMessage.java     |   3 +-
 .../msq/dart/controller/messages/PostCounters.java | 107 +++++++
 .../msq/dart/controller/sql/DartQueryMaker.java    |  57 ++--
 .../msq/dart/controller/sql/DartSqlClient.java     |  12 +
 .../controller/sql/DartSqlClientFactoryImpl.java   |   3 +-
 .../msq/dart/controller/sql/DartSqlClientImpl.java |  24 ++
 .../msq/dart/controller/sql/DartSqlClients.java    |   6 +-
 .../msq/dart/controller/sql/DartSqlEngine.java     | 109 +++++--
 .../druid/msq/dart/guice/DartControllerConfig.java |  24 ++
 .../msq/dart/worker/DartControllerClient.java      |  12 +-
 .../druid/msq/dart/worker/DartWorkerContext.java   |  16 +-
 .../msq/indexing/IndexerControllerContext.java     |   8 +
 .../druid/msq/indexing/IndexerWorkerContext.java   |   8 +
 .../druid/msq/indexing/MSQControllerTask.java      |   4 +-
 .../msq/indexing/TaskReportQueryListener.java      |   6 +-
 .../indexing/client/IndexerControllerClient.java   |  17 +-
 .../druid/msq/util/MultiStageQueryContext.java     |  15 +
 .../msq/counters/NilQueryCounterSnapshotTest.java  |  67 ++++
 .../controller/DartControllerRegistryTest.java     | 314 +++++++++++++++++++
 .../dart/controller/http/DartSqlResourceTest.java  |  20 +-
 .../controller/messages/ControllerMessageTest.java |  29 ++
 .../msq/indexing/TaskReportQueryListenerTest.java  |   8 +-
 .../apache/druid/client/broker/BrokerClient.java   |  19 ++
 .../druid/client/broker/BrokerClientImpl.java      |  35 +++
 .../testing/embedded/EmbeddedClusterApis.java      |  13 +
 .../testing/embedded/EmbeddedServiceClient.java    |  50 +++
 .../apache/druid/sql/calcite/run/SqlEngine.java    |  42 ++-
 .../druid/sql/http/GetQueryReportResponse.java     |  82 +++++
 .../org/apache/druid/sql/http/SqlResource.java     |  50 ++-
 36 files changed, 1824 insertions(+), 99 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
new file mode 100644
index 00000000000..891cba4e3b7
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.guice.SleepModule;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
+import org.apache.druid.msq.dart.controller.sql.DartSqlClients;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.sql.http.GetQueryReportResponse;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Embedded test for the Dart report API at {@code 
/druid/v2/sql/queries/{id}/reports}.
+ * Uses batch ingestion to avoid dependency on Kafka/Docker.
+ */
+public class EmbeddedDartReportApiTest extends EmbeddedClusterTestBase
+{
+  private static final int MAX_RETAINED_REPORT_COUNT = 10;
+
+  private final EmbeddedBroker broker1 = new EmbeddedBroker();
+  private final EmbeddedBroker broker2 = new EmbeddedBroker();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+
+  private EmbeddedMSQApis msqApis;
+  private String ingestedDataSource;
+
+  private void configureBroker(EmbeddedBroker broker, int port)
+  {
+    broker.addProperty("druid.msq.dart.controller.heapFraction", "0.5")
+          .addProperty("druid.msq.dart.controller.maxRetainedReportCount", 
String.valueOf(MAX_RETAINED_REPORT_COUNT))
+          .addProperty("druid.query.default.context.maxConcurrentStages", "1")
+          .addProperty("druid.plaintextPort", String.valueOf(port));
+  }
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    coordinator.addProperty("druid.manager.segments.useIncrementalCache", 
"always");
+    overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+    // Enable Dart with report retention on both brokers, with different ports
+    configureBroker(broker1, 7082);
+    configureBroker(broker2, 7083);
+
+    historical.addProperty("druid.msq.dart.worker.heapFraction", "0.5")
+              .addProperty("druid.msq.dart.worker.concurrentQueries", "1");
+
+    indexer.setServerMemory(400_000_000)
+           .addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+           .addProperty("druid.processing.numThreads", "2")
+           .addProperty("druid.worker.capacity", "4");
+
+    return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+                               .addCommonProperty("druid.msq.dart.enabled", 
"true")
+                               .useLatchableEmitter()
+                               .addServer(coordinator)
+                               .addServer(overlord)
+                               .addServer(broker1)
+                               .addServer(broker2)
+                               .addServer(indexer)
+                               .addServer(historical)
+                               .addExtension(SleepModule.class);
+  }
+
+  @BeforeAll
+  protected void setupData()
+  {
+    msqApis = new EmbeddedMSQApis(cluster, overlord);
+
+    // Ingest test data once, using batch ingestion.
+    ingestedDataSource = EmbeddedClusterApis.createTestDatasourceName();
+    final String taskId = IdUtils.getRandomId();
+    final IndexTask task = 
MoreResources.Task.BASIC_INDEX.get().dataSource(ingestedDataSource).withId(taskId);
+    cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+
+    // Wait for segments to be available on both brokers
+    cluster.callApi().waitForAllSegmentsToBeAvailable(ingestedDataSource, 
coordinator, broker1);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(ingestedDataSource, 
coordinator, broker2);
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_getQueryReport_forCompletedDartQuery()
+  {
+    final String sqlQueryId = UUID.randomUUID().toString();
+    final String sql = StringUtils.format("SELECT COUNT(*) FROM \"%s\"", 
ingestedDataSource);
+
+    // Run a Dart query with a specific SQL query ID
+    final String result = cluster.callApi().onAnyBroker(
+        b -> b.submitSqlQuery(
+            new ClientSqlQuery(
+                sql,
+                "CSV",
+                false,
+                false,
+                false,
+                ImmutableMap.of(
+                    QueryContexts.ENGINE, "msq-dart",
+                    QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId
+                ),
+                null
+            )
+        )
+    ).trim();
+
+    // Verify the query returned results (should be 10 rows based on 
CSV_10_DAYS data)
+    Assertions.assertEquals("10", result);
+
+    // Now fetch the report using the SQL query ID
+    final GetQueryReportResponse reportResponse = 
msqApis.getDartQueryReport(sqlQueryId, broker1);
+
+    // Verify the report response
+    Assertions.assertNotNull(reportResponse, "Report response should not be 
null");
+    Assertions.assertNotNull(reportResponse.getQueryInfo(), "Query info should 
not be null");
+    Assertions.assertNotNull(reportResponse.getReportMap(), "Report should not 
be null");
+
+    // Verify the query info
+    final DartQueryInfo queryInfo = (DartQueryInfo) 
reportResponse.getQueryInfo();
+    Assertions.assertEquals(sql, queryInfo.getSql());
+    Assertions.assertEquals(sqlQueryId, queryInfo.getSqlQueryId());
+    Assertions.assertNotNull(queryInfo.getDartQueryId());
+
+    // Verify the report is an MSQTaskReport
+    Assertions.assertInstanceOf(TaskReport.ReportMap.class, 
reportResponse.getReportMap());
+    Assertions.assertInstanceOf(MSQTaskReport.class, 
reportResponse.getReportMap().get(MSQTaskReport.REPORT_KEY));
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_getQueryReport_notFound()
+  {
+    // Try to get a report for a non-existent query
+    final GetQueryReportResponse reportResponse = 
msqApis.getDartQueryReport("nonexistent-query-id", broker1);
+
+    // Verify the response is null (not found)
+    Assertions.assertNull(reportResponse, "Report response should be null for 
non-existent query");
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_getQueryReport_fromBothBrokers()
+  {
+    final String sqlQueryId = UUID.randomUUID().toString();
+    final String sql = StringUtils.format("SELECT COUNT(*) FROM \"%s\"", 
ingestedDataSource);
+
+    // Run a Dart query on any broker
+    final String result = cluster.callApi().onAnyBroker(
+        b -> b.submitSqlQuery(
+            new ClientSqlQuery(
+                sql,
+                "CSV",
+                false,
+                false,
+                false,
+                ImmutableMap.of(
+                    QueryContexts.ENGINE, "msq-dart",
+                    QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId
+                ),
+                null
+            )
+        )
+    ).trim();
+
+    // Verify the query returned results
+    Assertions.assertEquals("10", result);
+
+    // Verify both brokers have discovered each other
+    final var sqlClients1 = 
broker1.bindings().getInstance(DartSqlClients.class);
+    final var sqlClients2 = 
broker2.bindings().getInstance(DartSqlClients.class);
+    Assertions.assertEquals(1, sqlClients1.getAllClients().size(), "Broker1 
should have 1 client (broker2)");
+    Assertions.assertEquals(1, sqlClients2.getAllClients().size(), "Broker2 
should have 1 client (broker1)");
+
+    // Fetch the report from both brokers, to verify cross-broker lookup is 
working
+    final GetQueryReportResponse reportFromBroker1 = 
msqApis.getDartQueryReport(sqlQueryId, broker1);
+    final GetQueryReportResponse reportFromBroker2 = 
msqApis.getDartQueryReport(sqlQueryId, broker2);
+
+    // Verify the report content
+    for (GetQueryReportResponse report : Arrays.asList(reportFromBroker1, 
reportFromBroker2)) {
+      Assertions.assertNotNull(report);
+      final DartQueryInfo queryInfo = (DartQueryInfo) report.getQueryInfo();
+      Assertions.assertEquals(sqlQueryId, queryInfo.getSqlQueryId());
+      Assertions.assertEquals(sql, queryInfo.getSql());
+      Assertions.assertNotNull(queryInfo.getDartQueryId());
+      Assertions.assertInstanceOf(TaskReport.ReportMap.class, 
report.getReportMap());
+      Assertions.assertInstanceOf(MSQTaskReport.class, 
report.getReportMap().get(MSQTaskReport.REPORT_KEY));
+    }
+  }
+
+  @Test
+  @Timeout(60)
+  public void test_getQueryReport_forRunningAndCanceledQuery()
+  {
+    final String sqlQueryId = UUID.randomUUID().toString();
+
+    // Use SLEEP to make the query run for a while.
+    // Need to use a non-constant expression to make this happen at runtime 
rather than planning time.
+    final String sql =
+        StringUtils.format("SELECT SLEEP(TIMESTAMP_TO_MILLIS(__time) * 0 + 60) 
FROM \"%s\"", ingestedDataSource);
+
+    // Step 1: Issue the query asynchronously.
+    final ListenableFuture<String> queryFuture =
+        msqApis.submitDartSqlAsync(sql, Map.of(QueryContexts.CTX_SQL_QUERY_ID, 
sqlQueryId), broker1);
+
+    // Step 2: Get the report.
+    final GetQueryReportResponse runningReport = waitForReport(sqlQueryId);
+
+    Assertions.assertNotNull(runningReport, "Report should be available for 
running query");
+    Assertions.assertNotNull(runningReport.getQueryInfo(), "Query info should 
not be null");
+    Assertions.assertNotNull(runningReport.getReportMap(), "Report should not 
be null");
+
+    // Verify the query info
+    final DartQueryInfo runningQueryInfo = (DartQueryInfo) 
runningReport.getQueryInfo();
+    Assertions.assertEquals(sql, runningQueryInfo.getSql());
+    Assertions.assertEquals(sqlQueryId, runningQueryInfo.getSqlQueryId());
+
+    // Verify the report is an MSQTaskReport with RUNNING status
+    final MSQTaskReport runningMsqReport =
+        (MSQTaskReport) 
runningReport.getReportMap().get(MSQTaskReport.REPORT_KEY);
+    Assertions.assertNotNull(runningMsqReport, "MSQ report should not be 
null");
+
+    final MSQTaskReportPayload runningPayload = runningMsqReport.getPayload();
+    Assertions.assertNotNull(runningPayload, "Payload should not be null");
+    Assertions.assertNotNull(runningPayload.getStatus(), "Status should not be 
null");
+    Assertions.assertEquals(
+        TaskState.RUNNING,
+        runningPayload.getStatus().getStatus(),
+        "Query should be in RUNNING state"
+    );
+
+    // Step 3: Cancel the query
+    final boolean canceled = msqApis.cancelDartQuery(sqlQueryId, broker1);
+    Assertions.assertTrue(canceled, "Query cancellation should be accepted");
+
+    // Step 4: Wait for the sqlQuery/time metric to be emitted (signals query 
completion)
+    final LatchableEmitter emitter = broker1.latchableEmitter();
+    emitter.waitForEvent(
+        event -> event.hasMetricName("sqlQuery/time")
+                      .hasDimension("id", sqlQueryId)
+    );
+
+    // Step 5: Fetch the report again - should now be in FAILED state
+    final GetQueryReportResponse canceledReport = 
msqApis.getDartQueryReport(sqlQueryId, broker1);
+
+    Assertions.assertNotNull(canceledReport, "Report should be available for 
canceled query");
+    Assertions.assertNotNull(canceledReport.getReportMap(), "Report map should 
not be null");
+
+    final MSQTaskReport canceledMsqReport =
+        (MSQTaskReport) 
canceledReport.getReportMap().get(MSQTaskReport.REPORT_KEY);
+    Assertions.assertNotNull(canceledMsqReport, "MSQ report should not be null 
for canceled query");
+
+    final MSQTaskReportPayload canceledPayload = 
canceledMsqReport.getPayload();
+    Assertions.assertNotNull(canceledPayload, "Payload should not be null for 
canceled query");
+    Assertions.assertNotNull(canceledPayload.getStatus(), "Status should not 
be null for canceled query");
+    Assertions.assertEquals(
+        TaskState.FAILED,
+        canceledPayload.getStatus().getStatus(),
+        "canceled query should be in FAILED state"
+    );
+
+    // The query future should complete with an error due to cancellation
+    try {
+      queryFuture.get();
+      Assertions.fail("Query should have failed due to cancellation");
+    }
+    catch (Exception ignored) {
+      // Expected - query was canceled
+    }
+  }
+
+  /**
+   * Polls the report API until a report is available.
+   */
+  private GetQueryReportResponse waitForReport(String sqlQueryId)
+  {
+    final long timeout = 30_000;
+    final long deadline = System.currentTimeMillis() + timeout;
+    while (System.currentTimeMillis() < deadline) {
+      final GetQueryReportResponse report = 
msqApis.getDartQueryReport(sqlQueryId, broker1);
+      if (report != null) {
+        return report;
+      }
+      try {
+        Thread.sleep(100);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    throw new ISE("Timed out after[%,d] ms waiting for query to be in RUNNING 
state", timeout);
+  }
+}
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
index d656618db3d..115d1c22f16 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
@@ -19,6 +19,9 @@
 
 package org.apache.druid.testing.embedded.msq;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.report.TaskReport;
@@ -29,11 +32,17 @@ import 
org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.http.ClientSqlQuery;
 import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.sql.http.GetQueryReportResponse;
 import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedClusterApis;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
 import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
+import javax.annotation.Nullable;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
@@ -150,4 +159,90 @@ public class EmbeddedMSQApis
 
     return taskReportPayload;
   }
+
+  /**
+   * Gets the query report for a Dart query by its SQL query ID, fetching from 
a specific broker.
+   * Returns null if the query is not found.
+   *
+   * @param sqlQueryId   the SQL query ID
+   * @param targetBroker the broker to fetch the report from
+   */
+  @Nullable
+  public GetQueryReportResponse getDartQueryReport(String sqlQueryId, 
EmbeddedBroker targetBroker)
+  {
+    try {
+      final String responseJson = cluster.callApi().onTargetBroker(
+          targetBroker,
+          b -> b.getQueryReport(sqlQueryId, false /* allow cross-broker 
forwarding */)
+      );
+      return parseReportResponse(responseJson, 
targetBroker.bindings().jsonMapper());
+    }
+    catch (RuntimeException e) {
+      // Check if this is a 404 Not Found response
+      final Throwable cause = e.getCause();
+      if (cause instanceof HttpResponseException) {
+        if (((HttpResponseException) 
cause).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+          return null;
+        }
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Submits a Dart SQL query asynchronously to a specific broker.
+   *
+   * @param sql          the SQL query
+   * @param context      additional context parameters
+   * @param targetBroker the broker to submit the query to
+   *
+   * @return a future that resolves when the query completes
+   */
+  public ListenableFuture<String> submitDartSqlAsync(
+      String sql,
+      Map<String, Object> context,
+      EmbeddedBroker targetBroker
+  )
+  {
+    final Map<String, Object> fullContext = new HashMap<>(context);
+    fullContext.put(QueryContexts.ENGINE, DartSqlEngine.NAME);
+
+    return cluster.callApi().onTargetBrokerAsync(
+        targetBroker,
+        b -> b.submitSqlQuery(
+            new ClientSqlQuery(
+                sql,
+                ResultFormat.CSV.name(),
+                false,
+                false,
+                false,
+                fullContext,
+                null
+            )
+        )
+    );
+  }
+
+  /**
+   * Cancels a Dart SQL query by its SQL query ID.
+   *
+   * @param sqlQueryId   the SQL query ID to cancel
+   * @param targetBroker the broker where the query is running
+   *
+   * @return true if the cancellation was accepted
+   */
+  public boolean cancelDartQuery(String sqlQueryId, EmbeddedBroker 
targetBroker)
+  {
+    return cluster.callApi().onTargetBroker(targetBroker, b -> 
b.cancelSqlQuery(sqlQueryId));
+  }
+
+  private static GetQueryReportResponse parseReportResponse(String 
responseJson, ObjectMapper jsonMapper)
+  {
+    try {
+      return jsonMapper.readValue(responseJson, GetQueryReportResponse.class);
+    }
+    catch (JsonProcessingException e) {
+      throw DruidException.defensive(e, "Failed to parse query report 
response[%s]", responseJson);
+    }
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshotsTree.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshotsTree.java
index dce2fe7ac3a..706c9a6871d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshotsTree.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshotsTree.java
@@ -109,6 +109,27 @@ public class CounterSnapshotsTree
     }
   }
 
+  @Override
+  public boolean equals(final Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final CounterSnapshotsTree that = (CounterSnapshotsTree) o;
+    return copyMap().equals(that.copyMap());
+  }
+
+  @Override
+  public int hashCode()
+  {
+    synchronized (snapshotsMap) {
+      return snapshotsMap.hashCode();
+    }
+  }
+
   @Override
   public String toString()
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
index eca3f6d6228..03be3fb1a0b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.counters;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 /**
@@ -29,11 +30,19 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 @JsonTypeName("nil")
 public class NilQueryCounterSnapshot implements QueryCounterSnapshot
 {
+  private static final NilQueryCounterSnapshot INSTANCE = new 
NilQueryCounterSnapshot();
+
   private NilQueryCounterSnapshot()
   {
     // Singleton
   }
 
+  @JsonCreator
+  public static NilQueryCounterSnapshot instance()
+  {
+    return INSTANCE;
+  }
+
   @Override
   public int hashCode()
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
index 5cedd13baf0..e9d6fbb3010 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
@@ -44,7 +44,7 @@ public class ControllerMessageListener implements 
MessageListener<ControllerMess
   @Override
   public void messageReceived(ControllerMessage message)
   {
-    final ControllerHolder holder = 
controllerRegistry.get(message.getQueryId());
+    final ControllerHolder holder = 
controllerRegistry.getController(message.getQueryId());
     if (holder != null) {
       message.handle(holder.getController());
     }
@@ -59,7 +59,7 @@ public class ControllerMessageListener implements 
MessageListener<ControllerMess
   @Override
   public void serverRemoved(DruidNode node)
   {
-    for (final ControllerHolder holder : controllerRegistry.getAllHolders()) {
+    for (final ControllerHolder holder : 
controllerRegistry.getAllControllers()) {
       final Controller controller = holder.getController();
       final WorkerId workerId = WorkerId.fromDruidNode(node, 
controller.queryId());
       holder.workerOffline(workerId);
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
index 847dbf75980..b80a6b6daf5 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
@@ -19,45 +19,166 @@
 
 package org.apache.druid.msq.dart.controller;
 
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import com.google.inject.Inject;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
+import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.Controller;
+import org.joda.time.Period;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
- * Registry for actively-running {@link Controller}.
+ * Registry for actively-running {@link Controller} and recently-completed 
{@link TaskReport}.
  */
+@ManageLifecycle
 public class DartControllerRegistry
 {
+  /**
+   * Minimum frequency for checking if {@link #cleanupExpiredReports()} needs 
to be run.
+   */
+  private static final long MIN_CLEANUP_CHECK_MILLIS = 10_000;
+
+  private final DartControllerConfig config;
+
+  /**
+   * Map of Dart query ID -> controller for currently-running queries.
+   */
   private final ConcurrentHashMap<String, ControllerHolder> controllerMap = 
new ConcurrentHashMap<>();
 
+  /**
+   * Map of Dart query ID -> timestamped report for completed queries.
+   */
+  @GuardedBy("completeReports")
+  private final LinkedHashMap<String, QueryInfoAndReport> completeReports = 
new LinkedHashMap<>();
+
+  /**
+   * Map of SQL query ID -> Dart query ID. Used by {@link 
#getQueryInfoAndReportBySqlQueryId(String)}. Contains an
+   * entry for every query in either {@link #controllerMap} or {@link 
#completeReports}.
+   *
+   * It is possible for the same SQL query ID to map to multiple Dart query 
IDs, because SQL query IDs can be set
+   * by the user, and uniqueness is not a required. If this occurs case, we go 
with the first one encountered
+   * and ignore the others.
+   */
+  private final ConcurrentHashMap<String, String> sqlQueryIdToDartQueryId = 
new ConcurrentHashMap<>();
+
+  /**
+   * Executor for cleaning up reports older than {@link 
DartControllerConfig#getMaxRetainedReportDuration()}.
+   */
+  private ScheduledExecutorService cleanupExec;
+
+  @Inject
+  public DartControllerRegistry(final DartControllerConfig config)
+  {
+    this.config = config;
+  }
+
+  @LifecycleStart
+  public void start()
+  {
+    // Schedule periodic cleanup of expired reports.
+    if (!config.getMaxRetainedReportDuration().equals(Period.ZERO)) {
+      final String threadNameFormat = 
StringUtils.format("%s-ReportCleanupExec-%%s", getClass().getSimpleName());
+      final long cleanupPeriodMs = Math.max(
+          MIN_CLEANUP_CHECK_MILLIS,
+          
config.getMaxRetainedReportDuration().toStandardDuration().getMillis() / 10
+      );
+      cleanupExec = Execs.scheduledSingleThreaded(threadNameFormat);
+      cleanupExec.scheduleAtFixedRate(
+          this::cleanupExpiredReports,
+          cleanupPeriodMs,
+          cleanupPeriodMs,
+          TimeUnit.MILLISECONDS
+      );
+    }
+  }
+
+  @LifecycleStop
+  public void stop()
+  {
+    if (cleanupExec != null) {
+      cleanupExec.shutdown();
+    }
+  }
+
   /**
    * Add a controller. Throws {@link DruidException} if a controller with the 
same {@link Controller#queryId()} is
    * already registered.
    */
   public void register(ControllerHolder holder)
   {
-    if (controllerMap.putIfAbsent(holder.getController().queryId(), holder) != 
null) {
-      throw DruidException.defensive("Controller[%s] already registered", 
holder.getController().queryId());
+    final String dartQueryId = holder.getController().queryId();
+    if (controllerMap.putIfAbsent(dartQueryId, holder) != null) {
+      throw DruidException.defensive("Controller[%s] already registered", 
dartQueryId);
     }
+    sqlQueryIdToDartQueryId.putIfAbsent(holder.getSqlQueryId(), dartQueryId);
   }
 
   /**
-   * Remove a controller from the registry.
+   * Remove a controller from the registry. Optionally registers a report that 
will be available for some
+   * time afterwards, based on {@link 
DartControllerConfig#getMaxRetainedReportCount()} and
+   * {@link DartControllerConfig#getMaxRetainedReportDuration()}.
    */
-  public void deregister(ControllerHolder holder)
+  public void deregister(ControllerHolder holder, @Nullable 
TaskReport.ReportMap completeReport)
   {
+    final String dartQueryId = holder.getController().queryId();
+
     // Remove only if the current mapping for the queryId is this specific 
controller.
-    controllerMap.remove(holder.getController().queryId(), holder);
+    final boolean didRemove = controllerMap.remove(dartQueryId, holder);
+
+    // Add completeReport to completeReports, if present, and if we actually 
did deregister this specific controller.
+    if (didRemove && completeReport != null && 
config.getMaxRetainedReportCount() > 0) {
+      synchronized (completeReports) {
+        // Remove reports if size is greater than maxRetainedReportCount - 1.
+        int reportsToRemove = completeReports.size() - 
config.getMaxRetainedReportCount() + 1;
+        if (reportsToRemove > 0) {
+          for (Iterator<Map.Entry<String, QueryInfoAndReport>> it = 
completeReports.entrySet().iterator();
+               it.hasNext() && reportsToRemove > 0;
+               reportsToRemove--) {
+            final QueryInfoAndReport evictedReport = it.next().getValue();
+            it.remove();
+            sqlQueryIdToDartQueryId.remove(
+                evictedReport.getQueryInfo().getSqlQueryId(),
+                evictedReport.getQueryInfo().getDartQueryId()
+            );
+          }
+        }
+
+        completeReports.put(
+            dartQueryId,
+            new QueryInfoAndReport(
+                DartQueryInfo.fromControllerHolder(holder),
+                completeReport,
+                DateTimes.nowUtc()
+            )
+        );
+      }
+    } else if (didRemove) {
+      // Report not retained, but controller was removed; clean up the SQL 
query ID mapping.
+      sqlQueryIdToDartQueryId.remove(holder.getSqlQueryId(), dartQueryId);
+    }
   }
 
   /**
-   * Return a specific controller holder, or null if it doesn't exist.
+   * Return a specific controller holder by Dart query ID, or null if it 
doesn't exist.
    */
   @Nullable
-  public ControllerHolder get(final String queryId)
+  public ControllerHolder getController(final String queryId)
   {
     return controllerMap.get(queryId);
   }
@@ -65,8 +186,70 @@ public class DartControllerRegistry
   /**
    * Returns all actively-running {@link Controller}.
    */
-  public Collection<ControllerHolder> getAllHolders()
+  public Collection<ControllerHolder> getAllControllers()
   {
     return controllerMap.values();
   }
+
+  /**
+   * Gets execution details and report for a query.
+   */
+  @Nullable
+  public QueryInfoAndReport getQueryInfoAndReport(final String queryId)
+  {
+    final ControllerHolder runningController = getController(queryId);
+
+    if (runningController != null) {
+      final TaskReport.ReportMap liveReportMap = 
runningController.getController().liveReports();
+      if (liveReportMap != null) {
+        return new QueryInfoAndReport(
+            DartQueryInfo.fromControllerHolder(runningController),
+            liveReportMap,
+            DateTimes.nowUtc()
+        );
+      } else {
+        return null;
+      }
+    } else {
+      synchronized (completeReports) {
+        return completeReports.get(queryId);
+      }
+    }
+  }
+
+  /**
+   * Gets execution details and report for a query by SQL query ID.
+   */
+  @Nullable
+  public QueryInfoAndReport getQueryInfoAndReportBySqlQueryId(final String 
sqlQueryId)
+  {
+    final String dartQueryId = sqlQueryIdToDartQueryId.get(sqlQueryId);
+    if (dartQueryId == null) {
+      return null;
+    }
+    return getQueryInfoAndReport(dartQueryId);
+  }
+
+  /**
+   * Removes reports that have exceeded {@link 
DartControllerConfig#getMaxRetainedReportDuration()}.
+   */
+  private void cleanupExpiredReports()
+  {
+    final long thresholdTimestamp = 
DateTimes.nowUtc().minus(config.getMaxRetainedReportDuration()).getMillis();
+
+    synchronized (completeReports) {
+      final Iterator<Map.Entry<String, QueryInfoAndReport>> it = 
completeReports.entrySet().iterator();
+      while (it.hasNext()) {
+        final QueryInfoAndReport report = it.next().getValue();
+        if (report.getTimestamp().getMillis() < thresholdTimestamp) {
+          it.remove();
+          
sqlQueryIdToDartQueryId.remove(report.getQueryInfo().getSqlQueryId(), 
report.getQueryInfo().getDartQueryId());
+        } else {
+          // Entries are added in order of increasing timestamp, so we can 
stop looking once we
+          // find a non-expired entry.
+          break;
+        }
+      }
+    }
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
new file mode 100644
index 00000000000..de163ec6b0a
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Object returned by {@link 
DartControllerRegistry#getQueryInfoAndReport(String)}.
+ */
+public class QueryInfoAndReport
+{
+  private final DartQueryInfo queryInfo;
+  @Nullable
+  private final TaskReport.ReportMap reportMap;
+  private final DateTime timestamp;
+
+  public QueryInfoAndReport(DartQueryInfo queryInfo, @Nullable 
TaskReport.ReportMap reportMap, DateTime timestamp)
+  {
+    this.queryInfo = queryInfo;
+    this.reportMap = reportMap;
+    this.timestamp = timestamp;
+  }
+
+  public DartQueryInfo getQueryInfo()
+  {
+    return queryInfo;
+  }
+
+  @Nullable
+  public TaskReport.ReportMap getReportMap()
+  {
+    return reportMap;
+  }
+
+  /**
+   * Timestamp that the query details or report was last updated. Generally 
"now" for currently-running queries,
+   * and the query finish time for completed queries.
+   */
+  public DateTime getTimestamp()
+  {
+    return timestamp;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    QueryInfoAndReport that = (QueryInfoAndReport) o;
+    return Objects.equals(queryInfo, that.queryInfo)
+           && Objects.equals(reportMap, that.reportMap)
+           && Objects.equals(timestamp, that.timestamp);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(queryInfo, reportMap, timestamp);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "QueryInfoAndReport{" +
+           "queryInfo=" + queryInfo +
+           ", report=" + reportMap +
+           ", timestamp=" + timestamp +
+           '}';
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/ControllerMessage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/ControllerMessage.java
index 454e23bbc9c..63fa5d95f05 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/ControllerMessage.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/ControllerMessage.java
@@ -33,7 +33,8 @@ import org.apache.druid.msq.exec.Controller;
     @JsonSubTypes.Type(value = DoneReadingInput.class, name = 
"doneReadingInput"),
     @JsonSubTypes.Type(value = ResultsComplete.class, name = 
"resultsComplete"),
     @JsonSubTypes.Type(value = WorkerError.class, name = "workerError"),
-    @JsonSubTypes.Type(value = WorkerWarning.class, name = "workerWarning")
+    @JsonSubTypes.Type(value = WorkerWarning.class, name = "workerWarning"),
+    @JsonSubTypes.Type(value = PostCounters.class, name = "postCounters")
 })
 public interface ControllerMessage
 {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/PostCounters.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/PostCounters.java
new file mode 100644
index 00000000000..64436fd39d2
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/PostCounters.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerClient;
+
+import java.util.Objects;
+
+/**
+ * Message for {@link ControllerClient#postCounters}.
+ */
+public class PostCounters implements ControllerMessage
+{
+  private final String queryId;
+  private final String workerId;
+  private final CounterSnapshotsTree counters;
+
+  @JsonCreator
+  public PostCounters(
+      @JsonProperty("queryId") final String queryId,
+      @JsonProperty("workerId") final String workerId,
+      @JsonProperty("counters") final CounterSnapshotsTree counters
+  )
+  {
+    this.queryId = Preconditions.checkNotNull(queryId, "queryId");
+    this.workerId = Preconditions.checkNotNull(workerId, "workerId");
+    this.counters = Preconditions.checkNotNull(counters, "counters");
+  }
+
+  @Override
+  @JsonProperty
+  public String getQueryId()
+  {
+    return queryId;
+  }
+
+  @JsonProperty
+  public String getWorkerId()
+  {
+    return workerId;
+  }
+
+  @JsonProperty("counters")
+  public CounterSnapshotsTree getCounters()
+  {
+    return counters;
+  }
+
+  @Override
+  public void handle(final Controller controller)
+  {
+    controller.updateCounters(workerId, counters);
+  }
+
+  @Override
+  public boolean equals(final Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final PostCounters that = (PostCounters) o;
+    return Objects.equals(queryId, that.queryId)
+           && Objects.equals(workerId, that.workerId)
+           && Objects.equals(counters, that.counters);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(queryId, workerId, counters);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "PostCounters{" +
+           "queryId='" + queryId + '\'' +
+           ", workerId='" + workerId + '\'' +
+           ", counters=" + counters +
+           '}';
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index c309ef0d419..5270c9f63d8 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -23,6 +23,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Iterators;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.report.TaskReport;
 import org.apache.druid.io.LimitedOutputStream;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Either;
@@ -31,7 +32,6 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.BaseSequence;
 import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.dart.controller.ControllerHolder;
 import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
@@ -46,12 +46,13 @@ import org.apache.druid.msq.exec.ResultsContext;
 import org.apache.druid.msq.indexing.LegacyMSQSpec;
 import org.apache.druid.msq.indexing.QueryDefMSQSpec;
 import org.apache.druid.msq.indexing.TaskReportQueryListener;
-import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
 import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.CancellationReason;
 import org.apache.druid.msq.indexing.error.MSQErrorReport;
 import org.apache.druid.msq.indexing.report.MSQResultsReport;
 import org.apache.druid.msq.indexing.report.MSQStatusReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.querykit.MultiQueryKit;
 import org.apache.druid.msq.sql.MSQTaskQueryMaker;
@@ -114,7 +115,7 @@ public class DartQueryMaker implements QueryMaker
   private final DartControllerConfig controllerConfig;
 
   /**
-   * Executor for {@link #runWithoutReport(ControllerHolder)}. Number of 
thread is equal to
+   * Executor for {@link #runWithIterator(ControllerHolder)}. Number of thread 
is equal to
    * {@link DartControllerConfig#getConcurrentQueries()}, which limits the 
number of concurrent controllers.
    */
   private final ExecutorService controllerExecutor;
@@ -230,12 +231,12 @@ public class DartQueryMaker implements QueryMaker
       // runWithReport, runWithoutReport are responsible for calling 
controllerRegistry.deregister(controllerHolder)
       // when their work is done.
       final Sequence<Object[]> results =
-          fullReport ? runWithReport(controllerHolder) : 
runWithoutReport(controllerHolder);
+          fullReport ? runWithReport(controllerHolder) : 
runWithIterator(controllerHolder);
       return QueryResponse.withEmptyContext(results);
     }
     catch (Throwable e) {
       // Error while calling runWithReport or runWithoutReport. Deregister 
controller immediately.
-      controllerRegistry.deregister(controllerHolder);
+      controllerRegistry.deregister(controllerHolder, null);
       throw e;
     }
   }
@@ -256,16 +257,16 @@ public class DartQueryMaker implements QueryMaker
    * Run a query and return the full report, buffered in memory up to
    * {@link DartControllerConfig#getMaxQueryReportSize()}.
    *
-   * Arranges for {@link DartControllerRegistry#deregister(ControllerHolder)} 
to be called upon completion (either
-   * success or failure).
+   * Arranges for {@link DartControllerRegistry#deregister} to be called upon 
completion (either success or failure).
    */
   private Sequence<Object[]> runWithReport(final ControllerHolder 
controllerHolder)
   {
-    final Future<Map<String, Object>> reportFuture;
+    final Future<TaskReport.ReportMap> reportFuture;
 
     // Run in controllerExecutor. Control doesn't really *need* to be moved to 
another thread, but we have to
     // use the controllerExecutor anyway, to ensure we respect the 
concurrentQueries configuration.
     reportFuture = controllerExecutor.submit(() -> {
+      TaskReport.ReportMap retVal = null;
       final String threadName = Thread.currentThread().getName();
 
       try {
@@ -273,7 +274,6 @@ public class DartQueryMaker implements QueryMaker
 
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         final TaskReportQueryListener queryListener = new 
TaskReportQueryListener(
-            TaskReportMSQDestination.instance(),
             () -> new LimitedOutputStream(
                 baos,
                 controllerConfig.getMaxQueryReportSize(),
@@ -286,12 +286,13 @@ public class DartQueryMaker implements QueryMaker
             ),
             plannerContext.getJsonMapper(),
             controllerHolder.getController().queryId(),
-            Collections.emptyMap()
+            Collections.emptyMap(),
+            MSQDestination.UNLIMITED
         );
 
         if (controllerHolder.run(queryListener)) {
-          return plannerContext.getJsonMapper()
-                               .readValue(baos.toByteArray(), 
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
+          retVal = plannerContext.getJsonMapper()
+                                 .readValue(baos.toByteArray(), 
TaskReport.ReportMap.class);
         } else {
           // Controller was canceled before it ran.
           throw MSQErrorReport
@@ -305,9 +306,11 @@ public class DartQueryMaker implements QueryMaker
         }
       }
       finally {
-        controllerRegistry.deregister(controllerHolder);
+        controllerRegistry.deregister(controllerHolder, retVal);
         Thread.currentThread().setName(threadName);
       }
+
+      return retVal;
     });
 
     // Return a sequence that reads one row (the report) from reportFuture.
@@ -342,10 +345,9 @@ public class DartQueryMaker implements QueryMaker
   /**
    * Run a query and return the results only, streamed back using {@link 
ResultIteratorMaker}.
    *
-   * Arranges for {@link DartControllerRegistry#deregister(ControllerHolder)} 
to be called upon completion (either
-   * success or failure).
+   * Arranges for {@link DartControllerRegistry#deregister} to be called upon 
completion (either success or failure).
    */
-  private Sequence<Object[]> runWithoutReport(final ControllerHolder 
controllerHolder)
+  private Sequence<Object[]> runWithIterator(final ControllerHolder 
controllerHolder)
   {
     return new BaseSequence<>(new ResultIteratorMaker(controllerHolder));
   }
@@ -364,7 +366,7 @@ public class DartQueryMaker implements QueryMaker
   }
 
   /**
-   * Helper for {@link #runWithoutReport(ControllerHolder)}.
+   * Helper for {@link #runWithIterator(ControllerHolder)}.
    */
   class ResultIteratorMaker implements BaseSequence.IteratorMaker<Object[], 
ResultIterator>
   {
@@ -423,7 +425,20 @@ public class DartQueryMaker implements QueryMaker
           throw e;
         }
         finally {
-          controllerRegistry.deregister(controllerHolder);
+          final MSQTaskReport taskReport;
+
+          if (resultIterator.report != null) {
+            taskReport = new MSQTaskReport(
+                controllerHolder.getController().queryId(),
+                resultIterator.report
+            );
+          } else {
+            taskReport = null;
+          }
+
+          final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+          reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
+          controllerRegistry.deregister(controllerHolder, reportMap);
           Thread.currentThread().setName(threadName);
         }
       });
@@ -450,7 +465,7 @@ public class DartQueryMaker implements QueryMaker
   }
 
   /**
-   * Helper for {@link ResultIteratorMaker}, which is in turn a helper for 
{@link #runWithoutReport(ControllerHolder)}.
+   * Helper for {@link ResultIteratorMaker}, which is in turn a helper for 
{@link #runWithIterator(ControllerHolder)}.
    */
   static class ResultIterator implements Iterator<Object[]>, QueryListener
   {
@@ -471,6 +486,7 @@ public class DartQueryMaker implements QueryMaker
     private Either<Throwable, Object[]> current;
 
     private volatile boolean complete;
+    private volatile MSQTaskReportPayload report;
 
     @Nullable
     private final Duration timeout;
@@ -563,7 +579,8 @@ public class DartQueryMaker implements QueryMaker
     public void onQueryComplete(MSQTaskReportPayload report)
     {
       try {
-        complete = true;
+        this.report = report;
+        this.complete = true;
 
         final MSQStatusReport statusReport = report.getStatus();
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
index c8da492237a..e94aac0cb4a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.dart.controller.sql;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.GetQueryReportResponse;
 import org.apache.druid.sql.http.SqlResource;
 
 import javax.servlet.http.HttpServletRequest;
@@ -39,4 +40,15 @@ public interface DartSqlClient
    * @see SqlResource#doGetRunningQueries(String, HttpServletRequest) the 
server side
    */
   ListenableFuture<GetQueriesResponse> getRunningQueries(boolean selfOnly);
+
+  /**
+   * Get query report for a particular SQL query ID on this server.
+   *
+   * @param sqlQueryId SQL query ID
+   * @param selfOnly   true if only queries from this server should be 
examined; false if queries from all servers
+   *                   should be examined
+   *
+   * @see SqlResource#doGetQueryReport(String, String, HttpServletRequest) 
server side
+   */
+  ListenableFuture<GetQueryReportResponse> getQueryReport(String sqlQueryId, 
boolean selfOnly);
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
index 3562f5c2ff8..5046bf801eb 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
@@ -55,7 +55,8 @@ public class DartSqlClientFactoryImpl implements 
DartSqlClientFactory
   {
     final ServiceClient client = clientFactory.makeClient(
         StringUtils.format("%s[dart-sql]", node.getHostAndPortToUse()),
-        new 
FixedServiceLocator(ServiceLocation.fromDruidNode(node).withBasePath(SqlResource.PATH)),
+        new FixedServiceLocator(ServiceLocation.fromDruidNode(node)
+                                               
.withBasePath(StringUtils.maybeRemoveTrailingSlash(SqlResource.PATH))),
         StandardRetryPolicy.noRetries()
     );
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
index 7b054220521..d2c0e8f1c26 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
@@ -22,11 +22,13 @@ package org.apache.druid.msq.dart.controller.sql;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import 
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
 import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.rpc.ServiceClient;
 import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.GetQueryReportResponse;
 import org.apache.http.client.utils.URIBuilder;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 
@@ -67,4 +69,26 @@ public class DartSqlClientImpl implements DartSqlClient
       throw new RuntimeException(e);
     }
   }
+
+  @Override
+  public ListenableFuture<GetQueryReportResponse> getQueryReport(String 
sqlQueryId, boolean selfOnly)
+  {
+    try {
+      URIBuilder builder = new 
URIBuilder(StringUtils.format("/queries/%s/reports", 
StringUtils.urlEncode(sqlQueryId)));
+      if (selfOnly) {
+        builder.addParameter("selfOnly", null);
+      }
+
+      return FutureUtils.transform(
+          client.asyncRequest(
+              new RequestBuilder(HttpMethod.GET, builder.toString()),
+              new BytesFullResponseHandler()
+          ),
+          holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(), 
GetQueryReportResponse.class)
+      );
+    }
+    catch (URISyntaxException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
index f28d895cf60..02b2b652661 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
@@ -33,15 +33,15 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.sql.http.SqlResource;
 
-import javax.servlet.http.HttpServletRequest;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 /**
- * Keeps {@link DartSqlClient} for all servers except ourselves. Currently the 
purpose of this is to power
- * the "get all queries" API at {@link SqlResource#doGetRunningQueries(String, 
String, HttpServletRequest)}.
+ * Keeps {@link DartSqlClient} for all servers except ourselves. Currently, 
the purpose of this is to power
+ * the "get all queries" API at {@link SqlResource#doGetRunningQueries} and 
the "get query report" API
+ * at {@link SqlResource#doGetQueryReport}.
  */
 @ManageLifecycle
 public class DartSqlClients implements DruidNodeDiscovery.Listener
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index f61ed243fb3..0dbc8307e32 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -37,6 +38,7 @@ import org.apache.druid.msq.dart.Dart;
 import org.apache.druid.msq.dart.controller.ControllerHolder;
 import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
 import org.apache.druid.msq.dart.controller.DartControllerRegistry;
+import org.apache.druid.msq.dart.controller.QueryInfoAndReport;
 import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
 import org.apache.druid.msq.dart.guice.DartControllerConfig;
 import org.apache.druid.msq.exec.QueryKitSpecFactory;
@@ -61,8 +63,9 @@ import org.apache.druid.sql.calcite.run.SqlEngine;
 import org.apache.druid.sql.calcite.run.SqlEngines;
 import org.apache.druid.sql.destination.IngestDestination;
 import org.apache.druid.sql.http.GetQueriesResponse;
-import org.apache.druid.sql.http.QueryInfo;
+import org.apache.druid.sql.http.GetQueryReportResponse;
 
+import javax.annotation.Nullable;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -270,16 +273,17 @@ public class DartSqlEngine implements SqlEngine
   }
 
   @Override
-  public List<QueryInfo> getRunningQueries(
+  public GetQueriesResponse getRunningQueries(
       boolean selfOnly,
       AuthenticationResult authenticationResult,
-      AuthorizationResult authorizationResult
+      AuthorizationResult stateReadAuthorization
   )
   {
-    final List<DartQueryInfo> queries = controllerRegistry.getAllHolders()
-                                                          .stream()
-                                                          
.map(DartQueryInfo::fromControllerHolder)
-                                                          
.collect(Collectors.toList());
+    final List<DartQueryInfo> queries =
+        controllerRegistry.getAllControllers()
+                          .stream()
+                          .map(DartQueryInfo::fromControllerHolder)
+                          .collect(Collectors.toList());
 
     // Add queries from all other servers, if "selfOnly" is false.
     if (!selfOnly) {
@@ -302,27 +306,82 @@ public class DartSqlEngine implements SqlEngine
     // Sort queries by start time, breaking ties by query ID, so the list 
comes back in a consistent and nice order.
     
queries.sort(Comparator.comparing(DartQueryInfo::getStartTime).thenComparing(DartQueryInfo::getDartQueryId));
 
-    if (authorizationResult.allowAccessWithNoRestriction()) {
+    if (stateReadAuthorization.allowAccessWithNoRestriction()) {
       // User can READ STATE, so they can see all running queries, as well as 
authentication details.
-      return List.copyOf(queries);
+      return new GetQueriesResponse(List.copyOf(queries));
     } else {
       // User cannot READ STATE, so they can see only their own queries, 
without authentication details.
-      return queries.stream()
-                    .filter(
-                        query ->
-                            authenticationResult.getAuthenticatedBy() != null
-                            && authenticationResult.getIdentity() != null
-                            && Objects.equals(
-                                authenticationResult.getAuthenticatedBy(),
-                                query.getAuthenticator()
-                            )
-                            && Objects.equals(
-                                authenticationResult.getIdentity(),
-                                query.getIdentity()
-                            ))
-                    .map(DartQueryInfo::withoutAuthenticationResult)
-                    .collect(Collectors.toList());
+      return new GetQueriesResponse(
+          queries.stream()
+                 .filter(query -> isOwnQuery(authenticationResult, query))
+                 .map(DartQueryInfo::withoutAuthenticationResult)
+                 .collect(Collectors.toList())
+      );
+    }
+  }
+
+  @Override
+  @Nullable
+  public GetQueryReportResponse getQueryReport(
+      final String sqlQueryId,
+      final boolean selfOnly,
+      final AuthenticationResult authenticationResult,
+      final AuthorizationResult stateReadAuthorization
+  )
+  {
+    QueryInfoAndReport infoAndReport = 
controllerRegistry.getQueryInfoAndReportBySqlQueryId(sqlQueryId);
+
+    if (infoAndReport == null && !selfOnly) {
+      final List<GetQueryReportResponse> otherReports = 
FutureUtils.getUnchecked(
+          Futures.successfulAsList(
+              Iterables.transform(sqlClients.getAllClients(), client -> 
client.getQueryReport(sqlQueryId, true))
+          ),
+          true
+      );
+
+      for (final GetQueryReportResponse otherReport : otherReports) {
+        // Check for non-null report with non-null content (a 404 response 
returns GetReportResponse with null fields)
+        if (otherReport != null && otherReport.getQueryInfo() != null) {
+          infoAndReport = new QueryInfoAndReport(
+              (DartQueryInfo) otherReport.getQueryInfo(),
+              otherReport.getReportMap(),
+              DateTimes.utc(0)
+          );
+          break;
+        }
+      }
+    }
+
+    if (infoAndReport == null) {
+      return null;
     }
+
+    if (stateReadAuthorization.allowAccessWithNoRestriction()) {
+      // User can READ STATE, so they can see any report.
+      return new GetQueryReportResponse(infoAndReport.getQueryInfo(), 
infoAndReport.getReportMap());
+    } else {
+      // User cannot READ STATE, so they can see only their own queries, 
without authentication details.
+      final DartQueryInfo queryInfo = infoAndReport.getQueryInfo();
+      if (isOwnQuery(authenticationResult, queryInfo)) {
+        return new 
GetQueryReportResponse(queryInfo.withoutAuthenticationResult(), 
infoAndReport.getReportMap());
+      } else {
+        return null;
+      }
+    }
+  }
+
+  /**
+   * Returns whether the given query belongs to the authenticated user.
+   */
+  private static boolean isOwnQuery(
+      final AuthenticationResult authenticationResult,
+      final DartQueryInfo queryInfo
+  )
+  {
+    return authenticationResult.getAuthenticatedBy() != null
+           && authenticationResult.getIdentity() != null
+           && Objects.equals(authenticationResult.getAuthenticatedBy(), 
queryInfo.getAuthenticator())
+           && Objects.equals(authenticationResult.getIdentity(), 
queryInfo.getIdentity());
   }
 
   @Override
@@ -330,7 +389,7 @@ public class DartSqlEngine implements SqlEngine
   {
     final Object dartQueryId = 
plannerContext.queryContext().get(QueryContexts.CTX_DART_QUERY_ID);
     if (dartQueryId instanceof String) {
-      final ControllerHolder holder = controllerRegistry.get((String) 
dartQueryId);
+      final ControllerHolder holder = 
controllerRegistry.getController((String) dartQueryId);
       if (holder != null) {
         holder.cancel(CancellationReason.USER_REQUEST);
       }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
index f97f895314c..203b565cba5 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.dart.guice;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.joda.time.Period;
 
 /**
  * Runtime configuration for controllers (which run on Brokers).
@@ -42,6 +43,19 @@ public class DartControllerConfig
   @JsonProperty("heapFraction")
   private double heapFraction = DEFAULT_HEAP_FRACTION;
 
+  /**
+   * Maximum number of retained reports to store in-memory for completed 
queries.
+   * Set to zero to avoid storing any reports at all.
+   */
+  @JsonProperty("maxRetainedReportCount")
+  private int maxRetainedReportCount = 0;
+
+  /**
+   * Maximum age of retained reports. Set to zero to disable age-based 
expiration.
+   */
+  @JsonProperty("maxRetainedReportDuration")
+  private Period maxRetainedReportDuration = Period.ZERO;
+
   public int getConcurrentQueries()
   {
     return concurrentQueries;
@@ -56,4 +70,14 @@ public class DartControllerConfig
   {
     return heapFraction;
   }
+
+  public int getMaxRetainedReportCount()
+  {
+    return maxRetainedReportCount;
+  }
+
+  public Period getMaxRetainedReportDuration()
+  {
+    return maxRetainedReportDuration;
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartControllerClient.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartControllerClient.java
index 23d83d00549..97608df6ac7 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartControllerClient.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartControllerClient.java
@@ -27,6 +27,7 @@ import org.apache.druid.msq.counters.CounterSnapshotsTree;
 import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
 import org.apache.druid.msq.dart.controller.messages.DoneReadingInput;
 import org.apache.druid.msq.dart.controller.messages.PartialKeyStatistics;
+import org.apache.druid.msq.dart.controller.messages.PostCounters;
 import org.apache.druid.msq.dart.controller.messages.ResultsComplete;
 import org.apache.druid.msq.dart.controller.messages.WorkerError;
 import org.apache.druid.msq.dart.controller.messages.WorkerWarning;
@@ -47,6 +48,7 @@ public class DartControllerClient implements ControllerClient
   private final Outbox<ControllerMessage> outbox;
   private final String queryId;
   private final String controllerHost;
+  private final boolean liveReportCounters;
 
   /**
    * Currently-outstanding futures. These are tracked so they can be canceled 
in {@link #close()}.
@@ -56,12 +58,14 @@ public class DartControllerClient implements 
ControllerClient
   public DartControllerClient(
       final Outbox<ControllerMessage> outbox,
       final String queryId,
-      final String controllerHost
+      final String controllerHost,
+      final boolean liveReportCounters
   )
   {
     this.outbox = outbox;
     this.queryId = queryId;
     this.controllerHost = controllerHost;
+    this.liveReportCounters = liveReportCounters;
   }
 
   @Override
@@ -102,9 +106,11 @@ public class DartControllerClient implements 
ControllerClient
   }
 
   @Override
-  public void postCounters(String workerId, CounterSnapshotsTree snapshotsTree)
+  public void postCounters(final String workerId, final CounterSnapshotsTree 
snapshotsTree)
   {
-    // Do nothing. Live counters are not sent to the controller in this mode.
+    if (liveReportCounters) {
+      sendMessage(new PostCounters(queryId, workerId, snapshotsTree));
+    }
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
index 64f3884810a..62eff338119 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.messages.server.Outbox;
 import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
+import org.apache.druid.msq.dart.controller.messages.PostCounters;
 import org.apache.druid.msq.exec.ControllerClient;
 import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
 import org.apache.druid.msq.exec.FrameContext;
@@ -62,6 +63,12 @@ import java.io.File;
  */
 public class DartWorkerContext implements WorkerContext
 {
+  /**
+   * Default for {@link MultiStageQueryContext#CTX_LIVE_REPORT_COUNTERS}. Off 
by default since older Dart controllers
+   * don't understand the {@link PostCounters} message, and because it adds 
some overhead.
+   */
+  public static final boolean DEFAULT_LIVE_REPORT_COUNTERS = false;
+
   private final String queryId;
   private final String controllerHost;
   private final WorkerId workerId;
@@ -70,7 +77,6 @@ public class DartWorkerContext implements WorkerContext
   private final PolicyEnforcer policyEnforcer;
   private final Injector injector;
   private final DartWorkerClient workerClient;
-  private final DruidProcessingConfig processingConfig;
   private final SegmentWrangler segmentWrangler;
   private final GroupingEngine groupingEngine;
   private final DataSegmentProvider dataSegmentProvider;
@@ -119,7 +125,6 @@ public class DartWorkerContext implements WorkerContext
     this.policyEnforcer = policyEnforcer;
     this.injector = injector;
     this.workerClient = workerClient;
-    this.processingConfig = processingConfig;
     this.segmentWrangler = segmentWrangler;
     this.groupingEngine = groupingEngine;
     this.dataSegmentProvider = dataSegmentProvider;
@@ -193,7 +198,12 @@ public class DartWorkerContext implements WorkerContext
   @Override
   public ControllerClient makeControllerClient()
   {
-    return new DartControllerClient(outbox, queryId, controllerHost);
+    return new DartControllerClient(
+        outbox,
+        queryId,
+        controllerHost,
+        MultiStageQueryContext.getLiveReportCounters(queryContext, 
DEFAULT_LIVE_REPORT_COUNTERS)
+    );
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 2d7f958aca4..13a31edb3de 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -334,6 +334,14 @@ public class IndexerControllerContext implements 
ControllerContext
       builder.put(BaseQuery.SQL_QUERY_ID, 
queryContext.get(QueryContexts.CTX_SQL_QUERY_ID));
     }
 
+    if 
(queryContext.containsKey(MultiStageQueryContext.CTX_LIVE_REPORT_COUNTERS)) {
+      // No default for this one, because we want the default to be assigned 
on the worker.
+      builder.put(
+          MultiStageQueryContext.CTX_LIVE_REPORT_COUNTERS,
+          
queryContext.getBoolean(MultiStageQueryContext.CTX_LIVE_REPORT_COUNTERS)
+      );
+    }
+
     MSQDestination destination = querySpec.getDestination();
     if (destination.toSelectDestination() != null) {
       builder.put(
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index beec439ea19..68e46827b08 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -67,6 +67,11 @@ import java.io.File;
 
 public class IndexerWorkerContext implements WorkerContext
 {
+  /**
+   * Default for {@link MultiStageQueryContext#CTX_LIVE_REPORT_COUNTERS}. On 
by default for tasks.
+   */
+  public static final boolean DEFAULT_LIVE_REPORT_COUNTERS = true;
+
   private static final Logger log = new Logger(IndexerWorkerContext.class);
 
   private final MSQWorkerTask task;
@@ -81,6 +86,7 @@ public class IndexerWorkerContext implements WorkerContext
   private final MemoryIntrospector memoryIntrospector;
   private final ProcessingBuffersProvider processingBuffersProvider;
   private final int maxConcurrentStages;
+  private final boolean liveReportCounters;
   private final boolean includeAllCounters;
   private final int threadCount;
 
@@ -117,6 +123,7 @@ public class IndexerWorkerContext implements WorkerContext
         queryContext,
         IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES
     );
+    this.liveReportCounters = 
MultiStageQueryContext.getLiveReportCounters(queryContext, 
DEFAULT_LIVE_REPORT_COUNTERS);
     this.includeAllCounters = 
MultiStageQueryContext.getIncludeAllCounters(queryContext);
     
     // Compute thread count once in constructor
@@ -246,6 +253,7 @@ public class IndexerWorkerContext implements WorkerContext
             new SpecificTaskRetryPolicy(task.getControllerTaskId(), 
StandardRetryPolicy.unlimited())
         ),
         jsonMapper(),
+        liveReportCounters,
         controllerLocator
     );
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index c381713587b..02939322d30 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -285,11 +285,11 @@ public class MSQControllerTask extends AbstractTask 
implements ClientTaskQuery,
     );
 
     final TaskReportQueryListener queryListener = new TaskReportQueryListener(
-        querySpec.getDestination(),
         () -> 
toolbox.getTaskReportFileWriter().openReportOutputStream(getId()),
         toolbox.getJsonMapper(),
         getId(),
-        getContext()
+        getContext(),
+        querySpec.getDestination().getRowsInTaskReport()
     );
 
     controller.run(queryListener);
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
index be73a3cbfdd..4c21ca005ee 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
@@ -74,19 +74,19 @@ public class TaskReportQueryListener implements 
QueryListener
   private boolean resultsCurrentlyOpen;
 
   public TaskReportQueryListener(
-      final MSQDestination destination,
       final OutputStreamSupplier reportSink,
       final ObjectMapper jsonMapper,
       final String taskId,
-      final Map<String, Object> taskContext
+      final Map<String, Object> taskContext,
+      final long rowsInTaskReport
   )
   {
-    this.rowsInTaskReport = destination.getRowsInTaskReport();
     this.reportSink = reportSink;
     this.jsonMapper = jsonMapper;
     this.serializers = jsonMapper.getSerializerProviderInstance();
     this.taskId = taskId;
     this.taskContext = taskContext;
+    this.rowsInTaskReport = rowsInTaskReport;
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java
index 1a420d69b6c..a0f5e015fdb 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java
@@ -46,16 +46,19 @@ public class IndexerControllerClient implements 
ControllerClient
 {
   private final ServiceClient serviceClient;
   private final ObjectMapper jsonMapper;
+  private final boolean liveReportCounters;
   private final Closeable baggage;
 
   public IndexerControllerClient(
       final ServiceClient serviceClient,
       final ObjectMapper jsonMapper,
+      final boolean liveReportCounters,
       final Closeable baggage
   )
   {
     this.serviceClient = serviceClient;
     this.jsonMapper = jsonMapper;
+    this.liveReportCounters = liveReportCounters;
     this.baggage = baggage;
   }
 
@@ -99,12 +102,14 @@ public class IndexerControllerClient implements 
ControllerClient
   @Override
   public void postCounters(String workerId, CounterSnapshotsTree 
snapshotsTree) throws IOException
   {
-    final String path = StringUtils.format("/counters/%s", 
StringUtils.urlEncode(workerId));
-    doRequest(
-        new RequestBuilder(HttpMethod.POST, path)
-            .jsonContent(jsonMapper, snapshotsTree),
-        IgnoreHttpResponseHandler.INSTANCE
-    );
+    if (liveReportCounters) {
+      final String path = StringUtils.format("/counters/%s", 
StringUtils.urlEncode(workerId));
+      doRequest(
+          new RequestBuilder(HttpMethod.POST, path)
+              .jsonContent(jsonMapper, snapshotsTree),
+          IgnoreHttpResponseHandler.INSTANCE
+      );
+    }
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 11e2784d29d..8f93ce02529 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -208,6 +208,13 @@ public class MultiStageQueryContext
   public static final String CTX_INCLUDE_ALL_COUNTERS = "includeAllCounters";
   public static final boolean DEFAULT_INCLUDE_ALL_COUNTERS = true;
 
+  /**
+   * Whether workers should send live counter updates to the controller via 
the message relay. When enabled, workers
+   * periodically send counter snapshots to the controller, allowing the 
controller to have more up-to-date progress
+   * information.
+   */
+  public static final String CTX_LIVE_REPORT_COUNTERS = "liveReportCounters";
+
   public static final String CTX_FORCE_TIME_SORT = 
DimensionsSpec.PARAMETER_FORCE_TIME_SORT;
   private static final boolean DEFAULT_FORCE_TIME_SORT = 
DimensionsSpec.DEFAULT_FORCE_TIME_SORT;
 
@@ -516,6 +523,14 @@ public class MultiStageQueryContext
     return queryContext.getBoolean(CTX_INCLUDE_ALL_COUNTERS, 
DEFAULT_INCLUDE_ALL_COUNTERS);
   }
 
+  /**
+   * See {@link #CTX_LIVE_REPORT_COUNTERS}.
+   */
+  public static boolean getLiveReportCounters(final QueryContext queryContext, 
final boolean defaultValue)
+  {
+    return queryContext.getBoolean(CTX_LIVE_REPORT_COUNTERS, defaultValue);
+  }
+
   public static boolean isForceSegmentSortByTime(final QueryContext 
queryContext)
   {
     return queryContext.getBoolean(CTX_FORCE_TIME_SORT, 
DEFAULT_FORCE_TIME_SORT);
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/counters/NilQueryCounterSnapshotTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/NilQueryCounterSnapshotTest.java
new file mode 100644
index 00000000000..6d8d33ace0b
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/NilQueryCounterSnapshotTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.segment.TestHelper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class NilQueryCounterSnapshotTest
+{
+  private ObjectMapper objectMapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    objectMapper = TestHelper.JSON_MAPPER.copy();
+    objectMapper.registerModules(new MSQIndexingModule().getJacksonModules());
+  }
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    final NilQueryCounterSnapshot snapshot = 
NilQueryCounterSnapshot.instance();
+    final String json = objectMapper.writeValueAsString(snapshot);
+    final QueryCounterSnapshot deserialized = objectMapper.readValue(json, 
QueryCounterSnapshot.class);
+    Assertions.assertEquals(snapshot, deserialized);
+    Assertions.assertSame(NilQueryCounterSnapshot.instance(), deserialized);
+  }
+
+  @Test
+  public void testDeserializeFromUnknownType() throws Exception
+  {
+    final String json = "{\"type\": \"nonexistent\"}";
+    final QueryCounterSnapshot deserialized = objectMapper.readValue(json, 
QueryCounterSnapshot.class);
+    Assertions.assertEquals(NilQueryCounterSnapshot.instance(), deserialized);
+    Assertions.assertSame(NilQueryCounterSnapshot.instance(), deserialized);
+  }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    EqualsVerifier.forClass(NilQueryCounterSnapshot.class)
+                  .usingGetClass()
+                  .verify();
+  }
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
new file mode 100644
index 00000000000..3a19d796641
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.msq.dart.guice.DartControllerConfig;
+import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.joda.time.Period;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+
+public class DartControllerRegistryTest
+{
+  private static final String AUTHENTICATOR_NAME = "authn";
+
+  private AutoCloseable mockCloser;
+
+  @Mock
+  private MSQTaskReportPayload reportPayload;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mockCloser = MockitoAnnotations.openMocks(this);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception
+  {
+    mockCloser.close();
+  }
+
+  @Test
+  public void test_register_addsToRegistry()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(0, Period.ZERO));
+    final ControllerHolder holder = makeControllerHolder("dart1", "sql1", 
"user1");
+
+    registry.register(holder);
+
+    Assertions.assertEquals(1, registry.getAllControllers().size());
+    Assertions.assertSame(holder, registry.getController("dart1"));
+  }
+
+  @Test
+  public void test_register_duplicateThrows()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(0, Period.ZERO));
+    final ControllerHolder holder1 = makeControllerHolder("dart1", "sql1", 
"user1");
+    final ControllerHolder holder2 = makeControllerHolder("dart1", "sql2", 
"user2");
+
+    registry.register(holder1);
+
+    Assertions.assertThrows(DruidException.class, () -> 
registry.register(holder2));
+  }
+
+  @Test
+  public void test_deregister_noReport_removesFromRegistry()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+    final ControllerHolder holder = makeControllerHolder("dart1", "sql1", 
"user1");
+
+    registry.register(holder);
+    registry.deregister(holder, null);
+
+    Assertions.assertEquals(0, registry.getAllControllers().size());
+    Assertions.assertNull(registry.getController("dart1"));
+    Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
+    Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+  }
+
+  @Test
+  public void test_deregister_withReport_retainsReport()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+    final ControllerHolder holder = makeControllerHolder("dart1", "sql1", 
"user1");
+    final TaskReport report = new MSQTaskReport("dart1", reportPayload);
+
+    final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+    reportMap.put(MSQTaskReport.REPORT_KEY, report);
+    registry.register(holder);
+    registry.deregister(holder, reportMap);
+
+    // Controller is removed
+    Assertions.assertEquals(0, registry.getAllControllers().size());
+    Assertions.assertNull(registry.getController("dart1"));
+
+    // But report is retained
+    final QueryInfoAndReport infoAndReport = 
registry.getQueryInfoAndReport("dart1");
+    Assertions.assertNotNull(infoAndReport);
+    Assertions.assertEquals("dart1", 
infoAndReport.getQueryInfo().getDartQueryId());
+    Assertions.assertSame(report, 
infoAndReport.getReportMap().get(MSQTaskReport.REPORT_KEY));
+
+    // And can be looked up by SQL query ID
+    final QueryInfoAndReport infoAndReportBySql = 
registry.getQueryInfoAndReportBySqlQueryId("sql1");
+    Assertions.assertNotNull(infoAndReportBySql);
+    Assertions.assertEquals("dart1", 
infoAndReportBySql.getQueryInfo().getDartQueryId());
+    Assertions.assertEquals("sql1", 
infoAndReportBySql.getQueryInfo().getSqlQueryId());
+  }
+
+  @Test
+  public void 
test_deregister_withReport_zeroRetainedCount_doesNotRetainReport()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(0, Period.hours(1)));
+    final ControllerHolder holder = makeControllerHolder("dart1", "sql1", 
"user1");
+    final TaskReport report = new MSQTaskReport("dart1", reportPayload);
+
+    final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+    reportMap.put(MSQTaskReport.REPORT_KEY, report);
+    registry.register(holder);
+    registry.deregister(holder, reportMap);
+
+    Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
+    Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+  }
+
+  @Test
+  public void test_getQueryInfoAndReport_runningQuery()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+    final Controller controller = Mockito.mock(Controller.class);
+    final ControllerContext controllerContext = 
Mockito.mock(ControllerContext.class);
+    Mockito.when(controller.queryId()).thenReturn("dart1");
+    
Mockito.when(controller.getControllerContext()).thenReturn(controllerContext);
+    
Mockito.when(controllerContext.selfNode()).thenReturn(Mockito.mock(DruidNode.class));
+
+    // Set up live reports
+    final TaskReport.ReportMap liveReportMap = new TaskReport.ReportMap();
+    liveReportMap.put(MSQTaskReport.REPORT_KEY, new MSQTaskReport("dart1", 
reportPayload));
+    Mockito.when(controller.liveReports()).thenReturn(liveReportMap);
+
+    final ControllerHolder holder = new ControllerHolder(
+        controller,
+        "sql1",
+        "SELECT 1",
+        makeAuthenticationResult("user1"),
+        DateTimes.of("2000")
+    );
+
+    registry.register(holder);
+
+    final QueryInfoAndReport infoAndReport = 
registry.getQueryInfoAndReport("dart1");
+    Assertions.assertNotNull(infoAndReport);
+    Assertions.assertEquals("dart1", 
infoAndReport.getQueryInfo().getDartQueryId());
+
+    // Also works by SQL query ID
+    final QueryInfoAndReport infoAndReportBySql = 
registry.getQueryInfoAndReportBySqlQueryId("sql1");
+    Assertions.assertNotNull(infoAndReportBySql);
+    Assertions.assertEquals("dart1", 
infoAndReportBySql.getQueryInfo().getDartQueryId());
+
+    registry.deregister(holder, null);
+  }
+
+  @Test
+  public void test_getQueryInfoAndReport_runningQuery_noLiveReports()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+    final Controller controller = Mockito.mock(Controller.class);
+    final ControllerContext controllerContext = 
Mockito.mock(ControllerContext.class);
+    Mockito.when(controller.queryId()).thenReturn("dart1");
+    
Mockito.when(controller.getControllerContext()).thenReturn(controllerContext);
+    
Mockito.when(controllerContext.selfNode()).thenReturn(Mockito.mock(DruidNode.class));
+    Mockito.when(controller.liveReports()).thenReturn(null);
+
+    final ControllerHolder holder = new ControllerHolder(
+        controller,
+        "sql1",
+        "SELECT 1",
+        makeAuthenticationResult("user1"),
+        DateTimes.of("2000")
+    );
+
+    registry.register(holder);
+
+    // Returns null when no live reports are available
+    Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
+
+    // But the sqlQueryId mapping should still work after deregister with 
report
+    registry.deregister(holder, null);
+    Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+  }
+
+  @Test
+  public void test_reportEviction_byCount()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(2, Period.hours(1)));
+
+    // Register and deregister 3 queries with reports
+    for (int i = 1; i <= 3; i++) {
+      final ControllerHolder holder = makeControllerHolder("dart" + i, "sql" + 
i, "user1");
+      final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+      reportMap.put(MSQTaskReport.REPORT_KEY, new MSQTaskReport("dart" + i, 
reportPayload));
+      registry.register(holder);
+      registry.deregister(holder, reportMap);
+    }
+
+    // Only the last 2 reports should be retained
+    Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
+    Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+
+    Assertions.assertNotNull(registry.getQueryInfoAndReport("dart2"));
+    
Assertions.assertNotNull(registry.getQueryInfoAndReportBySqlQueryId("sql2"));
+
+    Assertions.assertNotNull(registry.getQueryInfoAndReport("dart3"));
+    
Assertions.assertNotNull(registry.getQueryInfoAndReportBySqlQueryId("sql3"));
+  }
+
+  @Test
+  public void test_getQueryInfoAndReportBySqlQueryId_notFound()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+
+    
Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("nonexistent"));
+  }
+
+  @Test
+  public void test_getAllControllers()
+  {
+    final DartControllerRegistry registry = new 
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+    final ControllerHolder holder1 = makeControllerHolder("dart1", "sql1", 
"user1");
+    final ControllerHolder holder2 = makeControllerHolder("dart2", "sql2", 
"user2");
+
+    registry.register(holder1);
+    registry.register(holder2);
+
+    Assertions.assertEquals(2, registry.getAllControllers().size());
+    Assertions.assertTrue(registry.getAllControllers().contains(holder1));
+    Assertions.assertTrue(registry.getAllControllers().contains(holder2));
+
+    registry.deregister(holder1, null);
+    registry.deregister(holder2, null);
+  }
+
+  private ControllerHolder makeControllerHolder(
+      final String dartQueryId,
+      final String sqlQueryId,
+      final String identity
+  )
+  {
+    final Controller controller = Mockito.mock(Controller.class);
+    final ControllerContext controllerContext = 
Mockito.mock(ControllerContext.class);
+    Mockito.when(controller.queryId()).thenReturn(dartQueryId);
+    
Mockito.when(controller.getControllerContext()).thenReturn(controllerContext);
+    
Mockito.when(controllerContext.selfNode()).thenReturn(Mockito.mock(DruidNode.class));
+
+    return new ControllerHolder(
+        controller,
+        sqlQueryId,
+        "SELECT 1",
+        makeAuthenticationResult(identity),
+        DateTimes.of("2000")
+    );
+  }
+
+  private static AuthenticationResult makeAuthenticationResult(final String 
identity)
+  {
+    return new AuthenticationResult(identity, null, AUTHENTICATOR_NAME, 
Collections.emptyMap());
+  }
+
+  private static DartControllerConfig makeConfig(
+      final int maxRetainedReportCount,
+      final Period maxRetainedReportDuration
+  )
+  {
+    return new DartControllerConfig()
+    {
+      @Override
+      @JsonProperty("maxRetainedReportCount")
+      public int getMaxRetainedReportCount()
+      {
+        return maxRetainedReportCount;
+      }
+
+      @Override
+      @JsonProperty("maxRetainedReportDuration")
+      public Period getMaxRetainedReportDuration()
+      {
+        return maxRetainedReportDuration;
+      }
+    };
+  }
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 64335b3a76c..92f7a735052 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -244,7 +244,7 @@ public class DartSqlResourceTest extends MSQTestBase
             return 
super.queryKernelConfig(querySpec).toBuilder().workerIds(ImmutableList.of("some")).build();
           }
         },
-        controllerRegistry = new DartControllerRegistry()
+        controllerRegistry = new DartControllerRegistry(new 
DartControllerConfig())
         {
           @Override
           public void register(ControllerHolder holder)
@@ -298,7 +298,7 @@ public class DartSqlResourceTest extends MSQTestBase
 
     // Ensure that controllerRegistry has nothing in it at the conclusion of 
each test. Verifies that controllers
     // are fully cleaned up.
-    Assertions.assertEquals(0, controllerRegistry.getAllHolders().size(), 
"controllerRegistry.getAllHolders().size()");
+    Assertions.assertEquals(0, controllerRegistry.getAllControllers().size(), 
"controllerRegistry.getAllHolders().size()");
   }
 
   @Test
@@ -325,7 +325,7 @@ public class DartSqlResourceTest extends MSQTestBase
         sqlResource.doGetRunningQueries("", httpServletRequest).getEntity()
     );
 
-    controllerRegistry.deregister(holder);
+    controllerRegistry.deregister(holder, null);
   }
 
   /**
@@ -342,15 +342,15 @@ public class DartSqlResourceTest extends MSQTestBase
     final ControllerHolder holder2 = 
setUpMockRunningQuery(DIFFERENT_REGULAR_USER_NAME);
 
     // Regular users can see only their own queries, without authentication 
details.
-    Assertions.assertEquals(2, controllerRegistry.getAllHolders().size());
+    Assertions.assertEquals(2, controllerRegistry.getAllControllers().size());
     Assertions.assertEquals(
         new GetQueriesResponse(
             
Collections.singletonList(DartQueryInfo.fromControllerHolder(holder).withoutAuthenticationResult())),
         sqlResource.doGetRunningQueries("", httpServletRequest).getEntity()
     );
 
-    controllerRegistry.deregister(holder);
-    controllerRegistry.deregister(holder2);
+    controllerRegistry.deregister(holder, null);
+    controllerRegistry.deregister(holder2, null);
   }
 
   /**
@@ -390,7 +390,7 @@ public class DartSqlResourceTest extends MSQTestBase
         sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
     );
 
-    controllerRegistry.deregister(localHolder);
+    controllerRegistry.deregister(localHolder, null);
   }
 
   /**
@@ -417,7 +417,7 @@ public class DartSqlResourceTest extends MSQTestBase
         sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
     );
 
-    controllerRegistry.deregister(localHolder);
+    controllerRegistry.deregister(localHolder, null);
   }
 
   /**
@@ -454,7 +454,7 @@ public class DartSqlResourceTest extends MSQTestBase
         sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
     );
 
-    controllerRegistry.deregister(localHolder);
+    controllerRegistry.deregister(localHolder, null);
   }
 
   /**
@@ -490,7 +490,7 @@ public class DartSqlResourceTest extends MSQTestBase
         sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
     );
 
-    controllerRegistry.deregister(holder);
+    controllerRegistry.deregister(holder, null);
   }
 
   @Test
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/messages/ControllerMessageTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/messages/ControllerMessageTest.java
index 427faf4aee6..d323dead298 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/messages/ControllerMessageTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/messages/ControllerMessageTest.java
@@ -22,6 +22,9 @@ package org.apache.druid.msq.dart.controller.messages;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.counters.CounterSnapshots;
+import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.counters.NilQueryCounterSnapshot;
 import org.apache.druid.msq.guice.MSQIndexingModule;
 import org.apache.druid.msq.indexing.error.MSQErrorReport;
 import org.apache.druid.msq.indexing.error.UnknownFault;
@@ -34,6 +37,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Map;
 
 public class ControllerMessageTest
 {
@@ -69,6 +73,13 @@ public class ControllerMessageTest
             Collections.singletonList(MSQErrorReport.fromFault("task", null, 
null, UnknownFault.forMessage("oops")))
         )
     );
+    assertSerde(
+        new PostCounters(
+            STAGE_ID.getQueryId(),
+            "worker-1",
+            new CounterSnapshotsTree()
+        )
+    );
   }
 
   @Test
@@ -79,6 +90,24 @@ public class ControllerMessageTest
     EqualsVerifier.forClass(ResultsComplete.class).usingGetClass().verify();
     EqualsVerifier.forClass(WorkerError.class).usingGetClass().verify();
     EqualsVerifier.forClass(WorkerWarning.class).usingGetClass().verify();
+    EqualsVerifier.forClass(PostCounters.class)
+                  .usingGetClass()
+                  .withPrefabValues(
+                      CounterSnapshotsTree.class,
+                      CounterSnapshotsTree.fromMap(
+                          Map.of(
+                              1,
+                              Map.of(1, new CounterSnapshots(Map.of("foo", 
NilQueryCounterSnapshot.instance())))
+                          )
+                      ),
+                      CounterSnapshotsTree.fromMap(
+                          Map.of(
+                              1,
+                              Map.of(1, new CounterSnapshots(Map.of("bar", 
NilQueryCounterSnapshot.instance())))
+                          )
+                      )
+                  )
+                  .verify();
   }
 
   private void assertSerde(final ControllerMessage message) throws IOException
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
index 252419f7227..e50da1c75ed 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
@@ -72,11 +72,11 @@ public class TaskReportQueryListenerTest
   public void test_taskReportDestination() throws IOException
   {
     final TaskReportQueryListener listener = new TaskReportQueryListener(
-        TaskReportMSQDestination.instance(),
         Suppliers.ofInstance(baos)::get,
         JSON_MAPPER,
         TASK_ID,
-        TASK_CONTEXT
+        TASK_CONTEXT,
+        TaskReportMSQDestination.instance().getRowsInTaskReport()
     );
 
     Assert.assertTrue(listener.readResults());
@@ -140,11 +140,11 @@ public class TaskReportQueryListenerTest
   public void test_durableDestination() throws IOException
   {
     final TaskReportQueryListener listener = new TaskReportQueryListener(
-        DurableStorageMSQDestination.instance(),
         Suppliers.ofInstance(baos)::get,
         JSON_MAPPER,
         TASK_ID,
-        TASK_CONTEXT
+        TASK_CONTEXT,
+        DurableStorageMSQDestination.instance().getRowsInTaskReport()
     );
 
     Assert.assertTrue(listener.readResults());
diff --git 
a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java 
b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
index 377866c48ee..60d03fe8d84 100644
--- a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
+++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
@@ -65,4 +65,23 @@ public interface BrokerClient
    * Updates the broker with the given {@link CoordinatorDynamicConfig}.
    */
   ListenableFuture<Boolean> 
updateCoordinatorDynamicConfig(CoordinatorDynamicConfig config);
+
+  /**
+   * Gets the report for a SQL query by its SQL query ID.
+   *
+   * @param sqlQueryId the SQL query ID
+   * @param selfOnly   if true, only check the local broker; if false, check 
all brokers
+   *
+   * @return JSON response from the report API
+   */
+  ListenableFuture<String> getQueryReport(String sqlQueryId, boolean selfOnly);
+
+  /**
+   * Cancels a SQL query by its SQL query ID.
+   *
+   * @param sqlQueryId the SQL query ID to cancel
+   *
+   * @return true if the cancellation was accepted
+   */
+  ListenableFuture<Boolean> cancelSqlQuery(String sqlQueryId);
 }
diff --git 
a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java 
b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java
index 52f7c3184d5..2d318bb3f32 100644
--- a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java
+++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java
@@ -128,5 +128,40 @@ public class BrokerClientImpl implements BrokerClient
         }
     );
   }
+
+  @Override
+  public ListenableFuture<String> getQueryReport(String sqlQueryId, boolean 
selfOnly)
+  {
+    final String path = StringUtils.format(
+        "/druid/v2/sql/queries/%s/reports%s",
+        StringUtils.urlEncode(sqlQueryId),
+        selfOnly ? "?selfOnly" : ""
+    );
+
+    return FutureUtils.transform(
+        client.asyncRequest(
+            new RequestBuilder(HttpMethod.GET, path),
+            new StringFullResponseHandler(StandardCharsets.UTF_8)
+        ),
+        FullResponseHolder::getContent
+    );
+  }
+
+  @Override
+  public ListenableFuture<Boolean> cancelSqlQuery(String sqlQueryId)
+  {
+    final String path = StringUtils.format(
+        "/druid/v2/sql/%s",
+        StringUtils.urlEncode(sqlQueryId)
+    );
+
+    return FutureUtils.transform(
+        client.asyncRequest(
+            new RequestBuilder(HttpMethod.DELETE, path),
+            new BytesFullResponseHandler()
+        ),
+        holder -> holder.getStatus().equals(HttpResponseStatus.ACCEPTED)
+    );
+  }
 }
 
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index a628ad26bbe..1ed70fdbdca 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -126,6 +126,19 @@ public class EmbeddedClusterApis implements 
EmbeddedResource
     return client.onAnyBroker(brokerApi);
   }
 
+  public <T> T onTargetBroker(EmbeddedBroker targetBroker, 
Function<BrokerClient, ListenableFuture<T>> brokerApi)
+  {
+    return client.onTargetBroker(targetBroker, brokerApi);
+  }
+
+  public <T> ListenableFuture<T> onTargetBrokerAsync(
+      EmbeddedBroker targetBroker,
+      Function<BrokerClient, ListenableFuture<T>> brokerApi
+  )
+  {
+    return client.onTargetBrokerAsync(targetBroker, brokerApi);
+  }
+
   /**
    * Submits the given SQL query to any of the brokers (using {@code 
BrokerClient})
    * of the cluster.
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
index beacab52920..90e2eb90048 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
@@ -25,20 +25,26 @@ import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.client.broker.Broker;
 import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.client.broker.BrokerClientImpl;
 import org.apache.druid.client.coordinator.Coordinator;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.client.indexing.IndexingService;
 import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.rpc.FixedServiceLocator;
 import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.rpc.ServiceClient;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.ServiceClientFactoryImpl;
+import org.apache.druid.rpc.ServiceLocation;
 import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.StandardRetryPolicy;
 import org.apache.druid.rpc.guice.ServiceClientModule;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.security.Escalator;
@@ -173,6 +179,50 @@ public class EmbeddedServiceClient
     return makeRequest(request, resultType, brokerServiceClient, 
getMapper(EmbeddedBroker.class));
   }
 
+  /**
+   * Executes an API call on a specific broker. Unlike {@link 
#onAnyBroker(Function)},
+   * this method allows targeting a specific broker instance.
+   *
+   * @param targetBroker the specific broker to target
+   * @param brokerApi    function that receives a {@link BrokerClient} and 
returns a future
+   */
+  public <T> T onTargetBroker(EmbeddedBroker targetBroker, 
Function<BrokerClient, ListenableFuture<T>> brokerApi)
+  {
+    return 
getResult(brokerApi.apply(createBrokerClientForBroker(targetBroker)));
+  }
+
+  /**
+   * Executes an API call on a specific broker asynchronously, returning the 
future directly.
+   *
+   * @param targetBroker the specific broker to target
+   * @param brokerApi    function that receives a {@link BrokerClient} and 
returns a future
+   */
+  public <T> ListenableFuture<T> onTargetBrokerAsync(
+      EmbeddedBroker targetBroker,
+      Function<BrokerClient, ListenableFuture<T>> brokerApi
+  )
+  {
+    return brokerApi.apply(createBrokerClientForBroker(targetBroker));
+  }
+
+  /**
+   * Creates a {@link BrokerClient} that targets a specific broker using a 
{@link FixedServiceLocator}.
+   */
+  private BrokerClient createBrokerClientForBroker(EmbeddedBroker targetBroker)
+  {
+    final ServiceLocation brokerLocation = 
ServiceLocation.fromDruidNode(targetBroker.bindings().selfNode());
+    final ServiceClientFactory clientFactory =
+        targetBroker.bindings().getInstance(ServiceClientFactory.class, 
EscalatedGlobal.class);
+    return new BrokerClientImpl(
+        clientFactory.makeClient(
+            NodeRole.BROKER.getJsonName(),
+            new FixedServiceLocator(brokerLocation),
+            StandardRetryPolicy.noRetries()
+        ),
+        targetBroker.bindings().jsonMapper()
+    );
+  }
+
   @Nullable
   private <T> T makeRequest(
       Function<ObjectMapper, RequestBuilder> request,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
index 9e2a3930ab5..7b2902c11b8 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
@@ -30,8 +30,11 @@ import org.apache.druid.server.security.AuthorizationResult;
 import org.apache.druid.sql.SqlStatementFactory;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.destination.IngestDestination;
+import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.GetQueryReportResponse;
 import org.apache.druid.sql.http.QueryInfo;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -102,9 +105,9 @@ public interface SqlEngine
   /**
    * Create a {@link QueryMaker} for an INSERT ... SELECT query.
    *
-   * @param destination      destination for the INSERT portion of the query
-   * @param relRoot          planned and validated rel for the SELECT portion 
of the query
-   * @param plannerContext   context for this query
+   * @param destination    destination for the INSERT portion of the query
+   * @param relRoot        planned and validated rel for the SELECT portion of 
the query
+   * @param plannerContext context for this query
    *
    * @return an executor for the provided query
    *
@@ -132,14 +135,41 @@ public interface SqlEngine
   /**
    * Returns a list of {@link QueryInfo} containing the currently running 
queries using this engine. Returns an empty
    * list if the operation is not supported.
+   *
+   * @param selfOnly               whether to only include queries running on 
this server. If false, this server should
+   *                               contact all other servers to build a full 
list of all running queries.
+   * @param authenticationResult   implementations should use this for 
filtering the list of visible queries
+   * @param stateReadAuthorization authorization for the STATE READ resource. 
If this is authorized, implementations
+   *                               should allow all queries to be visible
+   */
+  default GetQueriesResponse getRunningQueries(
+      boolean selfOnly,
+      AuthenticationResult authenticationResult,
+      AuthorizationResult stateReadAuthorization
+  )
+  {
+    return new GetQueriesResponse(List.of());
+  }
+
+  /**
+   * Retrieves the report for a query, if available.
+   *
+   * @param sqlQueryId             SQL query ID to retrieve the report for
+   * @param selfOnly               whether to only include queries running on 
this server. If false, this server should
+   *                               contact all other servers to find this 
query, if necessary.
+   * @param authenticationResult   implementations should use this to 
determine if a query should be visible to a user
+   * @param stateReadAuthorization authorization for the STATE READ resource. 
If this is authorized, implementations
+   *                               should allow all queries to be visible
    */
-  default List<QueryInfo> getRunningQueries(
+  @Nullable
+  default GetQueryReportResponse getQueryReport(
+      String sqlQueryId,
       boolean selfOnly,
       AuthenticationResult authenticationResult,
-      AuthorizationResult authorizationResult
+      AuthorizationResult stateReadAuthorization
   )
   {
-    return List.of();
+    return null;
   }
 
   /**
diff --git 
a/sql/src/main/java/org/apache/druid/sql/http/GetQueryReportResponse.java 
b/sql/src/main/java/org/apache/druid/sql/http/GetQueryReportResponse.java
new file mode 100644
index 00000000000..1c91dc436a2
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/http/GetQueryReportResponse.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.report.TaskReport;
+
+import java.util.Objects;
+
+/**
+ * Class returned by {@link SqlResource#doGetQueryReport}, the "get query 
report" API.
+ */
+public class GetQueryReportResponse
+{
+  private final QueryInfo queryInfo;
+  private final TaskReport.ReportMap reportMap;
+
+  @JsonCreator
+  public GetQueryReportResponse(
+      @JsonProperty("query") QueryInfo queryInfo,
+      @JsonProperty("report") TaskReport.ReportMap reportMap
+  )
+  {
+    this.queryInfo = queryInfo;
+    this.reportMap = reportMap;
+  }
+
+  @JsonProperty("query")
+  public QueryInfo getQueryInfo()
+  {
+    return queryInfo;
+  }
+
+  @JsonProperty("report")
+  public TaskReport.ReportMap getReportMap()
+  {
+    return reportMap;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    GetQueryReportResponse that = (GetQueryReportResponse) o;
+    return Objects.equals(queryInfo, that.queryInfo) && 
Objects.equals(reportMap, that.reportMap);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(queryInfo, reportMap);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "GetReportResponse{" +
+           "queryInfo=" + queryInfo +
+           ", report=" + reportMap +
+           '}';
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index 98e39e7be3a..e5f1a513d52 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -163,13 +163,61 @@ public class SqlResource
 
     // Get running queries from all engines that support it.
     for (SqlEngine sqlEngine : engines) {
-      queries.addAll(sqlEngine.getRunningQueries(selfOnly != null, 
authenticationResult, stateReadAccess));
+      queries.addAll(sqlEngine.getRunningQueries(selfOnly != null, 
authenticationResult, stateReadAccess).getQueries());
     }
 
     AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request);
     return Response.ok().entity(new GetQueriesResponse(queries)).build();
   }
 
+  /**
+   * API to get query reports, for all engines that support such reports.
+   *
+   * @param sqlQueryId SQL query ID
+   * @param selfOnly   if true, check reports from this server only. If false, 
check reports on all servers.
+   * @param request    http request.
+   */
+  @GET
+  @Path("/queries/{sqlQueryId}/reports")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response doGetQueryReport(
+      @PathParam("sqlQueryId") final String sqlQueryId,
+      @QueryParam("selfOnly") final String selfOnly,
+      @Context final HttpServletRequest request
+  )
+  {
+    final AuthenticationResult authenticationResult = 
AuthorizationUtils.authenticationResultFromRequest(request);
+    final AuthorizationResult stateReadAccess = 
AuthorizationUtils.authorizeAllResourceActions(
+        authenticationResult,
+        Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE, 
Action.READ)),
+        authorizerMapper
+    );
+
+    final Collection<SqlEngine> engines = sqlEngineRegistry.getAllEngines();
+
+    // Get task report from the first engine that recognizes the SQL query ID.
+    GetQueryReportResponse retVal = null;
+    for (SqlEngine sqlEngine : engines) {
+      retVal = sqlEngine.getQueryReport(
+          sqlQueryId,
+          selfOnly != null,
+          authenticationResult,
+          stateReadAccess
+      );
+
+      if (retVal != null) {
+        break;
+      }
+    }
+
+    AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request);
+    if (retVal == null) {
+      return Response.status(Status.NOT_FOUND).entity(new 
GetQueryReportResponse(null, null)).build();
+    } else {
+      return Response.ok().entity(retVal).build();
+    }
+  }
+
   /**
    * This method is defined as public so that tests can access it
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to