This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new aaf0aaa  Enable routing of SQL queries at Router (#11566)
aaf0aaa is described below

commit aaf0aaad8f3936358ab77415df7dcc68d3ee7800
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Aug 13 18:44:39 2021 +0530

    Enable routing of SQL queries at Router (#11566)
    
    This PR adds a new property druid.router.sql.enable which allows the
    Router to handle SQL queries when set to true.
    
    This change does not affect Avatica JDBC requests and they are still routed
    by hashing the Connection ID.
    
    To allow parsing of the request object as a SqlQuery (contained in module 
druid-sql),
    some classes have been moved from druid-server to druid-services with
    the same package name.
---
 docs/configuration/index.md                        |   1 +
 docs/design/router.md                              |  13 ++
 extensions-core/hdfs-storage/pom.xml               |   5 +
 .../java/org/apache/druid/query/QueryContexts.java |   9 +-
 .../org/apache/druid/query/QueryContextsTest.java  |  29 ++--
 server/pom.xml                                     |   4 -
 services/pom.xml                                   |  74 ++++++++
 .../druid/server/AsyncQueryForwardingServlet.java  | 186 ++++++++++++++-------
 .../apache/druid/server/http/RouterResource.java   |   0
 .../server/router/CoordinatorRuleManager.java      |   0
 .../JavaScriptTieredBrokerSelectorStrategy.java    |   0
 .../router/ManualTieredBrokerSelectorStrategy.java |  21 ++-
 .../PriorityTieredBrokerSelectorStrategy.java      |   0
 .../druid/server/router/QueryHostFinder.java       |  62 +++----
 .../druid/server/router/TieredBrokerConfig.java    |   0
 .../server/router/TieredBrokerHostSelector.java    |  53 +++++-
 .../TieredBrokerSelectorStrategiesProvider.java    |   0
 .../router/TieredBrokerSelectorStrategy.java       |  23 +++
 .../TimeBoundaryTieredBrokerSelectorStrategy.java  |   0
 .../server/AsyncQueryForwardingServletTest.java    |  87 +++++++---
 .../http/security/SecurityResourceFilterTest.java  |   0
 .../server/router/CoordinatorRuleManagerTest.java  |   0
 ...JavaScriptTieredBrokerSelectorStrategyTest.java |   0
 .../ManualTieredBrokerSelectorStrategyTest.java    |  52 ++++++
 .../druid/server/router/QueryHostFinderTest.java   |   0
 .../router/TieredBrokerHostSelectorTest.java       |  40 +++++
 26 files changed, 518 insertions(+), 141 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 251f5c4..b724e3c 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -2052,6 +2052,7 @@ Supported query contexts:
 |`druid.router.tierToBrokerMap`|Queries for a certain tier of data are routed 
to their appropriate Broker. This value should be an ordered JSON map of tiers 
to Broker names. The priority of Brokers is based on the 
ordering.|{"_default_tier": "<defaultBrokerServiceName>"}|
 |`druid.router.defaultRule`|The default rule for all datasources.|"_default"|
 |`druid.router.pollPeriod`|How often to poll for new rules.|PT1M|
+|`druid.router.sql.enable`|Enable routing of SQL queries. Possible values are 
`true` and `false`. When set to `true`, the Router uses the provided strategies 
to determine the broker service for a given SQL query.|`false`|
 |`druid.router.strategies`|Please see [Router 
Strategies](../design/router.md#router-strategies) for 
details.|[{"type":"timeBoundary"},{"type":"priority"}]|
 |`druid.router.avatica.balancer.type`|Class to use for balancing Avatica 
queries across Brokers. Please see [Avatica Query 
Balancing](../design/router.md#avatica-query-balancing).|rendezvousHash|
 |`druid.router.managementProxy.enabled`|Enables the Router's [management 
proxy](../design/router.md#router-as-management-proxy) functionality.|false|
diff --git a/docs/design/router.md b/docs/design/router.md
index c5a3b8d..06d1e1f 100644
--- a/docs/design/router.md
+++ b/docs/design/router.md
@@ -112,6 +112,7 @@ Queries with a priority set to less than minPriority are 
routed to the lowest pr
 #### manual
 
 This strategy reads the parameter `brokerService` from the query context and 
routes the query to that broker service. If no valid `brokerService` is 
specified in the query context, the field `defaultManualBrokerService` is used 
to determine target broker service given the value is valid and non-null. A 
value is considered valid if it is present in `druid.router.tierToBrokerMap`
+This strategy can route both Native and SQL queries (when enabled).
 
 *Example*: A strategy that routes queries to the Broker "druid:broker-hot" if 
no valid `brokerService` is found in the query context.
 
@@ -137,6 +138,18 @@ Allows defining arbitrary routing rules using a JavaScript 
function. The functio
 
 > JavaScript-based functionality is disabled by default. Please refer to the 
 > Druid [JavaScript programming guide](../development/javascript.md) for 
 > guidelines about using Druid's JavaScript functionality, including 
 > instructions on how to enable it.
 
+### Routing of SQL queries
+
+To enable routing of SQL queries, set `druid.router.sql.enable` to `true` 
(`false` by default). The broker service for a
+given SQL query is resolved using only the provided Router strategies. If not 
resolved using any of the strategies, the
+Router uses the `defaultBrokerServiceName`. This behavior is slightly 
different from native queries where the Router
+first tries to resolve the broker service using strategies, then load rules 
and finally using the `defaultBrokerServiceName`
+if still not resolved.
+
+Routing of native queries is always enabled.
+
+Setting `druid.router.sql.enable` does not affect Avatica JDBC requests. They 
are routed based on connection ID as
+explained in the next section.
 
 ### Avatica query balancing
 
diff --git a/extensions-core/hdfs-storage/pom.xml 
b/extensions-core/hdfs-storage/pom.xml
index d16712d..47a9ec7 100644
--- a/extensions-core/hdfs-storage/pom.xml
+++ b/extensions-core/hdfs-storage/pom.xml
@@ -378,6 +378,11 @@
           <scope>test</scope>
         </dependency>
         <dependency>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
           <groupId>org.apache.druid</groupId>
           <artifactId>druid-server</artifactId>
           <version>${project.parent.version}</version>
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java 
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 4a293e5..dd58dfc 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -329,6 +329,11 @@ public class QueryContexts
     return parseBoolean(query, ENABLE_DEBUG, DEFAULT_ENABLE_DEBUG);
   }
 
+  public static boolean isDebug(Map<String, Object> queryContext)
+  {
+    return parseBoolean(queryContext, ENABLE_DEBUG, DEFAULT_ENABLE_DEBUG);
+  }
+
   public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long 
maxScatterGatherBytesLimit)
   {
     Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);
@@ -418,9 +423,9 @@ public class QueryContexts
     return query.getContextBoolean(RETURN_PARTIAL_RESULTS_KEY, defaultValue);
   }
 
-  public static <T> String getBrokerServiceName(Query<T> query)
+  public static String getBrokerServiceName(Map<String, Object> queryContext)
   {
-    return query.getContextValue(BROKER_SERVICE_NAME);
+    return queryContext == null ? null : (String) 
queryContext.get(BROKER_SERVICE_NAME);
   }
 
   static <T> long parseLong(Query<T> query, String key, long defaultValue)
diff --git 
a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java 
b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java
index 764a81a..3b693aa 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryContextsTest.java
@@ -30,6 +30,7 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.HashMap;
+import java.util.Map;
 
 public class QueryContextsTest
 {
@@ -149,33 +150,21 @@ public class QueryContextsTest
   @Test
   public void testGetBrokerServiceName()
   {
-    Query<?> query = new TestQuery(
-        new TableDataSource("test"),
-        new 
MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
-        false,
-        new HashMap<>()
-    );
-
-    Assert.assertNull(QueryContexts.getBrokerServiceName(query));
+    Map<String, Object> queryContext = new HashMap<>();
+    Assert.assertNull(QueryContexts.getBrokerServiceName(queryContext));
 
-    query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, "hotBroker");
-    Assert.assertEquals("hotBroker", 
QueryContexts.getBrokerServiceName(query));
+    queryContext.put(QueryContexts.BROKER_SERVICE_NAME, "hotBroker");
+    Assert.assertEquals("hotBroker", 
QueryContexts.getBrokerServiceName(queryContext));
   }
 
   @Test
   public void testGetBrokerServiceName_withNonStringValue()
   {
-    Query<?> query = new TestQuery(
-        new TableDataSource("test"),
-        new 
MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
-        false,
-        new HashMap<>()
-    );
-
-    query.getContext().put(QueryContexts.BROKER_SERVICE_NAME, 100);
+    Map<String, Object> queryContext = new HashMap<>();
+    queryContext.put(QueryContexts.BROKER_SERVICE_NAME, 100);
 
     exception.expect(ClassCastException.class);
-    QueryContexts.getBrokerServiceName(query);
+    QueryContexts.getBrokerServiceName(queryContext);
   }
 
   @Test
@@ -188,6 +177,7 @@ public class QueryContextsTest
         ImmutableMap.of()
     );
     Assert.assertFalse(QueryContexts.isDebug(query));
