abhishekagarwal87 commented on a change in pull request #11566:
URL: https://github.com/apache/druid/pull/11566#discussion_r686528076



##########
File path: 
services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
##########
@@ -151,6 +157,9 @@ public AsyncQueryForwardingServlet(
     this.queryMetricsFactory = queryMetricsFactory;
     this.authenticatorMapper = authenticatorMapper;
     this.protobufTranslation = new ProtobufTranslationImpl();
+    this.routeSqlQueries = Boolean.parseBoolean(
+        properties.getProperty("druid.router.sql.enable", "false")

Review comment:
       nit: for consistency, can this be declared as a static variable? 

##########
File path: 
services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
##########
@@ -259,23 +244,22 @@ protected void service(HttpServletRequest request, 
HttpServletResponse response)
         request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
       }
       catch (IOException e) {
-        log.warn(e, "Exception parsing query");
-        final String errorMessage = e.getMessage() == null ? "no error 
message" : e.getMessage();
-        requestLogger.logNativeQuery(
-            RequestLogLine.forNative(
-                null,
-                DateTimes.nowUtc(),
-                request.getRemoteAddr(),
-                new QueryStats(ImmutableMap.of("success", false, "exception", 
errorMessage))
-            )
-        );
-        response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-        response.setContentType(MediaType.APPLICATION_JSON);
-        objectMapper.writeValue(
-            response.getOutputStream(),
-            ImmutableMap.of("error", errorMessage)
-        );
-
+        handleQueryParseException(request, response, objectMapper, e, true);
+        return;
+      }
+      catch (Exception e) {
+        handleException(response, objectMapper, e);
+        return;
+      }
+    } else if (routeSqlQueries && isSqlQueryEndpoint && 
HttpMethod.DELETE.is(method)) {
+      targetServer = hostFinder.pickDefaultServer();
+      broadcastQueryCancelRequest(request, targetServer);
+    } else if (routeSqlQueries && isSqlQueryEndpoint && 
HttpMethod.POST.is(method)) {
+      try {
+        targetServer = getTargetServerForSql(request, objectMapper);
+      }
+      catch (IOException e) {
+        handleQueryParseException(request, response, objectMapper, e, false);

Review comment:
       just wondering why would there be an IOException in getting a target 
server for sql. what could be the reasons? 

##########
File path: 
services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
##########
@@ -259,23 +244,22 @@ protected void service(HttpServletRequest request, 
HttpServletResponse response)
         request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
       }
       catch (IOException e) {
-        log.warn(e, "Exception parsing query");
-        final String errorMessage = e.getMessage() == null ? "no error 
message" : e.getMessage();
-        requestLogger.logNativeQuery(
-            RequestLogLine.forNative(
-                null,
-                DateTimes.nowUtc(),
-                request.getRemoteAddr(),
-                new QueryStats(ImmutableMap.of("success", false, "exception", 
errorMessage))
-            )
-        );
-        response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-        response.setContentType(MediaType.APPLICATION_JSON);
-        objectMapper.writeValue(
-            response.getOutputStream(),
-            ImmutableMap.of("error", errorMessage)
-        );
-
+        handleQueryParseException(request, response, objectMapper, e, true);
+        return;
+      }
+      catch (Exception e) {
+        handleException(response, objectMapper, e);
+        return;
+      }
+    } else if (routeSqlQueries && isSqlQueryEndpoint && 
HttpMethod.DELETE.is(method)) {

Review comment:
       I don't think we will ever get here. 

##########
File path: 
services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
##########
@@ -292,6 +276,99 @@ protected void service(HttpServletRequest request, 
HttpServletResponse response)
     doService(request, response);
   }
 
+  /**
+   * Issues async query cancellation requests to all Brokers (except the given
+   * targetServer). Query cancellation on the targetServer is handled by the
+   * proxy servlet.
+   */
+  private void broadcastQueryCancelRequest(HttpServletRequest request, Server 
targetServer)
+  {
+    // send query cancellation to all brokers this query may have gone to
+    // to keep the code simple, the proxy servlet will also send a request to 
the default targetServer.
+    for (final Server server : hostFinder.getAllServers()) {
+      if (server.getHost().equals(targetServer.getHost())) {
+        continue;
+      }
+
+      // issue async requests
+      Response.CompleteListener completeListener = result -> {
+        if (result.isFailed()) {
+          log.warn(
+              result.getFailure(),
+              "Failed to forward cancellation request to [%s]",
+              server.getHost()
+          );
+        }
+      };
+
+      Request broadcastReq = broadcastClient
+          .newRequest(rewriteURI(request, server.getScheme(), 
server.getHost()))
+          .method(HttpMethod.DELETE)
+          .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+      copyRequestHeaders(request, broadcastReq);
+      broadcastReq.send(completeListener);
+    }
+
+    interruptedQueryCount.incrementAndGet();
+  }
+
+  private Server getTargetServerForSql(
+      HttpServletRequest request,
+      ObjectMapper objectMapper
+  ) throws IOException
+  {
+    SqlQuery inputSqlQuery = objectMapper.readValue(request.getInputStream(), 
SqlQuery.class);
+    request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery);
+
+    return inputSqlQuery != null

Review comment:
       is it expected to be null? I would suggest logging an error if the 
sqlQuery is null. 

##########
File path: 
services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
##########
@@ -257,6 +267,37 @@ public String getDefaultServiceName()
     return new Pair<>(brokerServiceName, nodesHolder.pick());
   }
 
+  public Pair<String, Server> selectForSql(SqlQuery sqlQuery)
+  {
+    synchronized (lock) {
+      if (!ruleManager.isStarted() || !started) {
+        return getDefaultLookup();
+      }
+    }
+
+    // Resolve brokerServiceName using Tier selector strategies
+    String brokerServiceName = null;
+    for (TieredBrokerSelectorStrategy strategy : strategies) {
+      final Optional<String> optionalName = 
strategy.getBrokerServiceName(tierConfig, sqlQuery);
+      if (optionalName.isPresent()) {
+        brokerServiceName = optionalName.get();
+        break;
+      }
+    }
+
+    // Use defaut if not resolved by strategies
+    if (brokerServiceName == null) {
+      log.error(
+          "No brokerServiceName found for SQL Query [%s]. Using default 
selector for [%s].",

Review comment:
       I think this could lead to a lot of unnecessary logging. It is not 
exactly an error statement. You should maybe log an info if `debug` flag is set 
to true in the query contexts. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to