This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 93735f5df89 Dart: Serve reports for running and recently-finished
queries. (#18886)
93735f5df89 is described below
commit 93735f5df890f90e22e613549fe4d65533975765
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jan 8 11:04:15 2026 -0800
Dart: Serve reports for running and recently-finished queries. (#18886)
This patch adds query reports, and a mapping from SQL query ID to
Dart query ID, to DartControllerRegistry. It is made available through
a new getQueryReport method on SqlEngine, and through a new
"/druid/v2/sql/queries/<sqlQueryId>/report" endpoint on SqlResource.
---
.../embedded/msq/EmbeddedDartReportApiTest.java | 340 +++++++++++++++++++++
.../testing/embedded/msq/EmbeddedMSQApis.java | 95 ++++++
.../druid/msq/counters/CounterSnapshotsTree.java | 21 ++
.../msq/counters/NilQueryCounterSnapshot.java | 9 +
.../dart/controller/ControllerMessageListener.java | 4 +-
.../dart/controller/DartControllerRegistry.java | 201 +++++++++++-
.../msq/dart/controller/QueryInfoAndReport.java | 93 ++++++
.../controller/messages/ControllerMessage.java | 3 +-
.../msq/dart/controller/messages/PostCounters.java | 107 +++++++
.../msq/dart/controller/sql/DartQueryMaker.java | 57 ++--
.../msq/dart/controller/sql/DartSqlClient.java | 12 +
.../controller/sql/DartSqlClientFactoryImpl.java | 3 +-
.../msq/dart/controller/sql/DartSqlClientImpl.java | 24 ++
.../msq/dart/controller/sql/DartSqlClients.java | 6 +-
.../msq/dart/controller/sql/DartSqlEngine.java | 109 +++++--
.../druid/msq/dart/guice/DartControllerConfig.java | 24 ++
.../msq/dart/worker/DartControllerClient.java | 12 +-
.../druid/msq/dart/worker/DartWorkerContext.java | 16 +-
.../msq/indexing/IndexerControllerContext.java | 8 +
.../druid/msq/indexing/IndexerWorkerContext.java | 8 +
.../druid/msq/indexing/MSQControllerTask.java | 4 +-
.../msq/indexing/TaskReportQueryListener.java | 6 +-
.../indexing/client/IndexerControllerClient.java | 17 +-
.../druid/msq/util/MultiStageQueryContext.java | 15 +
.../msq/counters/NilQueryCounterSnapshotTest.java | 67 ++++
.../controller/DartControllerRegistryTest.java | 314 +++++++++++++++++++
.../dart/controller/http/DartSqlResourceTest.java | 20 +-
.../controller/messages/ControllerMessageTest.java | 29 ++
.../msq/indexing/TaskReportQueryListenerTest.java | 8 +-
.../apache/druid/client/broker/BrokerClient.java | 19 ++
.../druid/client/broker/BrokerClientImpl.java | 35 +++
.../testing/embedded/EmbeddedClusterApis.java | 13 +
.../testing/embedded/EmbeddedServiceClient.java | 50 +++
.../apache/druid/sql/calcite/run/SqlEngine.java | 42 ++-
.../druid/sql/http/GetQueryReportResponse.java | 82 +++++
.../org/apache/druid/sql/http/SqlResource.java | 50 ++-
36 files changed, 1824 insertions(+), 99 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
new file mode 100644
index 00000000000..891cba4e3b7
--- /dev/null
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.guice.SleepModule;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
+import org.apache.druid.msq.dart.controller.sql.DartSqlClients;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.http.ClientSqlQuery;
+import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.sql.http.GetQueryReportResponse;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedClusterApis;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Embedded test for the Dart report API at {@code
/druid/v2/sql/queries/{id}/reports}.
+ * Uses batch ingestion to avoid dependency on Kafka/Docker.
+ */
+public class EmbeddedDartReportApiTest extends EmbeddedClusterTestBase
+{
+ private static final int MAX_RETAINED_REPORT_COUNT = 10;
+
+ private final EmbeddedBroker broker1 = new EmbeddedBroker();
+ private final EmbeddedBroker broker2 = new EmbeddedBroker();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedHistorical historical = new EmbeddedHistorical();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+
+ private EmbeddedMSQApis msqApis;
+ private String ingestedDataSource;
+
+ private void configureBroker(EmbeddedBroker broker, int port)
+ {
+ broker.addProperty("druid.msq.dart.controller.heapFraction", "0.5")
+ .addProperty("druid.msq.dart.controller.maxRetainedReportCount",
String.valueOf(MAX_RETAINED_REPORT_COUNT))
+ .addProperty("druid.query.default.context.maxConcurrentStages", "1")
+ .addProperty("druid.plaintextPort", String.valueOf(port));
+ }
+
+ @Override
+ protected EmbeddedDruidCluster createCluster()
+ {
+ coordinator.addProperty("druid.manager.segments.useIncrementalCache",
"always");
+ overlord.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
+
+ // Enable Dart with report retention on both brokers, with different ports
+ configureBroker(broker1, 7082);
+ configureBroker(broker2, 7083);
+
+ historical.addProperty("druid.msq.dart.worker.heapFraction", "0.5")
+ .addProperty("druid.msq.dart.worker.concurrentQueries", "1");
+
+ indexer.setServerMemory(400_000_000)
+ .addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
+ .addProperty("druid.processing.numThreads", "2")
+ .addProperty("druid.worker.capacity", "4");
+
+ return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
+ .addCommonProperty("druid.msq.dart.enabled",
"true")
+ .useLatchableEmitter()
+ .addServer(coordinator)
+ .addServer(overlord)
+ .addServer(broker1)
+ .addServer(broker2)
+ .addServer(indexer)
+ .addServer(historical)
+ .addExtension(SleepModule.class);
+ }
+
+ @BeforeAll
+ protected void setupData()
+ {
+ msqApis = new EmbeddedMSQApis(cluster, overlord);
+
+ // Ingest test data once, using batch ingestion.
+ ingestedDataSource = EmbeddedClusterApis.createTestDatasourceName();
+ final String taskId = IdUtils.getRandomId();
+ final IndexTask task =
MoreResources.Task.BASIC_INDEX.get().dataSource(ingestedDataSource).withId(taskId);
+ cluster.callApi().onLeaderOverlord(o -> o.runTask(taskId, task));
+ cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+
+ // Wait for segments to be available on both brokers
+ cluster.callApi().waitForAllSegmentsToBeAvailable(ingestedDataSource,
coordinator, broker1);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(ingestedDataSource,
coordinator, broker2);
+ }
+
+ @Test
+ @Timeout(60)
+ public void test_getQueryReport_forCompletedDartQuery()
+ {
+ final String sqlQueryId = UUID.randomUUID().toString();
+ final String sql = StringUtils.format("SELECT COUNT(*) FROM \"%s\"",
ingestedDataSource);
+
+ // Run a Dart query with a specific SQL query ID
+ final String result = cluster.callApi().onAnyBroker(
+ b -> b.submitSqlQuery(
+ new ClientSqlQuery(
+ sql,
+ "CSV",
+ false,
+ false,
+ false,
+ ImmutableMap.of(
+ QueryContexts.ENGINE, "msq-dart",
+ QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId
+ ),
+ null
+ )
+ )
+ ).trim();
+
+ // Verify the query returned results (should be 10 rows based on
CSV_10_DAYS data)
+ Assertions.assertEquals("10", result);
+
+ // Now fetch the report using the SQL query ID
+ final GetQueryReportResponse reportResponse =
msqApis.getDartQueryReport(sqlQueryId, broker1);
+
+ // Verify the report response
+ Assertions.assertNotNull(reportResponse, "Report response should not be
null");
+ Assertions.assertNotNull(reportResponse.getQueryInfo(), "Query info should
not be null");
+ Assertions.assertNotNull(reportResponse.getReportMap(), "Report should not
be null");
+
+ // Verify the query info
+ final DartQueryInfo queryInfo = (DartQueryInfo)
reportResponse.getQueryInfo();
+ Assertions.assertEquals(sql, queryInfo.getSql());
+ Assertions.assertEquals(sqlQueryId, queryInfo.getSqlQueryId());
+ Assertions.assertNotNull(queryInfo.getDartQueryId());
+
+ // Verify the report is an MSQTaskReport
+ Assertions.assertInstanceOf(TaskReport.ReportMap.class,
reportResponse.getReportMap());
+ Assertions.assertInstanceOf(MSQTaskReport.class,
reportResponse.getReportMap().get(MSQTaskReport.REPORT_KEY));
+ }
+
+ @Test
+ @Timeout(60)
+ public void test_getQueryReport_notFound()
+ {
+ // Try to get a report for a non-existent query
+ final GetQueryReportResponse reportResponse =
msqApis.getDartQueryReport("nonexistent-query-id", broker1);
+
+ // Verify the response is null (not found)
+ Assertions.assertNull(reportResponse, "Report response should be null for
non-existent query");
+ }
+
+ @Test
+ @Timeout(60)
+ public void test_getQueryReport_fromBothBrokers()
+ {
+ final String sqlQueryId = UUID.randomUUID().toString();
+ final String sql = StringUtils.format("SELECT COUNT(*) FROM \"%s\"",
ingestedDataSource);
+
+ // Run a Dart query on any broker
+ final String result = cluster.callApi().onAnyBroker(
+ b -> b.submitSqlQuery(
+ new ClientSqlQuery(
+ sql,
+ "CSV",
+ false,
+ false,
+ false,
+ ImmutableMap.of(
+ QueryContexts.ENGINE, "msq-dart",
+ QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId
+ ),
+ null
+ )
+ )
+ ).trim();
+
+ // Verify the query returned results
+ Assertions.assertEquals("10", result);
+
+ // Verify both brokers have discovered each other
+ final var sqlClients1 =
broker1.bindings().getInstance(DartSqlClients.class);
+ final var sqlClients2 =
broker2.bindings().getInstance(DartSqlClients.class);
+ Assertions.assertEquals(1, sqlClients1.getAllClients().size(), "Broker1
should have 1 client (broker2)");
+ Assertions.assertEquals(1, sqlClients2.getAllClients().size(), "Broker2
should have 1 client (broker1)");
+
+ // Fetch the report from both brokers, to verify cross-broker lookup is
working
+ final GetQueryReportResponse reportFromBroker1 =
msqApis.getDartQueryReport(sqlQueryId, broker1);
+ final GetQueryReportResponse reportFromBroker2 =
msqApis.getDartQueryReport(sqlQueryId, broker2);
+
+ // Verify the report content
+ for (GetQueryReportResponse report : Arrays.asList(reportFromBroker1,
reportFromBroker2)) {
+ Assertions.assertNotNull(report);
+ final DartQueryInfo queryInfo = (DartQueryInfo) report.getQueryInfo();
+ Assertions.assertEquals(sqlQueryId, queryInfo.getSqlQueryId());
+ Assertions.assertEquals(sql, queryInfo.getSql());
+ Assertions.assertNotNull(queryInfo.getDartQueryId());
+ Assertions.assertInstanceOf(TaskReport.ReportMap.class,
report.getReportMap());
+ Assertions.assertInstanceOf(MSQTaskReport.class,
report.getReportMap().get(MSQTaskReport.REPORT_KEY));
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void test_getQueryReport_forRunningAndCanceledQuery()
+ {
+ final String sqlQueryId = UUID.randomUUID().toString();
+
+ // Use SLEEP to make the query run for a while.
+ // Need to use a non-constant expression to make this happen at runtime
rather than planning time.
+ final String sql =
+ StringUtils.format("SELECT SLEEP(TIMESTAMP_TO_MILLIS(__time) * 0 + 60)
FROM \"%s\"", ingestedDataSource);
+
+ // Step 1: Issue the query asynchronously.
+ final ListenableFuture<String> queryFuture =
+ msqApis.submitDartSqlAsync(sql, Map.of(QueryContexts.CTX_SQL_QUERY_ID,
sqlQueryId), broker1);
+
+ // Step 2: Get the report.
+ final GetQueryReportResponse runningReport = waitForReport(sqlQueryId);
+
+ Assertions.assertNotNull(runningReport, "Report should be available for
running query");
+ Assertions.assertNotNull(runningReport.getQueryInfo(), "Query info should
not be null");
+ Assertions.assertNotNull(runningReport.getReportMap(), "Report should not
be null");
+
+ // Verify the query info
+ final DartQueryInfo runningQueryInfo = (DartQueryInfo)
runningReport.getQueryInfo();
+ Assertions.assertEquals(sql, runningQueryInfo.getSql());
+ Assertions.assertEquals(sqlQueryId, runningQueryInfo.getSqlQueryId());
+
+ // Verify the report is an MSQTaskReport with RUNNING status
+ final MSQTaskReport runningMsqReport =
+ (MSQTaskReport)
runningReport.getReportMap().get(MSQTaskReport.REPORT_KEY);
+ Assertions.assertNotNull(runningMsqReport, "MSQ report should not be
null");
+
+ final MSQTaskReportPayload runningPayload = runningMsqReport.getPayload();
+ Assertions.assertNotNull(runningPayload, "Payload should not be null");
+ Assertions.assertNotNull(runningPayload.getStatus(), "Status should not be
null");
+ Assertions.assertEquals(
+ TaskState.RUNNING,
+ runningPayload.getStatus().getStatus(),
+ "Query should be in RUNNING state"
+ );
+
+ // Step 3: Cancel the query
+ final boolean canceled = msqApis.cancelDartQuery(sqlQueryId, broker1);
+ Assertions.assertTrue(canceled, "Query cancellation should be accepted");
+
+ // Step 4: Wait for the sqlQuery/time metric to be emitted (signals query
completion)
+ final LatchableEmitter emitter = broker1.latchableEmitter();
+ emitter.waitForEvent(
+ event -> event.hasMetricName("sqlQuery/time")
+ .hasDimension("id", sqlQueryId)
+ );
+
+ // Step 5: Fetch the report again - should now be in FAILED state
+ final GetQueryReportResponse canceledReport =
msqApis.getDartQueryReport(sqlQueryId, broker1);
+
+ Assertions.assertNotNull(canceledReport, "Report should be available for
canceled query");
+ Assertions.assertNotNull(canceledReport.getReportMap(), "Report map should
not be null");
+
+ final MSQTaskReport canceledMsqReport =
+ (MSQTaskReport)
canceledReport.getReportMap().get(MSQTaskReport.REPORT_KEY);
+ Assertions.assertNotNull(canceledMsqReport, "MSQ report should not be null
for canceled query");
+
+ final MSQTaskReportPayload canceledPayload =
canceledMsqReport.getPayload();
+ Assertions.assertNotNull(canceledPayload, "Payload should not be null for
canceled query");
+ Assertions.assertNotNull(canceledPayload.getStatus(), "Status should not
be null for canceled query");
+ Assertions.assertEquals(
+ TaskState.FAILED,
+ canceledPayload.getStatus().getStatus(),
+ "canceled query should be in FAILED state"
+ );
+
+ // The query future should complete with an error due to cancellation
+ try {
+ queryFuture.get();
+ Assertions.fail("Query should have failed due to cancellation");
+ }
+ catch (Exception ignored) {
+ // Expected - query was canceled
+ }
+ }
+
+ /**
+ * Polls the report API until a report is available.
+ */
+ private GetQueryReportResponse waitForReport(String sqlQueryId)
+ {
+ final long timeout = 30_000;
+ final long deadline = System.currentTimeMillis() + timeout;
+ while (System.currentTimeMillis() < deadline) {
+ final GetQueryReportResponse report =
msqApis.getDartQueryReport(sqlQueryId, broker1);
+ if (report != null) {
+ return report;
+ }
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ throw new ISE("Timed out after[%,d] ms waiting for query to be in RUNNING
state", timeout);
+ }
+}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
index d656618db3d..115d1c22f16 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedMSQApis.java
@@ -19,6 +19,9 @@
package org.apache.druid.testing.embedded.msq;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.report.TaskReport;
@@ -29,11 +32,17 @@ import
org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.rpc.HttpResponseException;
+import org.apache.druid.sql.http.GetQueryReportResponse;
import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import javax.annotation.Nullable;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -150,4 +159,90 @@ public class EmbeddedMSQApis
return taskReportPayload;
}
+
+ /**
+ * Gets the query report for a Dart query by its SQL query ID, fetching from
a specific broker.
+ * Returns null if the query is not found.
+ *
+ * @param sqlQueryId the SQL query ID
+ * @param targetBroker the broker to fetch the report from
+ */
+ @Nullable
+ public GetQueryReportResponse getDartQueryReport(String sqlQueryId,
EmbeddedBroker targetBroker)
+ {
+ try {
+ final String responseJson = cluster.callApi().onTargetBroker(
+ targetBroker,
+ b -> b.getQueryReport(sqlQueryId, false /* allow cross-broker
forwarding */)
+ );
+ return parseReportResponse(responseJson,
targetBroker.bindings().jsonMapper());
+ }
+ catch (RuntimeException e) {
+ // Check if this is a 404 Not Found response
+ final Throwable cause = e.getCause();
+ if (cause instanceof HttpResponseException) {
+ if (((HttpResponseException)
cause).getResponse().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
+ return null;
+ }
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Submits a Dart SQL query asynchronously to a specific broker.
+ *
+ * @param sql the SQL query
+ * @param context additional context parameters
+ * @param targetBroker the broker to submit the query to
+ *
+ * @return a future that resolves when the query completes
+ */
+ public ListenableFuture<String> submitDartSqlAsync(
+ String sql,
+ Map<String, Object> context,
+ EmbeddedBroker targetBroker
+ )
+ {
+ final Map<String, Object> fullContext = new HashMap<>(context);
+ fullContext.put(QueryContexts.ENGINE, DartSqlEngine.NAME);
+
+ return cluster.callApi().onTargetBrokerAsync(
+ targetBroker,
+ b -> b.submitSqlQuery(
+ new ClientSqlQuery(
+ sql,
+ ResultFormat.CSV.name(),
+ false,
+ false,
+ false,
+ fullContext,
+ null
+ )
+ )
+ );
+ }
+
+ /**
+ * Cancels a Dart SQL query by its SQL query ID.
+ *
+ * @param sqlQueryId the SQL query ID to cancel
+ * @param targetBroker the broker where the query is running
+ *
+ * @return true if the cancellation was accepted
+ */
+ public boolean cancelDartQuery(String sqlQueryId, EmbeddedBroker
targetBroker)
+ {
+ return cluster.callApi().onTargetBroker(targetBroker, b ->
b.cancelSqlQuery(sqlQueryId));
+ }
+
+ private static GetQueryReportResponse parseReportResponse(String
responseJson, ObjectMapper jsonMapper)
+ {
+ try {
+ return jsonMapper.readValue(responseJson, GetQueryReportResponse.class);
+ }
+ catch (JsonProcessingException e) {
+ throw DruidException.defensive(e, "Failed to parse query report
response[%s]", responseJson);
+ }
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshotsTree.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshotsTree.java
index dce2fe7ac3a..706c9a6871d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshotsTree.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshotsTree.java
@@ -109,6 +109,27 @@ public class CounterSnapshotsTree
}
}
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final CounterSnapshotsTree that = (CounterSnapshotsTree) o;
+ return copyMap().equals(that.copyMap());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ synchronized (snapshotsMap) {
+ return snapshotsMap.hashCode();
+ }
+ }
+
@Override
public String toString()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
index eca3f6d6228..03be3fb1a0b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
@@ -19,6 +19,7 @@
package org.apache.druid.msq.counters;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
/**
@@ -29,11 +30,19 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
@JsonTypeName("nil")
public class NilQueryCounterSnapshot implements QueryCounterSnapshot
{
+ private static final NilQueryCounterSnapshot INSTANCE = new
NilQueryCounterSnapshot();
+
private NilQueryCounterSnapshot()
{
// Singleton
}
+ @JsonCreator
+ public static NilQueryCounterSnapshot instance()
+ {
+ return INSTANCE;
+ }
+
@Override
public int hashCode()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
index 5cedd13baf0..e9d6fbb3010 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerMessageListener.java
@@ -44,7 +44,7 @@ public class ControllerMessageListener implements
MessageListener<ControllerMess
@Override
public void messageReceived(ControllerMessage message)
{
- final ControllerHolder holder =
controllerRegistry.get(message.getQueryId());
+ final ControllerHolder holder =
controllerRegistry.getController(message.getQueryId());
if (holder != null) {
message.handle(holder.getController());
}
@@ -59,7 +59,7 @@ public class ControllerMessageListener implements
MessageListener<ControllerMess
@Override
public void serverRemoved(DruidNode node)
{
- for (final ControllerHolder holder : controllerRegistry.getAllHolders()) {
+ for (final ControllerHolder holder :
controllerRegistry.getAllControllers()) {
final Controller controller = holder.getController();
final WorkerId workerId = WorkerId.fromDruidNode(node,
controller.queryId());
holder.workerOffline(workerId);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
index 847dbf75980..b80a6b6daf5 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
@@ -19,45 +19,166 @@
package org.apache.druid.msq.dart.controller;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import com.google.inject.Inject;
import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.ManageLifecycle;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
+import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
+import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.Controller;
+import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
- * Registry for actively-running {@link Controller}.
+ * Registry for actively-running {@link Controller} and recently-completed
{@link TaskReport}.
*/
+@ManageLifecycle
public class DartControllerRegistry
{
+ /**
+ * Minimum frequency for checking if {@link #cleanupExpiredReports()} needs
to be run.
+ */
+ private static final long MIN_CLEANUP_CHECK_MILLIS = 10_000;
+
+ private final DartControllerConfig config;
+
+ /**
+ * Map of Dart query ID -> controller for currently-running queries.
+ */
private final ConcurrentHashMap<String, ControllerHolder> controllerMap =
new ConcurrentHashMap<>();
+ /**
+ * Map of Dart query ID -> timestamped report for completed queries.
+ */
+ @GuardedBy("completeReports")
+ private final LinkedHashMap<String, QueryInfoAndReport> completeReports =
new LinkedHashMap<>();
+
+ /**
+ * Map of SQL query ID -> Dart query ID. Used by {@link
#getQueryInfoAndReportBySqlQueryId(String)}. Contains an
+ * entry for every query in either {@link #controllerMap} or {@link
#completeReports}.
+ *
+ * It is possible for the same SQL query ID to map to multiple Dart query
IDs, because SQL query IDs can be set
+ * by the user, and uniqueness is not a required. If this occurs case, we go
with the first one encountered
+ * and ignore the others.
+ */
+ private final ConcurrentHashMap<String, String> sqlQueryIdToDartQueryId =
new ConcurrentHashMap<>();
+
+ /**
+ * Executor for cleaning up reports older than {@link
DartControllerConfig#getMaxRetainedReportDuration()}.
+ */
+ private ScheduledExecutorService cleanupExec;
+
+ @Inject
+ public DartControllerRegistry(final DartControllerConfig config)
+ {
+ this.config = config;
+ }
+
+ @LifecycleStart
+ public void start()
+ {
+ // Schedule periodic cleanup of expired reports.
+ if (!config.getMaxRetainedReportDuration().equals(Period.ZERO)) {
+ final String threadNameFormat =
StringUtils.format("%s-ReportCleanupExec-%%s", getClass().getSimpleName());
+ final long cleanupPeriodMs = Math.max(
+ MIN_CLEANUP_CHECK_MILLIS,
+
config.getMaxRetainedReportDuration().toStandardDuration().getMillis() / 10
+ );
+ cleanupExec = Execs.scheduledSingleThreaded(threadNameFormat);
+ cleanupExec.scheduleAtFixedRate(
+ this::cleanupExpiredReports,
+ cleanupPeriodMs,
+ cleanupPeriodMs,
+ TimeUnit.MILLISECONDS
+ );
+ }
+ }
+
+ @LifecycleStop
+ public void stop()
+ {
+ if (cleanupExec != null) {
+ cleanupExec.shutdown();
+ }
+ }
+
/**
* Add a controller. Throws {@link DruidException} if a controller with the
same {@link Controller#queryId()} is
* already registered.
*/
public void register(ControllerHolder holder)
{
- if (controllerMap.putIfAbsent(holder.getController().queryId(), holder) !=
null) {
- throw DruidException.defensive("Controller[%s] already registered",
holder.getController().queryId());
+ final String dartQueryId = holder.getController().queryId();
+ if (controllerMap.putIfAbsent(dartQueryId, holder) != null) {
+ throw DruidException.defensive("Controller[%s] already registered",
dartQueryId);
}
+ sqlQueryIdToDartQueryId.putIfAbsent(holder.getSqlQueryId(), dartQueryId);
}
/**
- * Remove a controller from the registry.
+ * Remove a controller from the registry. Optionally registers a report that
will be available for some
+ * time afterwards, based on {@link
DartControllerConfig#getMaxRetainedReportCount()} and
+ * {@link DartControllerConfig#getMaxRetainedReportDuration()}.
*/
- public void deregister(ControllerHolder holder)
+ public void deregister(ControllerHolder holder, @Nullable
TaskReport.ReportMap completeReport)
{
+ final String dartQueryId = holder.getController().queryId();
+
// Remove only if the current mapping for the queryId is this specific
controller.
- controllerMap.remove(holder.getController().queryId(), holder);
+ final boolean didRemove = controllerMap.remove(dartQueryId, holder);
+
+ // Add completeReport to completeReports, if present, and if we actually
did deregister this specific controller.
+ if (didRemove && completeReport != null &&
config.getMaxRetainedReportCount() > 0) {
+ synchronized (completeReports) {
+ // Remove reports if size is greater than maxRetainedReportCount - 1.
+ int reportsToRemove = completeReports.size() -
config.getMaxRetainedReportCount() + 1;
+ if (reportsToRemove > 0) {
+ for (Iterator<Map.Entry<String, QueryInfoAndReport>> it =
completeReports.entrySet().iterator();
+ it.hasNext() && reportsToRemove > 0;
+ reportsToRemove--) {
+ final QueryInfoAndReport evictedReport = it.next().getValue();
+ it.remove();
+ sqlQueryIdToDartQueryId.remove(
+ evictedReport.getQueryInfo().getSqlQueryId(),
+ evictedReport.getQueryInfo().getDartQueryId()
+ );
+ }
+ }
+
+ completeReports.put(
+ dartQueryId,
+ new QueryInfoAndReport(
+ DartQueryInfo.fromControllerHolder(holder),
+ completeReport,
+ DateTimes.nowUtc()
+ )
+ );
+ }
+ } else if (didRemove) {
+ // Report not retained, but controller was removed; clean up the SQL
query ID mapping.
+ sqlQueryIdToDartQueryId.remove(holder.getSqlQueryId(), dartQueryId);
+ }
}
/**
- * Return a specific controller holder, or null if it doesn't exist.
+ * Return a specific controller holder by Dart query ID, or null if it
doesn't exist.
*/
@Nullable
- public ControllerHolder get(final String queryId)
+ public ControllerHolder getController(final String queryId)
{
return controllerMap.get(queryId);
}
@@ -65,8 +186,70 @@ public class DartControllerRegistry
/**
* Returns all actively-running {@link Controller}.
*/
- public Collection<ControllerHolder> getAllHolders()
+ public Collection<ControllerHolder> getAllControllers()
{
return controllerMap.values();
}
+
+ /**
+ * Gets execution details and report for a query.
+ */
+ @Nullable
+ public QueryInfoAndReport getQueryInfoAndReport(final String queryId)
+ {
+ final ControllerHolder runningController = getController(queryId);
+
+ if (runningController != null) {
+ final TaskReport.ReportMap liveReportMap =
runningController.getController().liveReports();
+ if (liveReportMap != null) {
+ return new QueryInfoAndReport(
+ DartQueryInfo.fromControllerHolder(runningController),
+ liveReportMap,
+ DateTimes.nowUtc()
+ );
+ } else {
+ return null;
+ }
+ } else {
+ synchronized (completeReports) {
+ return completeReports.get(queryId);
+ }
+ }
+ }
+
+ /**
+ * Gets execution details and report for a query by SQL query ID.
+ */
+ @Nullable
+ public QueryInfoAndReport getQueryInfoAndReportBySqlQueryId(final String
sqlQueryId)
+ {
+ final String dartQueryId = sqlQueryIdToDartQueryId.get(sqlQueryId);
+ if (dartQueryId == null) {
+ return null;
+ }
+ return getQueryInfoAndReport(dartQueryId);
+ }
+
+ /**
+ * Removes reports that have exceeded {@link
DartControllerConfig#getMaxRetainedReportDuration()}.
+ */
+ private void cleanupExpiredReports()
+ {
+ final long thresholdTimestamp =
DateTimes.nowUtc().minus(config.getMaxRetainedReportDuration()).getMillis();
+
+ synchronized (completeReports) {
+ final Iterator<Map.Entry<String, QueryInfoAndReport>> it =
completeReports.entrySet().iterator();
+ while (it.hasNext()) {
+ final QueryInfoAndReport report = it.next().getValue();
+ if (report.getTimestamp().getMillis() < thresholdTimestamp) {
+ it.remove();
+
sqlQueryIdToDartQueryId.remove(report.getQueryInfo().getSqlQueryId(),
report.getQueryInfo().getDartQueryId());
+ } else {
+ // Entries are added in order of increasing timestamp, so we can
stop looking once we
+ // find a non-expired entry.
+ break;
+ }
+ }
+ }
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
new file mode 100644
index 00000000000..de163ec6b0a
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Object returned by {@link
DartControllerRegistry#getQueryInfoAndReport(String)}.
+ */
+public class QueryInfoAndReport
+{
+ private final DartQueryInfo queryInfo;
+ @Nullable
+ private final TaskReport.ReportMap reportMap;
+ private final DateTime timestamp;
+
+ public QueryInfoAndReport(DartQueryInfo queryInfo, @Nullable
TaskReport.ReportMap reportMap, DateTime timestamp)
+ {
+ this.queryInfo = queryInfo;
+ this.reportMap = reportMap;
+ this.timestamp = timestamp;
+ }
+
+ public DartQueryInfo getQueryInfo()
+ {
+ return queryInfo;
+ }
+
+ @Nullable
+ public TaskReport.ReportMap getReportMap()
+ {
+ return reportMap;
+ }
+
+ /**
+ * Timestamp that the query details or report was last updated. Generally
"now" for currently-running queries,
+ * and the query finish time for completed queries.
+ */
+ public DateTime getTimestamp()
+ {
+ return timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ QueryInfoAndReport that = (QueryInfoAndReport) o;
+ return Objects.equals(queryInfo, that.queryInfo)
+ && Objects.equals(reportMap, that.reportMap)
+ && Objects.equals(timestamp, that.timestamp);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(queryInfo, reportMap, timestamp);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "QueryInfoAndReport{" +
+ "queryInfo=" + queryInfo +
+ ", report=" + reportMap +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/ControllerMessage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/ControllerMessage.java
index 454e23bbc9c..63fa5d95f05 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/ControllerMessage.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/ControllerMessage.java
@@ -33,7 +33,8 @@ import org.apache.druid.msq.exec.Controller;
@JsonSubTypes.Type(value = DoneReadingInput.class, name =
"doneReadingInput"),
@JsonSubTypes.Type(value = ResultsComplete.class, name =
"resultsComplete"),
@JsonSubTypes.Type(value = WorkerError.class, name = "workerError"),
- @JsonSubTypes.Type(value = WorkerWarning.class, name = "workerWarning")
+ @JsonSubTypes.Type(value = WorkerWarning.class, name = "workerWarning"),
+ @JsonSubTypes.Type(value = PostCounters.class, name = "postCounters")
})
public interface ControllerMessage
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/PostCounters.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/PostCounters.java
new file mode 100644
index 00000000000..64436fd39d2
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/messages/PostCounters.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller.messages;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerClient;
+
+import java.util.Objects;
+
+/**
+ * Message for {@link ControllerClient#postCounters}.
+ */
+public class PostCounters implements ControllerMessage
+{
+ private final String queryId;
+ private final String workerId;
+ private final CounterSnapshotsTree counters;
+
+ @JsonCreator
+ public PostCounters(
+ @JsonProperty("queryId") final String queryId,
+ @JsonProperty("workerId") final String workerId,
+ @JsonProperty("counters") final CounterSnapshotsTree counters
+ )
+ {
+ this.queryId = Preconditions.checkNotNull(queryId, "queryId");
+ this.workerId = Preconditions.checkNotNull(workerId, "workerId");
+ this.counters = Preconditions.checkNotNull(counters, "counters");
+ }
+
+ @Override
+ @JsonProperty
+ public String getQueryId()
+ {
+ return queryId;
+ }
+
+ @JsonProperty
+ public String getWorkerId()
+ {
+ return workerId;
+ }
+
+ @JsonProperty("counters")
+ public CounterSnapshotsTree getCounters()
+ {
+ return counters;
+ }
+
+ @Override
+ public void handle(final Controller controller)
+ {
+ controller.updateCounters(workerId, counters);
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final PostCounters that = (PostCounters) o;
+ return Objects.equals(queryId, that.queryId)
+ && Objects.equals(workerId, that.workerId)
+ && Objects.equals(counters, that.counters);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(queryId, workerId, counters);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PostCounters{" +
+ "queryId='" + queryId + '\'' +
+ ", workerId='" + workerId + '\'' +
+ ", counters=" + counters +
+ '}';
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
index c309ef0d419..5270c9f63d8 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartQueryMaker.java
@@ -23,6 +23,7 @@ import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.io.LimitedOutputStream;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Either;
@@ -31,7 +32,6 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
@@ -46,12 +46,13 @@ import org.apache.druid.msq.exec.ResultsContext;
import org.apache.druid.msq.indexing.LegacyMSQSpec;
import org.apache.druid.msq.indexing.QueryDefMSQSpec;
import org.apache.druid.msq.indexing.TaskReportQueryListener;
-import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
+import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CancellationReason;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQStatusReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.msq.querykit.MultiQueryKit;
import org.apache.druid.msq.sql.MSQTaskQueryMaker;
@@ -114,7 +115,7 @@ public class DartQueryMaker implements QueryMaker
private final DartControllerConfig controllerConfig;
/**
- * Executor for {@link #runWithoutReport(ControllerHolder)}. Number of
thread is equal to
+ * Executor for {@link #runWithIterator(ControllerHolder)}. Number of thread
is equal to
* {@link DartControllerConfig#getConcurrentQueries()}, which limits the
number of concurrent controllers.
*/
private final ExecutorService controllerExecutor;
@@ -230,12 +231,12 @@ public class DartQueryMaker implements QueryMaker
// runWithReport, runWithoutReport are responsible for calling
controllerRegistry.deregister(controllerHolder)
// when their work is done.
final Sequence<Object[]> results =
- fullReport ? runWithReport(controllerHolder) :
runWithoutReport(controllerHolder);
+ fullReport ? runWithReport(controllerHolder) :
runWithIterator(controllerHolder);
return QueryResponse.withEmptyContext(results);
}
catch (Throwable e) {
// Error while calling runWithReport or runWithoutReport. Deregister
controller immediately.
- controllerRegistry.deregister(controllerHolder);
+ controllerRegistry.deregister(controllerHolder, null);
throw e;
}
}
@@ -256,16 +257,16 @@ public class DartQueryMaker implements QueryMaker
* Run a query and return the full report, buffered in memory up to
* {@link DartControllerConfig#getMaxQueryReportSize()}.
*
- * Arranges for {@link DartControllerRegistry#deregister(ControllerHolder)}
to be called upon completion (either
- * success or failure).
+ * Arranges for {@link DartControllerRegistry#deregister} to be called upon
completion (either success or failure).
*/
private Sequence<Object[]> runWithReport(final ControllerHolder
controllerHolder)
{
- final Future<Map<String, Object>> reportFuture;
+ final Future<TaskReport.ReportMap> reportFuture;
// Run in controllerExecutor. Control doesn't really *need* to be moved to
another thread, but we have to
// use the controllerExecutor anyway, to ensure we respect the
concurrentQueries configuration.
reportFuture = controllerExecutor.submit(() -> {
+ TaskReport.ReportMap retVal = null;
final String threadName = Thread.currentThread().getName();
try {
@@ -273,7 +274,6 @@ public class DartQueryMaker implements QueryMaker
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final TaskReportQueryListener queryListener = new
TaskReportQueryListener(
- TaskReportMSQDestination.instance(),
() -> new LimitedOutputStream(
baos,
controllerConfig.getMaxQueryReportSize(),
@@ -286,12 +286,13 @@ public class DartQueryMaker implements QueryMaker
),
plannerContext.getJsonMapper(),
controllerHolder.getController().queryId(),
- Collections.emptyMap()
+ Collections.emptyMap(),
+ MSQDestination.UNLIMITED
);
if (controllerHolder.run(queryListener)) {
- return plannerContext.getJsonMapper()
- .readValue(baos.toByteArray(),
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
+ retVal = plannerContext.getJsonMapper()
+ .readValue(baos.toByteArray(),
TaskReport.ReportMap.class);
} else {
// Controller was canceled before it ran.
throw MSQErrorReport
@@ -305,9 +306,11 @@ public class DartQueryMaker implements QueryMaker
}
}
finally {
- controllerRegistry.deregister(controllerHolder);
+ controllerRegistry.deregister(controllerHolder, retVal);
Thread.currentThread().setName(threadName);
}
+
+ return retVal;
});
// Return a sequence that reads one row (the report) from reportFuture.
@@ -342,10 +345,9 @@ public class DartQueryMaker implements QueryMaker
/**
* Run a query and return the results only, streamed back using {@link
ResultIteratorMaker}.
*
- * Arranges for {@link DartControllerRegistry#deregister(ControllerHolder)}
to be called upon completion (either
- * success or failure).
+ * Arranges for {@link DartControllerRegistry#deregister} to be called upon
completion (either success or failure).
*/
- private Sequence<Object[]> runWithoutReport(final ControllerHolder
controllerHolder)
+ private Sequence<Object[]> runWithIterator(final ControllerHolder
controllerHolder)
{
return new BaseSequence<>(new ResultIteratorMaker(controllerHolder));
}
@@ -364,7 +366,7 @@ public class DartQueryMaker implements QueryMaker
}
/**
- * Helper for {@link #runWithoutReport(ControllerHolder)}.
+ * Helper for {@link #runWithIterator(ControllerHolder)}.
*/
class ResultIteratorMaker implements BaseSequence.IteratorMaker<Object[],
ResultIterator>
{
@@ -423,7 +425,20 @@ public class DartQueryMaker implements QueryMaker
throw e;
}
finally {
- controllerRegistry.deregister(controllerHolder);
+ final MSQTaskReport taskReport;
+
+ if (resultIterator.report != null) {
+ taskReport = new MSQTaskReport(
+ controllerHolder.getController().queryId(),
+ resultIterator.report
+ );
+ } else {
+ taskReport = null;
+ }
+
+ final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+ reportMap.put(MSQTaskReport.REPORT_KEY, taskReport);
+ controllerRegistry.deregister(controllerHolder, reportMap);
Thread.currentThread().setName(threadName);
}
});
@@ -450,7 +465,7 @@ public class DartQueryMaker implements QueryMaker
}
/**
- * Helper for {@link ResultIteratorMaker}, which is in turn a helper for
{@link #runWithoutReport(ControllerHolder)}.
+ * Helper for {@link ResultIteratorMaker}, which is in turn a helper for
{@link #runWithIterator(ControllerHolder)}.
*/
static class ResultIterator implements Iterator<Object[]>, QueryListener
{
@@ -471,6 +486,7 @@ public class DartQueryMaker implements QueryMaker
private Either<Throwable, Object[]> current;
private volatile boolean complete;
+ private volatile MSQTaskReportPayload report;
@Nullable
private final Duration timeout;
@@ -563,7 +579,8 @@ public class DartQueryMaker implements QueryMaker
public void onQueryComplete(MSQTaskReportPayload report)
{
try {
- complete = true;
+ this.report = report;
+ this.complete = true;
final MSQStatusReport statusReport = report.getStatus();
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
index c8da492237a..e94aac0cb4a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.dart.controller.sql;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.GetQueryReportResponse;
import org.apache.druid.sql.http.SqlResource;
import javax.servlet.http.HttpServletRequest;
@@ -39,4 +40,15 @@ public interface DartSqlClient
* @see SqlResource#doGetRunningQueries(String, HttpServletRequest) the
server side
*/
ListenableFuture<GetQueriesResponse> getRunningQueries(boolean selfOnly);
+
+ /**
+ * Get query report for a particular SQL query ID on this server.
+ *
+ * @param sqlQueryId SQL query ID
+ * @param selfOnly true if only queries from this server should be
examined; false if queries from all servers
+ * should be examined
+ *
+ * @see SqlResource#doGetQueryReport(String, String, HttpServletRequest)
server side
+ */
+ ListenableFuture<GetQueryReportResponse> getQueryReport(String sqlQueryId,
boolean selfOnly);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
index 3562f5c2ff8..5046bf801eb 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
@@ -55,7 +55,8 @@ public class DartSqlClientFactoryImpl implements
DartSqlClientFactory
{
final ServiceClient client = clientFactory.makeClient(
StringUtils.format("%s[dart-sql]", node.getHostAndPortToUse()),
- new
FixedServiceLocator(ServiceLocation.fromDruidNode(node).withBasePath(SqlResource.PATH)),
+ new FixedServiceLocator(ServiceLocation.fromDruidNode(node)
+
.withBasePath(StringUtils.maybeRemoveTrailingSlash(SqlResource.PATH))),
StandardRetryPolicy.noRetries()
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
index 7b054220521..d2c0e8f1c26 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
@@ -22,11 +22,13 @@ package org.apache.druid.msq.dart.controller.sql;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.GetQueryReportResponse;
import org.apache.http.client.utils.URIBuilder;
import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -67,4 +69,26 @@ public class DartSqlClientImpl implements DartSqlClient
throw new RuntimeException(e);
}
}
+
+ @Override
+ public ListenableFuture<GetQueryReportResponse> getQueryReport(String
sqlQueryId, boolean selfOnly)
+ {
+ try {
+ URIBuilder builder = new
URIBuilder(StringUtils.format("/queries/%s/reports",
StringUtils.urlEncode(sqlQueryId)));
+ if (selfOnly) {
+ builder.addParameter("selfOnly", null);
+ }
+
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.GET, builder.toString()),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(),
GetQueryReportResponse.class)
+ );
+ }
+ catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
index f28d895cf60..02b2b652661 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
@@ -33,15 +33,15 @@ import
org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.http.SqlResource;
-import javax.servlet.http.HttpServletRequest;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
- * Keeps {@link DartSqlClient} for all servers except ourselves. Currently the
purpose of this is to power
- * the "get all queries" API at {@link SqlResource#doGetRunningQueries(String,
String, HttpServletRequest)}.
+ * Keeps {@link DartSqlClient} for all servers except ourselves. Currently,
the purpose of this is to power
+ * the "get all queries" API at {@link SqlResource#doGetRunningQueries} and
the "get query report" API
+ * at {@link SqlResource#doGetQueryReport}.
*/
@ManageLifecycle
public class DartSqlClients implements DruidNodeDiscovery.Listener
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index f61ed243fb3..0dbc8307e32 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
@@ -37,6 +38,7 @@ import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
+import org.apache.druid.msq.dart.controller.QueryInfoAndReport;
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.QueryKitSpecFactory;
@@ -61,8 +63,9 @@ import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngines;
import org.apache.druid.sql.destination.IngestDestination;
import org.apache.druid.sql.http.GetQueriesResponse;
-import org.apache.druid.sql.http.QueryInfo;
+import org.apache.druid.sql.http.GetQueryReportResponse;
+import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -270,16 +273,17 @@ public class DartSqlEngine implements SqlEngine
}
@Override
- public List<QueryInfo> getRunningQueries(
+ public GetQueriesResponse getRunningQueries(
boolean selfOnly,
AuthenticationResult authenticationResult,
- AuthorizationResult authorizationResult
+ AuthorizationResult stateReadAuthorization
)
{
- final List<DartQueryInfo> queries = controllerRegistry.getAllHolders()
- .stream()
-
.map(DartQueryInfo::fromControllerHolder)
-
.collect(Collectors.toList());
+ final List<DartQueryInfo> queries =
+ controllerRegistry.getAllControllers()
+ .stream()
+ .map(DartQueryInfo::fromControllerHolder)
+ .collect(Collectors.toList());
// Add queries from all other servers, if "selfOnly" is false.
if (!selfOnly) {
@@ -302,27 +306,82 @@ public class DartSqlEngine implements SqlEngine
// Sort queries by start time, breaking ties by query ID, so the list
comes back in a consistent and nice order.
queries.sort(Comparator.comparing(DartQueryInfo::getStartTime).thenComparing(DartQueryInfo::getDartQueryId));
- if (authorizationResult.allowAccessWithNoRestriction()) {
+ if (stateReadAuthorization.allowAccessWithNoRestriction()) {
// User can READ STATE, so they can see all running queries, as well as
authentication details.
- return List.copyOf(queries);
+ return new GetQueriesResponse(List.copyOf(queries));
} else {
// User cannot READ STATE, so they can see only their own queries,
without authentication details.
- return queries.stream()
- .filter(
- query ->
- authenticationResult.getAuthenticatedBy() != null
- && authenticationResult.getIdentity() != null
- && Objects.equals(
- authenticationResult.getAuthenticatedBy(),
- query.getAuthenticator()
- )
- && Objects.equals(
- authenticationResult.getIdentity(),
- query.getIdentity()
- ))
- .map(DartQueryInfo::withoutAuthenticationResult)
- .collect(Collectors.toList());
+ return new GetQueriesResponse(
+ queries.stream()
+ .filter(query -> isOwnQuery(authenticationResult, query))
+ .map(DartQueryInfo::withoutAuthenticationResult)
+ .collect(Collectors.toList())
+ );
+ }
+ }
+
+ @Override
+ @Nullable
+ public GetQueryReportResponse getQueryReport(
+ final String sqlQueryId,
+ final boolean selfOnly,
+ final AuthenticationResult authenticationResult,
+ final AuthorizationResult stateReadAuthorization
+ )
+ {
+ QueryInfoAndReport infoAndReport =
controllerRegistry.getQueryInfoAndReportBySqlQueryId(sqlQueryId);
+
+ if (infoAndReport == null && !selfOnly) {
+ final List<GetQueryReportResponse> otherReports =
FutureUtils.getUnchecked(
+ Futures.successfulAsList(
+ Iterables.transform(sqlClients.getAllClients(), client ->
client.getQueryReport(sqlQueryId, true))
+ ),
+ true
+ );
+
+ for (final GetQueryReportResponse otherReport : otherReports) {
+ // Check for non-null report with non-null content (a 404 response
returns GetReportResponse with null fields)
+ if (otherReport != null && otherReport.getQueryInfo() != null) {
+ infoAndReport = new QueryInfoAndReport(
+ (DartQueryInfo) otherReport.getQueryInfo(),
+ otherReport.getReportMap(),
+ DateTimes.utc(0)
+ );
+ break;
+ }
+ }
+ }
+
+ if (infoAndReport == null) {
+ return null;
}
+
+ if (stateReadAuthorization.allowAccessWithNoRestriction()) {
+ // User can READ STATE, so they can see any report.
+ return new GetQueryReportResponse(infoAndReport.getQueryInfo(),
infoAndReport.getReportMap());
+ } else {
+ // User cannot READ STATE, so they can see only their own queries,
without authentication details.
+ final DartQueryInfo queryInfo = infoAndReport.getQueryInfo();
+ if (isOwnQuery(authenticationResult, queryInfo)) {
+ return new
GetQueryReportResponse(queryInfo.withoutAuthenticationResult(),
infoAndReport.getReportMap());
+ } else {
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Returns whether the given query belongs to the authenticated user.
+ */
+ private static boolean isOwnQuery(
+ final AuthenticationResult authenticationResult,
+ final DartQueryInfo queryInfo
+ )
+ {
+ return authenticationResult.getAuthenticatedBy() != null
+ && authenticationResult.getIdentity() != null
+ && Objects.equals(authenticationResult.getAuthenticatedBy(),
queryInfo.getAuthenticator())
+ && Objects.equals(authenticationResult.getIdentity(),
queryInfo.getIdentity());
}
@Override
@@ -330,7 +389,7 @@ public class DartSqlEngine implements SqlEngine
{
final Object dartQueryId =
plannerContext.queryContext().get(QueryContexts.CTX_DART_QUERY_ID);
if (dartQueryId instanceof String) {
- final ControllerHolder holder = controllerRegistry.get((String)
dartQueryId);
+ final ControllerHolder holder =
controllerRegistry.getController((String) dartQueryId);
if (holder != null) {
holder.cancel(CancellationReason.USER_REQUEST);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
index f97f895314c..203b565cba5 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerConfig.java
@@ -21,6 +21,7 @@ package org.apache.druid.msq.dart.guice;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.joda.time.Period;
/**
* Runtime configuration for controllers (which run on Brokers).
@@ -42,6 +43,19 @@ public class DartControllerConfig
@JsonProperty("heapFraction")
private double heapFraction = DEFAULT_HEAP_FRACTION;
+ /**
+ * Maximum number of retained reports to store in-memory for completed
queries.
+ * Set to zero to avoid storing any reports at all.
+ */
+ @JsonProperty("maxRetainedReportCount")
+ private int maxRetainedReportCount = 0;
+
+ /**
+ * Maximum age of retained reports. Set to zero to disable age-based
expiration.
+ */
+ @JsonProperty("maxRetainedReportDuration")
+ private Period maxRetainedReportDuration = Period.ZERO;
+
public int getConcurrentQueries()
{
return concurrentQueries;
@@ -56,4 +70,14 @@ public class DartControllerConfig
{
return heapFraction;
}
+
+ public int getMaxRetainedReportCount()
+ {
+ return maxRetainedReportCount;
+ }
+
+ public Period getMaxRetainedReportDuration()
+ {
+ return maxRetainedReportDuration;
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartControllerClient.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartControllerClient.java
index 23d83d00549..97608df6ac7 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartControllerClient.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartControllerClient.java
@@ -27,6 +27,7 @@ import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
import org.apache.druid.msq.dart.controller.messages.DoneReadingInput;
import org.apache.druid.msq.dart.controller.messages.PartialKeyStatistics;
+import org.apache.druid.msq.dart.controller.messages.PostCounters;
import org.apache.druid.msq.dart.controller.messages.ResultsComplete;
import org.apache.druid.msq.dart.controller.messages.WorkerError;
import org.apache.druid.msq.dart.controller.messages.WorkerWarning;
@@ -47,6 +48,7 @@ public class DartControllerClient implements ControllerClient
private final Outbox<ControllerMessage> outbox;
private final String queryId;
private final String controllerHost;
+ private final boolean liveReportCounters;
/**
* Currently-outstanding futures. These are tracked so they can be canceled
in {@link #close()}.
@@ -56,12 +58,14 @@ public class DartControllerClient implements
ControllerClient
public DartControllerClient(
final Outbox<ControllerMessage> outbox,
final String queryId,
- final String controllerHost
+ final String controllerHost,
+ final boolean liveReportCounters
)
{
this.outbox = outbox;
this.queryId = queryId;
this.controllerHost = controllerHost;
+ this.liveReportCounters = liveReportCounters;
}
@Override
@@ -102,9 +106,11 @@ public class DartControllerClient implements
ControllerClient
}
@Override
- public void postCounters(String workerId, CounterSnapshotsTree snapshotsTree)
+ public void postCounters(final String workerId, final CounterSnapshotsTree
snapshotsTree)
{
- // Do nothing. Live counters are not sent to the controller in this mode.
+ if (liveReportCounters) {
+ sendMessage(new PostCounters(queryId, workerId, snapshotsTree));
+ }
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
index 64f3884810a..62eff338119 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.messages.server.Outbox;
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
+import org.apache.druid.msq.dart.controller.messages.PostCounters;
import org.apache.druid.msq.exec.ControllerClient;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.FrameContext;
@@ -62,6 +63,12 @@ import java.io.File;
*/
public class DartWorkerContext implements WorkerContext
{
+ /**
+ * Default for {@link MultiStageQueryContext#CTX_LIVE_REPORT_COUNTERS}. Off
by default since older Dart controllers
+ * don't understand the {@link PostCounters} message, and because it adds
some overhead.
+ */
+ public static final boolean DEFAULT_LIVE_REPORT_COUNTERS = false;
+
private final String queryId;
private final String controllerHost;
private final WorkerId workerId;
@@ -70,7 +77,6 @@ public class DartWorkerContext implements WorkerContext
private final PolicyEnforcer policyEnforcer;
private final Injector injector;
private final DartWorkerClient workerClient;
- private final DruidProcessingConfig processingConfig;
private final SegmentWrangler segmentWrangler;
private final GroupingEngine groupingEngine;
private final DataSegmentProvider dataSegmentProvider;
@@ -119,7 +125,6 @@ public class DartWorkerContext implements WorkerContext
this.policyEnforcer = policyEnforcer;
this.injector = injector;
this.workerClient = workerClient;
- this.processingConfig = processingConfig;
this.segmentWrangler = segmentWrangler;
this.groupingEngine = groupingEngine;
this.dataSegmentProvider = dataSegmentProvider;
@@ -193,7 +198,12 @@ public class DartWorkerContext implements WorkerContext
@Override
public ControllerClient makeControllerClient()
{
- return new DartControllerClient(outbox, queryId, controllerHost);
+ return new DartControllerClient(
+ outbox,
+ queryId,
+ controllerHost,
+ MultiStageQueryContext.getLiveReportCounters(queryContext,
DEFAULT_LIVE_REPORT_COUNTERS)
+ );
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 2d7f958aca4..13a31edb3de 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -334,6 +334,14 @@ public class IndexerControllerContext implements
ControllerContext
builder.put(BaseQuery.SQL_QUERY_ID,
queryContext.get(QueryContexts.CTX_SQL_QUERY_ID));
}
+ if
(queryContext.containsKey(MultiStageQueryContext.CTX_LIVE_REPORT_COUNTERS)) {
+ // No default for this one, because we want the default to be assigned
on the worker.
+ builder.put(
+ MultiStageQueryContext.CTX_LIVE_REPORT_COUNTERS,
+
queryContext.getBoolean(MultiStageQueryContext.CTX_LIVE_REPORT_COUNTERS)
+ );
+ }
+
MSQDestination destination = querySpec.getDestination();
if (destination.toSelectDestination() != null) {
builder.put(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index beec439ea19..68e46827b08 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -67,6 +67,11 @@ import java.io.File;
public class IndexerWorkerContext implements WorkerContext
{
+ /**
+ * Default for {@link MultiStageQueryContext#CTX_LIVE_REPORT_COUNTERS}. On
by default for tasks.
+ */
+ public static final boolean DEFAULT_LIVE_REPORT_COUNTERS = true;
+
private static final Logger log = new Logger(IndexerWorkerContext.class);
private final MSQWorkerTask task;
@@ -81,6 +86,7 @@ public class IndexerWorkerContext implements WorkerContext
private final MemoryIntrospector memoryIntrospector;
private final ProcessingBuffersProvider processingBuffersProvider;
private final int maxConcurrentStages;
+ private final boolean liveReportCounters;
private final boolean includeAllCounters;
private final int threadCount;
@@ -117,6 +123,7 @@ public class IndexerWorkerContext implements WorkerContext
queryContext,
IndexerControllerContext.DEFAULT_MAX_CONCURRENT_STAGES
);
+ this.liveReportCounters =
MultiStageQueryContext.getLiveReportCounters(queryContext,
DEFAULT_LIVE_REPORT_COUNTERS);
this.includeAllCounters =
MultiStageQueryContext.getIncludeAllCounters(queryContext);
// Compute thread count once in constructor
@@ -246,6 +253,7 @@ public class IndexerWorkerContext implements WorkerContext
new SpecificTaskRetryPolicy(task.getControllerTaskId(),
StandardRetryPolicy.unlimited())
),
jsonMapper(),
+ liveReportCounters,
controllerLocator
);
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
index c381713587b..02939322d30 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java
@@ -285,11 +285,11 @@ public class MSQControllerTask extends AbstractTask
implements ClientTaskQuery,
);
final TaskReportQueryListener queryListener = new TaskReportQueryListener(
- querySpec.getDestination(),
() ->
toolbox.getTaskReportFileWriter().openReportOutputStream(getId()),
toolbox.getJsonMapper(),
getId(),
- getContext()
+ getContext(),
+ querySpec.getDestination().getRowsInTaskReport()
);
controller.run(queryListener);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
index be73a3cbfdd..4c21ca005ee 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/TaskReportQueryListener.java
@@ -74,19 +74,19 @@ public class TaskReportQueryListener implements
QueryListener
private boolean resultsCurrentlyOpen;
public TaskReportQueryListener(
- final MSQDestination destination,
final OutputStreamSupplier reportSink,
final ObjectMapper jsonMapper,
final String taskId,
- final Map<String, Object> taskContext
+ final Map<String, Object> taskContext,
+ final long rowsInTaskReport
)
{
- this.rowsInTaskReport = destination.getRowsInTaskReport();
this.reportSink = reportSink;
this.jsonMapper = jsonMapper;
this.serializers = jsonMapper.getSerializerProviderInstance();
this.taskId = taskId;
this.taskContext = taskContext;
+ this.rowsInTaskReport = rowsInTaskReport;
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java
index 1a420d69b6c..a0f5e015fdb 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerControllerClient.java
@@ -46,16 +46,19 @@ public class IndexerControllerClient implements
ControllerClient
{
private final ServiceClient serviceClient;
private final ObjectMapper jsonMapper;
+ private final boolean liveReportCounters;
private final Closeable baggage;
public IndexerControllerClient(
final ServiceClient serviceClient,
final ObjectMapper jsonMapper,
+ final boolean liveReportCounters,
final Closeable baggage
)
{
this.serviceClient = serviceClient;
this.jsonMapper = jsonMapper;
+ this.liveReportCounters = liveReportCounters;
this.baggage = baggage;
}
@@ -99,12 +102,14 @@ public class IndexerControllerClient implements
ControllerClient
@Override
public void postCounters(String workerId, CounterSnapshotsTree
snapshotsTree) throws IOException
{
- final String path = StringUtils.format("/counters/%s",
StringUtils.urlEncode(workerId));
- doRequest(
- new RequestBuilder(HttpMethod.POST, path)
- .jsonContent(jsonMapper, snapshotsTree),
- IgnoreHttpResponseHandler.INSTANCE
- );
+ if (liveReportCounters) {
+ final String path = StringUtils.format("/counters/%s",
StringUtils.urlEncode(workerId));
+ doRequest(
+ new RequestBuilder(HttpMethod.POST, path)
+ .jsonContent(jsonMapper, snapshotsTree),
+ IgnoreHttpResponseHandler.INSTANCE
+ );
+ }
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 11e2784d29d..8f93ce02529 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -208,6 +208,13 @@ public class MultiStageQueryContext
public static final String CTX_INCLUDE_ALL_COUNTERS = "includeAllCounters";
public static final boolean DEFAULT_INCLUDE_ALL_COUNTERS = true;
+ /**
+ * Whether workers should send live counter updates to the controller via
the message relay. When enabled, workers
+ * periodically send counter snapshots to the controller, allowing the
controller to have more up-to-date progress
+ * information.
+ */
+ public static final String CTX_LIVE_REPORT_COUNTERS = "liveReportCounters";
+
public static final String CTX_FORCE_TIME_SORT =
DimensionsSpec.PARAMETER_FORCE_TIME_SORT;
private static final boolean DEFAULT_FORCE_TIME_SORT =
DimensionsSpec.DEFAULT_FORCE_TIME_SORT;
@@ -516,6 +523,14 @@ public class MultiStageQueryContext
return queryContext.getBoolean(CTX_INCLUDE_ALL_COUNTERS,
DEFAULT_INCLUDE_ALL_COUNTERS);
}
+ /**
+ * See {@link #CTX_LIVE_REPORT_COUNTERS}.
+ */
+ public static boolean getLiveReportCounters(final QueryContext queryContext,
final boolean defaultValue)
+ {
+ return queryContext.getBoolean(CTX_LIVE_REPORT_COUNTERS, defaultValue);
+ }
+
public static boolean isForceSegmentSortByTime(final QueryContext
queryContext)
{
return queryContext.getBoolean(CTX_FORCE_TIME_SORT,
DEFAULT_FORCE_TIME_SORT);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/counters/NilQueryCounterSnapshotTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/NilQueryCounterSnapshotTest.java
new file mode 100644
index 00000000000..6d8d33ace0b
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/counters/NilQueryCounterSnapshotTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.segment.TestHelper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class NilQueryCounterSnapshotTest
+{
+ private ObjectMapper objectMapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ objectMapper = TestHelper.JSON_MAPPER.copy();
+ objectMapper.registerModules(new MSQIndexingModule().getJacksonModules());
+ }
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ final NilQueryCounterSnapshot snapshot =
NilQueryCounterSnapshot.instance();
+ final String json = objectMapper.writeValueAsString(snapshot);
+ final QueryCounterSnapshot deserialized = objectMapper.readValue(json,
QueryCounterSnapshot.class);
+ Assertions.assertEquals(snapshot, deserialized);
+ Assertions.assertSame(NilQueryCounterSnapshot.instance(), deserialized);
+ }
+
+ @Test
+ public void testDeserializeFromUnknownType() throws Exception
+ {
+ final String json = "{\"type\": \"nonexistent\"}";
+ final QueryCounterSnapshot deserialized = objectMapper.readValue(json,
QueryCounterSnapshot.class);
+ Assertions.assertEquals(NilQueryCounterSnapshot.instance(), deserialized);
+ Assertions.assertSame(NilQueryCounterSnapshot.instance(), deserialized);
+ }
+
+ @Test
+ public void testEqualsAndHashCode()
+ {
+ EqualsVerifier.forClass(NilQueryCounterSnapshot.class)
+ .usingGetClass()
+ .verify();
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
new file mode 100644
index 00000000000..3a19d796641
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.indexer.report.TaskReport;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.msq.dart.guice.DartControllerConfig;
+import org.apache.druid.msq.exec.Controller;
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.indexing.report.MSQTaskReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.joda.time.Period;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+
+public class DartControllerRegistryTest
+{
+ private static final String AUTHENTICATOR_NAME = "authn";
+
+ private AutoCloseable mockCloser;
+
+ @Mock
+ private MSQTaskReportPayload reportPayload;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mockCloser = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception
+ {
+ mockCloser.close();
+ }
+
+ @Test
+ public void test_register_addsToRegistry()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(0, Period.ZERO));
+ final ControllerHolder holder = makeControllerHolder("dart1", "sql1",
"user1");
+
+ registry.register(holder);
+
+ Assertions.assertEquals(1, registry.getAllControllers().size());
+ Assertions.assertSame(holder, registry.getController("dart1"));
+ }
+
+ @Test
+ public void test_register_duplicateThrows()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(0, Period.ZERO));
+ final ControllerHolder holder1 = makeControllerHolder("dart1", "sql1",
"user1");
+ final ControllerHolder holder2 = makeControllerHolder("dart1", "sql2",
"user2");
+
+ registry.register(holder1);
+
+ Assertions.assertThrows(DruidException.class, () ->
registry.register(holder2));
+ }
+
+ @Test
+ public void test_deregister_noReport_removesFromRegistry()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+ final ControllerHolder holder = makeControllerHolder("dart1", "sql1",
"user1");
+
+ registry.register(holder);
+ registry.deregister(holder, null);
+
+ Assertions.assertEquals(0, registry.getAllControllers().size());
+ Assertions.assertNull(registry.getController("dart1"));
+ Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
+ Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+ }
+
+ @Test
+ public void test_deregister_withReport_retainsReport()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+ final ControllerHolder holder = makeControllerHolder("dart1", "sql1",
"user1");
+ final TaskReport report = new MSQTaskReport("dart1", reportPayload);
+
+ final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+ reportMap.put(MSQTaskReport.REPORT_KEY, report);
+ registry.register(holder);
+ registry.deregister(holder, reportMap);
+
+ // Controller is removed
+ Assertions.assertEquals(0, registry.getAllControllers().size());
+ Assertions.assertNull(registry.getController("dart1"));
+
+ // But report is retained
+ final QueryInfoAndReport infoAndReport =
registry.getQueryInfoAndReport("dart1");
+ Assertions.assertNotNull(infoAndReport);
+ Assertions.assertEquals("dart1",
infoAndReport.getQueryInfo().getDartQueryId());
+ Assertions.assertSame(report,
infoAndReport.getReportMap().get(MSQTaskReport.REPORT_KEY));
+
+ // And can be looked up by SQL query ID
+ final QueryInfoAndReport infoAndReportBySql =
registry.getQueryInfoAndReportBySqlQueryId("sql1");
+ Assertions.assertNotNull(infoAndReportBySql);
+ Assertions.assertEquals("dart1",
infoAndReportBySql.getQueryInfo().getDartQueryId());
+ Assertions.assertEquals("sql1",
infoAndReportBySql.getQueryInfo().getSqlQueryId());
+ }
+
+ @Test
+ public void
test_deregister_withReport_zeroRetainedCount_doesNotRetainReport()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(0, Period.hours(1)));
+ final ControllerHolder holder = makeControllerHolder("dart1", "sql1",
"user1");
+ final TaskReport report = new MSQTaskReport("dart1", reportPayload);
+
+ final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+ reportMap.put(MSQTaskReport.REPORT_KEY, report);
+ registry.register(holder);
+ registry.deregister(holder, reportMap);
+
+ Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
+ Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+ }
+
+ @Test
+ public void test_getQueryInfoAndReport_runningQuery()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+ final Controller controller = Mockito.mock(Controller.class);
+ final ControllerContext controllerContext =
Mockito.mock(ControllerContext.class);
+ Mockito.when(controller.queryId()).thenReturn("dart1");
+
Mockito.when(controller.getControllerContext()).thenReturn(controllerContext);
+
Mockito.when(controllerContext.selfNode()).thenReturn(Mockito.mock(DruidNode.class));
+
+ // Set up live reports
+ final TaskReport.ReportMap liveReportMap = new TaskReport.ReportMap();
+ liveReportMap.put(MSQTaskReport.REPORT_KEY, new MSQTaskReport("dart1",
reportPayload));
+ Mockito.when(controller.liveReports()).thenReturn(liveReportMap);
+
+ final ControllerHolder holder = new ControllerHolder(
+ controller,
+ "sql1",
+ "SELECT 1",
+ makeAuthenticationResult("user1"),
+ DateTimes.of("2000")
+ );
+
+ registry.register(holder);
+
+ final QueryInfoAndReport infoAndReport =
registry.getQueryInfoAndReport("dart1");
+ Assertions.assertNotNull(infoAndReport);
+ Assertions.assertEquals("dart1",
infoAndReport.getQueryInfo().getDartQueryId());
+
+ // Also works by SQL query ID
+ final QueryInfoAndReport infoAndReportBySql =
registry.getQueryInfoAndReportBySqlQueryId("sql1");
+ Assertions.assertNotNull(infoAndReportBySql);
+ Assertions.assertEquals("dart1",
infoAndReportBySql.getQueryInfo().getDartQueryId());
+
+ registry.deregister(holder, null);
+ }
+
+ @Test
+ public void test_getQueryInfoAndReport_runningQuery_noLiveReports()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+ final Controller controller = Mockito.mock(Controller.class);
+ final ControllerContext controllerContext =
Mockito.mock(ControllerContext.class);
+ Mockito.when(controller.queryId()).thenReturn("dart1");
+
Mockito.when(controller.getControllerContext()).thenReturn(controllerContext);
+
Mockito.when(controllerContext.selfNode()).thenReturn(Mockito.mock(DruidNode.class));
+ Mockito.when(controller.liveReports()).thenReturn(null);
+
+ final ControllerHolder holder = new ControllerHolder(
+ controller,
+ "sql1",
+ "SELECT 1",
+ makeAuthenticationResult("user1"),
+ DateTimes.of("2000")
+ );
+
+ registry.register(holder);
+
+ // Returns null when no live reports are available
+ Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
+
+ // But the sqlQueryId mapping should still work after deregister with
report
+ registry.deregister(holder, null);
+ Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+ }
+
+ @Test
+ public void test_reportEviction_byCount()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(2, Period.hours(1)));
+
+ // Register and deregister 3 queries with reports
+ for (int i = 1; i <= 3; i++) {
+ final ControllerHolder holder = makeControllerHolder("dart" + i, "sql" +
i, "user1");
+ final TaskReport.ReportMap reportMap = new TaskReport.ReportMap();
+ reportMap.put(MSQTaskReport.REPORT_KEY, new MSQTaskReport("dart" + i,
reportPayload));
+ registry.register(holder);
+ registry.deregister(holder, reportMap);
+ }
+
+ // Only the last 2 reports should be retained
+ Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
+ Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+
+ Assertions.assertNotNull(registry.getQueryInfoAndReport("dart2"));
+
Assertions.assertNotNull(registry.getQueryInfoAndReportBySqlQueryId("sql2"));
+
+ Assertions.assertNotNull(registry.getQueryInfoAndReport("dart3"));
+
Assertions.assertNotNull(registry.getQueryInfoAndReportBySqlQueryId("sql3"));
+ }
+
+ @Test
+ public void test_getQueryInfoAndReportBySqlQueryId_notFound()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+
+
Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("nonexistent"));
+ }
+
+ @Test
+ public void test_getAllControllers()
+ {
+ final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(10, Period.hours(1)));
+ final ControllerHolder holder1 = makeControllerHolder("dart1", "sql1",
"user1");
+ final ControllerHolder holder2 = makeControllerHolder("dart2", "sql2",
"user2");
+
+ registry.register(holder1);
+ registry.register(holder2);
+
+ Assertions.assertEquals(2, registry.getAllControllers().size());
+ Assertions.assertTrue(registry.getAllControllers().contains(holder1));
+ Assertions.assertTrue(registry.getAllControllers().contains(holder2));
+
+ registry.deregister(holder1, null);
+ registry.deregister(holder2, null);
+ }
+
+ private ControllerHolder makeControllerHolder(
+ final String dartQueryId,
+ final String sqlQueryId,
+ final String identity
+ )
+ {
+ final Controller controller = Mockito.mock(Controller.class);
+ final ControllerContext controllerContext =
Mockito.mock(ControllerContext.class);
+ Mockito.when(controller.queryId()).thenReturn(dartQueryId);
+
Mockito.when(controller.getControllerContext()).thenReturn(controllerContext);
+
Mockito.when(controllerContext.selfNode()).thenReturn(Mockito.mock(DruidNode.class));
+
+ return new ControllerHolder(
+ controller,
+ sqlQueryId,
+ "SELECT 1",
+ makeAuthenticationResult(identity),
+ DateTimes.of("2000")
+ );
+ }
+
+ private static AuthenticationResult makeAuthenticationResult(final String
identity)
+ {
+ return new AuthenticationResult(identity, null, AUTHENTICATOR_NAME,
Collections.emptyMap());
+ }
+
+ private static DartControllerConfig makeConfig(
+ final int maxRetainedReportCount,
+ final Period maxRetainedReportDuration
+ )
+ {
+ return new DartControllerConfig()
+ {
+ @Override
+ @JsonProperty("maxRetainedReportCount")
+ public int getMaxRetainedReportCount()
+ {
+ return maxRetainedReportCount;
+ }
+
+ @Override
+ @JsonProperty("maxRetainedReportDuration")
+ public Period getMaxRetainedReportDuration()
+ {
+ return maxRetainedReportDuration;
+ }
+ };
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 64335b3a76c..92f7a735052 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -244,7 +244,7 @@ public class DartSqlResourceTest extends MSQTestBase
return
super.queryKernelConfig(querySpec).toBuilder().workerIds(ImmutableList.of("some")).build();
}
},
- controllerRegistry = new DartControllerRegistry()
+ controllerRegistry = new DartControllerRegistry(new
DartControllerConfig())
{
@Override
public void register(ControllerHolder holder)
@@ -298,7 +298,7 @@ public class DartSqlResourceTest extends MSQTestBase
// Ensure that controllerRegistry has nothing in it at the conclusion of
each test. Verifies that controllers
// are fully cleaned up.
- Assertions.assertEquals(0, controllerRegistry.getAllHolders().size(),
"controllerRegistry.getAllHolders().size()");
+ Assertions.assertEquals(0, controllerRegistry.getAllControllers().size(),
"controllerRegistry.getAllHolders().size()");
}
@Test
@@ -325,7 +325,7 @@ public class DartSqlResourceTest extends MSQTestBase
sqlResource.doGetRunningQueries("", httpServletRequest).getEntity()
);
- controllerRegistry.deregister(holder);
+ controllerRegistry.deregister(holder, null);
}
/**
@@ -342,15 +342,15 @@ public class DartSqlResourceTest extends MSQTestBase
final ControllerHolder holder2 =
setUpMockRunningQuery(DIFFERENT_REGULAR_USER_NAME);
// Regular users can see only their own queries, without authentication
details.
- Assertions.assertEquals(2, controllerRegistry.getAllHolders().size());
+ Assertions.assertEquals(2, controllerRegistry.getAllControllers().size());
Assertions.assertEquals(
new GetQueriesResponse(
Collections.singletonList(DartQueryInfo.fromControllerHolder(holder).withoutAuthenticationResult())),
sqlResource.doGetRunningQueries("", httpServletRequest).getEntity()
);
- controllerRegistry.deregister(holder);
- controllerRegistry.deregister(holder2);
+ controllerRegistry.deregister(holder, null);
+ controllerRegistry.deregister(holder2, null);
}
/**
@@ -390,7 +390,7 @@ public class DartSqlResourceTest extends MSQTestBase
sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
);
- controllerRegistry.deregister(localHolder);
+ controllerRegistry.deregister(localHolder, null);
}
/**
@@ -417,7 +417,7 @@ public class DartSqlResourceTest extends MSQTestBase
sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
);
- controllerRegistry.deregister(localHolder);
+ controllerRegistry.deregister(localHolder, null);
}
/**
@@ -454,7 +454,7 @@ public class DartSqlResourceTest extends MSQTestBase
sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
);
- controllerRegistry.deregister(localHolder);
+ controllerRegistry.deregister(localHolder, null);
}
/**
@@ -490,7 +490,7 @@ public class DartSqlResourceTest extends MSQTestBase
sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
);
- controllerRegistry.deregister(holder);
+ controllerRegistry.deregister(holder, null);
}
@Test
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/messages/ControllerMessageTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/messages/ControllerMessageTest.java
index 427faf4aee6..d323dead298 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/messages/ControllerMessageTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/messages/ControllerMessageTest.java
@@ -22,6 +22,9 @@ package org.apache.druid.msq.dart.controller.messages;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.msq.counters.CounterSnapshots;
+import org.apache.druid.msq.counters.CounterSnapshotsTree;
+import org.apache.druid.msq.counters.NilQueryCounterSnapshot;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.UnknownFault;
@@ -34,6 +37,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Collections;
+import java.util.Map;
public class ControllerMessageTest
{
@@ -69,6 +73,13 @@ public class ControllerMessageTest
Collections.singletonList(MSQErrorReport.fromFault("task", null,
null, UnknownFault.forMessage("oops")))
)
);
+ assertSerde(
+ new PostCounters(
+ STAGE_ID.getQueryId(),
+ "worker-1",
+ new CounterSnapshotsTree()
+ )
+ );
}
@Test
@@ -79,6 +90,24 @@ public class ControllerMessageTest
EqualsVerifier.forClass(ResultsComplete.class).usingGetClass().verify();
EqualsVerifier.forClass(WorkerError.class).usingGetClass().verify();
EqualsVerifier.forClass(WorkerWarning.class).usingGetClass().verify();
+ EqualsVerifier.forClass(PostCounters.class)
+ .usingGetClass()
+ .withPrefabValues(
+ CounterSnapshotsTree.class,
+ CounterSnapshotsTree.fromMap(
+ Map.of(
+ 1,
+ Map.of(1, new CounterSnapshots(Map.of("foo",
NilQueryCounterSnapshot.instance())))
+ )
+ ),
+ CounterSnapshotsTree.fromMap(
+ Map.of(
+ 1,
+ Map.of(1, new CounterSnapshots(Map.of("bar",
NilQueryCounterSnapshot.instance())))
+ )
+ )
+ )
+ .verify();
}
private void assertSerde(final ControllerMessage message) throws IOException
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
index 252419f7227..e50da1c75ed 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/TaskReportQueryListenerTest.java
@@ -72,11 +72,11 @@ public class TaskReportQueryListenerTest
public void test_taskReportDestination() throws IOException
{
final TaskReportQueryListener listener = new TaskReportQueryListener(
- TaskReportMSQDestination.instance(),
Suppliers.ofInstance(baos)::get,
JSON_MAPPER,
TASK_ID,
- TASK_CONTEXT
+ TASK_CONTEXT,
+ TaskReportMSQDestination.instance().getRowsInTaskReport()
);
Assert.assertTrue(listener.readResults());
@@ -140,11 +140,11 @@ public class TaskReportQueryListenerTest
public void test_durableDestination() throws IOException
{
final TaskReportQueryListener listener = new TaskReportQueryListener(
- DurableStorageMSQDestination.instance(),
Suppliers.ofInstance(baos)::get,
JSON_MAPPER,
TASK_ID,
- TASK_CONTEXT
+ TASK_CONTEXT,
+ DurableStorageMSQDestination.instance().getRowsInTaskReport()
);
Assert.assertTrue(listener.readResults());
diff --git
a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
index 377866c48ee..60d03fe8d84 100644
--- a/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
+++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClient.java
@@ -65,4 +65,23 @@ public interface BrokerClient
* Updates the broker with the given {@link CoordinatorDynamicConfig}.
*/
ListenableFuture<Boolean>
updateCoordinatorDynamicConfig(CoordinatorDynamicConfig config);
+
+ /**
+ * Gets the report for a SQL query by its SQL query ID.
+ *
+ * @param sqlQueryId the SQL query ID
+ * @param selfOnly if true, only check the local broker; if false, check
all brokers
+ *
+ * @return JSON response from the report API
+ */
+ ListenableFuture<String> getQueryReport(String sqlQueryId, boolean selfOnly);
+
+ /**
+ * Cancels a SQL query by its SQL query ID.
+ *
+ * @param sqlQueryId the SQL query ID to cancel
+ *
+ * @return true if the cancellation was accepted
+ */
+ ListenableFuture<Boolean> cancelSqlQuery(String sqlQueryId);
}
diff --git
a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java
b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java
index 52f7c3184d5..2d318bb3f32 100644
--- a/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java
+++ b/server/src/main/java/org/apache/druid/client/broker/BrokerClientImpl.java
@@ -128,5 +128,40 @@ public class BrokerClientImpl implements BrokerClient
}
);
}
+
+ @Override
+ public ListenableFuture<String> getQueryReport(String sqlQueryId, boolean
selfOnly)
+ {
+ final String path = StringUtils.format(
+ "/druid/v2/sql/queries/%s/reports%s",
+ StringUtils.urlEncode(sqlQueryId),
+ selfOnly ? "?selfOnly" : ""
+ );
+
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.GET, path),
+ new StringFullResponseHandler(StandardCharsets.UTF_8)
+ ),
+ FullResponseHolder::getContent
+ );
+ }
+
+ @Override
+ public ListenableFuture<Boolean> cancelSqlQuery(String sqlQueryId)
+ {
+ final String path = StringUtils.format(
+ "/druid/v2/sql/%s",
+ StringUtils.urlEncode(sqlQueryId)
+ );
+
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.DELETE, path),
+ new BytesFullResponseHandler()
+ ),
+ holder -> holder.getStatus().equals(HttpResponseStatus.ACCEPTED)
+ );
+ }
}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index a628ad26bbe..1ed70fdbdca 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -126,6 +126,19 @@ public class EmbeddedClusterApis implements
EmbeddedResource
return client.onAnyBroker(brokerApi);
}
+ public <T> T onTargetBroker(EmbeddedBroker targetBroker,
Function<BrokerClient, ListenableFuture<T>> brokerApi)
+ {
+ return client.onTargetBroker(targetBroker, brokerApi);
+ }
+
+ public <T> ListenableFuture<T> onTargetBrokerAsync(
+ EmbeddedBroker targetBroker,
+ Function<BrokerClient, ListenableFuture<T>> brokerApi
+ )
+ {
+ return client.onTargetBrokerAsync(targetBroker, brokerApi);
+ }
+
/**
* Submits the given SQL query to any of the brokers (using {@code
BrokerClient})
* of the cluster.
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
index beacab52920..90e2eb90048 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
@@ -25,20 +25,26 @@ import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.broker.Broker;
import org.apache.druid.client.broker.BrokerClient;
+import org.apache.druid.client.broker.BrokerClientImpl;
import org.apache.druid.client.coordinator.Coordinator;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.IndexingService;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.discovery.NodeRole;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.rpc.FixedServiceLocator;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceClientFactoryImpl;
+import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.ServiceLocator;
+import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.guice.ServiceClientModule;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.security.Escalator;
@@ -173,6 +179,50 @@ public class EmbeddedServiceClient
return makeRequest(request, resultType, brokerServiceClient,
getMapper(EmbeddedBroker.class));
}
+ /**
+ * Executes an API call on a specific broker. Unlike {@link
#onAnyBroker(Function)},
+ * this method allows targeting a specific broker instance.
+ *
+ * @param targetBroker the specific broker to target
+ * @param brokerApi function that receives a {@link BrokerClient} and
returns a future
+ */
+ public <T> T onTargetBroker(EmbeddedBroker targetBroker,
Function<BrokerClient, ListenableFuture<T>> brokerApi)
+ {
+ return
getResult(brokerApi.apply(createBrokerClientForBroker(targetBroker)));
+ }
+
+ /**
+ * Executes an API call on a specific broker asynchronously, returning the
future directly.
+ *
+ * @param targetBroker the specific broker to target
+ * @param brokerApi function that receives a {@link BrokerClient} and
returns a future
+ */
+ public <T> ListenableFuture<T> onTargetBrokerAsync(
+ EmbeddedBroker targetBroker,
+ Function<BrokerClient, ListenableFuture<T>> brokerApi
+ )
+ {
+ return brokerApi.apply(createBrokerClientForBroker(targetBroker));
+ }
+
+ /**
+ * Creates a {@link BrokerClient} that targets a specific broker using a
{@link FixedServiceLocator}.
+ */
+ private BrokerClient createBrokerClientForBroker(EmbeddedBroker targetBroker)
+ {
+ final ServiceLocation brokerLocation =
ServiceLocation.fromDruidNode(targetBroker.bindings().selfNode());
+ final ServiceClientFactory clientFactory =
+ targetBroker.bindings().getInstance(ServiceClientFactory.class,
EscalatedGlobal.class);
+ return new BrokerClientImpl(
+ clientFactory.makeClient(
+ NodeRole.BROKER.getJsonName(),
+ new FixedServiceLocator(brokerLocation),
+ StandardRetryPolicy.noRetries()
+ ),
+ targetBroker.bindings().jsonMapper()
+ );
+ }
+
@Nullable
private <T> T makeRequest(
Function<ObjectMapper, RequestBuilder> request,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
index 9e2a3930ab5..7b2902c11b8 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
@@ -30,8 +30,11 @@ import org.apache.druid.server.security.AuthorizationResult;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.destination.IngestDestination;
+import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.GetQueryReportResponse;
import org.apache.druid.sql.http.QueryInfo;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@@ -102,9 +105,9 @@ public interface SqlEngine
/**
* Create a {@link QueryMaker} for an INSERT ... SELECT query.
*
- * @param destination destination for the INSERT portion of the query
- * @param relRoot planned and validated rel for the SELECT portion
of the query
- * @param plannerContext context for this query
+ * @param destination destination for the INSERT portion of the query
+ * @param relRoot planned and validated rel for the SELECT portion of
the query
+ * @param plannerContext context for this query
*
* @return an executor for the provided query
*
@@ -132,14 +135,41 @@ public interface SqlEngine
/**
* Returns a list of {@link QueryInfo} containing the currently running
queries using this engine. Returns an empty
* list if the operation is not supported.
+ *
+ * @param selfOnly whether to only include queries running on
this server. If false, this server should
+ * contact all other servers to build a full
list of all running queries.
+ * @param authenticationResult implementations should use this for
filtering the list of visible queries
+ * @param stateReadAuthorization authorization for the STATE READ resource.
If this is authorized, implementations
+ * should allow all queries to be visible
+ */
+ default GetQueriesResponse getRunningQueries(
+ boolean selfOnly,
+ AuthenticationResult authenticationResult,
+ AuthorizationResult stateReadAuthorization
+ )
+ {
+ return new GetQueriesResponse(List.of());
+ }
+
+ /**
+ * Retrieves the report for a query, if available.
+ *
+ * @param sqlQueryId SQL query ID to retrieve the report for
+ * @param selfOnly whether to only include queries running on
this server. If false, this server should
+ * contact all other servers to find this
query, if necessary.
+ * @param authenticationResult implementations should use this to
determine if a query should be visible to a user
+ * @param stateReadAuthorization authorization for the STATE READ resource.
If this is authorized, implementations
+ * should allow all queries to be visible
*/
- default List<QueryInfo> getRunningQueries(
+ @Nullable
+ default GetQueryReportResponse getQueryReport(
+ String sqlQueryId,
boolean selfOnly,
AuthenticationResult authenticationResult,
- AuthorizationResult authorizationResult
+ AuthorizationResult stateReadAuthorization
)
{
- return List.of();
+ return null;
}
/**
diff --git
a/sql/src/main/java/org/apache/druid/sql/http/GetQueryReportResponse.java
b/sql/src/main/java/org/apache/druid/sql/http/GetQueryReportResponse.java
new file mode 100644
index 00000000000..1c91dc436a2
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/http/GetQueryReportResponse.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.indexer.report.TaskReport;
+
+import java.util.Objects;
+
+/**
+ * Class returned by {@link SqlResource#doGetQueryReport}, the "get query
report" API.
+ */
+public class GetQueryReportResponse
+{
+ private final QueryInfo queryInfo;
+ private final TaskReport.ReportMap reportMap;
+
+ @JsonCreator
+ public GetQueryReportResponse(
+ @JsonProperty("query") QueryInfo queryInfo,
+ @JsonProperty("report") TaskReport.ReportMap reportMap
+ )
+ {
+ this.queryInfo = queryInfo;
+ this.reportMap = reportMap;
+ }
+
+ @JsonProperty("query")
+ public QueryInfo getQueryInfo()
+ {
+ return queryInfo;
+ }
+
+ @JsonProperty("report")
+ public TaskReport.ReportMap getReportMap()
+ {
+ return reportMap;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GetQueryReportResponse that = (GetQueryReportResponse) o;
+ return Objects.equals(queryInfo, that.queryInfo) &&
Objects.equals(reportMap, that.reportMap);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(queryInfo, reportMap);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GetReportResponse{" +
+ "queryInfo=" + queryInfo +
+ ", report=" + reportMap +
+ '}';
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index 98e39e7be3a..e5f1a513d52 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -163,13 +163,61 @@ public class SqlResource
// Get running queries from all engines that support it.
for (SqlEngine sqlEngine : engines) {
- queries.addAll(sqlEngine.getRunningQueries(selfOnly != null,
authenticationResult, stateReadAccess));
+ queries.addAll(sqlEngine.getRunningQueries(selfOnly != null,
authenticationResult, stateReadAccess).getQueries());
}
AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request);
return Response.ok().entity(new GetQueriesResponse(queries)).build();
}
+ /**
+ * API to get query reports, for all engines that support such reports.
+ *
+ * @param sqlQueryId SQL query ID
+ * @param selfOnly if true, check reports from this server only. If false,
check reports on all servers.
+ * @param request http request.
+ */
+ @GET
+ @Path("/queries/{sqlQueryId}/reports")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response doGetQueryReport(
+ @PathParam("sqlQueryId") final String sqlQueryId,
+ @QueryParam("selfOnly") final String selfOnly,
+ @Context final HttpServletRequest request
+ )
+ {
+ final AuthenticationResult authenticationResult =
AuthorizationUtils.authenticationResultFromRequest(request);
+ final AuthorizationResult stateReadAccess =
AuthorizationUtils.authorizeAllResourceActions(
+ authenticationResult,
+ Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE,
Action.READ)),
+ authorizerMapper
+ );
+
+ final Collection<SqlEngine> engines = sqlEngineRegistry.getAllEngines();
+
+ // Get task report from the first engine that recognizes the SQL query ID.
+ GetQueryReportResponse retVal = null;
+ for (SqlEngine sqlEngine : engines) {
+ retVal = sqlEngine.getQueryReport(
+ sqlQueryId,
+ selfOnly != null,
+ authenticationResult,
+ stateReadAccess
+ );
+
+ if (retVal != null) {
+ break;
+ }
+ }
+
+ AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request);
+ if (retVal == null) {
+ return Response.status(Status.NOT_FOUND).entity(new
GetQueryReportResponse(null, null)).build();
+ } else {
+ return Response.ok().entity(retVal).build();
+ }
+ }
+
/**
* This method is defined as public so that tests can access it
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]