+    Assert.assertFalse(QueryContexts.isDebug(query.getContext()));
   }
 
   @Test
@@ -200,5 +190,6 @@ public class QueryContextsTest
         ImmutableMap.of(QueryContexts.ENABLE_DEBUG, true)
     );
     Assert.assertTrue(QueryContexts.isDebug(query));
+    Assert.assertTrue(QueryContexts.isDebug(query.getContext()));
   }
 }
diff --git a/server/pom.xml b/server/pom.xml
index a9569a3..677866f 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -323,10 +323,6 @@
             <groupId>com.fasterxml.jackson.module</groupId>
             <artifactId>jackson-module-guice</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.calcite.avatica</groupId>
-            <artifactId>avatica-core</artifactId>
-        </dependency>
 
         <!-- Tests -->
         <dependency>
diff --git a/services/pom.xml b/services/pom.xml
index d37cc1d..1b80915 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -82,6 +82,18 @@
         </dependency>
         <dependency>
             <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-http</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-proxy</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-server</artifactId>
         </dependency>
         <dependency>
@@ -89,6 +101,10 @@
             <artifactId>curator-framework</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.calcite.avatica</groupId>
+            <artifactId>avatica-core</artifactId>
+        </dependency>
+        <dependency>
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
         </dependency>
@@ -98,13 +114,37 @@
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-smile-provider</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.opencsv</groupId>
             <artifactId>opencsv</artifactId>
         </dependency>
         <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.errorprone</groupId>
+            <artifactId>error_prone_annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>jsr311-api</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.eclipse.jetty</groupId>
             <artifactId>jetty-servlet</artifactId>
         </dependency>
@@ -113,6 +153,10 @@
             <artifactId>jetty-rewrite</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-server</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.inject.extensions</groupId>
             <artifactId>guice-multibindings</artifactId>
         </dependency>
@@ -126,6 +170,10 @@
         </dependency>
         <dependency>
             <groupId>io.netty</groupId>
+            <artifactId>netty</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
             <artifactId>netty-common</artifactId>
         </dependency>
         <dependency>
