This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d3623c2dff4 Add sys.queries table. (#18923)
d3623c2dff4 is described below
commit d3623c2dff47d9511a473e9a6f02f1bd006fa84f
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Jan 25 15:41:18 2026 -0800
Add sys.queries table. (#18923)
The sys.queries table provides insight into currently-running queries.
It provides the same information as the /druid/v2/sql/queries API. As such,
it currently only works with Dart.
In this patch the table is documented, but off by default. It can be
enabled by setting druid.sql.planner.enableSysQueriesTable = true.
This patch additionally adds an "includeComplete" parameter to
/druid/v2/sql/queries, which is used by the implementation of the
sys.queries table, to allow it to show information for recently-completed
queries.
---
docs/configuration/index.md | 1 +
docs/querying/sql-metadata-tables.md | 35 +-
embedded-tests/pom.xml | 5 +-
.../auth/AbstractAuthConfigurationTest.java | 5 +
.../embedded/auth/BasicAuthConfigurationTest.java | 20 +-
.../embedded/auth/EmbeddedBasicAuthResource.java | 97 +++++-
.../embedded/msq/EmbeddedDartReportApiTest.java | 355 ++++++++++++++++++++-
.../msq/dart/controller/ControllerHolder.java | 55 +++-
.../dart/controller/DartControllerRegistry.java | 59 +++-
.../msq/dart/controller/QueryInfoAndReport.java | 2 +-
.../msq/dart/controller/http/DartQueryInfo.java | 18 +-
.../msq/dart/controller/sql/DartSqlClient.java | 9 +-
.../msq/dart/controller/sql/DartSqlClientImpl.java | 5 +-
.../msq/dart/controller/sql/DartSqlEngine.java | 21 +-
.../druid/msq/exec/CaptureReportQueryListener.java | 90 ++++++
.../controller/DartControllerRegistryTest.java | 42 ++-
.../dart/controller/http/DartQueryInfoTest.java | 4 +-
.../dart/controller/http/DartSqlResourceTest.java | 27 +-
.../dart/controller/sql/DartSqlClientImplTest.java | 33 +-
.../druid/sql/calcite/planner/PlannerConfig.java | 25 +-
.../apache/druid/sql/calcite/run/SqlEngine.java | 3 +
.../druid/sql/calcite/schema/SystemSchema.java | 187 ++++++++++-
.../java/org/apache/druid/sql/http/QueryInfo.java | 19 ++
.../org/apache/druid/sql/http/SqlResource.java | 17 +-
.../{QueryInfo.java => StandardQueryState.java} | 18 +-
.../schema/DruidCalciteSchemaModuleTest.java | 3 +
.../druid/sql/calcite/schema/SystemSchemaTest.java | 80 ++++-
.../druid/sql/calcite/util/CalciteTests.java | 6 +-
.../druid/sql/http/GetQueriesResponseTest.java | 18 ++
29 files changed, 1146 insertions(+), 113 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 6e0f34583c6..f6ddebcb0b2 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1892,6 +1892,7 @@ The Druid SQL server is configured through the following
properties on the Broke
|`druid.sql.planner.useNativeQueryExplain`|If true, `EXPLAIN PLAN FOR` will
return the explain plan as a JSON representation of equivalent native query(s),
else it will return the original version of explain plan generated by Calcite.
It can be overridden per query with `useNativeQueryExplain` context key.|true|
|`druid.sql.planner.maxNumericInFilters`|Max limit for the amount of numeric
values that can be compared for a string type dimension when the entire SQL
WHERE clause of a query translates to an [OR](../querying/filters.md#or) of
[Bound filter](../querying/filters.md#bound-filter). By default, Druid does not
restrict the amount of numeric Bound Filters on String columns, although this
situation may block other queries from running. Set this property to a smaller
value to prevent Druid fro [...]
|`druid.sql.approxCountDistinct.function`|Implementation to use for the
[`APPROX_COUNT_DISTINCT` function](../querying/sql-aggregations.md). Without
extensions loaded, the only valid value is `APPROX_COUNT_DISTINCT_BUILTIN` (a
HyperLogLog, or HLL, based implementation). If the [DataSketches
extension](../development/extensions-core/datasketches-extension.md) is loaded,
this can also be `APPROX_COUNT_DISTINCT_DS_HLL` (alternative HLL
implementation) or `APPROX_COUNT_DISTINCT_DS_THETA`.<br [...]
+|`druid.sql.planner.enableSysQueriesTable`|**Experimental.** Whether to enable
the [`sys.queries` table](../querying/sql-metadata-tables.md#queries-table),
which provides information about currently running and recently completed SQL
queries. Currently only queries from the Dart (MSQ) engine are shown.|false|
:::info
Previous versions of Druid had properties named
`druid.sql.planner.maxQueryCount` and
`druid.sql.planner.maxSemiJoinRowsInMemory`.
diff --git a/docs/querying/sql-metadata-tables.md
b/docs/querying/sql-metadata-tables.md
index 1346faebb71..802d88c567c 100644
--- a/docs/querying/sql-metadata-tables.md
+++ b/docs/querying/sql-metadata-tables.md
@@ -335,4 +335,37 @@ For example, to retrieve properties for a specific server,
use the query
```sql
SELECT * FROM sys.server_properties WHERE server='192.168.1.1:8081'
-```
\ No newline at end of file
+```
+
+### QUERIES table
+
+:::info
+ The `sys.queries` table is an experimental feature. You must enable it by
setting the runtime property
+ `druid.sql.planner.enableSysQueriesTable=true` on Broker processes. The main
reason this table is experimental
+ is that it only shows queries from the [Dart](dart.md) engine, which is also
experimental.
+:::
+
+The queries table provides information about currently running and recently
completed SQL queries.
+
+|Column|Type|Notes|
+|------|-----|-----|
+|id|VARCHAR|Execution ID for the query. For Dart queries, this is the
`dartQueryId`.|
+|engine|VARCHAR|SQL engine that executed the query, e.g., `msq-dart`|
+|state|VARCHAR|Query status: `ACCEPTED`, `RUNNING`, `SUCCESS`, `FAILED`, or
`CANCELED`|
+|info|VARCHAR|JSON-serialized query information including `sqlQueryId`, `sql`,
`identity`, `startTime`, and other engine-specific details|
+
+For example, to retrieve all recently completed Dart queries:
+
+```sql
+SELECT *
+FROM sys.queries
+WHERE
+ engine = 'msq-dart'
+ AND state IN ('SUCCESS', 'FAILED', 'CANCELED')
+```
+
+:::info
+ The retention of completed query information is controlled by Dart controller
configuration.
+ See `druid.msq.dart.controller.maxRetainedReportCount` and
`druid.msq.dart.controller.maxRetainedReportDuration`
+ for details on how long completed queries are retained.
+:::
diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index 63782673dfe..fc7a720b9c3 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -209,7 +209,10 @@
</exclusion>
</exclusions>
</dependency>
-
+ <dependency>
+ <groupId>com.opencsv</groupId>
+ <artifactId>opencsv</artifactId>
+ </dependency>
<!-- Test dependencies -->
<dependency>
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/AbstractAuthConfigurationTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/AbstractAuthConfigurationTest.java
index 9cc75989ac0..851b8caebfd 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/AbstractAuthConfigurationTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/AbstractAuthConfigurationTest.java
@@ -1006,6 +1006,11 @@ public abstract class AbstractAuthConfigurationTest
extends EmbeddedClusterTestB
);
}
+ protected EmbeddedCoordinator getCoordinator()
+ {
+ return coordinator;
+ }
+
protected String getCoordinatorUrl()
{
return getServerUrl(coordinator);
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthConfigurationTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthConfigurationTest.java
index ce356d185f8..d80844612c9 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthConfigurationTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/BasicAuthConfigurationTest.java
@@ -196,18 +196,14 @@ public class BasicAuthConfigurationTest extends
AbstractAuthConfigurationTest
List<ResourceAction> permissions
)
{
- // Setup authentication by creating user and password
- postAsAdmin(null, "/authentication/db/basic/users/%s", user);
-
- final BasicAuthenticatorCredentialUpdate credentials
- = new BasicAuthenticatorCredentialUpdate(password, 5000);
- postAsAdmin(credentials, "/authentication/db/basic/users/%s/credentials",
user);
-
- // Setup authorization by assigning a role to the user
- postAsAdmin(null, "/authorization/db/basic/users/%s", user);
- postAsAdmin(null, "/authorization/db/basic/roles/%s", role);
- postAsAdmin(null, "/authorization/db/basic/users/%s/roles/%s", user, role);
- postAsAdmin(permissions, "/authorization/db/basic/roles/%s/permissions",
role);
+ EmbeddedBasicAuthResource.createUserWithPermissions(
+ getHttpClient(User.ADMIN),
+ getCoordinator(),
+ user,
+ password,
+ role,
+ permissions
+ );
}
private void postAsAdmin(
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/EmbeddedBasicAuthResource.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/EmbeddedBasicAuthResource.java
index 3a54954287c..798e3834c7e 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/EmbeddedBasicAuthResource.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/auth/EmbeddedBasicAuthResource.java
@@ -20,15 +20,25 @@
package org.apache.druid.testing.embedded.auth;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.security.basic.BasicSecurityDruidModule;
+import
org.apache.druid.security.basic.authentication.entity.BasicAuthenticatorCredentialUpdate;
+import org.apache.druid.server.DruidNode;
+import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedDruidServer;
import org.apache.druid.testing.embedded.EmbeddedResource;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.List;
/**
* Resource to enable the basic auth extension in embedded tests.
*/
public class EmbeddedBasicAuthResource implements EmbeddedResource
{
+ public static final String ADMIN_USER = "admin";
public static final String ADMIN_PASSWORD = "priest";
public static final String SYSTEM_PASSWORD = "warlock";
public static final String SYSTEM_USER = "druid_system";
@@ -65,19 +75,100 @@ public class EmbeddedBasicAuthResource implements
EmbeddedResource
{
// Do nothing
}
-
+
private String authenticatorProp(String name)
{
return StringUtils.format("druid.auth.authenticator.%s.%s",
AUTHENTICATOR_NAME, name);
}
-
+
private String authorizerProp(String name)
{
return StringUtils.format("druid.auth.authorizer.%s.%s", AUTHORIZER_NAME,
name);
}
-
+
private String escalatorProp(String name)
{
return StringUtils.format("druid.escalator.%s", name);
}
+
+ /**
+ * Creates a user with specified permissions using the basic auth security
API.
+ *
+ * @param adminClient HTTP client authenticated as admin
+ * @param coordinator the coordinator server to make API calls against
+ * @param username the username to create
+ * @param password the password for the user
+ * @param roleName the role name to create and assign
+ * @param permissions the permissions to grant to the role
+ */
+ public static void createUserWithPermissions(
+ HttpClient adminClient,
+ EmbeddedDruidServer<?> coordinator,
+ String username,
+ String password,
+ String roleName,
+ List<ResourceAction> permissions
+ )
+ {
+ final DruidNode coordinatorDruidNode = coordinator.bindings().selfNode();
+ final String baseUrl = StringUtils.format(
+ "%s://%s/druid-ext/basic-security",
+ coordinatorDruidNode.getServiceScheme(),
+ coordinatorDruidNode.getHostAndPortToUse()
+ );
+
+ // Create user in authentication DB
+ HttpUtil.makeRequest(
+ adminClient,
+ HttpMethod.POST,
+ StringUtils.format("%s/authentication/db/basic/users/%s", baseUrl,
username),
+ null,
+ HttpResponseStatus.OK
+ );
+
+ // Set password
+ HttpUtil.makeRequest(
+ adminClient,
+ HttpMethod.POST,
+ StringUtils.format("%s/authentication/db/basic/users/%s/credentials",
baseUrl, username),
+ new BasicAuthenticatorCredentialUpdate(password, 5000),
+ HttpResponseStatus.OK
+ );
+
+ // Create user in authorization DB
+ HttpUtil.makeRequest(
+ adminClient,
+ HttpMethod.POST,
+ StringUtils.format("%s/authorization/db/basic/users/%s", baseUrl,
username),
+ null,
+ HttpResponseStatus.OK
+ );
+
+ // Create role
+ HttpUtil.makeRequest(
+ adminClient,
+ HttpMethod.POST,
+ StringUtils.format("%s/authorization/db/basic/roles/%s", baseUrl,
roleName),
+ null,
+ HttpResponseStatus.OK
+ );
+
+ // Assign role to user
+ HttpUtil.makeRequest(
+ adminClient,
+ HttpMethod.POST,
+ StringUtils.format("%s/authorization/db/basic/users/%s/roles/%s",
baseUrl, username, roleName),
+ null,
+ HttpResponseStatus.OK
+ );
+
+ // Grant permissions
+ HttpUtil.makeRequest(
+ adminClient,
+ HttpMethod.POST,
+ StringUtils.format("%s/authorization/db/basic/roles/%s/permissions",
baseUrl, roleName),
+ permissions,
+ HttpResponseStatus.OK
+ );
+ }
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
index 891cba4e3b7..c0787c2e21c 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/EmbeddedDartReportApiTest.java
@@ -19,15 +19,23 @@
package org.apache.druid.testing.embedded.msq;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.error.DruidException;
import org.apache.druid.guice.SleepModule;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.http.client.CredentialedHttpClient;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.auth.BasicCredentials;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.controller.sql.DartSqlClients;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
@@ -35,7 +43,14 @@ import
org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.server.metrics.LatchableEmitter;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.Resource;
+import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.http.GetQueriesResponse;
import org.apache.druid.sql.http.GetQueryReportResponse;
+import org.apache.druid.sql.http.QueryInfo;
+import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
@@ -43,16 +58,27 @@ import
org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedHistorical;
import org.apache.druid.testing.embedded.EmbeddedIndexer;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.auth.EmbeddedBasicAuthResource;
+import org.apache.druid.testing.embedded.auth.HttpUtil;
import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
/**
* Embedded test for the Dart report API at {@code
/druid/v2/sql/queries/{id}/reports}.
@@ -62,6 +88,10 @@ public class EmbeddedDartReportApiTest extends
EmbeddedClusterTestBase
{
private static final int MAX_RETAINED_REPORT_COUNT = 10;
+ // Authentication constants - use shared constants from
EmbeddedBasicAuthResource where available
+ private static final String REGULAR_USER = "regularUser";
+ private static final String REGULAR_PASSWORD = "helloworld";
+
private final EmbeddedBroker broker1 = new EmbeddedBroker();
private final EmbeddedBroker broker2 = new EmbeddedBroker();
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
@@ -71,12 +101,15 @@ public class EmbeddedDartReportApiTest extends
EmbeddedClusterTestBase
private EmbeddedMSQApis msqApis;
private String ingestedDataSource;
+ private HttpClient adminClient;
+ private HttpClient regularUserClient;
private void configureBroker(EmbeddedBroker broker, int port)
{
broker.addProperty("druid.msq.dart.controller.heapFraction", "0.5")
.addProperty("druid.msq.dart.controller.maxRetainedReportCount",
String.valueOf(MAX_RETAINED_REPORT_COUNT))
.addProperty("druid.query.default.context.maxConcurrentStages", "1")
+ .addProperty("druid.sql.planner.enableSysQueriesTable", "true")
.addProperty("druid.plaintextPort", String.valueOf(port));
}
@@ -100,6 +133,7 @@ public class EmbeddedDartReportApiTest extends
EmbeddedClusterTestBase
return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.addCommonProperty("druid.msq.dart.enabled",
"true")
+ .addResource(new EmbeddedBasicAuthResource())
.useLatchableEmitter()
.addServer(coordinator)
.addServer(overlord)
@@ -115,6 +149,10 @@ public class EmbeddedDartReportApiTest extends
EmbeddedClusterTestBase
{
msqApis = new EmbeddedMSQApis(cluster, overlord);
+ // Set up HTTP clients for admin and regular user
+ setupAdminClient();
+ setupRegularUserAndClient();
+
// Ingest test data once, using batch ingestion.
ingestedDataSource = EmbeddedClusterApis.createTestDatasourceName();
final String taskId = IdUtils.getRandomId();
@@ -127,6 +165,41 @@ public class EmbeddedDartReportApiTest extends
EmbeddedClusterTestBase
cluster.callApi().waitForAllSegmentsToBeAvailable(ingestedDataSource,
coordinator, broker2);
}
+ private void setupAdminClient()
+ {
+ adminClient = new CredentialedHttpClient(
+ new BasicCredentials(EmbeddedBasicAuthResource.ADMIN_USER,
EmbeddedBasicAuthResource.ADMIN_PASSWORD),
+ broker1.bindings().globalHttpClient()
+ );
+ }
+
+ /**
+ * Creates a regular user with only datasource read permission (no STATE
READ).
+ * Username is {@link #REGULAR_USER}, password is {@link #REGULAR_PASSWORD}.
+ */
+ private void setupRegularUserAndClient()
+ {
+ // Grant permissions: datasource read access to all datasources,
sys.queries table access, but no STATE READ
+ final List<ResourceAction> permissions = ImmutableList.of(
+ new ResourceAction(new Resource(".*", ResourceType.DATASOURCE),
Action.READ),
+ new ResourceAction(new Resource("queries", ResourceType.SYSTEM_TABLE),
Action.READ)
+ );
+
+ EmbeddedBasicAuthResource.createUserWithPermissions(
+ adminClient,
+ coordinator,
+ REGULAR_USER,
+ REGULAR_PASSWORD,
+ "regularRole",
+ permissions
+ );
+
+ regularUserClient = new CredentialedHttpClient(
+ new BasicCredentials(REGULAR_USER, REGULAR_PASSWORD),
+ broker1.bindings().globalHttpClient()
+ );
+ }
+
@Test
@Timeout(60)
public void test_getQueryReport_forCompletedDartQuery()
@@ -174,6 +247,46 @@ public class EmbeddedDartReportApiTest extends
EmbeddedClusterTestBase
Assertions.assertInstanceOf(MSQTaskReport.class,
reportResponse.getReportMap().get(MSQTaskReport.REPORT_KEY));
}
+ @Test
+ @Timeout(60)
+ public void test_sysQueries_returnsRecentlyFinishedQuery() throws IOException
+ {
+ final String sqlQueryId = UUID.randomUUID().toString();
+
+ // Run a Dart query with a specific SQL query ID
+ final String result = cluster.callApi().runSql(
+ "SET engine = 'msq-dart';\n"
+ + "SET sqlQueryId = '%s';\n"
+ + "SELECT COUNT(*) FROM \"%s\"",
+ sqlQueryId,
+ ingestedDataSource
+ );
+
+ // Verify the query returned results.
+ Assertions.assertEquals("10", result);
+
+ // Query sys.queries to find the recently-finished query.
+ final String sysQueriesText = cluster.callApi().runSql(
+ "SELECT engine, state, info FROM sys.queries\n"
+ + "WHERE engine = 'msq-dart'\n"
+ + "AND info LIKE '%%%s%%'",
+ sqlQueryId
+ ).trim();
+
+ // Verify the query appears in sys.queries with SUCCESS state.
+ final String[] sysQueriesResult =
CsvInputFormat.createOpenCsvParser().parseLine(sysQueriesText);
+
+ Assertions.assertEquals("msq-dart", sysQueriesResult[0]);
+ Assertions.assertEquals("SUCCESS", sysQueriesResult[1]);
+
+ final DartQueryInfo sysQueriesQueryInfo = (DartQueryInfo)
broker1.bindings().jsonMapper().readValue(
+ sysQueriesResult[2],
+ QueryInfo.class
+ );
+
+ Assertions.assertEquals(sqlQueryId, sysQueriesQueryInfo.getSqlQueryId());
+ }
+
@Test
@Timeout(60)
public void test_getQueryReport_notFound()
@@ -262,7 +375,7 @@ public class EmbeddedDartReportApiTest extends
EmbeddedClusterTestBase
Assertions.assertEquals(sql, runningQueryInfo.getSql());
Assertions.assertEquals(sqlQueryId, runningQueryInfo.getSqlQueryId());
- // Verify the report is an MSQTaskReport with RUNNING status
+ // Verify the report is an MSQTaskReport with RUNNING state
final MSQTaskReport runningMsqReport =
(MSQTaskReport)
runningReport.getReportMap().get(MSQTaskReport.REPORT_KEY);
Assertions.assertNotNull(runningMsqReport, "MSQ report should not be
null");
@@ -317,7 +430,159 @@ public class EmbeddedDartReportApiTest extends
EmbeddedClusterTestBase
}
/**
- * Polls the report API until a report is available.
+ * Test that admin (with STATE READ permission) can see queries from all
users,
+ * while regular user (without STATE READ) can only see their own queries.
+ */
+ @Test
+ @Timeout(60)
+ public void test_getRunningQueries_authorization()
+ {
+ final String adminQueryId = "admin-query-" + UUID.randomUUID();
+ final String regularUserQueryId = "regular-query-" + UUID.randomUUID();
+
+ // Run a query as admin
+ runSqlWithClient(
+ StringUtils.format(
+ "SET engine = 'msq-dart';\n"
+ + "SET sqlQueryId = '%s';\n"
+ + "SELECT COUNT(*) FROM \"%s\"",
+ adminQueryId,
+ ingestedDataSource
+ ),
+ adminClient
+ );
+
+ // Run a query as regular user
+ runSqlWithClient(
+ StringUtils.format(
+ "SET engine = 'msq-dart';\n"
+ + "SET sqlQueryId = '%s';\n"
+ + "SELECT COUNT(*) FROM \"%s\"",
+ regularUserQueryId,
+ ingestedDataSource
+ ),
+ regularUserClient
+ );
+
+ // Admin should see both queries
+ final GetQueriesResponse adminResponse =
getRunningQueriesWithClient(adminClient);
+ Assertions.assertNotNull(adminResponse);
+
+ final List<String> adminVisibleSqlQueryIds = getSqlQueryIds(adminResponse);
+ Assertions.assertTrue(adminVisibleSqlQueryIds.contains(adminQueryId));
+
Assertions.assertTrue(adminVisibleSqlQueryIds.contains(regularUserQueryId));
+
+ // Admin can get either query report
+ Assertions.assertNotNull(getReportWithClient(adminQueryId, adminClient));
+ Assertions.assertNotNull(getReportWithClient(regularUserQueryId,
adminClient));
+
+ // Regular user should only see their own query
+ final GetQueriesResponse regularUserResponse =
getRunningQueriesWithClient(regularUserClient);
+ Assertions.assertNotNull(regularUserResponse);
+
+ final List<String> regularUserVisibleSqlQueryIds =
getSqlQueryIds(regularUserResponse);
+
Assertions.assertFalse(regularUserVisibleSqlQueryIds.contains(adminQueryId));
+
Assertions.assertTrue(regularUserVisibleSqlQueryIds.contains(regularUserQueryId));
+
+ // Regular user can get only their own query report
+ final RuntimeException e = Assertions.assertThrows(
+ RuntimeException.class,
+ () -> getReportWithClient(adminQueryId, regularUserClient)
+ );
+ MatcherAssert.assertThat(e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("404 Not
Found")));
+ Assertions.assertNotNull(getReportWithClient(regularUserQueryId,
regularUserClient));
+ }
+
+ /**
+ * Test that admin (with STATE READ permission) can see queries from all
users in sys.queries,
+ * while regular user (without STATE READ) can only see their own queries.
+ */
+ @Test
+ @Timeout(60)
+ public void test_sysQueries_authorization() throws IOException
+ {
+ final String adminQueryId = "admin-sys-query-" + UUID.randomUUID();
+ final String regularUserQueryId = "regular-sys-query-" + UUID.randomUUID();
+
+ // Run a query as admin
+ runSqlWithClient(
+ StringUtils.format(
+ "SET engine = 'msq-dart';\n"
+ + "SET sqlQueryId = '%s';\n"
+ + "SELECT COUNT(*) FROM \"%s\"",
+ adminQueryId,
+ ingestedDataSource
+ ),
+ adminClient
+ );
+
+ // Run a query as regular user
+ runSqlWithClient(
+ StringUtils.format(
+ "SET engine = 'msq-dart';\n"
+ + "SET sqlQueryId = '%s';\n"
+ + "SELECT COUNT(*) FROM \"%s\"",
+ regularUserQueryId,
+ ingestedDataSource
+ ),
+ regularUserClient
+ );
+
+ // Admin queries sys.queries and should see both queries
+ final List<String> adminVisibleSqlQueryIds =
getSqlQueryIdsFromSysQueries(adminClient);
+ Assertions.assertTrue(adminVisibleSqlQueryIds.contains(adminQueryId));
+
Assertions.assertTrue(adminVisibleSqlQueryIds.contains(regularUserQueryId));
+
+ // Regular user queries sys.queries and should only see their own query
+ final List<String> regularUserVisibleSqlQueryIds =
getSqlQueryIdsFromSysQueries(regularUserClient);
+
Assertions.assertFalse(regularUserVisibleSqlQueryIds.contains(adminQueryId));
+
Assertions.assertTrue(regularUserVisibleSqlQueryIds.contains(regularUserQueryId));
+ }
+
+ /**
+ * Extracts SQL query IDs from a {@link GetQueriesResponse} for Dart queries
only.
+ */
+ private static List<String> getSqlQueryIds(GetQueriesResponse response)
+ {
+ return response.getQueries().stream()
+ .filter(q -> q instanceof DartQueryInfo)
+ .map(q -> ((DartQueryInfo) q).getSqlQueryId())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Queries sys.queries and extracts SQL query IDs for Dart queries.
+ * The info column contains JSON with the sqlQueryId field.
+ */
+ private List<String> getSqlQueryIdsFromSysQueries(HttpClient httpClient)
throws IOException
+ {
+ final String sysQueriesResult = runSqlWithClient(
+ "SELECT info FROM sys.queries WHERE engine = 'msq-dart'",
+ httpClient
+ ).trim();
+
+ if (sysQueriesResult.isEmpty()) {
+ return List.of();
+ }
+
+ final List<String> sqlQueryIds = new ArrayList<>();
+ for (String line : sysQueriesResult.split("\n")) {
+ // Each line is a CSV row with the info JSON field; parse it to handle
proper CSV escaping
+ final String[] csvFields =
CsvInputFormat.createOpenCsvParser().parseLine(line);
+ final DartQueryInfo info = (DartQueryInfo)
broker1.bindings().jsonMapper().readValue(
+ csvFields[0],
+ QueryInfo.class
+ );
+ if (info.getSqlQueryId() == null) {
+ throw DruidException.defensive("Missing sqlQueryId in info[%s]", info);
+ }
+ sqlQueryIds.add(info.getSqlQueryId());
+ }
+ return sqlQueryIds;
+ }
+
+ /**
+ * Polls the report API on {@link #broker1} until a report is available.
*/
private GetQueryReportResponse waitForReport(String sqlQueryId)
{
@@ -337,4 +602,90 @@ public class EmbeddedDartReportApiTest extends
EmbeddedClusterTestBase
}
throw new ISE("Timed out after[%,d] ms waiting for query to be in RUNNING
state", timeout);
}
+
+ /**
+ * Gets running queries from {@link #broker1} using the provided HTTP client
for authentication.
+ */
+ private GetQueryReportResponse getReportWithClient(String queryId,
HttpClient httpClient)
+ {
+ final String brokerUrl = getBrokerUrl(broker1);
+ final String url = StringUtils.format(
+ "%s/druid/v2/sql/queries/%s/reports",
+ brokerUrl,
+ StringUtils.urlEncode(queryId)
+ );
+
+ final StatusResponseHolder response = HttpUtil.makeRequest(
+ httpClient,
+ HttpMethod.GET,
+ url,
+ null,
+ HttpResponseStatus.OK
+ );
+
+ try {
+ return broker1.bindings().jsonMapper().readValue(response.getContent(),
GetQueryReportResponse.class);
+ }
+ catch (JsonProcessingException e) {
+ throw DruidException.defensive(e, "Failed to parse
GetQueryReportResponse");
+ }
+ }
+
+ /**
+ * Gets running queries from {@link #broker1} using the provided HTTP client
for authentication.
+ */
+ private GetQueriesResponse getRunningQueriesWithClient(HttpClient httpClient)
+ {
+ final String brokerUrl = getBrokerUrl(broker1);
+ final String url = brokerUrl + "/druid/v2/sql/queries?includeComplete";
+
+ final StatusResponseHolder response = HttpUtil.makeRequest(
+ httpClient,
+ HttpMethod.GET,
+ url,
+ null,
+ HttpResponseStatus.OK
+ );
+
+ try {
+ return broker1.bindings().jsonMapper().readValue(response.getContent(),
GetQueriesResponse.class);
+ }
+ catch (JsonProcessingException e) {
+ throw DruidException.defensive(e, "Failed to parse GetQueriesResponse");
+ }
+ }
+
+ /**
+ * Submits a SQL query to {@link #broker1} using the provided HTTP client
for authentication.
+ */
+ public String runSqlWithClient(
+ String sql,
+ HttpClient httpClient
+ )
+ {
+ final ClientSqlQuery query = new ClientSqlQuery(
+ sql,
+ ResultFormat.CSV.name(),
+ false,
+ false,
+ false,
+ Map.of(),
+ null
+ );
+
+ final String brokerUrl = getBrokerUrl(broker1);
+ final StatusResponseHolder response = HttpUtil.makeRequest(
+ httpClient,
+ HttpMethod.POST,
+ brokerUrl + "/druid/v2/sql",
+ query,
+ HttpResponseStatus.OK
+ );
+ return response.getContent();
+ }
+
+ private String getBrokerUrl(EmbeddedBroker broker)
+ {
+ return broker.bindings().selfNode().getUriToUse().toString();
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
index 6bbffdd6fa4..a0b9f7c34f9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/ControllerHolder.java
@@ -21,13 +21,16 @@ package org.apache.druid.msq.dart.controller;
import com.google.common.base.Preconditions;
import org.apache.druid.msq.dart.worker.WorkerId;
+import org.apache.druid.msq.exec.CaptureReportQueryListener;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.msq.exec.ControllerContext;
import org.apache.druid.msq.exec.QueryListener;
import org.apache.druid.msq.indexing.error.CancellationReason;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.indexing.error.WorkerFailedFault;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.sql.http.StandardQueryState;
import org.joda.time.DateTime;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,17 +45,39 @@ public class ControllerHolder
/**
* Query has been accepted, but not yet {@link
Controller#run(QueryListener)}.
*/
- ACCEPTED,
+ ACCEPTED(StandardQueryState.ACCEPTED),
/**
* Query has had {@link Controller#run(QueryListener)} called.
*/
- RUNNING,
+ RUNNING(StandardQueryState.RUNNING),
/**
* Query has been canceled.
*/
- CANCELED
+ CANCELED(StandardQueryState.CANCELED),
+
+ /**
+ * Query has exited successfully.
+ */
+ SUCCESS(StandardQueryState.SUCCESS),
+
+ /**
+ * Query has failed.
+ */
+ FAILED(StandardQueryState.FAILED);
+
+ private final String statusString;
+
+ State(String statusString)
+ {
+ this.statusString = statusString;
+ }
+
+ public String getStatusString()
+ {
+ return statusString;
+ }
}
private final Controller controller;
@@ -152,7 +177,12 @@ public class ControllerHolder
*/
public void cancel(CancellationReason reason)
{
- if (state.getAndSet(State.CANCELED) == State.RUNNING) {
+ if (state.compareAndSet(State.ACCEPTED, State.CANCELED)) {
+ // No need to call stop() since run() wasn't called.
+ return;
+ }
+
+ if (state.compareAndSet(State.RUNNING, State.CANCELED)) {
controller.stop(reason);
}
}
@@ -166,10 +196,25 @@ public class ControllerHolder
public boolean run(final QueryListener listener) throws Exception
{
if (state.compareAndSet(State.ACCEPTED, State.RUNNING)) {
- controller.run(listener);
+ final CaptureReportQueryListener reportListener = new
CaptureReportQueryListener(listener);
+ controller.run(reportListener);
+ updateStateOnQueryComplete(reportListener.getReport());
return true;
} else {
return false;
}
}
+
+ private void updateStateOnQueryComplete(final MSQTaskReportPayload report)
+ {
+ switch (report.getStatus().getStatus()) {
+ case SUCCESS:
+ state.compareAndSet(State.RUNNING, State.SUCCESS);
+ break;
+
+ case FAILED:
+ state.compareAndSet(State.RUNNING, State.FAILED);
+ break;
+ }
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
index b80a6b6daf5..9beab028d1c 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerRegistry.java
@@ -35,10 +35,14 @@ import org.apache.druid.msq.exec.Controller;
import org.joda.time.Period;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -68,7 +72,7 @@ public class DartControllerRegistry
private final LinkedHashMap<String, QueryInfoAndReport> completeReports =
new LinkedHashMap<>();
/**
- * Map of SQL query ID -> Dart query ID. Used by {@link
#getQueryInfoAndReportBySqlQueryId(String)}. Contains an
+ * Map of SQL query ID -> Dart query ID. Used by {@link
#getQueryDetailsBySqlQueryId(String)}. Contains an
* entry for every query in either {@link #controllerMap} or {@link
#completeReports}.
*
* It is possible for the same SQL query ID to map to multiple Dart query
IDs, because SQL query IDs can be set
@@ -195,21 +199,12 @@ public class DartControllerRegistry
* Gets execution details and report for a query.
*/
@Nullable
- public QueryInfoAndReport getQueryInfoAndReport(final String queryId)
+ public QueryInfoAndReport getQueryDetails(final String queryId)
{
final ControllerHolder runningController = getController(queryId);
if (runningController != null) {
- final TaskReport.ReportMap liveReportMap =
runningController.getController().liveReports();
- if (liveReportMap != null) {
- return new QueryInfoAndReport(
- DartQueryInfo.fromControllerHolder(runningController),
- liveReportMap,
- DateTimes.nowUtc()
- );
- } else {
- return null;
- }
+ return getQueryDetails(runningController);
} else {
synchronized (completeReports) {
return completeReports.get(queryId);
@@ -221,13 +216,40 @@ public class DartControllerRegistry
* Gets execution details and report for a query by SQL query ID.
*/
@Nullable
- public QueryInfoAndReport getQueryInfoAndReportBySqlQueryId(final String
sqlQueryId)
+ public QueryInfoAndReport getQueryDetailsBySqlQueryId(final String
sqlQueryId)
{
final String dartQueryId = sqlQueryIdToDartQueryId.get(sqlQueryId);
if (dartQueryId == null) {
return null;
}
- return getQueryInfoAndReport(dartQueryId);
+ return getQueryDetails(dartQueryId);
+ }
+
+ /**
+ * Gets execution details and reports for all completed queries.
+ */
+ public List<QueryInfoAndReport> getAllQueryDetails(final boolean
includeComplete)
+ {
+ final Set<String> queryIds = new HashSet<>();
+ final List<QueryInfoAndReport> retVal = new ArrayList<>();
+
+ for (final ControllerHolder controllerHolder : getAllControllers()) {
+ if (queryIds.add(controllerHolder.getController().queryId())) {
+ retVal.add(getQueryDetails(controllerHolder));
+ }
+ }
+
+ if (includeComplete) {
+ synchronized (completeReports) {
+ for (Map.Entry<String, QueryInfoAndReport> entry :
completeReports.entrySet()) {
+ if (queryIds.add(entry.getKey())) {
+ retVal.add(entry.getValue());
+ }
+ }
+ }
+ }
+
+ return retVal;
}
/**
@@ -252,4 +274,13 @@ public class DartControllerRegistry
}
}
}
+
+ private static QueryInfoAndReport getQueryDetails(final ControllerHolder
controllerHolder)
+ {
+ return new QueryInfoAndReport(
+ DartQueryInfo.fromControllerHolder(controllerHolder),
+ controllerHolder.getController().liveReports(),
+ DateTimes.nowUtc()
+ );
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
index de163ec6b0a..ffb144658ec 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/QueryInfoAndReport.java
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
import java.util.Objects;
/**
- * Object returned by {@link
DartControllerRegistry#getQueryInfoAndReport(String)}.
+ * Object returned by {@link DartControllerRegistry#getQueryDetails(String)}.
*/
public class QueryInfoAndReport
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
index 88610ee8f33..ca6f3861d95 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.msq.dart.controller.ControllerHolder;
+import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.DruidNode;
@@ -79,7 +80,7 @@ public class DartQueryInfo implements QueryInfo
holder.getAuthenticationResult().getAuthenticatedBy(),
holder.getAuthenticationResult().getIdentity(),
holder.getStartTime(),
- holder.getState().toString()
+ holder.getState().getStatusString()
);
}
@@ -151,12 +152,25 @@ public class DartQueryInfo implements QueryInfo
return startTime;
}
+ @Override
@JsonProperty
- public String getState()
+ public String state()
{
return state;
}
+ @Override
+ public String engine()
+ {
+ return DartSqlEngine.NAME;
+ }
+
+ @Override
+ public String executionId()
+ {
+ return dartQueryId;
+ }
+
/**
* Returns a copy of this instance with {@link #getAuthenticator()} and
{@link #getIdentity()} nulled.
*/
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
index e94aac0cb4a..e64fffa2b3a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
@@ -34,12 +34,13 @@ public interface DartSqlClient
/**
* Get information about all currently-running queries on this server.
*
- * @param selfOnly true if only queries from this server should be returned;
false if queries from all servers
- * should be returned
+ * @param selfOnly true if only queries from this server should be
returned; false if queries from all servers
+ * should be returned
+ * @param includeComplete true if completed queries should be included in
the response
*
- * @see SqlResource#doGetRunningQueries(String, HttpServletRequest) the
server side
+ * @see SqlResource#doGetRunningQueries(String, String, HttpServletRequest)
the server side
*/
- ListenableFuture<GetQueriesResponse> getRunningQueries(boolean selfOnly);
+ ListenableFuture<GetQueriesResponse> getRunningQueries(boolean selfOnly,
boolean includeComplete);
/**
* Get query report for a particular SQL query ID on this server.
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
index d2c0e8f1c26..b22fcef1312 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
@@ -49,13 +49,16 @@ public class DartSqlClientImpl implements DartSqlClient
}
@Override
- public ListenableFuture<GetQueriesResponse> getRunningQueries(final boolean
selfOnly)
+ public ListenableFuture<GetQueriesResponse> getRunningQueries(final boolean
selfOnly, final boolean includeComplete)
{
try {
URIBuilder builder = new URIBuilder("/queries");
if (selfOnly) {
builder.addParameter("selfOnly", null);
}
+ if (includeComplete) {
+ builder.addParameter("includeComplete", null);
+ }
return FutureUtils.transform(
client.asyncRequest(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index 0dbc8307e32..954c44d2305 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -66,6 +66,7 @@ import org.apache.druid.sql.http.GetQueriesResponse;
import org.apache.druid.sql.http.GetQueryReportResponse;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -275,21 +276,27 @@ public class DartSqlEngine implements SqlEngine
@Override
public GetQueriesResponse getRunningQueries(
boolean selfOnly,
+ boolean includeComplete,
AuthenticationResult authenticationResult,
AuthorizationResult stateReadAuthorization
)
{
- final List<DartQueryInfo> queries =
- controllerRegistry.getAllControllers()
- .stream()
- .map(DartQueryInfo::fromControllerHolder)
- .collect(Collectors.toList());
+ final List<QueryInfoAndReport> queryDetails =
controllerRegistry.getAllQueryDetails(includeComplete);
+ final List<DartQueryInfo> queries = new ArrayList<>(queryDetails.size());
+
+ for (final QueryInfoAndReport queryDetail : queryDetails) {
+ queries.add(queryDetail.getQueryInfo());
+ }
// Add queries from all other servers, if "selfOnly" is false.
if (!selfOnly) {
final List<GetQueriesResponse> otherQueries = FutureUtils.getUnchecked(
Futures.successfulAsList(
- Iterables.transform(sqlClients.getAllClients(), client ->
client.getRunningQueries(true))),
+ Iterables.transform(
+ sqlClients.getAllClients(),
+ client -> client.getRunningQueries(true, includeComplete)
+ )
+ ),
true
);
@@ -329,7 +336,7 @@ public class DartSqlEngine implements SqlEngine
final AuthorizationResult stateReadAuthorization
)
{
- QueryInfoAndReport infoAndReport =
controllerRegistry.getQueryInfoAndReportBySqlQueryId(sqlQueryId);
+ QueryInfoAndReport infoAndReport =
controllerRegistry.getQueryDetailsBySqlQueryId(sqlQueryId);
if (infoAndReport == null && !selfOnly) {
final List<GetQueryReportResponse> otherReports =
FutureUtils.getUnchecked(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
new file mode 100644
index 00000000000..597ffc82692
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/CaptureReportQueryListener.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.msq.indexing.report.MSQResultsReport;
+import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * A {@link QueryListener} wrapper that captures the report from {@link
#onQueryComplete(MSQTaskReportPayload)}.
+ */
+public class CaptureReportQueryListener implements QueryListener
+{
+ private final QueryListener delegate;
+
+ @Nullable
+ private volatile MSQTaskReportPayload report;
+
+ public CaptureReportQueryListener(final QueryListener delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ /**
+ * Retrieves the report. Can only be called once the query is complete.
+ */
+ public MSQTaskReportPayload getReport()
+ {
+ if (report == null) {
+ throw DruidException.defensive("Query not complete, cannot call
getReport()");
+ }
+
+ return report;
+ }
+
+ @Override
+ public boolean readResults()
+ {
+ return delegate.readResults();
+ }
+
+ @Override
+ public void onResultsStart(
+ final List<MSQResultsReport.ColumnAndType> signature,
+ @Nullable final List<SqlTypeName> sqlTypeNames
+ )
+ {
+ delegate.onResultsStart(signature, sqlTypeNames);
+ }
+
+ @Override
+ public boolean onResultRow(final Object[] row)
+ {
+ return delegate.onResultRow(row);
+ }
+
+ @Override
+ public void onResultsComplete()
+ {
+ delegate.onResultsComplete();
+ }
+
+ @Override
+ public void onQueryComplete(final MSQTaskReportPayload report)
+ {
+ this.report = report;
+ delegate.onQueryComplete(report);
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
index 3a19d796641..94d5be39150 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerRegistryTest.java
@@ -97,8 +97,8 @@ public class DartControllerRegistryTest
Assertions.assertEquals(0, registry.getAllControllers().size());
Assertions.assertNull(registry.getController("dart1"));
- Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
- Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+ Assertions.assertNull(registry.getQueryDetails("dart1"));
+ Assertions.assertNull(registry.getQueryDetailsBySqlQueryId("sql1"));
}
@Test
@@ -118,13 +118,13 @@ public class DartControllerRegistryTest
Assertions.assertNull(registry.getController("dart1"));
// But report is retained
- final QueryInfoAndReport infoAndReport =
registry.getQueryInfoAndReport("dart1");
+ final QueryInfoAndReport infoAndReport = registry.getQueryDetails("dart1");
Assertions.assertNotNull(infoAndReport);
Assertions.assertEquals("dart1",
infoAndReport.getQueryInfo().getDartQueryId());
Assertions.assertSame(report,
infoAndReport.getReportMap().get(MSQTaskReport.REPORT_KEY));
// And can be looked up by SQL query ID
- final QueryInfoAndReport infoAndReportBySql =
registry.getQueryInfoAndReportBySqlQueryId("sql1");
+ final QueryInfoAndReport infoAndReportBySql =
registry.getQueryDetailsBySqlQueryId("sql1");
Assertions.assertNotNull(infoAndReportBySql);
Assertions.assertEquals("dart1",
infoAndReportBySql.getQueryInfo().getDartQueryId());
Assertions.assertEquals("sql1",
infoAndReportBySql.getQueryInfo().getSqlQueryId());
@@ -142,8 +142,8 @@ public class DartControllerRegistryTest
registry.register(holder);
registry.deregister(holder, reportMap);
- Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
- Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+ Assertions.assertNull(registry.getQueryDetails("dart1"));
+ Assertions.assertNull(registry.getQueryDetailsBySqlQueryId("sql1"));
}
@Test
@@ -171,12 +171,12 @@ public class DartControllerRegistryTest
registry.register(holder);
- final QueryInfoAndReport infoAndReport =
registry.getQueryInfoAndReport("dart1");
+ final QueryInfoAndReport infoAndReport = registry.getQueryDetails("dart1");
Assertions.assertNotNull(infoAndReport);
Assertions.assertEquals("dart1",
infoAndReport.getQueryInfo().getDartQueryId());
// Also works by SQL query ID
- final QueryInfoAndReport infoAndReportBySql =
registry.getQueryInfoAndReportBySqlQueryId("sql1");
+ final QueryInfoAndReport infoAndReportBySql =
registry.getQueryDetailsBySqlQueryId("sql1");
Assertions.assertNotNull(infoAndReportBySql);
Assertions.assertEquals("dart1",
infoAndReportBySql.getQueryInfo().getDartQueryId());
@@ -204,12 +204,10 @@ public class DartControllerRegistryTest
registry.register(holder);
- // Returns null when no live reports are available
- Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
-
- // But the sqlQueryId mapping should still work after deregister with
report
- registry.deregister(holder, null);
- Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+ // Returns null report when no live reports are available
+ final QueryInfoAndReport infoAndReport = registry.getQueryDetails("dart1");
+ Assertions.assertEquals("dart1",
infoAndReport.getQueryInfo().getDartQueryId());
+ Assertions.assertNull(infoAndReport.getReportMap());
}
@Test
@@ -227,22 +225,22 @@ public class DartControllerRegistryTest
}
// Only the last 2 reports should be retained
- Assertions.assertNull(registry.getQueryInfoAndReport("dart1"));
- Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("sql1"));
+ Assertions.assertNull(registry.getQueryDetails("dart1"));
+ Assertions.assertNull(registry.getQueryDetailsBySqlQueryId("sql1"));
- Assertions.assertNotNull(registry.getQueryInfoAndReport("dart2"));
-
Assertions.assertNotNull(registry.getQueryInfoAndReportBySqlQueryId("sql2"));
+ Assertions.assertNotNull(registry.getQueryDetails("dart2"));
+ Assertions.assertNotNull(registry.getQueryDetailsBySqlQueryId("sql2"));
- Assertions.assertNotNull(registry.getQueryInfoAndReport("dart3"));
-
Assertions.assertNotNull(registry.getQueryInfoAndReportBySqlQueryId("sql3"));
+ Assertions.assertNotNull(registry.getQueryDetails("dart3"));
+ Assertions.assertNotNull(registry.getQueryDetailsBySqlQueryId("sql3"));
}
@Test
- public void test_getQueryInfoAndReportBySqlQueryId_notFound()
+ public void test_getQueryDetailsBySqlQueryId_notFound()
{
final DartControllerRegistry registry = new
DartControllerRegistry(makeConfig(10, Period.hours(1)));
-
Assertions.assertNull(registry.getQueryInfoAndReportBySqlQueryId("nonexistent"));
+ Assertions.assertNull(registry.getQueryDetailsBySqlQueryId("nonexistent"));
}
@Test
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
index eb06ab03097..7c09f0a170e 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
@@ -23,9 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.dart.guice.DartWorkerModule;
+import org.apache.druid.sql.http.StandardQueryState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -44,7 +44,7 @@ public class DartQueryInfoTest
"",
"",
DateTimes.of("2000"),
- ControllerHolder.State.RUNNING.toString()
+ StandardQueryState.RUNNING
);
ObjectMapper jsonMapper = new DefaultObjectMapper().registerModules(new
DartWorkerModule().getJacksonModules());
byte[] bytes = jsonMapper.writeValueAsBytes(dartQueryInfo);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 92f7a735052..4cf48396142 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -91,6 +91,7 @@ import org.apache.druid.sql.http.SqlEngineRegistry;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.sql.http.SqlResource;
import org.apache.druid.sql.http.SqlResourceQueryResultPusherFactory;
+import org.apache.druid.sql.http.StandardQueryState;
import org.apache.druid.sql.http.SupportedEnginesResponse;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
@@ -322,7 +323,7 @@ public class DartSqlResourceTest extends MSQTestBase
Assertions.assertEquals(
new
GetQueriesResponse(Collections.singletonList(DartQueryInfo.fromControllerHolder(holder))),
- sqlResource.doGetRunningQueries("", httpServletRequest).getEntity()
+ sqlResource.doGetRunningQueries("", null,
httpServletRequest).getEntity()
);
controllerRegistry.deregister(holder, null);
@@ -346,7 +347,7 @@ public class DartSqlResourceTest extends MSQTestBase
Assertions.assertEquals(
new GetQueriesResponse(
Collections.singletonList(DartQueryInfo.fromControllerHolder(holder).withoutAuthenticationResult())),
- sqlResource.doGetRunningQueries("", httpServletRequest).getEntity()
+ sqlResource.doGetRunningQueries("", null,
httpServletRequest).getEntity()
);
controllerRegistry.deregister(holder, null);
@@ -374,9 +375,9 @@ public class DartSqlResourceTest extends MSQTestBase
AUTHENTICATOR_NAME,
DIFFERENT_REGULAR_USER_NAME,
DateTimes.of("2001"),
- ControllerHolder.State.RUNNING.toString()
+ StandardQueryState.RUNNING
);
- Mockito.when(dartSqlClient.getRunningQueries(true))
+ Mockito.when(dartSqlClient.getRunningQueries(true, false))
.thenReturn(Futures.immediateFuture(new
GetQueriesResponse(Collections.singletonList(remoteQueryInfo))));
// With selfOnly = null, the endpoint returns both queries.
@@ -387,7 +388,7 @@ public class DartSqlResourceTest extends MSQTestBase
remoteQueryInfo
)
),
- sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
+ sqlResource.doGetRunningQueries(null, null,
httpServletRequest).getEntity()
);
controllerRegistry.deregister(localHolder, null);
@@ -407,14 +408,14 @@ public class DartSqlResourceTest extends MSQTestBase
final ControllerHolder localHolder =
setUpMockRunningQuery(REGULAR_USER_NAME);
// Remote call fails.
- Mockito.when(dartSqlClient.getRunningQueries(true))
+ Mockito.when(dartSqlClient.getRunningQueries(true, false))
.thenReturn(Futures.immediateFailedFuture(new
IOException("something went wrong")));
// We only see local queries, because the remote call failed. (The entire
call doesn't fail; we see what we
// were able to fetch.)
Assertions.assertEquals(
new
GetQueriesResponse(ImmutableList.of(DartQueryInfo.fromControllerHolder(localHolder))),
- sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
+ sqlResource.doGetRunningQueries(null, null,
httpServletRequest).getEntity()
);
controllerRegistry.deregister(localHolder, null);
@@ -442,16 +443,16 @@ public class DartSqlResourceTest extends MSQTestBase
AUTHENTICATOR_NAME,
DIFFERENT_REGULAR_USER_NAME,
DateTimes.of("2000"),
- ControllerHolder.State.RUNNING.toString()
+ StandardQueryState.RUNNING
);
- Mockito.when(dartSqlClient.getRunningQueries(true))
+ Mockito.when(dartSqlClient.getRunningQueries(true, false))
.thenReturn(Futures.immediateFuture(new
GetQueriesResponse(Collections.singletonList(remoteQueryInfo))));
// The endpoint returns only the query issued by REGULAR_USER_NAME.
Assertions.assertEquals(
new GetQueriesResponse(
ImmutableList.of(DartQueryInfo.fromControllerHolder(localHolder).withoutAuthenticationResult())),
- sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
+ sqlResource.doGetRunningQueries(null, null,
httpServletRequest).getEntity()
);
controllerRegistry.deregister(localHolder, null);
@@ -479,15 +480,15 @@ public class DartSqlResourceTest extends MSQTestBase
AUTHENTICATOR_NAME,
DIFFERENT_REGULAR_USER_NAME,
DateTimes.of("2000"),
- ControllerHolder.State.RUNNING.toString()
+ StandardQueryState.RUNNING
);
- Mockito.when(dartSqlClient.getRunningQueries(true))
+ Mockito.when(dartSqlClient.getRunningQueries(true, false))
.thenReturn(Futures.immediateFuture(new
GetQueriesResponse(Collections.singletonList(remoteQueryInfo))));
// The endpoint returns only the query issued by
DIFFERENT_REGULAR_USER_NAME.
Assertions.assertEquals(
new
GetQueriesResponse(ImmutableList.of(remoteQueryInfo.withoutAuthenticationResult())),
- sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
+ sqlResource.doGetRunningQueries(null, null,
httpServletRequest).getEntity()
);
controllerRegistry.deregister(holder, null);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
index 34be09e5bb0..cdd1276c481 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
@@ -86,7 +86,7 @@ public class DartSqlClientImplTest
jsonMapper.writeValueAsBytes(getQueriesResponse)
);
- final ListenableFuture<GetQueriesResponse> result =
dartSqlClient.getRunningQueries(false);
+ final ListenableFuture<GetQueriesResponse> result =
dartSqlClient.getRunningQueries(false, false);
Assertions.assertEquals(getQueriesResponse, result.get());
}
@@ -115,7 +115,36 @@ public class DartSqlClientImplTest
jsonMapper.writeValueAsBytes(getQueriesResponse)
);
- final ListenableFuture<GetQueriesResponse> result =
dartSqlClient.getRunningQueries(true);
+ final ListenableFuture<GetQueriesResponse> result =
dartSqlClient.getRunningQueries(true, false);
+ Assertions.assertEquals(getQueriesResponse, result.get());
+ }
+
+ @Test
+ public void test_getMessages_includeComplete() throws Exception
+ {
+ final GetQueriesResponse getQueriesResponse = new GetQueriesResponse(
+ ImmutableList.of(
+ new DartQueryInfo(
+ "sid",
+ "did",
+ "SELECT 1",
+ "localhost:1001",
+ "",
+ "",
+ DateTimes.of("2000"),
+ ControllerHolder.State.RUNNING.toString()
+ )
+ )
+ );
+
+ serviceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.GET, "/queries?includeComplete"),
+ HttpResponseStatus.OK,
+ ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
+ jsonMapper.writeValueAsBytes(getQueriesResponse)
+ );
+
+ final ListenableFuture<GetQueriesResponse> result =
dartSqlClient.getRunningQueries(false, true);
Assertions.assertEquals(getQueriesResponse, result.get());
}
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
index 353501b78c4..d8c77c69589 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java
@@ -80,6 +80,9 @@ public class PlannerConfig
@JsonProperty
private String nativeQuerySqlPlanningMode =
QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED; // can be COUPLED or
DECOUPLED
+ @JsonProperty
+ private boolean enableSysQueriesTable = false;
+
public int getMaxNumericInFilters()
{
return maxNumericInFilters;
@@ -149,6 +152,14 @@ public class PlannerConfig
return nativeQuerySqlPlanningMode;
}
+ /**
+ * Returns whether the sys.queries table is enabled.
+ */
+ public boolean isEnableSysQueriesTable()
+ {
+ return enableSysQueriesTable;
+ }
+
public PlannerConfig withOverrides(final Map<String, Object> queryContext)
{
if (queryContext.isEmpty()) {
@@ -177,6 +188,7 @@ public class PlannerConfig
&& useNativeQueryExplain == that.useNativeQueryExplain
&& forceExpressionVirtualColumns ==
that.forceExpressionVirtualColumns
&& maxNumericInFilters == that.maxNumericInFilters
+ && enableSysQueriesTable == that.enableSysQueriesTable
&& Objects.equals(sqlTimeZone, that.sqlTimeZone)
&& Objects.equals(nativeQuerySqlPlanningMode,
that.nativeQuerySqlPlanningMode);
}
@@ -197,7 +209,8 @@ public class PlannerConfig
useNativeQueryExplain,
forceExpressionVirtualColumns,
maxNumericInFilters,
- nativeQuerySqlPlanningMode
+ nativeQuerySqlPlanningMode,
+ enableSysQueriesTable
);
}
@@ -213,6 +226,7 @@ public class PlannerConfig
", sqlTimeZone=" + sqlTimeZone +
", useNativeQueryExplain=" + useNativeQueryExplain +
", nativeQuerySqlPlanningMode=" + nativeQuerySqlPlanningMode +
+ ", enableSysQueriesTable=" + enableSysQueriesTable +
'}';
}
@@ -247,6 +261,7 @@ public class PlannerConfig
private boolean forceExpressionVirtualColumns;
private int maxNumericInFilters;
private String nativeQuerySqlPlanningMode;
+ private boolean enableSysQueriesTable;
public Builder(PlannerConfig base)
{
@@ -266,6 +281,7 @@ public class PlannerConfig
forceExpressionVirtualColumns = base.isForceExpressionVirtualColumns();
maxNumericInFilters = base.getMaxNumericInFilters();
nativeQuerySqlPlanningMode = base.getNativeQuerySqlPlanningMode();
+ enableSysQueriesTable = base.isEnableSysQueriesTable();
}
public Builder requireTimeCondition(boolean option)
@@ -340,6 +356,12 @@ public class PlannerConfig
return this;
}
+ public Builder enableSysQueriesTable(boolean option)
+ {
+ this.enableSysQueriesTable = option;
+ return this;
+ }
+
public Builder withOverrides(final Map<String, Object> queryContext)
{
useApproximateCountDistinct = QueryContexts.parseBoolean(
@@ -436,6 +458,7 @@ public class PlannerConfig
config.maxNumericInFilters = maxNumericInFilters;
config.forceExpressionVirtualColumns = forceExpressionVirtualColumns;
config.nativeQuerySqlPlanningMode = nativeQuerySqlPlanningMode;
+ config.enableSysQueriesTable = enableSysQueriesTable;
return config;
}
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
index 7b2902c11b8..f47328a21aa 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
@@ -138,12 +138,15 @@ public interface SqlEngine
*
* @param selfOnly whether to only include queries running on
this server. If false, this server should
* contact all other servers to build a full
list of all running queries.
+ * @param includeComplete whether to include completed queries in the
response. The number of completed queries
+ * returned is determined by engine-specific
retention settings.
* @param authenticationResult implementations should use this for
filtering the list of visible queries
* @param stateReadAuthorization authorization for the STATE READ resource.
If this is authorized, implementations
* should allow all queries to be visible
*/
default GetQueriesResponse getRunningQueries(
boolean selfOnly,
+ boolean includeComplete,
AuthenticationResult authenticationResult,
AuthorizationResult stateReadAuthorization
)
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index 8b15f30bab1..4440582d9a0 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
+import com.google.inject.Provider;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.calcite.DataContext;
@@ -75,8 +76,13 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.table.RowSignatures;
+import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.QueryInfo;
+import org.apache.druid.sql.http.SqlEngineRegistry;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentStatusInCluster;
@@ -102,6 +108,7 @@ public class SystemSchema extends AbstractSchema
private static final String SERVER_SEGMENTS_TABLE = "server_segments";
private static final String TASKS_TABLE = "tasks";
private static final String SUPERVISOR_TABLE = "supervisors";
+ private static final String QUERIES_TABLE = "queries";
private static final Function<SegmentStatusInCluster,
Iterable<ResourceAction>>
SEGMENT_STATUS_IN_CLUSTER_RA_GENERATOR = segment ->
@@ -226,6 +233,24 @@ public class SystemSchema extends AbstractSchema
.add("spec", ColumnType.STRING)
.build();
+ static final RowSignature QUERIES_SIGNATURE = RowSignature
+ .builder()
+ .add("id", ColumnType.STRING)
+ .add("engine", ColumnType.STRING)
+ .add("state", ColumnType.STRING)
+ .add("info", ColumnType.STRING)
+ .build();
+
+ /**
+ * Index of the "info" column in {@link #QUERIES_SIGNATURE}. Used for
projection pushdown.
+ */
+ private static final int QUERIES_INFO_INDEX =
QUERIES_SIGNATURE.indexOf("info");
+
+ /**
+ * List of [0..n) where n is the size of {@link #QUERIES_SIGNATURE}.
+ */
+ private static final int[] QUERIES_PROJECT_ALL = IntStream.range(0,
QUERIES_SIGNATURE.size()).toArray();
+
private final Map<String, Table> tableMap;
@Inject
@@ -239,13 +264,16 @@ public class SystemSchema extends AbstractSchema
final OverlordClient overlordClient,
final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider,
final ObjectMapper jsonMapper,
- @EscalatedClient final HttpClient httpClient
+ @EscalatedClient final HttpClient httpClient,
+ final Provider<SqlEngineRegistry> sqlEngineRegistryProvider,
+ final PlannerConfig plannerConfig
)
{
Preconditions.checkNotNull(serverView, "serverView");
- this.tableMap = ImmutableMap.of(
- SEGMENTS_TABLE,
- new SegmentsTable(druidSchema, metadataView, jsonMapper,
authorizerMapper),
+
+ final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
+ builder.put(SEGMENTS_TABLE, new SegmentsTable(druidSchema, metadataView,
jsonMapper, authorizerMapper));
+ builder.put(
SERVERS_TABLE,
new ServersTable(
druidNodeDiscoveryProvider,
@@ -254,16 +282,21 @@ public class SystemSchema extends AbstractSchema
overlordClient,
coordinatorClient,
jsonMapper
- ),
- SERVER_SEGMENTS_TABLE,
- new ServerSegmentsTable(serverView, authorizerMapper),
- TASKS_TABLE,
- new TasksTable(overlordClient, authorizerMapper),
- SUPERVISOR_TABLE,
- new SupervisorsTable(overlordClient, authorizerMapper),
+ )
+ );
+ builder.put(SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView,
authorizerMapper));
+ builder.put(TASKS_TABLE, new TasksTable(overlordClient, authorizerMapper));
+ builder.put(SUPERVISOR_TABLE, new SupervisorsTable(overlordClient,
authorizerMapper));
+ builder.put(
SystemServerPropertiesTable.TABLE_NAME,
new SystemServerPropertiesTable(druidNodeDiscoveryProvider,
authorizerMapper, httpClient, jsonMapper)
);
+
+ if (plannerConfig.isEnableSysQueriesTable()) {
+ builder.put(QUERIES_TABLE, new QueriesTable(sqlEngineRegistryProvider,
jsonMapper, authorizerMapper));
+ }
+
+ this.tableMap = builder.build();
}
@Override
@@ -1184,4 +1217,136 @@ public class SystemSchema extends AbstractSchema
}
return projectedRow;
}
+
+ /**
+ * This table contains currently running and recently completed queries from
all SQL engines.
+ * Enabled based on {@link PlannerConfig#isEnableSysQueriesTable()}.
+ */
+ static class QueriesTable extends AbstractTable implements
ProjectableFilterableTable
+ {
+ private final Provider<SqlEngineRegistry> sqlEngineRegistryProvider;
+ private final ObjectMapper jsonMapper;
+ private final AuthorizerMapper authorizerMapper;
+
+ public QueriesTable(
+ final Provider<SqlEngineRegistry> sqlEngineRegistryProvider,
+ final ObjectMapper jsonMapper,
+ final AuthorizerMapper authorizerMapper
+ )
+ {
+ this.sqlEngineRegistryProvider = sqlEngineRegistryProvider;
+ this.jsonMapper = jsonMapper;
+ this.authorizerMapper = authorizerMapper;
+ }
+
+ @Override
+ public RelDataType getRowType(final RelDataTypeFactory typeFactory)
+ {
+ return RowSignatures.toRelDataType(QUERIES_SIGNATURE, typeFactory);
+ }
+
+ @Override
+ public TableType getJdbcTableType()
+ {
+ return TableType.SYSTEM_TABLE;
+ }
+
+ @Override
+ public Enumerable<Object[]> scan(
+ final DataContext root,
+ final List<RexNode> filters,
+ @Nullable final int[] projects
+ )
+ {
+ final AuthenticationResult authenticationResult = (AuthenticationResult)
Preconditions.checkNotNull(
+ root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT),
+ "authenticationResult in dataContext"
+ );
+
+ // Check STATE READ authorization
+ final AuthorizationResult stateReadAuthorization =
AuthorizationUtils.authorizeAllResourceActions(
+ authenticationResult,
+ Collections.singletonList(new
ResourceAction(Resource.STATE_RESOURCE, Action.READ)),
+ authorizerMapper
+ );
+
+ // Get queries from all engines
+ final List<QueryInfo> allQueries = new ArrayList<>();
+ for (final SqlEngine sqlEngine :
sqlEngineRegistryProvider.get().getAllEngines()) {
+ final GetQueriesResponse response = sqlEngine.getRunningQueries(
+ false, // selfOnly false to get queries from all servers
+ true, // includeComplete true to include all queries
+ authenticationResult,
+ stateReadAuthorization
+ );
+ allQueries.addAll(response.getQueries());
+ }
+
+ // Determine if we need to serialize the info field (based on projection
pushdown)
+ final int[] nonNullProjects = projects == null ? QUERIES_PROJECT_ALL :
projects;
+ final boolean includeInfo = containsIndex(nonNullProjects,
QUERIES_INFO_INDEX);
+
+ // Build rows
+ final FluentIterable<Object[]> results = FluentIterable
+ .from(allQueries)
+ .transform(queryInfo -> buildQueryRow(queryInfo, includeInfo,
jsonMapper))
+ .transform(row -> projectQueriesRow(row, nonNullProjects));
+
+ return Linq4j.asEnumerable(results);
+ }
+
+ /**
+ * Build a full row for a query.
+ */
+ private static Object[] buildQueryRow(
+ final QueryInfo queryInfo,
+ final boolean includeInfo,
+ final ObjectMapper jsonMapper
+ )
+ {
+ final Object[] row = new Object[QUERIES_SIGNATURE.size()];
+ row[0] = queryInfo.executionId();
+ row[1] = queryInfo.engine();
+ row[2] = queryInfo.state();
+
+ // Only serialize info if it's in the projection
+ if (includeInfo) {
+ try {
+ row[3] = jsonMapper.writeValueAsString(queryInfo);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ row[3] = null;
+ }
+
+ return row;
+ }
+
+ /**
+ * Project a row to include only the columns in the projection.
+ */
+ private static Object[] projectQueriesRow(final Object[] row, final int[]
projects)
+ {
+ final Object[] projectedRow = new Object[projects.length];
+ for (int i = 0; i < projects.length; i++) {
+ projectedRow[i] = row[projects[i]];
+ }
+ return projectedRow;
+ }
+
+ /**
+ * Check if an array contains a specific index.
+ */
+ private static boolean containsIndex(final int[] array, final int index)
+ {
+ for (final int i : array) {
+ if (i == index) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
b/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
index d3ec14cde50..fa8da407bba 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
@@ -21,7 +21,26 @@ package org.apache.druid.sql.http;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+/**
+ * Information about a SQL query. Implementations are engine-specific.
+ */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "engine")
public interface QueryInfo
{
+ /**
+ * Returns the engine name for this query, matching the JSON "engine"
property.
+ */
+ String engine();
+
+ /**
+ * Returns the state of this query, which may be an engine-specific string.
Standard strings
+ * are in {@link StandardQueryState}, although engines can use additional
strings if they like.
+ */
+ String state();
+
+ /**
+ * Returns the execution ID for this query. This is the system-generated ID
used internally,
+ * such as the dartQueryId for Dart queries.
+ */
+ String executionId();
}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index e5f1a513d52..d5b6876129b 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -140,14 +140,18 @@ public class SqlResource
/**
* API to list all running queries, for all engines that supports such
listings.
*
- * @param selfOnly if true, return queries running on this server. If false,
return queries running on all servers.
- * @param request http request.
+ * @param selfOnly if present, return queries running on this server
only. If absent, return queries
+ * running on all servers.
+ * @param includeComplete if present, include completed queries in the
response. The number of completed queries
+ * returned is determined by engine-specific
retention settings.
+ * @param request http request.
*/
@GET
@Path("/queries")
@Produces(MediaType.APPLICATION_JSON)
public Response doGetRunningQueries(
@QueryParam("selfOnly") final String selfOnly,
+ @QueryParam("includeComplete") final String includeComplete,
@Context final HttpServletRequest request
)
{
@@ -163,7 +167,14 @@ public class SqlResource
// Get running queries from all engines that support it.
for (SqlEngine sqlEngine : engines) {
- queries.addAll(sqlEngine.getRunningQueries(selfOnly != null,
authenticationResult, stateReadAccess).getQueries());
+ queries.addAll(
+ sqlEngine.getRunningQueries(
+ selfOnly != null,
+ includeComplete != null,
+ authenticationResult,
+ stateReadAccess
+ ).getQueries()
+ );
}
AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request);
diff --git a/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
b/sql/src/main/java/org/apache/druid/sql/http/StandardQueryState.java
similarity index 65%
copy from sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
copy to sql/src/main/java/org/apache/druid/sql/http/StandardQueryState.java
index d3ec14cde50..67645d4cbe7 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/StandardQueryState.java
@@ -19,9 +19,19 @@
package org.apache.druid.sql.http;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "engine")
-public interface QueryInfo
+/**
+ * Standard strings returned by {@link QueryInfo#state()}. Engines can add
their own if they like.
+ */
+public class StandardQueryState
{
+ public static final String ACCEPTED = "ACCEPTED";
+ public static final String RUNNING = "RUNNING";
+ public static final String SUCCESS = "SUCCESS";
+ public static final String FAILED = "FAILED";
+ public static final String CANCELED = "CANCELED";
+
+ private StandardQueryState()
+ {
+ // No instantiation.
+ }
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
index 55e72a7a682..e308a0331ef 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java
@@ -55,6 +55,7 @@ import org.apache.druid.server.security.Escalator;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.view.ViewManager;
import org.easymock.EasyMock;
@@ -106,6 +107,7 @@ public class DruidCalciteSchemaModuleTest extends
CalciteTestBase
@BeforeEach
public void setUp()
{
+
EasyMock.expect(plannerConfig.isEnableSysQueriesTable()).andReturn(false).anyTimes();
EasyMock.replay(plannerConfig);
target = new DruidCalciteSchemaModule();
injector = Guice.createInjector(
@@ -134,6 +136,7 @@ public class DruidCalciteSchemaModuleTest extends
CalciteTestBase
.toInstance(CentralizedDatasourceSchemaConfig.create());
binder.bind(HttpClient.class).toInstance(httpClient);
binder.bind(HttpClient.class).annotatedWith(EscalatedClient.class).toInstance(httpClient);
+ binder.bind(new TypeLiteral<Set<SqlEngine>>()
{}).toInstance(ImmutableSet.of());
},
new LifecycleModule(),
target);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index ad000977383..10cada8563e 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -102,12 +102,18 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
+import org.apache.druid.sql.calcite.planner.PlannerConfig;
+import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.apache.druid.sql.calcite.schema.SystemSchema.QueriesTable;
import org.apache.druid.sql.calcite.schema.SystemSchema.SegmentsTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestDataBuilder;
import org.apache.druid.sql.calcite.util.TestTimelineServerView;
+import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.QueryInfo;
+import org.apache.druid.sql.http.SqlEngineRegistry;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
@@ -280,7 +286,9 @@ public class SystemSchemaTest extends CalciteTestBase
overlordClient,
druidNodeDiscoveryProvider,
MAPPER,
- httpClient
+ httpClient,
+ () -> new SqlEngineRegistry(Collections.emptySet()),
+ new PlannerConfig()
);
}
@@ -1587,6 +1595,76 @@ public class SystemSchemaTest extends CalciteTestBase
}
+ @Test
+ public void testQueriesTable()
+ {
+ // Create mock SqlEngine that returns test queries
+ final SqlEngine mockEngine = EasyMock.createMock(SqlEngine.class);
+ EasyMock.expect(mockEngine.name()).andReturn("native").anyTimes();
+ EasyMock.expect(mockEngine.getRunningQueries(
+ EasyMock.eq(false),
+ EasyMock.eq(true),
+ EasyMock.anyObject(),
+ EasyMock.anyObject()
+ )).andReturn(new GetQueriesResponse(ImmutableList.of(
+ createTestQueryInfo("query-1", "native", "RUNNING"),
+ createTestQueryInfo("query-2", "native", "COMPLETED")
+ ))).once();
+ EasyMock.replay(mockEngine);
+
+ final SqlEngineRegistry registry = new
SqlEngineRegistry(ImmutableSet.of(mockEngine));
+ final QueriesTable queriesTable = new QueriesTable(() -> registry, MAPPER,
authMapper);
+
+ final DataContext dataContext = createDataContext(Users.SUPER);
+ final List<Object[]> rows = queriesTable.scan(dataContext,
Collections.emptyList(), null).toList();
+
+ Assert.assertEquals(2, rows.size());
+
+ // Verify first row
+ Assert.assertEquals("query-1", rows.get(0)[0]);
+ Assert.assertEquals("native", rows.get(0)[1]);
+ Assert.assertEquals("RUNNING", rows.get(0)[2]);
+ Assert.assertNotNull(rows.get(0)[3]); // info should be serialized JSON
+
+ // Verify second row
+ Assert.assertEquals("query-2", rows.get(1)[0]);
+ Assert.assertEquals("native", rows.get(1)[1]);
+ Assert.assertEquals("COMPLETED", rows.get(1)[2]);
+ Assert.assertNotNull(rows.get(1)[3]); // info should be serialized JSON
+
+ // Verify value types
+ verifyTypes(rows, SystemSchema.QUERIES_SIGNATURE);
+
+ EasyMock.verify(mockEngine);
+ }
+
+ /**
+ * Creates a test QueryInfo implementation for testing purposes.
+ */
+ private QueryInfo createTestQueryInfo(final String executionId, final String
engine, final String state)
+ {
+ return new QueryInfo()
+ {
+ @Override
+ public String engine()
+ {
+ return engine;
+ }
+
+ @Override
+ public String state()
+ {
+ return state;
+ }
+
+ @Override
+ public String executionId()
+ {
+ return executionId;
+ }
+ };
+ }
+
private String getStatusPropertiesUrl(DiscoveryDruidNode discoveryDruidNode)
{
return
discoveryDruidNode.getDruidNode().getUriToUse().resolve("/status/properties").toString();
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 59151d352e2..069b4ead356 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -89,6 +89,7 @@ import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.MetadataSegmentView;
import org.apache.druid.sql.calcite.schema.SystemSchema;
import
org.apache.druid.sql.calcite.util.testoperator.CalciteTestOperatorModule;
+import org.apache.druid.sql.http.SqlEngineRegistry;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
@@ -98,6 +99,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -419,7 +421,9 @@ public class CalciteTests
overlordClient,
provider,
getJsonMapper(),
- new FakeHttpClient()
+ new FakeHttpClient(),
+ () -> new SqlEngineRegistry(Collections.emptySet()),
+ new PlannerConfig()
);
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java
b/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java
index cc650ae2725..196cf1eada9 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java
@@ -104,6 +104,24 @@ public class GetQueriesResponseTest
return authenticator;
}
+ @Override
+ public String engine()
+ {
+ return "test";
+ }
+
+ @Override
+ public String state()
+ {
+ return "RUNNING";
+ }
+
+ @Override
+ public String executionId()
+ {
+ return "test-execution-id";
+ }
+
@Override
public boolean equals(Object o)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]