abhishekagarwal87 commented on a change in pull request #11566:
URL: https://github.com/apache/druid/pull/11566#discussion_r686528076
##########
File path:
services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
##########
@@ -151,6 +157,9 @@ public AsyncQueryForwardingServlet(
this.queryMetricsFactory = queryMetricsFactory;
this.authenticatorMapper = authenticatorMapper;
this.protobufTranslation = new ProtobufTranslationImpl();
+ this.routeSqlQueries = Boolean.parseBoolean(
+ properties.getProperty("druid.router.sql.enable", "false")
Review comment:
nit: for consistency, can this be declared as a static variable?
##########
File path:
services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
##########
@@ -259,23 +244,22 @@ protected void service(HttpServletRequest request,
HttpServletResponse response)
request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
}
catch (IOException e) {
- log.warn(e, "Exception parsing query");
- final String errorMessage = e.getMessage() == null ? "no error
message" : e.getMessage();
- requestLogger.logNativeQuery(
- RequestLogLine.forNative(
- null,
- DateTimes.nowUtc(),
- request.getRemoteAddr(),
- new QueryStats(ImmutableMap.of("success", false, "exception",
errorMessage))
- )
- );
- response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- response.setContentType(MediaType.APPLICATION_JSON);
- objectMapper.writeValue(
- response.getOutputStream(),
- ImmutableMap.of("error", errorMessage)
- );
-
+ handleQueryParseException(request, response, objectMapper, e, true);
+ return;
+ }
+ catch (Exception e) {
+ handleException(response, objectMapper, e);
+ return;
+ }
+ } else if (routeSqlQueries && isSqlQueryEndpoint &&
HttpMethod.DELETE.is(method)) {
+ targetServer = hostFinder.pickDefaultServer();
+ broadcastQueryCancelRequest(request, targetServer);
+ } else if (routeSqlQueries && isSqlQueryEndpoint &&
HttpMethod.POST.is(method)) {
+ try {
+ targetServer = getTargetServerForSql(request, objectMapper);
+ }
+ catch (IOException e) {
+ handleQueryParseException(request, response, objectMapper, e, false);
Review comment:
just wondering why would there be an IOException in getting a target
server for sql. what could be the reasons?
##########
File path:
services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
##########
@@ -259,23 +244,22 @@ protected void service(HttpServletRequest request,
HttpServletResponse response)
request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
}
catch (IOException e) {
- log.warn(e, "Exception parsing query");
- final String errorMessage = e.getMessage() == null ? "no error
message" : e.getMessage();
- requestLogger.logNativeQuery(
- RequestLogLine.forNative(
- null,
- DateTimes.nowUtc(),
- request.getRemoteAddr(),
- new QueryStats(ImmutableMap.of("success", false, "exception",
errorMessage))
- )
- );
- response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- response.setContentType(MediaType.APPLICATION_JSON);
- objectMapper.writeValue(
- response.getOutputStream(),
- ImmutableMap.of("error", errorMessage)
- );
-
+ handleQueryParseException(request, response, objectMapper, e, true);
+ return;
+ }
+ catch (Exception e) {
+ handleException(response, objectMapper, e);
+ return;
+ }
+ } else if (routeSqlQueries && isSqlQueryEndpoint &&
HttpMethod.DELETE.is(method)) {
Review comment:
I don't think we will ever get here.
##########
File path:
services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
##########
@@ -292,6 +276,99 @@ protected void service(HttpServletRequest request,
HttpServletResponse response)
doService(request, response);
}
+ /**
+ * Issues async query cancellation requests to all Brokers (except the given
+ * targetServer). Query cancellation on the targetServer is handled by the
+ * proxy servlet.
+ */
+ private void broadcastQueryCancelRequest(HttpServletRequest request, Server
targetServer)
+ {
+ // send query cancellation to all brokers this query may have gone to
+ // to keep the code simple, the proxy servlet will also send a request to
the default targetServer.
+ for (final Server server : hostFinder.getAllServers()) {
+ if (server.getHost().equals(targetServer.getHost())) {
+ continue;
+ }
+
+ // issue async requests
+ Response.CompleteListener completeListener = result -> {
+ if (result.isFailed()) {
+ log.warn(
+ result.getFailure(),
+ "Failed to forward cancellation request to [%s]",
+ server.getHost()
+ );
+ }
+ };
+
+ Request broadcastReq = broadcastClient
+ .newRequest(rewriteURI(request, server.getScheme(),
server.getHost()))
+ .method(HttpMethod.DELETE)
+ .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ copyRequestHeaders(request, broadcastReq);
+ broadcastReq.send(completeListener);
+ }
+
+ interruptedQueryCount.incrementAndGet();
+ }
+
+ private Server getTargetServerForSql(
+ HttpServletRequest request,
+ ObjectMapper objectMapper
+ ) throws IOException
+ {
+ SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(),
SqlQuery.class);
+ request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery);
+
+ return inputSqlQuery != null
Review comment:
is it expected to be null? I would suggest logging an error if the
sqlQuery is null.
##########
File path:
services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
##########
@@ -257,6 +267,37 @@ public String getDefaultServiceName()
return new Pair<>(brokerServiceName, nodesHolder.pick());
}
+ public Pair<String, Server> selectForSql(SqlQuery sqlQuery)
+ {
+ synchronized (lock) {
+ if (!ruleManager.isStarted() || !started) {
+ return getDefaultLookup();
+ }
+ }
+
+ // Resolve brokerServiceName using Tier selector strategies
+ String brokerServiceName = null;
+ for (TieredBrokerSelectorStrategy strategy : strategies) {
+ final Optional<String> optionalName =
strategy.getBrokerServiceName(tierConfig, sqlQuery);
+ if (optionalName.isPresent()) {
+ brokerServiceName = optionalName.get();
+ break;
+ }
+ }
+
+ // Use defaut if not resolved by strategies
+ if (brokerServiceName == null) {
+ log.error(
+ "No brokerServiceName found for SQL Query [%s]. Using default
selector for [%s].",
Review comment:
I think this could lead to a lot of unnecessary logging. It is not
exactly an error statement. You should maybe log an info if `debug` flag is set
to true in the query contexts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]