@@ -162,6 +210,27 @@
         </dependency>
         <!-- Test Dependencies -->
         <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-core</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-server</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid</groupId>
+            <artifactId>druid-processing</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
@@ -176,6 +245,11 @@
             <artifactId>hamcrest-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
similarity index 81%
rename from 
server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
rename to 
services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index 253fe1d..0a2da62 100644
--- 
a/server/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++ 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -52,6 +52,7 @@ import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.Authenticator;
 import org.apache.druid.server.security.AuthenticatorMapper;
+import org.apache.druid.sql.http.SqlQuery;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.client.api.Request;
 import org.eclipse.jetty.client.api.Response;
@@ -68,6 +69,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response.Status;
 import java.io.IOException;
 import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -88,8 +90,12 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
   private static final String SCHEME_ATTRIBUTE = 
"org.apache.druid.proxy.to.host.scheme";
   private static final String QUERY_ATTRIBUTE = "org.apache.druid.proxy.query";
   private static final String AVATICA_QUERY_ATTRIBUTE = 
"org.apache.druid.proxy.avaticaQuery";
+  private static final String SQL_QUERY_ATTRIBUTE = 
"org.apache.druid.proxy.sqlQuery";
   private static final String OBJECTMAPPER_ATTRIBUTE = 
"org.apache.druid.proxy.objectMapper";
 
+  private static final String PROPERTY_SQL_ENABLE = "druid.router.sql.enable";
+  private static final String PROPERTY_SQL_ENABLE_DEFAULT = "false";
+
   private static final int CANCELLATION_TIMEOUT_MILLIS = 500;
 
   private final AtomicLong successfulQueryCount = new AtomicLong();
@@ -124,6 +130,8 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
   private final AuthenticatorMapper authenticatorMapper;
   private final ProtobufTranslation protobufTranslation;
 
+  private final boolean routeSqlQueries;
+
   private HttpClient broadcastClient;
 
   @Inject
@@ -137,7 +145,8 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
       ServiceEmitter emitter,
       RequestLogger requestLogger,
       GenericQueryMetricsFactory queryMetricsFactory,
-      AuthenticatorMapper authenticatorMapper
+      AuthenticatorMapper authenticatorMapper,
+      Properties properties
   )
   {
     this.warehouse = warehouse;
@@ -151,6 +160,9 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
     this.queryMetricsFactory = queryMetricsFactory;
     this.authenticatorMapper = authenticatorMapper;
     this.protobufTranslation = new ProtobufTranslationImpl();
+    this.routeSqlQueries = Boolean.parseBoolean(
+        properties.getProperty(PROPERTY_SQL_ENABLE, 
PROPERTY_SQL_ENABLE_DEFAULT)
+    );
   }
 
   @Override
@@ -195,7 +207,8 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
 
     // The Router does not have the ability to look inside SQL queries and 
route them intelligently, so just treat
     // them as a generic request.
-    final boolean isQueryEndpoint = requestURI.startsWith("/druid/v2") && 
!requestURI.startsWith("/druid/v2/sql");
+    final boolean isNativeQueryEndpoint = requestURI.startsWith("/druid/v2") 
&& !requestURI.startsWith("/druid/v2/sql");
+    final boolean isSqlQueryEndpoint = requestURI.startsWith("/druid/v2/sql");
 
     final boolean isAvaticaJson = 
requestURI.startsWith("/druid/v2/sql/avatica");
     final boolean isAvaticaPb = 
requestURI.startsWith("/druid/v2/sql/avatica-protobuf");
@@ -215,36 +228,11 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
       targetServer = hostFinder.findServerAvatica(connectionId);
       byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap);
       request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
