This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 69b2e809002 Dart resource deprecation (#18003)
69b2e809002 is described below
commit 69b2e8090023e550d12bf80dc3e056d77c8ebb40
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Mon Jun 2 14:16:02 2025 +0530
Dart resource deprecation (#18003)
Presently, DartResources handles DART queries, while SqlResource handles
sql native queries. This includes some Dart specific code which needs to be run
for creating and cancelling queries.
This PR aims to unify these interfaces by adding an "engine selector" to
SqlResource to allow it to run native as well as Dart queries. This involves
pushing any engine specific code into the SqlEngine and refactoring the Dart
cancellation to be more generic.
Release Notes
Dart specific endpoints have been removed and folded into SqlResource.
Added a new engine QueryContext parameter. The value can be native or
msq-dart. The value determines the engine used to run the query. The default
value is native.
---
extensions-core/multi-stage-query/pom.xml | 5 +
.../msq/dart/DartResourcePermissionMapper.java | 6 +-
.../msq/dart/controller/http/DartQueryInfo.java | 4 +-
.../msq/dart/controller/http/DartSqlResource.java | 277 ---------------------
.../msq/dart/controller/sql/DartSqlClient.java | 8 +-
.../controller/sql/DartSqlClientFactoryImpl.java | 4 +-
.../msq/dart/controller/sql/DartSqlClientImpl.java | 29 ++-
.../msq/dart/controller/sql/DartSqlClients.java | 4 +-
.../msq/dart/controller/sql/DartSqlEngine.java | 118 ++++++++-
.../druid/msq/dart/guice/DartControllerModule.java | 28 ++-
.../druid/msq/dart/guice/DartWorkerModule.java | 19 ++
.../java/org/apache/druid/msq/exec/Controller.java | 5 +-
.../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 15 +-
.../dart/controller/http/DartQueryInfoTest.java | 32 +++
.../dart/controller/http/DartSqlResourceTest.java | 156 ++++++------
.../controller/http/GetQueriesResponseTest.java | 62 -----
.../dart/controller/sql/DartSqlClientImplTest.java | 9 +-
.../apache/druid/msq/exec/TestMSQSqlModule.java | 2 +-
.../druid/msq/test/DartComponentSupplier.java | 4 +-
.../org/apache/druid/msq/test/MSQTestBase.java | 4 +-
.../security/AbstractAuthConfigurationTest.java | 10 +-
.../java/org/apache/druid/query/QueryContext.java | 9 +
.../java/org/apache/druid/query/QueryContexts.java | 3 +-
.../apache/druid/server/security/AuthConfig.java | 2 +-
.../java/org/apache/druid/sql/DirectStatement.java | 11 +-
.../druid/sql/calcite/run/NativeSqlEngine.java | 44 +++-
.../apache/druid/sql/calcite/run/SqlEngine.java | 35 +++
.../druid/sql/calcite/view/ViewSqlEngine.java | 8 +
.../java/org/apache/druid/sql/guice/SqlModule.java | 8 +
.../java/org/apache/druid/sql/http/EngineInfo.java | 32 +--
.../apache/druid/sql}/http/GetQueriesResponse.java | 10 +-
.../java/org/apache/druid/sql/http/QueryInfo.java | 13 +-
.../apache/druid/sql/http/SqlEngineRegistry.java | 63 +++++
.../org/apache/druid/sql/http/SqlHttpModule.java | 3 +
.../org/apache/druid/sql/http/SqlResource.java | 85 ++++++-
...tpModule.java => SupportedEnginesResponse.java} | 27 +-
.../sql/calcite/CalciteScanSignatureTest.java | 7 +
.../druid/sql/calcite/IngestionTestSqlEngine.java | 7 +
.../calcite/planner/CalcitePlannerModuleTest.java | 7 +-
.../druid/sql/calcite/util/CalciteTests.java | 14 +-
.../druid/sql/http/GetQueriesResponseTest.java | 140 +++++++++++
.../apache/druid/sql/http/SqlHttpModuleTest.java | 2 +-
.../org/apache/druid/sql/http/SqlResourceTest.java | 22 +-
43 files changed, 817 insertions(+), 536 deletions(-)
diff --git a/extensions-core/multi-stage-query/pom.xml
b/extensions-core/multi-stage-query/pom.xml
index bc26993c072..77f18c78f62 100644
--- a/extensions-core/multi-stage-query/pom.xml
+++ b/extensions-core/multi-stage-query/pom.xml
@@ -196,6 +196,11 @@
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <scope>provided</scope>
+ </dependency>
<!-- Tests -->
<dependency>
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/DartResourcePermissionMapper.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/DartResourcePermissionMapper.java
index 038d1b56c72..a69174517ec 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/DartResourcePermissionMapper.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/DartResourcePermissionMapper.java
@@ -20,13 +20,13 @@
package org.apache.druid.msq.dart;
import com.google.common.collect.ImmutableList;
-import org.apache.druid.msq.dart.controller.http.DartSqlResource;
import org.apache.druid.msq.dart.worker.http.DartWorkerResource;
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.msq.rpc.WorkerResource;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
+import org.apache.druid.sql.http.SqlResource;
import java.util.List;
@@ -34,7 +34,7 @@ public class DartResourcePermissionMapper implements
ResourcePermissionMapper
{
/**
* Permissions for admin APIs in {@link DartWorkerResource} and {@link
WorkerResource}. Note that queries from
- * end users go through {@link DartSqlResource}, which wouldn't use these
mappings.
+ * end users go through {@link SqlResource}, which wouldn't use these
mappings.
*/
@Override
public List<ResourceAction> getAdminPermissions()
@@ -47,7 +47,7 @@ public class DartResourcePermissionMapper implements
ResourcePermissionMapper
/**
* Permissions for per-query APIs in {@link DartWorkerResource} and {@link
WorkerResource}. Note that queries from
- * end users go through {@link DartSqlResource}, which wouldn't use these
mappings.
+ * end users go through {@link SqlResource}, which wouldn't use these
mappings.
*/
@Override
public List<ResourceAction> getQueryPermissions(String queryId)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
index 2bc5d08704d..88610ee8f33 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartQueryInfo.java
@@ -27,6 +27,8 @@ import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.util.MSQTaskQueryMakerUtils;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.QueryInfo;
import org.joda.time.DateTime;
import java.util.Objects;
@@ -34,7 +36,7 @@ import java.util.Objects;
/**
* Class included in {@link GetQueriesResponse}.
*/
-public class DartQueryInfo
+public class DartQueryInfo implements QueryInfo
{
private final String sqlQueryId;
private final String dartQueryId;
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
deleted file mode 100644
index 5c9b3e5ff72..00000000000
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/DartSqlResource.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.dart.controller.http;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.inject.Inject;
-import com.sun.jersey.api.core.HttpContext;
-import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.guice.annotations.Self;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.dart.Dart;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
-import org.apache.druid.msq.dart.controller.DartControllerRegistry;
-import org.apache.druid.msq.dart.controller.sql.DartSqlClients;
-import org.apache.druid.msq.indexing.error.CancellationReason;
-import org.apache.druid.query.QueryContexts;
-import org.apache.druid.server.DruidNode;
-import org.apache.druid.server.ResponseContextConfig;
-import org.apache.druid.server.initialization.ServerConfig;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.AuthenticationResult;
-import org.apache.druid.server.security.AuthorizationResult;
-import org.apache.druid.server.security.AuthorizationUtils;
-import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.server.security.Resource;
-import org.apache.druid.server.security.ResourceAction;
-import org.apache.druid.sql.HttpStatement;
-import org.apache.druid.sql.SqlLifecycleManager;
-import org.apache.druid.sql.SqlStatementFactory;
-import org.apache.druid.sql.http.SqlQuery;
-import org.apache.druid.sql.http.SqlResource;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Resource for Dart queries. API-compatible with {@link SqlResource}, so
clients can be pointed from
- * {@code /druid/v2/sql/} to {@code /druid/v2/sql/dart/} without code changes.
- */
-@Path(DartSqlResource.PATH + '/')
-public class DartSqlResource extends SqlResource
-{
- public static final String PATH = "/druid/v2/sql/dart";
-
- private static final Logger log = new Logger(DartSqlResource.class);
-
- private final DartControllerRegistry controllerRegistry;
- private final SqlLifecycleManager sqlLifecycleManager;
- private final DartSqlClients sqlClients;
- private final AuthorizerMapper authorizerMapper;
-
- // make dartqueryId a prefix the {{queeryid}}-{{startupTime}}-{{queryIndex}
- @Inject
- public DartSqlResource(
- final ObjectMapper jsonMapper,
- final AuthorizerMapper authorizerMapper,
- @Dart final SqlStatementFactory sqlStatementFactory,
- final DartControllerRegistry controllerRegistry,
- final SqlLifecycleManager sqlLifecycleManager,
- final DartSqlClients sqlClients,
- final ServerConfig serverConfig,
- final ResponseContextConfig responseContextConfig,
- @Self final DruidNode selfNode
- )
- {
- super(
- jsonMapper,
- authorizerMapper,
- sqlStatementFactory,
- sqlLifecycleManager,
- serverConfig,
- responseContextConfig,
- selfNode
- );
- this.controllerRegistry = controllerRegistry;
- this.sqlLifecycleManager = sqlLifecycleManager;
- this.sqlClients = sqlClients;
- this.authorizerMapper = authorizerMapper;
- }
-
- /**
- * API that allows callers to check if this resource is installed without
actually issuing a query. If installed,
- * this call returns 200 OK. If not installed, callers get 404 Not Found.
- */
- @GET
- @Path("/enabled")
- @Produces(MediaType.APPLICATION_JSON)
- public Response doGetEnabled(@Context final HttpServletRequest request)
- {
- AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request);
- return Response.ok(ImmutableMap.of("enabled", true)).build();
- }
-
- /**
- * API to list all running queries.
- *
- * @param selfOnly if true, return queries running on this server. If false,
return queries running on all servers.
- * @param req http request
- */
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- public GetQueriesResponse doGetRunningQueries(
- @QueryParam("selfOnly") final String selfOnly,
- @Context final HttpServletRequest req
- )
- {
- final AuthenticationResult authenticationResult =
AuthorizationUtils.authenticationResultFromRequest(req);
- final AuthorizationResult stateReadAccess =
AuthorizationUtils.authorizeAllResourceActions(
- authenticationResult,
- Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE,
Action.READ)),
- authorizerMapper
- );
-
- final List<DartQueryInfo> queries =
- controllerRegistry.getAllHolders()
- .stream()
- .map(DartQueryInfo::fromControllerHolder)
- .collect(Collectors.toList());
-
- // Add queries from all other servers, if "selfOnly" is not set.
- if (selfOnly == null) {
- final List<GetQueriesResponse> otherQueries = FutureUtils.getUnchecked(
- Futures.successfulAsList(
- Iterables.transform(sqlClients.getAllClients(), client ->
client.getRunningQueries(true))),
- true
- );
-
- for (final GetQueriesResponse response : otherQueries) {
- if (response != null) {
- queries.addAll(response.getQueries());
- }
- }
- }
-
- // Sort queries by start time, breaking ties by query ID, so the list
comes back in a consistent and nice order.
-
queries.sort(Comparator.comparing(DartQueryInfo::getStartTime).thenComparing(DartQueryInfo::getDartQueryId));
-
- final GetQueriesResponse response;
- if (stateReadAccess.allowAccessWithNoRestriction()) {
- // User can READ STATE, so they can see all running queries, as well as
authentication details.
- response = new GetQueriesResponse(queries);
- } else {
- // User cannot READ STATE, so they can see only their own queries,
without authentication details.
- response = new GetQueriesResponse(
- queries.stream()
- .filter(
- query ->
- authenticationResult.getAuthenticatedBy() != null
- && authenticationResult.getIdentity() != null
- &&
Objects.equals(authenticationResult.getAuthenticatedBy(),
query.getAuthenticator())
- && Objects.equals(authenticationResult.getIdentity(),
query.getIdentity()))
- .map(DartQueryInfo::withoutAuthenticationResult)
- .collect(Collectors.toList())
- );
- }
-
- AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(req);
- return response;
- }
-
- /**
- * API to issue a query.
- */
- @POST
- @Produces(MediaType.APPLICATION_JSON)
- @Override
- public Response doPost(
- @Context final HttpServletRequest req,
- @Context final HttpContext httpContext
- )
- {
- return this.doPost(SqlQuery.from(httpContext), req);
- }
-
- @Override
- public Response doPost(
- final SqlQuery sqlQuery,
- final HttpServletRequest req
- )
- {
- final Map<String, Object> context = new HashMap<>(sqlQuery.getContext());
-
- return super.doPost(sqlQuery.withOverridenContext(context), req);
- }
-
- /**
- * API to cancel a query.
- */
- @DELETE
- @Path("{id}")
- @Produces(MediaType.APPLICATION_JSON)
- @Override
- public Response cancelQuery(
- @PathParam("id") String sqlQueryId,
- @Context final HttpServletRequest req
- )
- {
- log.debug("Received cancel request for query[%s]", sqlQueryId);
-
- List<SqlLifecycleManager.Cancelable> cancelables =
sqlLifecycleManager.getAll(sqlQueryId);
- if (cancelables.isEmpty()) {
- // Return ACCEPTED even if the query wasn't found. When the Router
broadcasts cancellation requests to all
- // Brokers, this ensures the user sees a successful request.
- AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(req);
- return Response.status(Response.Status.ACCEPTED).build();
- }
-
- final AuthorizationResult authResult = authorizeCancellation(req,
cancelables);
-
- if (authResult.allowAccessWithNoRestriction()) {
- sqlLifecycleManager.removeAll(sqlQueryId, cancelables);
-
- // Don't call cancel() on the cancelables. That just cancels native
queries, which is useless here. Instead,
- // get the controller and stop it.
- for (SqlLifecycleManager.Cancelable cancelable : cancelables) {
- final HttpStatement stmt = (HttpStatement) cancelable;
- final Object dartQueryId =
stmt.context().get(QueryContexts.CTX_DART_QUERY_ID);
- if (dartQueryId instanceof String) {
- final ControllerHolder holder = controllerRegistry.get((String)
dartQueryId);
- if (holder != null) {
- holder.cancel(CancellationReason.USER_REQUEST);
- }
- } else {
- log.warn(
- "%s[%s] for query[%s] is not a string, cannot cancel.",
- QueryContexts.CTX_DART_QUERY_ID,
- dartQueryId,
- sqlQueryId
- );
- }
- }
-
- // Return ACCEPTED even if the query wasn't found. When the Router
broadcasts cancellation requests to all
- // Brokers, this ensures the user sees a successful request.
- return Response.status(Response.Status.ACCEPTED).build();
- } else {
- return Response.status(Response.Status.FORBIDDEN).build();
- }
- }
-}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
index 447da229d05..c8da492237a 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClient.java
@@ -20,13 +20,13 @@
package org.apache.druid.msq.dart.controller.sql;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.druid.msq.dart.controller.http.DartSqlResource;
-import org.apache.druid.msq.dart.controller.http.GetQueriesResponse;
+import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.SqlResource;
import javax.servlet.http.HttpServletRequest;
/**
- * Client for the {@link DartSqlResource} resource.
+ * Client for the {@link SqlResource} resource for Dart queries.
*/
public interface DartSqlClient
{
@@ -36,7 +36,7 @@ public interface DartSqlClient
* @param selfOnly true if only queries from this server should be returned;
false if queries from all servers
* should be returned
*
- * @see DartSqlResource#doGetRunningQueries(String, HttpServletRequest) the
server side
+ * @see SqlResource#doGetRunningQueries(String, HttpServletRequest) the
server side
*/
ListenableFuture<GetQueriesResponse> getRunningQueries(boolean selfOnly);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
index c2355a43e31..3562f5c2ff8 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientFactoryImpl.java
@@ -24,13 +24,13 @@ import com.google.inject.Inject;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.msq.dart.controller.http.DartSqlResource;
import org.apache.druid.rpc.FixedServiceLocator;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.sql.http.SqlResource;
/**
* Production implementation of {@link DartSqlClientFactory}.
@@ -55,7 +55,7 @@ public class DartSqlClientFactoryImpl implements
DartSqlClientFactory
{
final ServiceClient client = clientFactory.makeClient(
StringUtils.format("%s[dart-sql]", node.getHostAndPortToUse()),
- new
FixedServiceLocator(ServiceLocation.fromDruidNode(node).withBasePath(DartSqlResource.PATH)),
+ new
FixedServiceLocator(ServiceLocation.fromDruidNode(node).withBasePath(SqlResource.PATH)),
StandardRetryPolicy.noRetries()
);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
index aebf7e4b90f..7b054220521 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImpl.java
@@ -24,11 +24,14 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import
org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
-import org.apache.druid.msq.dart.controller.http.GetQueriesResponse;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.http.client.utils.URIBuilder;
import org.jboss.netty.handler.codec.http.HttpMethod;
+import java.net.URISyntaxException;
+
/**
* Production implementation of {@link DartSqlClient}.
*/
@@ -46,12 +49,22 @@ public class DartSqlClientImpl implements DartSqlClient
@Override
public ListenableFuture<GetQueriesResponse> getRunningQueries(final boolean
selfOnly)
{
- return FutureUtils.transform(
- client.asyncRequest(
- new RequestBuilder(HttpMethod.GET, selfOnly ? "/?selfOnly" : "/"),
- new BytesFullResponseHandler()
- ),
- holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(),
GetQueriesResponse.class)
- );
+ try {
+ URIBuilder builder = new URIBuilder("/queries");
+ if (selfOnly) {
+ builder.addParameter("selfOnly", null);
+ }
+
+ return FutureUtils.transform(
+ client.asyncRequest(
+ new RequestBuilder(HttpMethod.GET, builder.toString()),
+ new BytesFullResponseHandler()
+ ),
+ holder -> JacksonUtils.readValue(jsonMapper, holder.getContent(),
GetQueriesResponse.class)
+ );
+ }
+ catch (URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
index 733f69ee4bf..f28d895cf60 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlClients.java
@@ -30,8 +30,8 @@ import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
-import org.apache.druid.msq.dart.controller.http.DartSqlResource;
import org.apache.druid.server.DruidNode;
+import org.apache.druid.sql.http.SqlResource;
import javax.servlet.http.HttpServletRequest;
import java.util.Collection;
@@ -41,7 +41,7 @@ import java.util.Map;
/**
* Keeps {@link DartSqlClient} for all servers except ourselves. Currently the
purpose of this is to power
- * the "get all queries" API at {@link
DartSqlResource#doGetRunningQueries(String, HttpServletRequest)}.
+ * the "get all queries" API at {@link SqlResource#doGetRunningQueries(String,
String, HttpServletRequest)}.
*/
@ManageLifecycle
public class DartSqlClients implements DruidNodeDiscovery.Listener
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
index a75415f7d05..2836a1e3ceb 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/DartSqlEngine.java
@@ -20,26 +20,38 @@
package org.apache.druid.msq.dart.controller.sql;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
import com.google.inject.Inject;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.dart.Dart;
+import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.guice.DartControllerConfig;
import org.apache.druid.msq.exec.QueryKitSpecFactory;
+import org.apache.druid.msq.indexing.error.CancellationReason;
import org.apache.druid.msq.sql.DartQueryKitSpecFactory;
import org.apache.druid.msq.sql.MSQTaskSqlEngine;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.AuthorizationResult;
+import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
@@ -47,15 +59,22 @@ import org.apache.druid.sql.calcite.run.QueryMaker;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.run.SqlEngines;
import org.apache.druid.sql.destination.IngestDestination;
+import org.apache.druid.sql.http.GetQueriesResponse;
+import org.apache.druid.sql.http.QueryInfo;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
@LazySingleton
public class DartSqlEngine implements SqlEngine
{
- private static final String NAME = "msq-dart";
+ public static final String NAME = "msq-dart";
+ private static final Logger log = new Logger(DartSqlEngine.class);
private final DartControllerContextFactory controllerContextFactory;
private final DartControllerRegistry controllerRegistry;
@@ -64,6 +83,8 @@ public class DartSqlEngine implements SqlEngine
private final ServerConfig serverConfig;
private final QueryKitSpecFactory queryKitSpecFactory;
private final DefaultQueryConfig dartQueryConfig;
+ private final SqlToolbox toolbox;
+ private final DartSqlClients sqlClients;
@Inject
public DartSqlEngine(
@@ -72,7 +93,9 @@ public class DartSqlEngine implements SqlEngine
DartControllerConfig controllerConfig,
DartQueryKitSpecFactory queryKitSpecFactory,
ServerConfig serverConfig,
- @Dart DefaultQueryConfig dartQueryConfig
+ @Dart DefaultQueryConfig dartQueryConfig,
+ SqlToolbox toolbox,
+ DartSqlClients sqlClients
)
{
this(
@@ -82,7 +105,9 @@ public class DartSqlEngine implements SqlEngine
Execs.multiThreaded(controllerConfig.getConcurrentQueries(),
"dart-controller-%s"),
queryKitSpecFactory,
serverConfig,
- dartQueryConfig
+ dartQueryConfig,
+ toolbox,
+ sqlClients
);
}
@@ -93,7 +118,9 @@ public class DartSqlEngine implements SqlEngine
ExecutorService controllerExecutor,
QueryKitSpecFactory queryKitSpecFactory,
ServerConfig serverConfig,
- DefaultQueryConfig dartQueryConfig
+ DefaultQueryConfig dartQueryConfig,
+ SqlToolbox toolbox,
+ DartSqlClients sqlClients
)
{
this.controllerContextFactory = controllerContextFactory;
@@ -103,6 +130,8 @@ public class DartSqlEngine implements SqlEngine
this.queryKitSpecFactory = queryKitSpecFactory;
this.serverConfig = serverConfig;
this.dartQueryConfig = dartQueryConfig;
+ this.toolbox = toolbox;
+ this.sqlClients = sqlClients;
}
@Override
@@ -227,4 +256,85 @@ public class DartSqlEngine implements SqlEngine
final String dartQueryId = UUID.randomUUID().toString();
contextMap.put(QueryContexts.CTX_DART_QUERY_ID, dartQueryId);
}
+
+ @Override
+ public SqlStatementFactory getSqlStatementFactory()
+ {
+ return new SqlStatementFactory(toolbox.withEngine(this));
+ }
+
+ @Override
+ public List<QueryInfo> getRunningQueries(
+ boolean selfOnly,
+ AuthenticationResult authenticationResult,
+ AuthorizationResult authorizationResult
+ )
+ {
+ final List<DartQueryInfo> queries = controllerRegistry.getAllHolders()
+ .stream()
+
.map(DartQueryInfo::fromControllerHolder)
+
.collect(Collectors.toList());
+
+ // Add queries from all other servers, if "selfOnly" is false.
+ if (!selfOnly) {
+ final List<GetQueriesResponse> otherQueries = FutureUtils.getUnchecked(
+ Futures.successfulAsList(
+ Iterables.transform(sqlClients.getAllClients(), client ->
client.getRunningQueries(true))),
+ true
+ );
+
+ for (final GetQueriesResponse response : otherQueries) {
+ if (response != null) {
+ response.getQueries().stream()
+ .filter(queryInfo -> (queryInfo instanceof DartQueryInfo))
+ .map(queryInfo -> (DartQueryInfo) queryInfo)
+ .forEach(queries::add);
+ }
+ }
+ }
+
+ // Sort queries by start time, breaking ties by query ID, so the list
comes back in a consistent and nice order.
+
queries.sort(Comparator.comparing(DartQueryInfo::getStartTime).thenComparing(DartQueryInfo::getDartQueryId));
+
+ if (authorizationResult.allowAccessWithNoRestriction()) {
+ // User can READ STATE, so they can see all running queries, as well as
authentication details.
+ return List.copyOf(queries);
+ } else {
+ // User cannot READ STATE, so they can see only their own queries,
without authentication details.
+ return queries.stream()
+ .filter(
+ query ->
+ authenticationResult.getAuthenticatedBy() != null
+ && authenticationResult.getIdentity() != null
+ && Objects.equals(
+ authenticationResult.getAuthenticatedBy(),
+ query.getAuthenticator()
+ )
+ && Objects.equals(
+ authenticationResult.getIdentity(),
+ query.getIdentity()
+ ))
+ .map(DartQueryInfo::withoutAuthenticationResult)
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public void cancelQuery(PlannerContext plannerContext, QueryScheduler
queryScheduler)
+ {
+ final Object dartQueryId =
plannerContext.queryContext().get(QueryContexts.CTX_DART_QUERY_ID);
+ if (dartQueryId instanceof String) {
+ final ControllerHolder holder = controllerRegistry.get((String)
dartQueryId);
+ if (holder != null) {
+ holder.cancel(CancellationReason.USER_REQUEST);
+ }
+ } else {
+ log.warn(
+ "%s[%s] for query[%s] is not a string, cannot cancel.",
+ QueryContexts.CTX_DART_QUERY_ID,
+ dartQueryId,
+ plannerContext.getSqlQueryId()
+ );
+ }
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
index 0dae32f7cf0..3ee647e65ba 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
@@ -19,13 +19,15 @@
package org.apache.druid.msq.dart.guice;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
+import com.google.inject.multibindings.Multibinder;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
@@ -40,7 +42,7 @@ import
org.apache.druid.msq.dart.controller.DartControllerContextFactoryImpl;
import org.apache.druid.msq.dart.controller.DartControllerRegistry;
import org.apache.druid.msq.dart.controller.DartMessageRelayFactoryImpl;
import org.apache.druid.msq.dart.controller.DartMessageRelays;
-import org.apache.druid.msq.dart.controller.http.DartSqlResource;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactory;
import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactoryImpl;
import org.apache.druid.msq.dart.controller.sql.DartSqlClients;
@@ -49,7 +51,10 @@ import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
+import org.apache.druid.sql.calcite.run.SqlEngine;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
/**
@@ -77,8 +82,6 @@ public class DartControllerModule implements DruidModule
JsonConfigProvider.bind(binder, DartModules.DART_PROPERTY_BASE +
".controller", DartControllerConfig.class);
JsonConfigProvider.bind(binder, DartModules.DART_PROPERTY_BASE +
".query", DefaultQueryConfig.class, Dart.class);
- Jerseys.addResource(binder, DartSqlResource.class);
-
LifecycleModule.register(binder, DartSqlClients.class);
LifecycleModule.register(binder, DartMessageRelays.class);
@@ -94,6 +97,10 @@ public class DartControllerModule implements DruidModule
binder.bind(ResourcePermissionMapper.class)
.annotatedWith(Dart.class)
.to(DartResourcePermissionMapper.class);
+ Multibinder.newSetBinder(binder, SqlEngine.class)
+ .addBinding()
+ .to(DartSqlEngine.class)
+ .in(LazySingleton.class);
}
@Provides
@@ -114,4 +121,17 @@ public class DartControllerModule implements DruidModule
return new DartMessageRelays(discoveryProvider, messageRelayFactory);
}
}
+
+ @Override
+ public List<? extends com.fasterxml.jackson.databind.Module>
getJacksonModules()
+ {
+ return Collections.<com.fasterxml.jackson.databind.Module>singletonList(
+ new SimpleModule("DartModule").registerSubtypes(
+ new NamedType(
+ DartQueryInfo.class,
+ DartSqlEngine.NAME
+ )
+ )
+ );
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
index e9bd59f53d8..bd565078f9b 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
@@ -19,6 +19,8 @@
package org.apache.druid.msq.dart.guice;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
@@ -43,7 +45,9 @@ import org.apache.druid.messages.server.Outbox;
import org.apache.druid.messages.server.OutboxImpl;
import org.apache.druid.msq.dart.Dart;
import org.apache.druid.msq.dart.DartResourcePermissionMapper;
+import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
+import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
import org.apache.druid.msq.dart.worker.DartDataSegmentProvider;
import org.apache.druid.msq.dart.worker.DartWorkerFactory;
import org.apache.druid.msq.dart.worker.DartWorkerFactoryImpl;
@@ -57,6 +61,8 @@ import org.apache.druid.server.DruidNode;
import org.apache.druid.server.security.AuthorizerMapper;
import java.io.File;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
@@ -150,4 +156,17 @@ public class DartWorkerModule implements DruidModule
return new OutboxImpl<>();
}
}
+
+ @Override
+ public List<? extends com.fasterxml.jackson.databind.Module>
getJacksonModules()
+ {
+ return Collections.<com.fasterxml.jackson.databind.Module>singletonList(
+ new SimpleModule("DartModule").registerSubtypes(
+ new NamedType(
+ DartQueryInfo.class,
+ DartSqlEngine.NAME
+ )
+ )
+ );
+ }
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
index bf3b0197c29..d2fc2bd5b43 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Controller.java
@@ -22,7 +22,6 @@ package org.apache.druid.msq.exec;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
-import org.apache.druid.msq.dart.controller.http.DartSqlResource;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.error.CancellationReason;
@@ -30,9 +29,11 @@ import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.sql.calcite.run.SqlEngine;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Map;
/**
* Interface for the controller of a multi-stage query. Each Controller is
specific to a particular query.
@@ -45,7 +46,7 @@ public interface Controller
* Unique task/query ID for the batch query run by this controller.
*
* Controller IDs must be globally unique. For tasks, this is the task ID
from {@link MSQControllerTask#getId()}.
- * For Dart, this is {@link QueryContexts#CTX_DART_QUERY_ID}, set by {@link
DartSqlResource}.
+ * For Dart, this is {@link QueryContexts#CTX_DART_QUERY_ID}, set by {@link
SqlEngine#initContextMap(Map)}.
*/
String queryId();
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index 52ee833a110..1f1d0b1334f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -54,6 +54,8 @@ import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.Calcites;
@@ -87,26 +89,29 @@ public class MSQTaskSqlEngine implements SqlEngine
.build();
public static final List<String> TASK_STRUCT_FIELD_NAMES =
ImmutableList.of("TASK");
- private static final String NAME = "msq-task";
+ public static final String NAME = "msq-task";
private final OverlordClient overlordClient;
private final ObjectMapper jsonMapper;
private final MSQTerminalStageSpecFactory terminalStageSpecFactory;
private final QueryKitSpecFactory queryKitSpecFactory;
+ private final SqlToolbox sqlToolbox;
@Inject
public MSQTaskSqlEngine(
final OverlordClient overlordClient,
final ObjectMapper jsonMapper,
final MSQTerminalStageSpecFactory terminalStageSpecFactory,
- final MSQTaskQueryKitSpecFactory queryKitSpecFactory
+ final MSQTaskQueryKitSpecFactory queryKitSpecFactory,
+ final SqlToolbox sqlToolbox
)
{
this.overlordClient = overlordClient;
this.jsonMapper = jsonMapper;
this.terminalStageSpecFactory = terminalStageSpecFactory;
this.queryKitSpecFactory = queryKitSpecFactory;
+ this.sqlToolbox = sqlToolbox;
}
@Override
@@ -224,6 +229,12 @@ public class MSQTaskSqlEngine implements SqlEngine
);
}
+ @Override
+ public SqlStatementFactory getSqlStatementFactory()
+ {
+ return new SqlStatementFactory(sqlToolbox.withEngine(this));
+ }
+
/**
* Checks if the SELECT contains {@link
DruidSqlInsert#SQL_INSERT_SEGMENT_GRANULARITY} in the context. This is a
* defensive cheeck because {@link
org.apache.druid.sql.calcite.planner.DruidPlanner} should have called the
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
index 98003872353..eb06ab03097 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
@@ -19,11 +19,43 @@
package org.apache.druid.msq.dart.controller.http;
+import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.msq.dart.controller.ControllerHolder;
+import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
+import org.apache.druid.msq.dart.guice.DartWorkerModule;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.Map;
+
public class DartQueryInfoTest
{
+ @Test
+ void test_serde() throws Exception
+ {
+ DartQueryInfo dartQueryInfo = new DartQueryInfo(
+ "sid",
+ "did",
+ "SELECT 1",
+ "localhost:1001",
+ "",
+ "",
+ DateTimes.of("2000"),
+ ControllerHolder.State.RUNNING.toString()
+ );
+ ObjectMapper jsonMapper = new DefaultObjectMapper().registerModules(new
DartWorkerModule().getJacksonModules());
+ byte[] bytes = jsonMapper.writeValueAsBytes(dartQueryInfo);
+ DartQueryInfo deserialized = jsonMapper.readValue(bytes,
DartQueryInfo.class);
+ Assertions.assertEquals(dartQueryInfo, deserialized);
+
+ // Assert that the engine is present.
+ Map<String, Object> map = jsonMapper.readValue(bytes, Map.class);
+ Assertions.assertEquals(DartSqlEngine.NAME, map.get("engine"));
+ }
+
@Test
public void test_equals()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
index 87efb900cb9..389e7c8abc4 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartSqlResourceTest.java
@@ -70,7 +70,6 @@ import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.sql.SqlLifecycleManager;
-import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
@@ -83,8 +82,13 @@ import org.apache.druid.sql.calcite.util.QueryFrameworkUtils;
import org.apache.druid.sql.calcite.util.TestTimelineServerView;
import org.apache.druid.sql.calcite.view.NoopViewManager;
import org.apache.druid.sql.hook.DruidHookDispatcher;
+import org.apache.druid.sql.http.EngineInfo;
+import org.apache.druid.sql.http.GetQueriesResponse;
import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.sql.http.SqlEngineRegistry;
import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.sql.http.SqlResource;
+import org.apache.druid.sql.http.SupportedEnginesResponse;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -100,6 +104,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -109,7 +114,7 @@ import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
/**
- * Functional test of {@link DartSqlResource}, {@link DartSqlEngine}, and
{@link DartQueryMaker}.
+ * Functional test of {@link SqlResource}, {@link DartSqlEngine}, and {@link
DartQueryMaker}.
* Other classes are mocked when possible.
*/
public class DartSqlResourceTest extends MSQTestBase
@@ -138,7 +143,7 @@ public class DartSqlResourceTest extends MSQTestBase
// Objects created in setUp() below this line.
- private DartSqlResource sqlResource;
+ private SqlResource sqlResource;
private DartControllerRegistry controllerRegistry;
private ExecutorService controllerExecutor;
private AutoCloseable mockCloser;
@@ -146,13 +151,13 @@ public class DartSqlResourceTest extends MSQTestBase
// Mocks below this line.
/**
- * Mock for {@link DartSqlClients}, which is used in tests of {@link
DartSqlResource#doGetRunningQueries}.
+ * Mock for {@link DartSqlClients}, which is used in tests of {@link
SqlResource#doGetRunningQueries}.
*/
@Mock
private DartSqlClients dartSqlClients;
/**
- * Mock for {@link DartSqlClient}, which is used in tests of {@link
DartSqlResource#doGetRunningQueries}.
+ * Mock for {@link DartSqlClient}, which is used in tests of {@link
SqlResource#doGetRunningQueries}.
*/
@Mock
private DartSqlClient dartSqlClient;
@@ -174,42 +179,6 @@ public class DartSqlResourceTest extends MSQTestBase
{
mockCloser = MockitoAnnotations.openMocks(this);
- final DartSqlEngine engine = new DartSqlEngine(
- new MSQTestControllerContext(
- objectMapper,
- injector,
- null /* not used in this test */,
- workerMemoryParameters,
- loadedSegmentsMetadata,
- TaskLockType.APPEND,
- QueryContext.empty()
- ) {
- @Override
- public ControllerQueryKernelConfig queryKernelConfig(String queryId,
MSQSpec querySpec)
- {
- return super.queryKernelConfig(queryId, querySpec).toBuilder()
- .workerIds(ImmutableList.of("some")).build();
- }
- },
- controllerRegistry = new DartControllerRegistry()
- {
- @Override
- public void register(ControllerHolder holder)
- {
- super.register(holder);
- controllerRegistered.countDown();
- }
- },
- objectMapper.convertValue(ImmutableMap.of(),
DartControllerConfig.class),
- controllerExecutor = Execs.multiThreaded(
- MAX_CONTROLLERS,
- StringUtils.encodeForFormat(getClass().getSimpleName() +
"-controller-exec")
- ),
- new DartQueryKitSpecFactory(new
TestTimelineServerView(Collections.emptyList())),
- new ServerConfig(),
- new DefaultQueryConfig(ImmutableMap.of("foo", "bar"))
- );
-
final DruidSchemaCatalog rootSchema =
QueryFrameworkUtils.createMockRootSchema(
CalciteTests.INJECTOR,
queryFramework().conglomerate(),
@@ -239,7 +208,7 @@ public class DartSqlResourceTest extends MSQTestBase
final SqlLifecycleManager lifecycleManager = new SqlLifecycleManager();
final SqlToolbox toolbox = new SqlToolbox(
- engine,
+ null,
plannerFactory,
NoopServiceEmitter.instance(),
NoopRequestLogger.instance(),
@@ -248,14 +217,51 @@ public class DartSqlResourceTest extends MSQTestBase
lifecycleManager
);
- sqlResource = new DartSqlResource(
+
+ final DartSqlEngine engine = new DartSqlEngine(
+ new MSQTestControllerContext(
+ objectMapper,
+ injector,
+ null /* not used in this test */,
+ workerMemoryParameters,
+ loadedSegmentsMetadata,
+ TaskLockType.APPEND,
+ QueryContext.empty()
+ ) {
+ @Override
+ public ControllerQueryKernelConfig queryKernelConfig(String queryId,
MSQSpec querySpec)
+ {
+ return super.queryKernelConfig(queryId, querySpec).toBuilder()
+ .workerIds(ImmutableList.of("some")).build();
+ }
+ },
+ controllerRegistry = new DartControllerRegistry()
+ {
+ @Override
+ public void register(ControllerHolder holder)
+ {
+ super.register(holder);
+ controllerRegistered.countDown();
+ }
+ },
+ objectMapper.convertValue(ImmutableMap.of(),
DartControllerConfig.class),
+ controllerExecutor = Execs.multiThreaded(
+ MAX_CONTROLLERS,
+ StringUtils.encodeForFormat(getClass().getSimpleName() +
"-controller-exec")
+ ),
+ new DartQueryKitSpecFactory(new
TestTimelineServerView(Collections.emptyList())),
+ new ServerConfig(),
+ new DefaultQueryConfig(ImmutableMap.of("foo", "bar")),
+ toolbox,
+ dartSqlClients
+ );
+
+ sqlResource = new SqlResource(
objectMapper,
CalciteTests.TEST_AUTHORIZER_MAPPER,
- new SqlStatementFactory(toolbox),
- controllerRegistry,
- lifecycleManager,
- dartSqlClients,
new ServerConfig() /* currently only used for error transform strategy
*/,
+ lifecycleManager,
+ new SqlEngineRegistry(Set.of(engine)),
ResponseContextConfig.newConfig(false),
SELF_NODE
);
@@ -284,12 +290,13 @@ public class DartSqlResourceTest extends MSQTestBase
@Test
public void test_getEnabled()
{
- final Response response = sqlResource.doGetEnabled(httpServletRequest);
- Assertions.assertEquals(Response.Status.OK.getStatusCode(),
response.getStatus());
+ Response response = sqlResource.getSupportedEngines(httpServletRequest);
+ Set<EngineInfo> supportedEngines = ((SupportedEnginesResponse)
response.getEntity()).getEngines();
+ Assertions.assertTrue(supportedEngines.contains(new
EngineInfo(DartSqlEngine.NAME)));
}
/**
- * Test where a superuser calls {@link DartSqlResource#doGetRunningQueries}
with selfOnly enabled.
+ * Test where a superuser calls {@link SqlResource#doGetRunningQueries} with
selfOnly enabled.
*/
@Test
public void test_getRunningQueries_selfOnly_superUser()
@@ -301,7 +308,7 @@ public class DartSqlResourceTest extends MSQTestBase
Assertions.assertEquals(
new
GetQueriesResponse(Collections.singletonList(DartQueryInfo.fromControllerHolder(holder))),
- sqlResource.doGetRunningQueries("", httpServletRequest)
+ sqlResource.doGetRunningQueries("", httpServletRequest).getEntity()
);
controllerRegistry.deregister(holder);
@@ -309,7 +316,7 @@ public class DartSqlResourceTest extends MSQTestBase
/**
* Test where {@link #REGULAR_USER_NAME} and {@link
#DIFFERENT_REGULAR_USER_NAME} issue queries, and
- * {@link #REGULAR_USER_NAME} calls {@link
DartSqlResource#doGetRunningQueries} with selfOnly enabled.
+ * {@link #REGULAR_USER_NAME} calls {@link SqlResource#doGetRunningQueries}
with selfOnly enabled.
*/
@Test
public void test_getRunningQueries_selfOnly_regularUser()
@@ -325,7 +332,7 @@ public class DartSqlResourceTest extends MSQTestBase
Assertions.assertEquals(
new GetQueriesResponse(
Collections.singletonList(DartQueryInfo.fromControllerHolder(holder).withoutAuthenticationResult())),
- sqlResource.doGetRunningQueries("", httpServletRequest)
+ sqlResource.doGetRunningQueries("", httpServletRequest).getEntity()
);
controllerRegistry.deregister(holder);
@@ -333,7 +340,7 @@ public class DartSqlResourceTest extends MSQTestBase
}
/**
- * Test where a superuser calls {@link DartSqlResource#doGetRunningQueries}
with selfOnly disabled.
+ * Test where a superuser calls {@link SqlResource#doGetRunningQueries} with
selfOnly disabled.
*/
@Test
public void test_getRunningQueries_global_superUser()
@@ -366,14 +373,14 @@ public class DartSqlResourceTest extends MSQTestBase
remoteQueryInfo
)
),
- sqlResource.doGetRunningQueries(null, httpServletRequest)
+ sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
);
controllerRegistry.deregister(localHolder);
}
/**
- * Test where a superuser calls {@link DartSqlResource#doGetRunningQueries}
with selfOnly disabled, and where the
+ * Test where a superuser calls {@link SqlResource#doGetRunningQueries} with
selfOnly disabled, and where the
* remote server has a problem.
*/
@Test
@@ -393,7 +400,7 @@ public class DartSqlResourceTest extends MSQTestBase
// were able to fetch.)
Assertions.assertEquals(
new
GetQueriesResponse(ImmutableList.of(DartQueryInfo.fromControllerHolder(localHolder))),
- sqlResource.doGetRunningQueries(null, httpServletRequest)
+ sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
);
controllerRegistry.deregister(localHolder);
@@ -401,7 +408,7 @@ public class DartSqlResourceTest extends MSQTestBase
/**
* Test where {@link #REGULAR_USER_NAME} and {@link
#DIFFERENT_REGULAR_USER_NAME} issue queries, and
- * {@link #REGULAR_USER_NAME} calls {@link
DartSqlResource#doGetRunningQueries} with selfOnly disabled.
+ * {@link #REGULAR_USER_NAME} calls {@link SqlResource#doGetRunningQueries}
with selfOnly disabled.
*/
@Test
public void test_getRunningQueries_global_regularUser()
@@ -430,7 +437,7 @@ public class DartSqlResourceTest extends MSQTestBase
Assertions.assertEquals(
new GetQueriesResponse(
ImmutableList.of(DartQueryInfo.fromControllerHolder(localHolder).withoutAuthenticationResult())),
- sqlResource.doGetRunningQueries(null, httpServletRequest)
+ sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
);
controllerRegistry.deregister(localHolder);
@@ -438,7 +445,7 @@ public class DartSqlResourceTest extends MSQTestBase
/**
* Test where {@link #REGULAR_USER_NAME} and {@link
#DIFFERENT_REGULAR_USER_NAME} issue queries, and
- * {@link #DIFFERENT_REGULAR_USER_NAME} calls {@link
DartSqlResource#doGetRunningQueries} with selfOnly disabled.
+ * {@link #DIFFERENT_REGULAR_USER_NAME} calls {@link
SqlResource#doGetRunningQueries} with selfOnly disabled.
*/
@Test
public void test_getRunningQueries_global_differentRegularUser()
@@ -466,7 +473,7 @@ public class DartSqlResourceTest extends MSQTestBase
// The endpoint returns only the query issued by
DIFFERENT_REGULAR_USER_NAME.
Assertions.assertEquals(
new
GetQueriesResponse(ImmutableList.of(remoteQueryInfo.withoutAuthenticationResult())),
- sqlResource.doGetRunningQueries(null, httpServletRequest)
+ sqlResource.doGetRunningQueries(null, httpServletRequest).getEntity()
);
controllerRegistry.deregister(holder);
@@ -490,7 +497,7 @@ public class DartSqlResourceTest extends MSQTestBase
false,
false,
false,
- Collections.emptyMap(),
+ Map.of(QueryContexts.ENGINE, DartSqlEngine.NAME),
Collections.emptyList()
);
@@ -517,7 +524,7 @@ public class DartSqlResourceTest extends MSQTestBase
false,
false,
false,
- Collections.emptyMap(),
+ Map.of(QueryContexts.ENGINE, DartSqlEngine.NAME),
Collections.emptyList()
);
@@ -545,7 +552,7 @@ public class DartSqlResourceTest extends MSQTestBase
false,
false,
false,
- Collections.emptyMap(),
+ Map.of(QueryContexts.ENGINE, DartSqlEngine.NAME),
Collections.emptyList()
);
@@ -562,7 +569,6 @@ public class DartSqlResourceTest extends MSQTestBase
assertThat((String) e.get("errorMessage"),
CoreMatchers.startsWith("InvalidNullByte: "));
}
- @Test
public void test_doPost_regularUser_fullReport() throws Exception
{
final MockAsyncContext asyncContext = new MockAsyncContext();
@@ -580,7 +586,7 @@ public class DartSqlResourceTest extends MSQTestBase
false,
false,
false,
- ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true),
+ ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true,
QueryContexts.ENGINE, DartSqlEngine.NAME),
Collections.emptyList()
);
@@ -621,7 +627,11 @@ public class DartSqlResourceTest extends MSQTestBase
false,
false,
false,
- ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true,
QueryContexts.TIMEOUT_KEY, 1),
+ ImmutableMap.of(
+ QueryContexts.CTX_FULL_REPORT, true,
+ QueryContexts.TIMEOUT_KEY, 1,
+ QueryContexts.ENGINE, DartSqlEngine.NAME
+ ),
Collections.emptyList()
);
@@ -662,7 +672,7 @@ public class DartSqlResourceTest extends MSQTestBase
false,
false,
false,
- ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true),
+ ImmutableMap.of(QueryContexts.CTX_FULL_REPORT, true,
QueryContexts.ENGINE, DartSqlEngine.NAME),
Collections.emptyList()
);
@@ -734,7 +744,11 @@ public class DartSqlResourceTest extends MSQTestBase
false,
false,
false,
- ImmutableMap.of(QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId,
QueryContexts.CTX_FULL_REPORT, fullReport),
+ ImmutableMap.of(
+ QueryContexts.CTX_SQL_QUERY_ID, sqlQueryId,
+ QueryContexts.CTX_FULL_REPORT, fullReport,
+ QueryContexts.ENGINE, DartSqlEngine.NAME
+ ),
Collections.emptyList()
);
@@ -790,12 +804,12 @@ public class DartSqlResourceTest extends MSQTestBase
.thenReturn(makeAuthenticationResult(REGULAR_USER_NAME));
final Response cancellationResponse =
sqlResource.cancelQuery("nonexistent", httpServletRequest);
- Assertions.assertEquals(Response.Status.ACCEPTED.getStatusCode(),
cancellationResponse.getStatus());
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
cancellationResponse.getStatus());
}
/**
* Add a mock {@link ControllerHolder} to {@link #controllerRegistry}, with
a query run by the given user.
- * Used by methods that test {@link DartSqlResource#doGetRunningQueries}.
+ * Used by methods that test {@link SqlResource#doGetRunningQueries}.
*
* @return the mock holder
*/
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java
deleted file mode 100644
index bffaace5745..00000000000
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponseTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.dart.controller.http;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import nl.jqno.equalsverifier.EqualsVerifier;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.msq.dart.controller.ControllerHolder;
-import org.apache.druid.segment.TestHelper;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.util.Collections;
-
-public class GetQueriesResponseTest
-{
- @Test
- public void test_serde() throws Exception
- {
- final ObjectMapper jsonMapper = TestHelper.JSON_MAPPER;
- final GetQueriesResponse response = new GetQueriesResponse(
- Collections.singletonList(
- new DartQueryInfo(
- "xyz",
- "abc",
- "SELECT 1",
- "localhost:1001",
- "auth",
- "anon",
- DateTimes.of("2000"),
- ControllerHolder.State.RUNNING.toString()
- )
- )
- );
- final GetQueriesResponse response2 =
- jsonMapper.readValue(jsonMapper.writeValueAsBytes(response),
GetQueriesResponse.class);
- Assertions.assertEquals(response, response2);
- }
-
- @Test
- public void test_equals()
- {
- EqualsVerifier.forClass(GetQueriesResponse.class).usingGetClass().verify();
- }
-}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
index 114ea9c7207..34be09e5bb0 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/sql/DartSqlClientImplTest.java
@@ -27,9 +27,10 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.msq.dart.controller.ControllerHolder;
import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
-import org.apache.druid.msq.dart.controller.http.GetQueriesResponse;
+import org.apache.druid.msq.dart.guice.DartWorkerModule;
import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.sql.http.GetQueriesResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.jupiter.api.AfterEach;
@@ -49,7 +50,7 @@ public class DartSqlClientImplTest
@BeforeEach
public void setup()
{
- jsonMapper = new DefaultObjectMapper();
+ jsonMapper = new DefaultObjectMapper().registerModules(new
DartWorkerModule().getJacksonModules());
serviceClient = new MockServiceClient();
dartSqlClient = new DartSqlClientImpl(serviceClient, jsonMapper);
}
@@ -79,7 +80,7 @@ public class DartSqlClientImplTest
);
serviceClient.expectAndRespond(
- new RequestBuilder(HttpMethod.GET, "/"),
+ new RequestBuilder(HttpMethod.GET, "/queries"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(getQueriesResponse)
@@ -108,7 +109,7 @@ public class DartSqlClientImplTest
);
serviceClient.expectAndRespond(
- new RequestBuilder(HttpMethod.GET, "/?selfOnly"),
+ new RequestBuilder(HttpMethod.GET, "/queries?selfOnly"),
HttpResponseStatus.OK,
ImmutableMap.of(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON),
jsonMapper.writeValueAsBytes(getQueriesResponse)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
index 792287f7fcc..0d72c6f2b50 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
@@ -56,7 +56,7 @@ public class TestMSQSqlModule extends TestDruidModule
MSQTestOverlordServiceClient indexingServiceClient,
MSQTaskQueryKitSpecFactory queryKitSpecFactory)
{
- return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper, new
SegmentGenerationTerminalStageSpecFactory(), queryKitSpecFactory);
+ return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper, new
SegmentGenerationTerminalStageSpecFactory(), queryKitSpecFactory, null);
}
@Provides
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
index c45c540b1c5..641836f5a26 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
@@ -44,11 +44,13 @@ import org.apache.druid.sql.avatica.DartDruidMeta;
import org.apache.druid.sql.avatica.DruidMeta;
import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.DruidModuleCollection;
import
org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
import org.apache.druid.sql.calcite.util.datasets.TestDataSet;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -113,7 +115,7 @@ public class DartComponentSupplier extends
AbstractMSQComponentSupplierDelegate
@Provides
final DruidNodeDiscoveryProvider getDiscoveryProvider()
{
- return null;
+ return new
CalciteTests.FakeDruidNodeDiscoveryProvider(Collections.emptyMap());
}
@Override
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 9e5b407a66f..2495251dec0 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -217,7 +217,6 @@ import org.mockito.Mockito;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -581,7 +580,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
indexingServiceClient,
qf.queryJsonMapper().copy().registerModules(new
MSQSqlModule().getJacksonModules()),
new SegmentGenerationTerminalStageSpecFactory(),
- injector.getInstance(MSQTaskQueryKitSpecFactory.class)
+ injector.getInstance(MSQTaskQueryKitSpecFactory.class),
+ null
);
PlannerFactory plannerFactory = new PlannerFactory(
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
index d3fea48f56e..6f3b049896a 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/security/AbstractAuthConfigurationTest.java
@@ -36,6 +36,8 @@ import
org.apache.druid.java.util.http.client.CredentialedHttpClient;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.auth.BasicCredentials;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.http.SqlTaskStatus;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
@@ -113,6 +115,10 @@ public abstract class AbstractAuthConfigurationTest
new Resource("auth_test", ResourceType.DATASOURCE),
Action.READ
),
+ new ResourceAction(
+ new Resource(QueryContexts.ENGINE, ResourceType.QUERY_CONTEXT),
+ Action.WRITE
+ ),
new ResourceAction(
new Resource("auth_test_ctx", ResourceType.QUERY_CONTEXT),
Action.WRITE
@@ -815,7 +821,9 @@ public abstract class AbstractAuthConfigurationTest
HttpResponseStatus expectedStatus
) throws Exception
{
- return makeSQLQueryRequest(httpClient, query, "/druid/v2/sql/dart",
context, expectedStatus);
+ final Map<String, Object> dartContext = new HashMap<>(context);
+ dartContext.put(QueryContexts.ENGINE, DartSqlEngine.NAME);
+ return makeSQLQueryRequest(httpClient, query, "/druid/v2/sql",
dartContext, expectedStatus);
}
protected StatusResponseHolder makeSQLQueryRequest(
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 7dfa5569946..d5368425ed0 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -471,6 +471,15 @@ public class QueryContext
return getLong(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
}
+ public String getEngine()
+ {
+ return QueryContexts.parseString(
+ context,
+ QueryContexts.ENGINE,
+ QueryContexts.DEFAULT_ENGINE
+ );
+ }
+
public boolean hasTimeout()
{
return getTimeout() != QueryContexts.NO_TIMEOUT;
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 6ecfaddaa78..d288775977c 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -29,7 +29,6 @@ import org.apache.druid.java.util.common.Numbers;
import org.apache.druid.java.util.common.StringUtils;
import javax.annotation.Nullable;
-
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
@@ -90,6 +89,7 @@ public class QueryContexts
public static final String UNCOVERED_INTERVALS_LIMIT_KEY =
"uncoveredIntervalsLimit";
public static final String MIN_TOP_N_THRESHOLD = "minTopNThreshold";
public static final String CATALOG_VALIDATION_ENABLED =
"catalogValidationEnabled";
+ public static final String ENGINE = "engine";
// this flag controls whether the topN engine can use the 'pooled' algorithm
when query granularity is set to
// anything other than 'ALL' and the cardinality + number of aggregators
would require more size than is available
// in the buffers and so must reset the cursor to use multiple passes. This
is likely slower than the default
@@ -158,6 +158,7 @@ public class QueryContexts
public static final boolean DEFAULT_ENABLE_JOIN_FILTER_REWRITE = true;
public static final boolean
DEFAULT_ENABLE_JOIN_FILTER_REWRITE_VALUE_COLUMN_FILTERS = false;
public static final CloneQueryMode DEFAULT_CLONE_QUERY_MODE =
CloneQueryMode.EXCLUDECLONES;
+ public static final String DEFAULT_ENGINE = "native";
public static final boolean DEFAULT_ENABLE_REWRITE_JOIN_TO_FILTER = true;
public static final long DEFAULT_ENABLE_JOIN_FILTER_REWRITE_MAX_SIZE = 10000;
public static final boolean DEFAULT_ENABLE_SQL_JOIN_LEFT_SCAN_DIRECT = false;
diff --git
a/server/src/main/java/org/apache/druid/server/security/AuthConfig.java
b/server/src/main/java/org/apache/druid/server/security/AuthConfig.java
index c099e766f7e..3a664594d89 100644
--- a/server/src/main/java/org/apache/druid/server/security/AuthConfig.java
+++ b/server/src/main/java/org/apache/druid/server/security/AuthConfig.java
@@ -57,7 +57,7 @@ public class AuthConfig
public static final Set<String> ALLOWED_CONTEXT_KEYS = ImmutableSet.of(
// Set in the Avatica server path
QueryContexts.CTX_SQL_STRINGIFY_ARRAYS,
- // Set in DartSqlResource
+ // Set in DartSqlEngine
QueryContexts.CTX_DART_QUERY_ID,
// Set by the Router
QueryContexts.CTX_SQL_QUERY_ID
diff --git a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
index faf8d64375c..f4f36c28b73 100644
--- a/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/DirectStatement.java
@@ -25,7 +25,6 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.server.QueryResponse;
@@ -36,7 +35,6 @@ import org.apache.druid.sql.calcite.planner.PlannerResult;
import org.apache.druid.sql.calcite.planner.PrepareResult;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
/**
* Lifecycle for direct SQL statement execution, which means that the query
@@ -68,8 +66,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
*/
public class DirectStatement extends AbstractStatement implements Cancelable
{
- private static final Logger log = new Logger(DirectStatement.class);
-
/**
* Represents the execution plan for a query with the ability to run
* that plan (once).
@@ -302,12 +298,9 @@ public class DirectStatement extends AbstractStatement
implements Cancelable
return;
}
state = State.CANCELLED;
- final CopyOnWriteArrayList<String> nativeQueryIds =
plannerContext.getNativeQueryIds();
- for (String nativeQueryId : nativeQueryIds) {
- log.debug("Canceling native query [%s]", nativeQueryId);
- sqlToolbox.queryScheduler.cancelQuery(nativeQueryId);
- }
+ // Give control to the engine to do engine specific things.
+ sqlToolbox.engine.cancelQuery(plannerContext, sqlToolbox.queryScheduler);
}
@Override
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
index 16228a19e12..1562da6d92c 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java
@@ -20,6 +20,7 @@
package org.apache.druid.sql.calcite.run;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import org.apache.calcite.rel.RelRoot;
@@ -27,10 +28,14 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.JoinAlgorithm;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.server.QueryLifecycleFactory;
+import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.SqlToolbox;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlReplace;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@@ -39,10 +44,13 @@ import org.apache.druid.sql.destination.IngestDestination;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
@LazySingleton
public class NativeSqlEngine implements SqlEngine
{
+ private static final Logger LOG = new Logger(NativeSqlEngine.class);
+
public static final Set<String> SYSTEM_CONTEXT_PARAMETERS = ImmutableSet.of(
TimeBoundaryQuery.MAX_TIME_ARRAY_OUTPUT_NAME,
TimeBoundaryQuery.MIN_TIME_ARRAY_OUTPUT_NAME,
@@ -54,19 +62,34 @@ public class NativeSqlEngine implements SqlEngine
DruidSqlReplace.SQL_REPLACE_TIME_CHUNKS
);
- private static final String NAME = "native";
+ public static final String NAME = "native";
private final QueryLifecycleFactory queryLifecycleFactory;
private final ObjectMapper jsonMapper;
+ private final SqlStatementFactory sqlStatementFactory;
@Inject
public NativeSqlEngine(
final QueryLifecycleFactory queryLifecycleFactory,
- final ObjectMapper jsonMapper
+ final ObjectMapper jsonMapper,
+ final SqlToolbox toolbox
+ )
+ {
+ this.queryLifecycleFactory = queryLifecycleFactory;
+ this.jsonMapper = jsonMapper;
+ this.sqlStatementFactory = new
SqlStatementFactory(toolbox.withEngine(this));
+ }
+
+ @VisibleForTesting
+ public NativeSqlEngine(
+ final QueryLifecycleFactory queryLifecycleFactory,
+ final ObjectMapper jsonMapper,
+ final SqlStatementFactory sqlStatementFactory
)
{
this.queryLifecycleFactory = queryLifecycleFactory;
this.jsonMapper = jsonMapper;
+ this.sqlStatementFactory = sqlStatementFactory;
}
@Override
@@ -164,4 +187,21 @@ public class NativeSqlEngine implements SqlEngine
throw InvalidSqlInput.exception("Join algorithm [%s] is not supported by
engine [%s]", joinAlgorithm, NAME);
}
}
+
+ @Override
+ public SqlStatementFactory getSqlStatementFactory()
+ {
+ return sqlStatementFactory;
+ }
+
+ @Override
+ public void cancelQuery(PlannerContext plannerContext, QueryScheduler
queryScheduler)
+ {
+ final CopyOnWriteArrayList<String> nativeQueryIds =
plannerContext.getNativeQueryIds();
+
+ for (String nativeQueryId : nativeQueryIds) {
+ LOG.debug("Canceling native query [%s]", nativeQueryId);
+ queryScheduler.cancelQuery(nativeQueryId);
+ }
+ }
}
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
index 3f906b9ea40..9e2a3930ab5 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/SqlEngine.java
@@ -23,9 +23,16 @@ import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.tools.ValidationException;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.security.AuthenticationResult;
+import org.apache.druid.server.security.AuthorizationResult;
+import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.destination.IngestDestination;
+import org.apache.druid.sql.http.QueryInfo;
+import java.util.List;
import java.util.Map;
/**
@@ -116,4 +123,32 @@ public interface SqlEngine
default void initContextMap(Map<String, Object> contextMap)
{
}
+
+ /**
+ * Returns a {@link SqlStatementFactory} which uses this engine to create
statements.
+ */
+ SqlStatementFactory getSqlStatementFactory();
+
+ /**
+ * Returns a list of {@link QueryInfo} containing the currently running
queries using this engine. Returns an empty
+ * list if the operation is not supported.
+ */
+ default List<QueryInfo> getRunningQueries(
+ boolean selfOnly,
+ AuthenticationResult authenticationResult,
+ AuthorizationResult authorizationResult
+ )
+ {
+ return List.of();
+ }
+
+ /**
+ * Cancels a currently running query given the {@link PlannerContext} for
the query.
+ */
+ default void cancelQuery(PlannerContext plannerContext, QueryScheduler
queryScheduler)
+ {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.UNSUPPORTED)
+ .build("Engine[%s] does not support canceling
queries", name());
+ }
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
index 7563b45d52b..34064916655 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
@@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite.view;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
@@ -126,4 +127,11 @@ public class ViewSqlEngine implements SqlEngine
// Can't have views of INSERT or REPLACE statements.
throw new UnsupportedOperationException();
}
+
+ @Override
+ public SqlStatementFactory getSqlStatementFactory()
+ {
+ // View engine does not execute queries.
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
index 56d0d2d5d41..06b3b8b8b19 100644
--- a/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
+++ b/sql/src/main/java/org/apache/druid/sql/guice/SqlModule.java
@@ -26,6 +26,7 @@ import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
+import com.google.inject.multibindings.Multibinder;
import org.apache.druid.catalog.model.TableDefnRegistry;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
@@ -44,6 +45,7 @@ import
org.apache.druid.sql.calcite.planner.CalcitePlannerModule;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.planner.PlannerFactory;
import org.apache.druid.sql.calcite.run.NativeSqlEngine;
+import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidCalciteSchemaModule;
import org.apache.druid.sql.calcite.schema.DruidSchemaManager;
import org.apache.druid.sql.calcite.schema.NoopDruidSchemaManager;
@@ -124,6 +126,12 @@ public class SqlModule implements Module
// Default do-nothing catalog resolver
binder.bind(CatalogResolver.class).toInstance(CatalogResolver.NULL_RESOLVER);
+
+ // Bind the engine
+ Multibinder.newSetBinder(binder, SqlEngine.class)
+ .addBinding()
+ .to(NativeSqlEngine.class)
+ .in(LazySingleton.class);
}
private boolean isEnabled()
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponse.java
b/sql/src/main/java/org/apache/druid/sql/http/EngineInfo.java
similarity index 63%
copy from
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponse.java
copy to sql/src/main/java/org/apache/druid/sql/http/EngineInfo.java
index 2d1f87f860c..cd969a18a10 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponse.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/EngineInfo.java
@@ -17,31 +17,27 @@
* under the License.
*/
-package org.apache.druid.msq.dart.controller.http;
+package org.apache.druid.sql.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import java.util.List;
import java.util.Objects;
-/**
- * Class returned by {@link DartSqlResource#doGetRunningQueries}, the "list
all queries" API.
- */
-public class GetQueriesResponse
+public class EngineInfo
{
- private final List<DartQueryInfo> queries;
+ private final String name;
@JsonCreator
- public GetQueriesResponse(@JsonProperty("queries") List<DartQueryInfo>
queries)
+ public EngineInfo(@JsonProperty("name") String name)
{
- this.queries = queries;
+ this.name = name;
}
@JsonProperty
- public List<DartQueryInfo> getQueries()
+ public String getName()
{
- return queries;
+ return name;
}
@Override
@@ -53,21 +49,13 @@ public class GetQueriesResponse
if (o == null || getClass() != o.getClass()) {
return false;
}
- GetQueriesResponse response = (GetQueriesResponse) o;
- return Objects.equals(queries, response.queries);
+ EngineInfo that = (EngineInfo) o;
+ return Objects.equals(name, that.name);
}
@Override
public int hashCode()
{
- return Objects.hashCode(queries);
- }
-
- @Override
- public String toString()
- {
- return "GetQueriesResponse{" +
- "queries=" + queries +
- '}';
+ return Objects.hashCode(name);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponse.java
b/sql/src/main/java/org/apache/druid/sql/http/GetQueriesResponse.java
similarity index 83%
rename from
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponse.java
rename to sql/src/main/java/org/apache/druid/sql/http/GetQueriesResponse.java
index 2d1f87f860c..6f889be4572 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/http/GetQueriesResponse.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/GetQueriesResponse.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.druid.msq.dart.controller.http;
+package org.apache.druid.sql.http;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -26,20 +26,20 @@ import java.util.List;
import java.util.Objects;
/**
- * Class returned by {@link DartSqlResource#doGetRunningQueries}, the "list
all queries" API.
+ * Class returned by {@link SqlResource#doGetRunningQueries}, the "list all
queries" API.
*/
public class GetQueriesResponse
{
- private final List<DartQueryInfo> queries;
+ private final List<QueryInfo> queries;
@JsonCreator
- public GetQueriesResponse(@JsonProperty("queries") List<DartQueryInfo>
queries)
+ public GetQueriesResponse(@JsonProperty("queries") List<QueryInfo> queries)
{
this.queries = queries;
}
@JsonProperty
- public List<DartQueryInfo> getQueries()
+ public List<QueryInfo> getQueries()
{
return queries;
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
b/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
similarity index 74%
copy from
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
copy to sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
index 98003872353..d3ec14cde50 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/http/DartQueryInfoTest.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/QueryInfo.java
@@ -17,16 +17,11 @@
* under the License.
*/
-package org.apache.druid.msq.dart.controller.http;
+package org.apache.druid.sql.http;
-import nl.jqno.equalsverifier.EqualsVerifier;
-import org.junit.jupiter.api.Test;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
-public class DartQueryInfoTest
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "engine")
+public interface QueryInfo
{
- @Test
- public void test_equals()
- {
- EqualsVerifier.forClass(DartQueryInfo.class).usingGetClass().verify();
- }
}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlEngineRegistry.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlEngineRegistry.java
new file mode 100644
index 00000000000..1cf5b23ef24
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlEngineRegistry.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.google.inject.Inject;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.server.initialization.jetty.BadRequestException;
+import org.apache.druid.sql.calcite.run.SqlEngine;
+
+import javax.validation.constraints.NotNull;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class SqlEngineRegistry
+{
+
+ private final Map<String, SqlEngine> engines;
+
+ @Inject
+ public SqlEngineRegistry(Set<SqlEngine> engineSet)
+ {
+ engines = engineSet.stream().collect(Collectors.toMap(SqlEngine::name,
engine -> engine));
+ }
+
+ @NotNull
+ public SqlEngine getEngine(final String engineName)
+ {
+ SqlEngine engine = engines.getOrDefault(engineName == null ?
QueryContexts.DEFAULT_ENGINE : engineName, null);
+ if (engine == null) {
+ throw new BadRequestException("Unsupported engine");
+ }
+ return engine;
+ }
+
+ public Set<String> getSupportedEngines()
+ {
+ return engines.keySet();
+ }
+
+ public Collection<SqlEngine> getAllEngines()
+ {
+ return engines.values();
+ }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
index 1fc64ccded0..2a12de5f66c 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
@@ -21,8 +21,10 @@ package org.apache.druid.sql.http;
import com.google.inject.Binder;
import com.google.inject.Module;
+import com.google.inject.multibindings.Multibinder;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.sql.calcite.run.SqlEngine;
/**
* The Module responsible for providing bindings to the SQL http endpoint
@@ -33,6 +35,7 @@ public class SqlHttpModule implements Module
public void configure(Binder binder)
{
binder.bind(SqlResource.class).in(LazySingleton.class);
+ Multibinder.newSetBinder(binder, SqlEngine.class);
Jerseys.addResource(binder, SqlResource.class);
}
}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index 5199500bd49..2c4ed3c38a0 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -20,11 +20,11 @@
package org.apache.druid.sql.http;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.sun.jersey.api.core.HttpContext;
import org.apache.druid.common.exception.SanitizableException;
-import org.apache.druid.guice.annotations.NativeQuery;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -35,71 +35,128 @@ import org.apache.druid.server.QueryResponse;
import org.apache.druid.server.QueryResultPusher;
import org.apache.druid.server.ResponseContextConfig;
import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.security.Action;
+import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizationResult;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.DirectStatement.ResultSet;
import org.apache.druid.sql.HttpStatement;
import org.apache.druid.sql.SqlLifecycleManager;
import org.apache.druid.sql.SqlLifecycleManager.Cancelable;
import org.apache.druid.sql.SqlRowTransformer;
-import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.calcite.run.SqlEngine;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-@Path("/druid/v2/sql/")
+@Path(SqlResource.PATH)
public class SqlResource
{
+ public static final String PATH = "/druid/v2/sql/";
public static final String SQL_QUERY_ID_RESPONSE_HEADER =
"X-Druid-SQL-Query-Id";
public static final String SQL_HEADER_RESPONSE_HEADER =
"X-Druid-SQL-Header-Included";
public static final String SQL_HEADER_VALUE = "yes";
+
private static final Logger log = new Logger(SqlResource.class);
- public static final SqlResourceQueryMetricCounter QUERY_METRIC_COUNTER = new
SqlResourceQueryMetricCounter();
+ private static final SqlResourceQueryMetricCounter QUERY_METRIC_COUNTER =
new SqlResourceQueryMetricCounter();
private final ObjectMapper jsonMapper;
private final AuthorizerMapper authorizerMapper;
- private final SqlStatementFactory sqlStatementFactory;
- private final SqlLifecycleManager sqlLifecycleManager;
private final ServerConfig serverConfig;
private final ResponseContextConfig responseContextConfig;
private final DruidNode selfNode;
+ private final SqlLifecycleManager sqlLifecycleManager;
+ private final SqlEngineRegistry sqlEngineRegistry;
+ @VisibleForTesting
@Inject
- protected SqlResource(
+ public SqlResource(
final ObjectMapper jsonMapper,
final AuthorizerMapper authorizerMapper,
- final @NativeQuery SqlStatementFactory sqlStatementFactory,
- final SqlLifecycleManager sqlLifecycleManager,
final ServerConfig serverConfig,
+ final SqlLifecycleManager sqlLifecycleManager,
+ final SqlEngineRegistry sqlEngineRegistry,
ResponseContextConfig responseContextConfig,
@Self DruidNode selfNode
)
{
+ this.sqlEngineRegistry = Preconditions.checkNotNull(sqlEngineRegistry,
"sqlEngineRegistry");
this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
this.authorizerMapper = Preconditions.checkNotNull(authorizerMapper,
"authorizerMapper");
- this.sqlStatementFactory = Preconditions.checkNotNull(sqlStatementFactory,
"sqlStatementFactory");
- this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager,
"sqlLifecycleManager");
this.serverConfig = Preconditions.checkNotNull(serverConfig,
"serverConfig");
this.responseContextConfig = responseContextConfig;
this.selfNode = selfNode;
+ this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager,
"sqlLifecycleManager");
+ }
+
+ @GET
+ @Path("/engines")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getSupportedEngines(@Context final HttpServletRequest
request)
+ {
+ AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request);
+ Set<EngineInfo> engines = sqlEngineRegistry.getSupportedEngines()
+ .stream()
+ .map(EngineInfo::new)
+ .collect(Collectors.toSet());
+ return Response.ok(new SupportedEnginesResponse(engines)).build();
+ }
+
+ /**
+ * API to list all running queries, for all engines that supports such
listings.
+ *
+ * @param selfOnly if true, return queries running on this server. If false,
return queries running on all servers.
+ * @param request http request.
+ */
+ @GET
+ @Path("/queries")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response doGetRunningQueries(
+ @QueryParam("selfOnly") final String selfOnly,
+ @Context final HttpServletRequest request
+ )
+ {
+ final AuthenticationResult authenticationResult =
AuthorizationUtils.authenticationResultFromRequest(request);
+ final AuthorizationResult stateReadAccess =
AuthorizationUtils.authorizeAllResourceActions(
+ authenticationResult,
+ Collections.singletonList(new ResourceAction(Resource.STATE_RESOURCE,
Action.READ)),
+ authorizerMapper
+ );
+
+ final Collection<SqlEngine> engines = sqlEngineRegistry.getAllEngines();
+ final List<QueryInfo> queries = new ArrayList<>();
+
+ // Get running queries from all engines that support it.
+ for (SqlEngine sqlEngine : engines) {
+ queries.addAll(sqlEngine.getRunningQueries(selfOnly != null,
authenticationResult, stateReadAccess));
+ }
+
+ AuthorizationUtils.setRequestAuthorizationAttributeIfNeeded(request);
+ return Response.ok().entity(new GetQueriesResponse(queries)).build();
}
@POST
@@ -114,14 +171,16 @@ public class SqlResource
}
/**
- * This method is defined as public so that subclasses like Dart or test can
access it
+ * This method is defined as public so that tests can access it
*/
public Response doPost(
final SqlQuery sqlQuery,
final HttpServletRequest req
)
{
- final HttpStatement stmt = sqlStatementFactory.httpStatement(sqlQuery,
req);
+ final String engineName = sqlQuery.queryContext().getEngine();
+ final SqlEngine engine = sqlEngineRegistry.getEngine(engineName);
+ final HttpStatement stmt =
engine.getSqlStatementFactory().httpStatement(sqlQuery, req);
final String sqlQueryId = stmt.sqlQueryId();
final String currThreadName = Thread.currentThread().getName();
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
b/sql/src/main/java/org/apache/druid/sql/http/SupportedEnginesResponse.java
similarity index 62%
copy from sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
copy to
sql/src/main/java/org/apache/druid/sql/http/SupportedEnginesResponse.java
index 1fc64ccded0..89108f4ab17 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlHttpModule.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SupportedEnginesResponse.java
@@ -19,20 +19,27 @@
package org.apache.druid.sql.http;
-import com.google.inject.Binder;
-import com.google.inject.Module;
-import org.apache.druid.guice.Jerseys;
-import org.apache.druid.guice.LazySingleton;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Set;
/**
- * The Module responsible for providing bindings to the SQL http endpoint
+ * Class returned by {@link SqlResource#getSupportedEngines}, the supported
engines API.
*/
-public class SqlHttpModule implements Module
+public class SupportedEnginesResponse
{
- @Override
- public void configure(Binder binder)
+ private final Set<EngineInfo> engines;
+
+ @JsonCreator
+ public SupportedEnginesResponse(@JsonProperty("engines") Set<EngineInfo>
engines)
+ {
+ this.engines = engines;
+ }
+
+ @JsonProperty
+ public Set<EngineInfo> getEngines()
{
- binder.bind(SqlResource.class).in(LazySingleton.class);
- Jerseys.addResource(binder, SqlResource.class);
+ return engines;
}
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteScanSignatureTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteScanSignatureTest.java
index 02a2a168034..2010c335480 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteScanSignatureTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteScanSignatureTest.java
@@ -27,6 +27,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.tools.ValidationException;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.sql.SqlStatementFactory;
import
org.apache.druid.sql.calcite.CalciteScanSignatureTest.ScanSignatureComponentSupplier;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
@@ -183,6 +184,12 @@ public class CalciteScanSignatureTest extends
BaseCalciteQueryTest
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public SqlStatementFactory getSqlStatementFactory()
+ {
+ throw new UnsupportedOperationException();
+ }
}
}
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
index 8cb2a974e6d..f2b9a26239d 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
@@ -26,6 +26,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.run.QueryMaker;
@@ -126,4 +127,10 @@ public class IngestionTestSqlEngine implements SqlEngine
return new TestInsertQueryMaker(destination, signature);
}
+
+ @Override
+ public SqlStatementFactory getSqlStatementFactory()
+ {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
index 52aadd54839..eb803186d4c 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
@@ -41,6 +41,7 @@ import org.apache.druid.segment.join.JoinableFactoryWrapper;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
@@ -186,7 +187,7 @@ public class CalcitePlannerModuleTest extends
CalciteTestBase
PlannerContext context = PlannerContext.create(
toolbox,
"SELECT 1",
- new NativeSqlEngine(queryLifecycleFactory, mapper),
+ new NativeSqlEngine(queryLifecycleFactory, mapper,
(SqlStatementFactory) null),
Collections.emptyMap(),
null
);
@@ -206,7 +207,7 @@ public class CalcitePlannerModuleTest extends
CalciteTestBase
PlannerContext contextWithBloat = PlannerContext.create(
toolbox,
"SELECT 1",
- new NativeSqlEngine(queryLifecycleFactory, mapper),
+ new NativeSqlEngine(queryLifecycleFactory, mapper,
(SqlStatementFactory) null),
Collections.singletonMap(BLOAT_PROPERTY, BLOAT),
null
);
@@ -214,7 +215,7 @@ public class CalcitePlannerModuleTest extends
CalciteTestBase
PlannerContext contextWithoutBloat = PlannerContext.create(
toolbox,
"SELECT 1",
- new NativeSqlEngine(queryLifecycleFactory, mapper),
+ new NativeSqlEngine(queryLifecycleFactory, mapper,
(SqlStatementFactory) null),
Collections.emptyMap(),
null
);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index c314a4e075a..5e59e8a1e03 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -79,6 +79,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.sql.SqlStatementFactory;
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.aggregation.SqlAggregationModule;
import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
@@ -280,7 +281,16 @@ public class CalciteTests
final QueryRunnerFactoryConglomerate conglomerate
)
{
- return new NativeSqlEngine(createMockQueryLifecycleFactory(walker,
conglomerate), getJsonMapper());
+ return createMockSqlEngine(walker, conglomerate, null);
+ }
+
+ public static NativeSqlEngine createMockSqlEngine(
+ final QuerySegmentWalker walker,
+ final QueryRunnerFactoryConglomerate conglomerate,
+ final SqlStatementFactory sqlStatementFactory
+ )
+ {
+ return new NativeSqlEngine(createMockQueryLifecycleFactory(walker,
conglomerate), getJsonMapper(), sqlStatementFactory);
}
public static QueryLifecycleFactory createMockQueryLifecycleFactory(
@@ -522,7 +532,7 @@ public class CalciteTests
/**
* A fake {@link DruidNodeDiscoveryProvider} for {@link
#createMockSystemSchema}.
*/
- private static class FakeDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvider
+ public static class FakeDruidNodeDiscoveryProvider extends
DruidNodeDiscoveryProvider
{
private final Map<NodeRole, FakeDruidNodeDiscovery> nodeDiscoveries;
diff --git
a/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java
b/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java
new file mode 100644
index 00000000000..cc650ae2725
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/http/GetQueriesResponseTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.http;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.segment.TestHelper;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class GetQueriesResponseTest
+{
+ private static ObjectMapper jsonMapper;
+
+ @BeforeAll
+ static void setUp()
+ {
+ jsonMapper =
TestHelper.makeJsonMapper().registerModules(getJacksonModules());
+ }
+
+ @Test
+ public void test_serde() throws Exception
+ {
+ final GetQueriesResponse response = new GetQueriesResponse(
+ Collections.singletonList(
+ new TestQueryInfo(
+ "query",
+ "xyz",
+ "abc"
+ )
+ )
+ );
+ final GetQueriesResponse response2 =
+ jsonMapper.readValue(jsonMapper.writeValueAsBytes(response),
GetQueriesResponse.class);
+ Assertions.assertEquals(response, response2);
+ }
+
+ @Test
+ public void test_equals()
+ {
+ EqualsVerifier.forClass(GetQueriesResponse.class).usingGetClass().verify();
+ }
+
+ static class TestQueryInfo implements QueryInfo
+ {
+ private final String query;
+ private final String identity;
+ private final String authenticator;
+
+ @JsonCreator
+ public TestQueryInfo(
+ @JsonProperty("query") String query,
+ @JsonProperty("identity") String identity,
+ @JsonProperty("authenticator") String authenticator
+ )
+ {
+ this.query = query;
+ this.identity = identity;
+ this.authenticator = authenticator;
+ }
+
+ @JsonProperty
+ public String getQuery()
+ {
+ return query;
+ }
+
+ @JsonProperty
+ public String getIdentity()
+ {
+ return identity;
+ }
+
+ @JsonProperty
+ public String getAuthenticator()
+ {
+ return authenticator;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestQueryInfo that = (TestQueryInfo) o;
+ return Objects.equals(query, that.query)
+ && Objects.equals(identity, that.identity)
+ && Objects.equals(authenticator, that.authenticator);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(query, identity, authenticator);
+ }
+ }
+
+ private static List<? extends Module> getJacksonModules()
+ {
+ return Collections.<com.fasterxml.jackson.databind.Module>singletonList(
+ new SimpleModule("TestModule").registerSubtypes(
+ new NamedType(
+ TestQueryInfo.class,
+ "test"
+ )
+ )
+ );
+ }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
index ff9c41eab9a..483ca1a09d1 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
@@ -66,7 +66,7 @@ public class SqlHttpModuleTest
binder -> {
binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMpper);
binder.bind(AuthorizerMapper.class).toInstance(new
AuthorizerMapper(Collections.emptyMap()));
- binder.bind(NativeSqlEngine.class).toProvider(Providers.of(new
NativeSqlEngine(null, null)));
+ binder.bind(NativeSqlEngine.class).toProvider(Providers.of(new
NativeSqlEngine(null, null, (SqlStatementFactory) null)));
binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance(SqlResourceTest.DUMMY_DRUID_NODE);
binder.bind(ResponseContextConfig.class).toInstance(SqlResourceTest.TEST_RESPONSE_CONTEXT_CONFIG);
binder.bind(SqlStatementFactory.class)
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index f18873668ea..dce1953a9d8 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -278,9 +278,8 @@ public class SqlResourceTest extends CalciteTestBase
stubServiceEmitter = new StubServiceEmitter("test", "test");
final AuthConfig authConfig = new AuthConfig();
final DefaultQueryConfig defaultQueryConfig = new
DefaultQueryConfig(ImmutableMap.of());
- engine = CalciteTests.createMockSqlEngine(walker, conglomerate);
final SqlToolbox sqlToolbox = new SqlToolbox(
- engine,
+ null,
plannerFactory,
stubServiceEmitter,
testRequestLogger,
@@ -297,7 +296,7 @@ public class SqlResourceTest extends CalciteTestBase
)
{
TestHttpStatement stmt = new TestHttpStatement(
- sqlToolbox,
+ sqlToolbox.withEngine(engine),
sqlQuery,
req,
validateAndAuthorizeLatchSupplier,
@@ -323,12 +322,13 @@ public class SqlResourceTest extends CalciteTestBase
throw new UnsupportedOperationException();
}
};
+ engine = CalciteTests.createMockSqlEngine(walker, conglomerate,
sqlStatementFactory);
resource = new SqlResource(
JSON_MAPPER,
CalciteTests.TEST_AUTHORIZER_MAPPER,
- sqlStatementFactory,
- lifecycleManager,
new ServerConfig(),
+ lifecycleManager,
+ new SqlEngineRegistry(Set.of(engine)),
TEST_RESPONSE_CONTEXT_CONFIG,
DUMMY_DRUID_NODE
);
@@ -402,6 +402,14 @@ public class SqlResourceTest extends CalciteTestBase
Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
}
+ @Test
+ public void test_getEnabled()
+ {
+ Response response = resource.getSupportedEngines(req);
+ Set<EngineInfo> supportedEngines = ((SupportedEnginesResponse)
response.getEntity()).getEngines();
+ Assert.assertTrue(supportedEngines.contains(new
EngineInfo(NativeSqlEngine.NAME)));
+ }
+
@Test
public void testCountStarWithMissingIntervalsContext() throws Exception
{
@@ -1632,8 +1640,6 @@ public class SqlResourceTest extends CalciteTestBase
resource = new SqlResource(
JSON_MAPPER,
CalciteTests.TEST_AUTHORIZER_MAPPER,
- sqlStatementFactory,
- lifecycleManager,
new ServerConfig()
{
@Override
@@ -1648,6 +1654,8 @@ public class SqlResourceTest extends CalciteTestBase
return new
AllowedRegexErrorResponseTransformStrategy(ImmutableList.of());
}
},
+ lifecycleManager,
+ new SqlEngineRegistry(Set.of(engine)),
TEST_RESPONSE_CONTEXT_CONFIG,
DUMMY_DRUID_NODE
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]