-    } else if (isQueryEndpoint && HttpMethod.DELETE.is(method)) {
+    } else if (isNativeQueryEndpoint && HttpMethod.DELETE.is(method)) {
       // query cancellation request
       targetServer = hostFinder.pickDefaultServer();
-
-      for (final Server server : hostFinder.getAllServers()) {
-        // send query cancellation to all brokers this query may have gone to
-        // to keep the code simple, the proxy servlet will also send a request 
to the default targetServer.
-        if (!server.getHost().equals(targetServer.getHost())) {
-          // issue async requests
-          Response.CompleteListener completeListener = result -> {
-            if (result.isFailed()) {
-              log.warn(
-                  result.getFailure(),
-                  "Failed to forward cancellation request to [%s]",
-                  server.getHost()
-              );
-            }
-          };
-
-          Request broadcastReq = broadcastClient
-              .newRequest(rewriteURI(request, server.getScheme(), 
server.getHost()))
-              .method(HttpMethod.DELETE)
-              .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-
-          copyRequestHeaders(request, broadcastReq);
-          broadcastReq.send(completeListener);
-        }
-        interruptedQueryCount.incrementAndGet();
-      }
-    } else if (isQueryEndpoint && HttpMethod.POST.is(method)) {
+      broadcastQueryCancelRequest(request, targetServer);
+    } else if (isNativeQueryEndpoint && HttpMethod.POST.is(method)) {
       // query request
       try {
         Query inputQuery = objectMapper.readValue(request.getInputStream(), 
Query.class);
@@ -259,23 +247,21 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
         request.setAttribute(QUERY_ATTRIBUTE, inputQuery);
       }
       catch (IOException e) {
-        log.warn(e, "Exception parsing query");
-        final String errorMessage = e.getMessage() == null ? "no error 
message" : e.getMessage();
-        requestLogger.logNativeQuery(
-            RequestLogLine.forNative(
-                null,
-                DateTimes.nowUtc(),
-                request.getRemoteAddr(),
-                new QueryStats(ImmutableMap.of("success", false, "exception", 
errorMessage))
-            )
-        );
-        response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-        response.setContentType(MediaType.APPLICATION_JSON);
-        objectMapper.writeValue(
-            response.getOutputStream(),
-            ImmutableMap.of("error", errorMessage)
-        );
-
+        handleQueryParseException(request, response, objectMapper, e, true);
+        return;
+      }
+      catch (Exception e) {
+        handleException(response, objectMapper, e);
+        return;
+      }
+    } else if (routeSqlQueries && isSqlQueryEndpoint && 
HttpMethod.POST.is(method)) {
+      try {
+        SqlQuery inputSqlQuery = 
objectMapper.readValue(request.getInputStream(), SqlQuery.class);
+        request.setAttribute(SQL_QUERY_ATTRIBUTE, inputSqlQuery);
+        targetServer = hostFinder.findServerSql(inputSqlQuery);
+      }
+      catch (IOException e) {
+        handleQueryParseException(request, response, objectMapper, e, false);
         return;
       }
       catch (Exception e) {
@@ -292,6 +278,86 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
     doService(request, response);
   }
 
+  /**
+   * Issues async query cancellation requests to all Brokers (except the given
+   * targetServer). Query cancellation on the targetServer is handled by the
+   * proxy servlet.
+   */
+  private void broadcastQueryCancelRequest(HttpServletRequest request, Server 
targetServer)
+  {
+    // send query cancellation to all brokers this query may have gone to
+    // to keep the code simple, the proxy servlet will also send a request to 
the default targetServer.
+    for (final Server server : hostFinder.getAllServers()) {
+      if (server.getHost().equals(targetServer.getHost())) {
+        continue;
+      }
+
+      // issue async requests
+      Response.CompleteListener completeListener = result -> {
+        if (result.isFailed()) {
+          log.warn(
+              result.getFailure(),
+              "Failed to forward cancellation request to [%s]",
+              server.getHost()
+          );
+        }
+      };
+
+      Request broadcastReq = broadcastClient
+          .newRequest(rewriteURI(request, server.getScheme(), 
server.getHost()))
+          .method(HttpMethod.DELETE)
+          .timeout(CANCELLATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+      copyRequestHeaders(request, broadcastReq);
+      broadcastReq.send(completeListener);
+    }
+
+    interruptedQueryCount.incrementAndGet();
+  }
+
+  private void handleQueryParseException(
+      HttpServletRequest request,
+      HttpServletResponse response,
+      ObjectMapper objectMapper,
+      IOException parseException,
+      boolean isNativeQuery
+  ) throws IOException
+  {
+    log.warn(parseException, "Exception parsing query");
+
+    // Log the error message
+    final String errorMessage = parseException.getMessage() == null
+                                ? "no error message" : 
parseException.getMessage();
+    if (isNativeQuery) {
+      requestLogger.logNativeQuery(
+          RequestLogLine.forNative(
+              null,
+              DateTimes.nowUtc(),
+              request.getRemoteAddr(),
+              new QueryStats(ImmutableMap.of("success", false, "exception", 
errorMessage))
+          )
+      );
+    } else {
+      requestLogger.logSqlQuery(
+          RequestLogLine.forSql(
+              null,
+              null,
+              DateTimes.nowUtc(),
+              request.getRemoteAddr(),
+              new QueryStats(ImmutableMap.of("success", false, "exception", 
errorMessage))
+          )
+      );
+    }
+
+    // Write to the response
+    response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+    response.setContentType(MediaType.APPLICATION_JSON);
+    objectMapper.writeValue(
+        response.getOutputStream(),
+        ImmutableMap.of("error", errorMessage)
+    );
+  }
+
   protected void doService(
       HttpServletRequest request,
       HttpServletResponse response
@@ -317,16 +383,11 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
     }
 
     final Query query = (Query) clientRequest.getAttribute(QUERY_ATTRIBUTE);
+    final SqlQuery sqlQuery = (SqlQuery) 
clientRequest.getAttribute(SQL_QUERY_ATTRIBUTE);
     if (query != null) {
-      final ObjectMapper objectMapper = (ObjectMapper) 
clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE);
-      try {
-        byte[] bytes = objectMapper.writeValueAsBytes(query);
-        proxyRequest.content(new BytesContentProvider(bytes));
-        proxyRequest.getHeaders().put(HttpHeader.CONTENT_LENGTH, 
String.valueOf(bytes.length));
-      }
-      catch (JsonProcessingException e) {
-        throw new RuntimeException(e);
-      }
+      setProxyRequestContent(proxyRequest, clientRequest, query);
+    } else if (sqlQuery != null) {
+      setProxyRequestContent(proxyRequest, clientRequest, sqlQuery);
     }
 
     // Since we can't see the request object on the remote side, we can't 
check whether the remote side actually
@@ -358,6 +419,19 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
     );
   }
 
+  private void setProxyRequestContent(Request proxyRequest, HttpServletRequest 
clientRequest, Object content)
+  {
+    final ObjectMapper objectMapper = (ObjectMapper) 
clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE);
+    try {
+      byte[] bytes = objectMapper.writeValueAsBytes(content);
+      proxyRequest.content(new BytesContentProvider(bytes));
+      proxyRequest.getHeaders().put(HttpHeader.CONTENT_LENGTH, 
String.valueOf(bytes.length));
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Override
   protected Response.Listener newProxyResponseListener(HttpServletRequest 
request, HttpServletResponse response)
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/http/RouterResource.java 
b/services/src/main/java/org/apache/druid/server/http/RouterResource.java
similarity index 100%
rename from 
server/src/main/java/org/apache/druid/server/http/RouterResource.java
rename to 
services/src/main/java/org/apache/druid/server/http/RouterResource.java
diff --git 
a/server/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java
 
b/services/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java
similarity index 100%
rename from 
server/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java
rename to 
services/src/main/java/org/apache/druid/server/router/CoordinatorRuleManager.java
diff --git 
a/server/src/main/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategy.java
 
b/services/src/main/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategy.java
similarity index 100%
rename from 
server/src/main/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategy.java
rename to 
services/src/main/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategy.java
diff --git 
a/server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java
 
b/services/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java
similarity index 87%
rename from 
server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java
rename to 
services/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java
index 2f1d45d..c16ec0c 100644
--- 
a/server/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java
+++ 
b/services/src/main/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategy.java
@@ -27,8 +27,10 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryContexts;
+import org.apache.druid.sql.http.SqlQuery;
 
 import javax.annotation.Nullable;
+import java.util.Map;
 
 /**
  * Implementation of {@link TieredBrokerSelectorStrategy} which uses the 
parameter
@@ -58,8 +60,25 @@ public class ManualTieredBrokerSelectorStrategy implements 
TieredBrokerSelectorS
   @Override
   public Optional<String> getBrokerServiceName(TieredBrokerConfig tierConfig, 
Query query)
   {
+    return getBrokerServiceName(tierConfig, query.getContext());
+  }
+
+  @Override
+  public Optional<String> getBrokerServiceName(TieredBrokerConfig config, 
SqlQuery sqlQuery)
+  {
+    return getBrokerServiceName(config, sqlQuery.getContext());
+  }
+
+  /**
+   * Determines the Broker service name from the given query context.
+   */
+  private Optional<String> getBrokerServiceName(
+      TieredBrokerConfig tierConfig,
+      Map<String, Object> queryContext
+  )
+  {
     try {
-      final String contextBrokerService = 
QueryContexts.getBrokerServiceName(query);
+      final String contextBrokerService = 
QueryContexts.getBrokerServiceName(queryContext);
 
       if (isValidBrokerService(contextBrokerService, tierConfig)) {
         // If the broker service in the query context is valid, use that
diff --git 
a/server/src/main/java/org/apache/druid/server/router/PriorityTieredBrokerSelectorStrategy.java
 
b/services/src/main/java/org/apache/druid/server/router/PriorityTieredBrokerSelectorStrategy.java
similarity index 100%
rename from 
server/src/main/java/org/apache/druid/server/router/PriorityTieredBrokerSelectorStrategy.java
rename to 
services/src/main/java/org/apache/druid/server/router/PriorityTieredBrokerSelectorStrategy.java
diff --git 
a/server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java 
b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
similarity index 78%
rename from 
server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
rename to 
services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
index 0494f92..6a7a078 100644
--- a/server/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
+++ b/services/src/main/java/org/apache/druid/server/router/QueryHostFinder.java
@@ -25,9 +25,9 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.Query;
+import org.apache.druid.sql.http.SqlQuery;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -66,21 +66,20 @@ public class QueryHostFinder
 
   public Collection<Server> getAllServers()
   {
-    return ((Collection<List<Server>>) 
hostSelector.getAllBrokers().values()).stream()
-                                                                             
.flatMap(Collection::stream)
-                                                                             
.collect(Collectors.toList());
+    return hostSelector.getAllBrokers().values().stream()
+                       .flatMap(Collection::stream)
+                       .collect(Collectors.toList());
   }
 
   public Server findServerAvatica(String connectionId)
   {
     Server chosenServer = 
avaticaConnectionBalancer.pickServer(getAllServers(), connectionId);
-    if (chosenServer == null) {
-      log.makeAlert(
-          "Catastrophic failure! No servers found at all! Failing request!"
-      ).emit();
+    assertServerFound(
+        chosenServer,
+        "No server found for Avatica request with connectionId[%s]",
+        connectionId
+    );
 
-      throw new ISE("No server found for Avatica request with 
connectionId[%s]", connectionId);
-    }
     log.debug(
         "Balancer class [%s] sending request with connectionId[%s] to server: 
%s",
         avaticaConnectionBalancer.getClass(),
@@ -90,35 +89,24 @@ public class QueryHostFinder
     return chosenServer;
   }
 
+  public Server findServerSql(SqlQuery sqlQuery)
+  {
+    Server server = findServerInner(hostSelector.selectForSql(sqlQuery));
+    assertServerFound(server, "No server found for SQL Query [%s]", "SELECT 
IT");
+    return server;
+  }
+
   public <T> Server pickServer(Query<T> query)
   {
     Server server = findServer(query);
-
-    if (server == null) {
-      log.makeAlert(
-          "Catastrophic failure! No servers found at all! Failing request!"
-      ).emit();
-
-      throw new ISE("No server found for query[%s]", query);
-    }
-
-    log.debug("Selected [%s]", server.getHost());
-
+    assertServerFound(server, "No server found for query[%s]", query);
     return server;
   }
 
   public Server pickDefaultServer()
   {
     Server server = findDefaultServer();
-
-    if (server == null) {
-      log.makeAlert(
-          "Catastrophic failure! No servers found at all! Failing request!"
-      ).emit();
-
-      throw new ISE("No default server found!");
-    }
-
+    assertServerFound(server, "No default server found!");
     return server;
   }
 
@@ -155,4 +143,18 @@ public class QueryHostFinder
 
     return server;
   }
+
+  private void assertServerFound(Server server, String messageFormat, 
Object... args)
+  {
+    if (server != null) {
+      log.debug("Selected [%s]", server.getHost());
+      return;
+    }
+
+    log.makeAlert(
+        "Catastrophic failure! No servers found at all! Failing request!"
+    ).emit();
+
+    throw new ISE(messageFormat, args);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java 
b/services/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java
similarity index 100%
rename from 
server/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java
rename to 
services/src/main/java/org/apache/druid/server/router/TieredBrokerConfig.java
diff --git 
a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
 
b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
similarity index 85%
rename from 
server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
rename to 
services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
index a15aa5b..ae03665 100644
--- 
a/server/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
+++ 
b/services/src/main/java/org/apache/druid/server/router/TieredBrokerHostSelector.java
@@ -36,8 +36,10 @@ import 
org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryContexts;
 import org.apache.druid.server.coordinator.rules.LoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.sql.http.SqlQuery;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -50,7 +52,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  */
-public class TieredBrokerHostSelector<T>
+public class TieredBrokerHostSelector
 {
   private static EmittingLogger log = new 
EmittingLogger(TieredBrokerHostSelector.class);
 
@@ -181,7 +183,7 @@ public class TieredBrokerHostSelector<T>
     return tierConfig.getDefaultBrokerServiceName();
   }
 
-  public Pair<String, Server> select(final Query<T> query)
+  public <T> Pair<String, Server> select(final Query<T> query)
   {
     synchronized (lock) {
       if (!ruleManager.isStarted() || !started) {
@@ -243,8 +245,17 @@ public class TieredBrokerHostSelector<T>
       brokerServiceName = tierConfig.getDefaultBrokerServiceName();
     }
 
-    NodesHolder nodesHolder = servers.get(brokerServiceName);
+    return getServerPair(brokerServiceName);
+  }
 
+  /**
+   * Finds a server for the given brokerServiceName and returns a pair 
containing
+   * the brokerServiceName and the found server. Uses the default broker 
service
+   * if no server is found for the given brokerServiceName.
+   */
+  private Pair<String, Server> getServerPair(String brokerServiceName)
+  {
+    NodesHolder nodesHolder = servers.get(brokerServiceName);
     if (nodesHolder == null) {
       log.error(
           "No nodesHolder found for brokerServiceName[%s]. Using default 
selector for[%s]",
@@ -257,6 +268,42 @@ public class TieredBrokerHostSelector<T>
     return new Pair<>(brokerServiceName, nodesHolder.pick());
   }
 
+  public Pair<String, Server> selectForSql(SqlQuery sqlQuery)
+  {
+    synchronized (lock) {
+      if (!started) {
+        return getDefaultLookup();
+      }
+    }
+
+    // Resolve brokerServiceName using Tier selector strategies
+    String brokerServiceName = null;
+    for (TieredBrokerSelectorStrategy strategy : strategies) {
+      final Optional<String> optionalName = 
strategy.getBrokerServiceName(tierConfig, sqlQuery);
+      if (optionalName.isPresent()) {
+        brokerServiceName = optionalName.get();
+        break;
+      }
+    }
+
+    // Use defaut if not resolved by strategies
+    if (brokerServiceName == null) {
+      brokerServiceName = tierConfig.getDefaultBrokerServiceName();
+
+      // Log if query debugging is enabled
+      if (QueryContexts.isDebug(sqlQuery.getContext())) {
+        log.info(
+            "No brokerServiceName found for SQL Query [%s], Context [%s]. 
Using default selector for [%s].",
+            sqlQuery.getQuery(),
+            sqlQuery.getContext(),
+            tierConfig.getDefaultBrokerServiceName()
+        );
+      }
+    }
+
+    return getServerPair(brokerServiceName);
+  }
+
   public Pair<String, Server> getDefaultLookup()
   {
     final String brokerServiceName = tierConfig.getDefaultBrokerServiceName();
diff --git 
a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategiesProvider.java
 
b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategiesProvider.java
similarity index 100%
rename from 
server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategiesProvider.java
rename to 
services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategiesProvider.java
diff --git 
a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java
 
b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java
similarity index 65%
rename from 
server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java
rename to 
services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java
index 06f3a98..aee4ef8 100644
--- 
a/server/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java
+++ 
b/services/src/main/java/org/apache/druid/server/router/TieredBrokerSelectorStrategy.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.google.common.base.Optional;
 import org.apache.druid.query.Query;
+import org.apache.druid.sql.http.SqlQuery;
 
 /**
  */
@@ -36,5 +37,27 @@ import org.apache.druid.query.Query;
 
 public interface TieredBrokerSelectorStrategy
 {
+
+  /**
+   * Tries to determine the name of the Broker service to which the given 
native
+   * query should be routed.
+   *
+   * @param config Config containing tier to broker service map
+   * @param query  Native (JSON) query to be routed
+   * @return An empty Optional if the service name could not be determined.
+   */
   Optional<String> getBrokerServiceName(TieredBrokerConfig config, Query 
query);
+
+  /**
+   * Tries to determine the name of the Broker service to which the given 
SqlQuery
+   * should be routed. The default implementation returns an empty Optional.
+   *
+   * @param config   Config containing tier to broker service map
+   * @param sqlQuery SQL query to be routed
+   * @return An empty Optional if the service name could not be determined.
+   */
+  default Optional<String> getBrokerServiceName(TieredBrokerConfig config, 
SqlQuery sqlQuery)
+  {
+    return Optional.absent();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java
 
b/services/src/main/java/org/apache/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java
similarity index 100%
rename from 
server/src/main/java/org/apache/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java
rename to 
services/src/main/java/org/apache/druid/server/router/TimeBoundaryTieredBrokerSelectorStrategy.java
diff --git 
a/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
 
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
similarity index 88%
rename from 
server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
rename to 
services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
index b7767be..9a92289 100644
--- 
a/server/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
+++ 
b/services/src/test/java/org/apache/druid/server/AsyncQueryForwardingServletTest.java
@@ -64,6 +64,8 @@ import org.apache.druid.server.security.AllowAllAuthorizer;
 import org.apache.druid.server.security.AuthenticatorMapper;
 import org.apache.druid.server.security.Authorizer;
 import org.apache.druid.server.security.AuthorizerMapper;
+import org.apache.druid.sql.http.ResultFormat;
+import org.apache.druid.sql.http.SqlQuery;
 import org.easymock.EasyMock;
 import org.eclipse.jetty.client.HttpClient;
 import org.eclipse.jetty.server.Handler;
@@ -89,6 +91,7 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.Deflater;
@@ -191,9 +194,22 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
   }
 
   @Test
+  public void testSqlQueryProxy() throws Exception
+  {
+    final SqlQuery query = new SqlQuery("SELECT * FROM foo", 
ResultFormat.ARRAY, false, null, null);
+    final QueryHostFinder hostFinder = 
EasyMock.createMock(QueryHostFinder.class);
+    EasyMock.expect(hostFinder.findServerSql(query))
+            .andReturn(new TestServer("http", "1.2.3.4", 9999)).once();
+    EasyMock.replay(hostFinder);
+
+    Properties properties = new Properties();
+    properties.setProperty("druid.router.sql.enable", "true");
+    verifyServletCallsForQuery(query, true, hostFinder, properties);
+  }
+
+  @Test
   public void testQueryProxy() throws Exception
   {
-    final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
     final TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
                                         .dataSource("foo")
                                         .intervals("2000/P1D")
@@ -205,6 +221,20 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
     EasyMock.expect(hostFinder.pickServer(query)).andReturn(new 
TestServer("http", "1.2.3.4", 9999)).once();
     EasyMock.replay(hostFinder);
 
+    verifyServletCallsForQuery(query, false, hostFinder, new Properties());
+  }
+
+  /**
+   * Verifies that the Servlet calls the right methods the right number of 
times.
+   */
+  private void verifyServletCallsForQuery(
+      Object query,
+      boolean isSql,
+      QueryHostFinder hostFinder,
+      Properties properties
+  ) throws Exception
+  {
+    final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
     final HttpServletRequest requestMock = 
EasyMock.createMock(HttpServletRequest.class);
     final ByteArrayInputStream inputStream = new 
ByteArrayInputStream(jsonMapper.writeValueAsBytes(query));
     final ServletInputStream servletInputStream = new ServletInputStream()
@@ -242,10 +272,13 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
     
EasyMock.expect(requestMock.getContentType()).andReturn("application/json").times(2);
     requestMock.setAttribute("org.apache.druid.proxy.objectMapper", 
jsonMapper);
     EasyMock.expectLastCall();
-    EasyMock.expect(requestMock.getRequestURI()).andReturn("/druid/v2/");
+    EasyMock.expect(requestMock.getRequestURI()).andReturn(isSql ? 
"/druid/v2/sql" : "/druid/v2/");
     EasyMock.expect(requestMock.getMethod()).andReturn("POST");
     
EasyMock.expect(requestMock.getInputStream()).andReturn(servletInputStream);
-    requestMock.setAttribute("org.apache.druid.proxy.query", query);
+    requestMock.setAttribute(
+        isSql ? "org.apache.druid.proxy.sqlQuery" : 
"org.apache.druid.proxy.query",
+        query
+    );
     requestMock.setAttribute("org.apache.druid.proxy.to.host", "1.2.3.4:9999");
     requestMock.setAttribute("org.apache.druid.proxy.to.host.scheme", "http");
     EasyMock.expectLastCall();
@@ -262,7 +295,8 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
         new NoopServiceEmitter(),
         new NoopRequestLogger(),
         new DefaultGenericQueryMetricsFactory(),
-        new AuthenticatorMapper(ImmutableMap.of())
+        new AuthenticatorMapper(ImmutableMap.of()),
+        properties
     )
     {
       @Override
@@ -354,7 +388,8 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
               new NoopServiceEmitter(),
               new NoopRequestLogger(),
               new DefaultGenericQueryMetricsFactory(),
-              new AuthenticatorMapper(ImmutableMap.of())
+              new AuthenticatorMapper(ImmutableMap.of()),
+              new Properties()
           )
           {
             @Override
@@ -477,32 +512,32 @@ public class AsyncQueryForwardingServletTest extends 
BaseJettyTest
     final int maxNumRows = 1000;
 
     final List<? extends Service.Request> avaticaRequests = ImmutableList.of(
-            new Service.CatalogsRequest(connectionId),
-            new Service.SchemasRequest(connectionId, "druid", null),
-            new Service.TablesRequest(connectionId, "druid", "druid", null, 
null),
-            new Service.ColumnsRequest(connectionId, "druid", "druid", 
"someTable", null),
-            new Service.PrepareAndExecuteRequest(
-                    connectionId,
-                    statementId,
-                    query,
-                    maxNumRows
-            ),
-            new Service.PrepareRequest(connectionId, query, maxNumRows),
-            new Service.ExecuteRequest(
-                    new Meta.StatementHandle(connectionId, statementId, null),
-                    ImmutableList.of(),
-                    maxNumRows
-            ),
-            new Service.CloseStatementRequest(connectionId, statementId),
-            new Service.CloseConnectionRequest(connectionId)
+        new Service.CatalogsRequest(connectionId),
+        new Service.SchemasRequest(connectionId, "druid", null),
+        new Service.TablesRequest(connectionId, "druid", "druid", null, null),
+        new Service.ColumnsRequest(connectionId, "druid", "druid", 
"someTable", null),
+        new Service.PrepareAndExecuteRequest(
+            connectionId,
+            statementId,
+            query,
+            maxNumRows
+        ),
+        new Service.PrepareRequest(connectionId, query, maxNumRows),
+        new Service.ExecuteRequest(
+            new Meta.StatementHandle(connectionId, statementId, null),
+            ImmutableList.of(),
+            maxNumRows
+        ),
+        new Service.CloseStatementRequest(connectionId, statementId),
+        new Service.CloseConnectionRequest(connectionId)
     );
 
 
     for (Service.Request request : avaticaRequests) {
       Assert.assertEquals(
-              "failed",
-              connectionId,
-              
AsyncQueryForwardingServlet.getAvaticaProtobufConnectionId(request)
+          "failed",
+          connectionId,
+          AsyncQueryForwardingServlet.getAvaticaProtobufConnectionId(request)
       );
     }
   }
diff --git 
a/server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java
 
b/services/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java
similarity index 100%
rename from 
server/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java
rename to 
services/src/test/java/org/apache/druid/server/http/security/SecurityResourceFilterTest.java
diff --git 
a/server/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java
 
b/services/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java
similarity index 100%
rename from 
server/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java
rename to 
services/src/test/java/org/apache/druid/server/router/CoordinatorRuleManagerTest.java
diff --git 
a/server/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java
 
b/services/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java
similarity index 100%
rename from 
server/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java
rename to 
services/src/test/java/org/apache/druid/server/router/JavaScriptTieredBrokerSelectorStrategyTest.java
diff --git 
a/server/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java
 
b/services/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java
similarity index 82%
rename from 
server/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java
rename to 
services/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java
index d5b85ee..68855fc 100644
--- 
a/server/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java
+++ 
b/services/src/test/java/org/apache/druid/server/router/ManualTieredBrokerSelectorStrategyTest.java
@@ -28,11 +28,13 @@ import org.apache.druid.query.Druids;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.sql.http.SqlQuery;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -200,6 +202,56 @@ public class ManualTieredBrokerSelectorStrategyTest
     );
   }
 
+  @Test
+  public void testGetBrokerServiceName_forSql()
+  {
+    final ManualTieredBrokerSelectorStrategy strategy =
+        new ManualTieredBrokerSelectorStrategy(null);
+
+    assertEquals(
+        Optional.absent(),
+        strategy.getBrokerServiceName(tieredBrokerConfig, 
createSqlQueryWithContext(null))
+    );
+    assertEquals(
+        Optional.absent(),
+        strategy.getBrokerServiceName(
+            tieredBrokerConfig,
+            createSqlQueryWithContext(
+                ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, 
Names.INVALID_BROKER)
+            )
+        )
+    );
+    assertEquals(
+        Optional.of(Names.BROKER_SVC_HOT),
+        strategy.getBrokerServiceName(
+            tieredBrokerConfig,
+            createSqlQueryWithContext(
+                ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, 
Names.BROKER_SVC_HOT)
+            )
+        )
+    );
+    assertEquals(
+        Optional.of(Names.BROKER_SVC_COLD),
+        strategy.getBrokerServiceName(
+            tieredBrokerConfig,
+            createSqlQueryWithContext(
+                ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, 
Names.BROKER_SVC_COLD)
+            )
+        )
+    );
+  }
+
+  private SqlQuery createSqlQueryWithContext(Map<String, Object> queryContext)
+  {
+    return new SqlQuery(
+        "SELECT * FROM test",
+        null,
+        false,
+        queryContext,
+        null
+    );
+  }
+
   /**
    * Test constants.
    */
diff --git 
a/server/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java 
b/services/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java
similarity index 100%
rename from 
server/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java
rename to 
services/src/test/java/org/apache/druid/server/router/QueryHostFinderTest.java
diff --git 
a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
 
b/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
similarity index 92%
rename from 
server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
rename to 
services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
index a4d78d1..bf1bfed 100644
--- 
a/server/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
+++ 
b/services/src/test/java/org/apache/druid/server/router/TieredBrokerHostSelectorTest.java
@@ -44,6 +44,7 @@ import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.sql.http.SqlQuery;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Assert;
@@ -56,6 +57,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  */
@@ -334,6 +336,33 @@ public class TieredBrokerHostSelectorTest
   }
 
   @Test
+  public void testSelectForSql()
+  {
+    Assert.assertEquals(
+        brokerSelector.getDefaultServiceName(),
+        brokerSelector.selectForSql(
+            createSqlQueryWithContext(null)
+        ).lhs
+    );
+    Assert.assertEquals(
+        "hotBroker",
+        brokerSelector.selectForSql(
+            createSqlQueryWithContext(
+                ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, "hotBroker")
+            )
+        ).lhs
+    );
+    Assert.assertEquals(
+        "coldBroker",
+        brokerSelector.selectForSql(
+            createSqlQueryWithContext(
+                ImmutableMap.of(QueryContexts.BROKER_SERVICE_NAME, 
"coldBroker")
+            )
+        ).lhs
+    );
+  }
+
+  @Test
   public void testGetAllBrokers()
   {
     Assert.assertEquals(
@@ -356,6 +385,17 @@ public class TieredBrokerHostSelectorTest
     );
   }
 
+  private SqlQuery createSqlQueryWithContext(Map<String, Object> queryContext)
+  {
+    return new SqlQuery(
+        "SELECT * FROM test",
+        null,
+        false,
+        queryContext,
+        null
+    );
+  }
+
   private static class TestRuleManager extends CoordinatorRuleManager
   {
     public TestRuleManager(

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to