This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 82049bb  Cancel API for sqls (#11643)
82049bb is described below

commit 82049bbf0a6409c09889ba9b3ea6131c9da86705
Author: Jihoon Son <[email protected]>
AuthorDate: Sun Sep 5 10:57:45 2021 -0700

    Cancel API for sqls (#11643)
    
    * initial work
    
    * reduce lock in sqlLifecycle
    
    * Integration test for sql canceling
    
    * javadoc, cleanup, more tests
    
    * log level to debug
    
    * fix test
    
    * checkstyle
    
    * fix flaky test; address comments
    
    * rowTransformer
    
    * cancelled state
    
    * use lock
    
    * explode instead of noop
    
    * oops
    
    * unused import
    
    * less aggressive with state
    
    * fix calcite charset
    
    * don't emit metrics when you are not authorized
---
 integration-tests/pom.xml                          |   6 +
 .../clients/AbstractQueryResourceTestClient.java   |  21 +-
 .../testing/clients/QueryResourceTestClient.java   |  10 -
 .../testing/clients/SqlResourceTestClient.java     |  11 -
 .../testing/utils/AbstractTestQueryHelper.java     |   6 +
 .../apache/druid/tests/query/ITSqlCancelTest.java  | 146 +++++++++
 .../org/apache/druid/server/QueryScheduler.java    |  18 +-
 .../druid/server/AsyncQueryForwardingServlet.java  |   4 +-
 .../java/org/apache/druid/sql/SqlLifecycle.java    | 365 ++++++++++++---------
 .../org/apache/druid/sql/SqlLifecycleFactory.java  |   7 +-
 .../org/apache/druid/sql/SqlLifecycleManager.java  | 108 ++++++
 .../org/apache/druid/sql/SqlRowTransformer.java    |  81 +++++
 .../apache/druid/sql/avatica/DruidStatement.java   |  17 +-
 .../druid/sql/calcite/planner/PlannerContext.java  |   4 +-
 .../org/apache/druid/sql/http/SqlResource.java     | 145 +++++---
 .../apache/druid/sql/SqlLifecycleManagerTest.java  | 142 ++++++++
 .../org/apache/druid/sql/SqlLifecycleTest.java     |  27 +-
 .../apache/druid/sql/SqlRowTransformerTest.java    | 134 ++++++++
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java |   7 +
 .../druid/sql/calcite/util/CalciteTests.java       |   3 +-
 .../org/apache/druid/sql/guice/SqlModuleTest.java  |   7 +-
 .../apache/druid/sql/http/SqlHttpModuleTest.java   |  10 +-
 .../org/apache/druid/sql/http/SqlResourceTest.java | 332 +++++++++++++++++--
 23 files changed, 1316 insertions(+), 295 deletions(-)

diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index ab9a638..11c71c8 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -219,6 +219,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.druid.extensions</groupId>
+            <artifactId>druid-testing-tools</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.druid.extensions</groupId>
             <artifactId>simple-client-sslcontext</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
index ce2703c..907ac8d 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/AbstractQueryResourceTestClient.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractQueryResourceTestClient<QueryType>
 {
@@ -132,8 +133,6 @@ public abstract class 
AbstractQueryResourceTestClient<QueryType>
     this.acceptHeader = acceptHeader;
   }
 
-  public abstract String getBrokerURL();
-
   public List<Map<String, Object>> query(String url, QueryType query)
   {
     try {
@@ -154,7 +153,7 @@ public abstract class 
AbstractQueryResourceTestClient<QueryType>
       if (!response.getStatus().equals(HttpResponseStatus.OK)) {
         throw new ISE(
             "Error while querying[%s] status[%s] content[%s]",
-            getBrokerURL(),
+            url,
             response.getStatus(),
             new String(response.getContent(), StandardCharsets.UTF_8)
         );
@@ -190,4 +189,20 @@ public abstract class 
AbstractQueryResourceTestClient<QueryType>
       throw new RuntimeException(e);
     }
   }
+
+  public HttpResponseStatus cancelQuery(String url, long timeoutMs)
+  {
+    try {
+      Request request = new Request(HttpMethod.DELETE, new URL(url));
+      Future<StatusResponseHolder> future = httpClient.go(
+          request,
+          StatusResponseHandler.getInstance()
+      );
+      StatusResponseHolder responseHolder = future.get(timeoutMs, 
TimeUnit.MILLISECONDS);
+      return responseHolder.getStatus();
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/QueryResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/QueryResourceTestClient.java
index 33820fe..b1045d7 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/QueryResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/QueryResourceTestClient.java
@@ -23,7 +23,6 @@ package org.apache.druid.testing.clients;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import org.apache.druid.guice.annotations.Smile;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.query.Query;
 import org.apache.druid.testing.IntegrationTestingConfig;
@@ -58,15 +57,6 @@ public class QueryResourceTestClient extends 
AbstractQueryResourceTestClient<Que
     super(jsonMapper, smileMapper, httpClient, routerUrl, contentType, accept);
   }
 
-  @Override
-  public String getBrokerURL()
-  {
-    return StringUtils.format(
-        "%s/druid/v2/",
-        routerUrl
-    );
-  }
-
   /**
    * clone a new instance of current object with given encoding.
    * Note: For {@link AbstractQueryResourceTestClient#queryAsync(String, 
Object)} operation, contentType could only be application/json
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/SqlResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/SqlResourceTestClient.java
index b429318..d41c9c6 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/SqlResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/SqlResourceTestClient.java
@@ -21,7 +21,6 @@ package org.apache.druid.testing.clients;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.sql.http.SqlQuery;
 import org.apache.druid.testing.IntegrationTestingConfig;
@@ -43,14 +42,4 @@ public class SqlResourceTestClient extends 
AbstractQueryResourceTestClient<SqlQu
     // so no need to pass smile ObjectMapper
     super(jsonMapper, null, httpClient, config.getRouterUrl(), 
MediaType.APPLICATION_JSON, null);
   }
-
-  @Override
-  public String getBrokerURL()
-  {
-    return StringUtils.format(
-        "%s/druid/v2/sql/",
-        routerUrl
-    );
-  }
-
 }
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
index bd6866a..fce45a8 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/AbstractTestQueryHelper.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.Druids;
@@ -86,6 +87,11 @@ public abstract class 
AbstractTestQueryHelper<QueryResultType extends AbstractQu
 
   public abstract String getQueryURL(String schemeAndHost);
 
+  public String getCancelUrl(String schemaAndHost, String idToCancel)
+  {
+    return StringUtils.format("%s/%s", getQueryURL(schemaAndHost), idToCancel);
+  }
+
   public void testQueriesFromFile(String filePath) throws Exception
   {
     testQueriesFromFile(getQueryURL(broker), filePath);
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java
new file mode 100644
index 0000000..73b7746
--- /dev/null
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/query/ITSqlCancelTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.query.QueryException;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.sql.http.SqlQuery;
+import org.apache.druid.testing.IntegrationTestingConfig;
+import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
+import org.apache.druid.testing.clients.SqlResourceTestClient;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.SqlTestQueryHelper;
+import org.apache.druid.tests.TestNGGroup;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+@Test(groups = TestNGGroup.QUERY)
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITSqlCancelTest
+{
+  private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
+
+  /**
+   * This query will run exactly for 15 seconds.
+   */
+  private static final String QUERY
+      = "SELECT sleep(CASE WHEN added > 0 THEN 1 ELSE 0 END) FROM 
wikipedia_editstream WHERE added > 0 LIMIT 15";
+
+  private static final int NUM_QUERIES = 3;
+
+  @Inject
+  private CoordinatorResourceTestClient coordinatorClient;
+  @Inject
+  private SqlTestQueryHelper sqlHelper;
+  @Inject
+  private SqlResourceTestClient sqlClient;
+  @Inject
+  private IntegrationTestingConfig config;
+  @Inject
+  private ObjectMapper jsonMapper;
+
+  @BeforeMethod
+  public void before()
+  {
+    // ensure that wikipedia segments are loaded completely
+    ITRetryUtil.retryUntilTrue(
+        () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), 
"wikipedia segment load"
+    );
+  }
+
+  @Test
+  public void testCancelValidQuery() throws Exception
+  {
+    final String queryId = "sql-cancel-test";
+    final List<Future<StatusResponseHolder>> queryResponseFutures = new 
ArrayList<>();
+    for (int i = 0; i < NUM_QUERIES; i++) {
+      queryResponseFutures.add(
+          sqlClient.queryAsync(
+              sqlHelper.getQueryURL(config.getRouterUrl()),
+              new SqlQuery(QUERY, null, false, ImmutableMap.of("sqlQueryId", 
queryId), null)
+          )
+      );
+    }
+
+    // Wait until the sqlLifecycle is authorized and registered
+    Thread.sleep(1000);
+    final HttpResponseStatus responseStatus = sqlClient.cancelQuery(
+        sqlHelper.getCancelUrl(config.getRouterUrl(), queryId),
+        1000
+    );
+    if (!responseStatus.equals(HttpResponseStatus.ACCEPTED)) {
+      throw new RE("Failed to cancel query [%s]", queryId);
+    }
+
+    for (Future<StatusResponseHolder> queryResponseFuture : 
queryResponseFutures) {
+      final StatusResponseHolder queryResponse = queryResponseFuture.get(1, 
TimeUnit.SECONDS);
+      if 
(!queryResponse.getStatus().equals(HttpResponseStatus.INTERNAL_SERVER_ERROR)) {
+        throw new ISE("Query is not canceled after cancel request");
+      }
+      QueryException queryException = 
jsonMapper.readValue(queryResponse.getContent(), QueryException.class);
+      if 
(!QueryInterruptedException.QUERY_CANCELLED.equals(queryException.getErrorCode()))
 {
+        throw new ISE(
+            "Expected error code [%s], actual [%s]",
+            QueryInterruptedException.QUERY_CANCELLED,
+            queryException.getErrorCode()
+        );
+      }
+    }
+  }
+
+  @Test
+  public void testCancelInvalidQuery() throws Exception
+  {
+    final Future<StatusResponseHolder> queryResponseFuture = sqlClient
+        .queryAsync(
+            sqlHelper.getQueryURL(config.getRouterUrl()),
+            new SqlQuery(QUERY, null, false, ImmutableMap.of("sqlQueryId", 
"validId"), null)
+        );
+
+    // Wait until the sqlLifecycle is authorized and registered
+    Thread.sleep(1000);
+    final HttpResponseStatus responseStatus = sqlClient.cancelQuery(
+        sqlHelper.getCancelUrl(config.getRouterUrl(), "invalidId"),
+        1000
+    );
+    if (!responseStatus.equals(HttpResponseStatus.NOT_FOUND)) {
+      throw new RE("Expected http response [%s], actual response [%s]", 
HttpResponseStatus.NOT_FOUND, responseStatus);
+    }
+
+    final StatusResponseHolder queryResponse = queryResponseFuture.get(30, 
TimeUnit.SECONDS);
+    if (!queryResponse.getStatus().equals(HttpResponseStatus.OK)) {
+      throw new ISE("Query is not canceled after cancel request");
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java 
b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
index 46f2580..d959e48 100644
--- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
+++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
@@ -64,12 +64,26 @@ public class QueryScheduler implements QueryWatcher
   private final QueryPrioritizationStrategy prioritizationStrategy;
   private final QueryLaningStrategy laningStrategy;
   private final BulkheadRegistry laneRegistry;
+
   /**
-   * mapping of query id to set of futures associated with the query
+   * mapping of query id to set of futures associated with the query.
+   * This map is synchronized as there are 2 threads, query execution thread 
and query canceling thread,
+   * that can access the map at the same time.
+   *
+   * The updates (additions and removals) on this and {@link 
#queryDatasources} are racy
+   * as those updates are not being done atomically on those 2 maps,
+   * but it is OK in most cases since they will be cleaned up once the query 
is done.
    */
   private final SetMultimap<String, ListenableFuture<?>> queryFutures;
+
   /**
-   * mapping of query id to set of datasource names that are being queried, 
used for authorization
+   * mapping of query id to set of datasource names that are being queried, 
used for authorization.
+   * This map is synchronized as there are 2 threads, query execution thread 
and query canceling thread,
+   * that can access the map at the same time.
+   *
+   * The updates (additions and removals) on this and {@link #queryFutures} 
are racy
+   * as those updates are not being done atomically on those 2 maps,
+   * but it is OK in most cases since they will be cleaned up once the query 
is done.
    */
   private final SetMultimap<String, String> queryDatasources;
 
diff --git 
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
index 0a2da62..6779c06 100644
--- 
a/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
+++ 
b/services/src/main/java/org/apache/druid/server/AsyncQueryForwardingServlet.java
@@ -228,7 +228,7 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
       targetServer = hostFinder.findServerAvatica(connectionId);
       byte[] requestBytes = objectMapper.writeValueAsBytes(requestMap);
       request.setAttribute(AVATICA_QUERY_ATTRIBUTE, requestBytes);
-    } else if (isNativeQueryEndpoint && HttpMethod.DELETE.is(method)) {
+    } else if (HttpMethod.DELETE.is(method)) {
       // query cancellation request
       targetServer = hostFinder.pickDefaultServer();
       broadcastQueryCancelRequest(request, targetServer);
@@ -285,8 +285,6 @@ public class AsyncQueryForwardingServlet extends 
AsyncProxyServlet implements Qu
    */
   private void broadcastQueryCancelRequest(HttpServletRequest request, Server 
targetServer)
   {
-    // send query cancellation to all brokers this query may have gone to
-    // to keep the code simple, the proxy servlet will also send a request to 
the default targetServer.
     for (final Server server : hostFinder.getAllServers()) {
       if (server.getHost().equals(targetServer.getHost())) {
         continue;
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java 
b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
index c30aba0..98a7c22 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
@@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.calcite.avatica.remote.TypedValue;
-import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
@@ -40,6 +39,7 @@ import 
org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.QueryStats;
 import org.apache.druid.server.RequestLogLine;
 import org.apache.druid.server.log.RequestLogger;
@@ -64,7 +64,9 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -77,11 +79,10 @@ import java.util.stream.Collectors;
  * <li>Validation and Authorization ({@link 
#validateAndAuthorize(HttpServletRequest)} or {@link 
#validateAndAuthorize(AuthenticationResult)})</li>
  * <li>Planning ({@link #plan()})</li>
  * <li>Execution ({@link #execute()})</li>
- * <li>Logging ({@link #emitLogsAndMetrics(Throwable, String, long)})</li>
+ * <li>Logging ({@link #finalizeStateAndEmitLogsAndMetrics(Throwable, String, 
long)})</li>
  * </ol>
  *
- * <p>Unlike QueryLifecycle, this class is designed to be <b>thread safe</b> 
so that it can be used in multi-threaded
- * scenario (JDBC) without external synchronization.
+ * Every method in this class must be called by the same thread except for 
{@link #cancel()}.
  */
 public class SqlLifecycle
 {
@@ -90,34 +91,33 @@ public class SqlLifecycle
   private final PlannerFactory plannerFactory;
   private final ServiceEmitter emitter;
   private final RequestLogger requestLogger;
+  private final QueryScheduler queryScheduler;
   private final long startMs;
   private final long startNs;
-  private final Object lock = new Object();
 
-  @GuardedBy("lock")
+  /**
+   * This lock coordinates the access to {@link #state} as there is a 
happens-before relationship
+   * between {@link #cancel} and {@link #transition}.
+   */
+  private final Object stateLock = new Object();
+  @GuardedBy("stateLock")
   private State state = State.NEW;
 
   // init during intialize
-  @GuardedBy("lock")
   private String sql;
-  @GuardedBy("lock")
   private Map<String, Object> queryContext;
-  @GuardedBy("lock")
   private List<TypedValue> parameters;
   // init during plan
-  @GuardedBy("lock")
   private PlannerContext plannerContext;
-  @GuardedBy("lock")
   private ValidationResult validationResult;
-  @GuardedBy("lock")
   private PrepareResult prepareResult;
-  @GuardedBy("lock")
   private PlannerResult plannerResult;
 
   public SqlLifecycle(
       PlannerFactory plannerFactory,
       ServiceEmitter emitter,
       RequestLogger requestLogger,
+      QueryScheduler queryScheduler,
       long startMs,
       long startNs
   )
@@ -125,6 +125,7 @@ public class SqlLifecycle
     this.plannerFactory = plannerFactory;
     this.emitter = emitter;
     this.requestLogger = requestLogger;
+    this.queryScheduler = queryScheduler;
     this.startMs = startMs;
     this.startNs = startNs;
     this.parameters = Collections.emptyList();
@@ -137,15 +138,12 @@ public class SqlLifecycle
    */
   public String initialize(String sql, Map<String, Object> queryContext)
   {
-    synchronized (lock) {
-      transition(State.NEW, State.INITIALIZED);
-      this.sql = sql;
-      this.queryContext = contextWithSqlId(queryContext);
-      return sqlQueryId();
-    }
+    transition(State.NEW, State.INITIALIZED);
+    this.sql = sql;
+    this.queryContext = contextWithSqlId(queryContext);
+    return sqlQueryId();
   }
 
-  @GuardedBy("lock")
   private Map<String, Object> contextWithSqlId(Map<String, Object> 
queryContext)
   {
     Map<String, Object> newContext = new HashMap<>();
@@ -161,7 +159,6 @@ public class SqlLifecycle
     return newContext;
   }
 
-  @GuardedBy("lock")
   private String sqlQueryId()
   {
     return (String) this.queryContext.get(PlannerContext.CTX_SQL_QUERY_ID);
@@ -173,11 +170,9 @@ public class SqlLifecycle
    */
   public void setParameters(List<TypedValue> parameters)
   {
-    synchronized (lock) {
-      this.parameters = parameters;
-      if (this.plannerContext != null) {
-        this.plannerContext.setParameters(parameters);
-      }
+    this.parameters = parameters;
+    if (this.plannerContext != null) {
+      this.plannerContext.setParameters(parameters);
     }
   }
 
@@ -189,21 +184,21 @@ public class SqlLifecycle
    */
   public void validateAndAuthorize(AuthenticationResult authenticationResult)
   {
-    synchronized (lock) {
+    synchronized (stateLock) {
       if (state == State.AUTHORIZED) {
         return;
       }
-      transition(State.INITIALIZED, State.AUTHORIZING);
-      validate(authenticationResult);
-      Access access = doAuthorize(
-          AuthorizationUtils.authorizeAllResourceActions(
-              authenticationResult,
-              Iterables.transform(validationResult.getResources(), 
AuthorizationUtils.RESOURCE_READ_RA_GENERATOR),
-              plannerFactory.getAuthorizerMapper()
-          )
-      );
-      checkAccess(access);
     }
+    transition(State.INITIALIZED, State.AUTHORIZING);
+    validate(authenticationResult);
+    Access access = doAuthorize(
+        AuthorizationUtils.authorizeAllResourceActions(
+            authenticationResult,
+            Iterables.transform(validationResult.getResources(), 
AuthorizationUtils.RESOURCE_READ_RA_GENERATOR),
+            plannerFactory.getAuthorizerMapper()
+        )
+    );
+    checkAccess(access);
   }
 
   /**
@@ -215,22 +210,19 @@ public class SqlLifecycle
    */
   public void validateAndAuthorize(HttpServletRequest req)
   {
-    synchronized (lock) {
-      transition(State.INITIALIZED, State.AUTHORIZING);
-      AuthenticationResult authResult = 
AuthorizationUtils.authenticationResultFromRequest(req);
-      validate(authResult);
-      Access access = doAuthorize(
-          AuthorizationUtils.authorizeAllResourceActions(
-              req,
-              Iterables.transform(validationResult.getResources(), 
AuthorizationUtils.RESOURCE_READ_RA_GENERATOR),
-              plannerFactory.getAuthorizerMapper()
-          )
-      );
-      checkAccess(access);
-    }
+    transition(State.INITIALIZED, State.AUTHORIZING);
+    AuthenticationResult authResult = 
AuthorizationUtils.authenticationResultFromRequest(req);
+    validate(authResult);
+    Access access = doAuthorize(
+        AuthorizationUtils.authorizeAllResourceActions(
+            req,
+            Iterables.transform(validationResult.getResources(), 
AuthorizationUtils.RESOURCE_READ_RA_GENERATOR),
+            plannerFactory.getAuthorizerMapper()
+        )
+    );
+    checkAccess(access);
   }
 
-  @GuardedBy("lock")
   private ValidationResult validate(AuthenticationResult authenticationResult)
   {
     try (DruidPlanner planner = plannerFactory.createPlanner(queryContext)) {
@@ -251,7 +243,6 @@ public class SqlLifecycle
     }
   }
 
-  @GuardedBy("lock")
   private Access doAuthorize(final Access authorizationResult)
   {
     if (!authorizationResult.isAllowed()) {
@@ -263,7 +254,6 @@ public class SqlLifecycle
     return authorizationResult;
   }
 
-  @GuardedBy("lock")
   private void checkAccess(Access access)
   {
     plannerContext.setAuthorizationResult(access);
@@ -280,22 +270,22 @@ public class SqlLifecycle
    */
   public PrepareResult prepare() throws RelConversionException
   {
-    synchronized (lock) {
+    synchronized (stateLock) {
       if (state != State.AUTHORIZED) {
         throw new ISE("Cannot prepare because current state[%s] is not [%s].", 
state, State.AUTHORIZED);
       }
-      Preconditions.checkNotNull(plannerContext, "Cannot prepare, 
plannerContext is null");
-      try (DruidPlanner planner = 
plannerFactory.createPlannerWithContext(plannerContext)) {
-        this.prepareResult = planner.prepare(sql);
-        return prepareResult;
-      }
-      // we can't collapse catch clauses since SqlPlanningException has 
type-sensitive constructors.
-      catch (SqlParseException e) {
-        throw new SqlPlanningException(e);
-      }
-      catch (ValidationException e) {
-        throw new SqlPlanningException(e);
-      }
+    }
+    Preconditions.checkNotNull(plannerContext, "Cannot prepare, plannerContext 
is null");
+    try (DruidPlanner planner = 
plannerFactory.createPlannerWithContext(plannerContext)) {
+      this.prepareResult = planner.prepare(sql);
+      return prepareResult;
+    }
+    // we can't collapse catch clauses since SqlPlanningException has 
type-sensitive constructors.
+    catch (SqlParseException e) {
+      throw new SqlPlanningException(e);
+    }
+    catch (ValidationException e) {
+      throw new SqlPlanningException(e);
     }
   }
 
@@ -304,36 +294,48 @@ public class SqlLifecycle
    *
    * If successful, the lifecycle will first transition from {@link 
State#AUTHORIZED} to {@link State#PLANNED}.
    */
-  public PlannerContext plan() throws RelConversionException
+  public void plan() throws RelConversionException
   {
-    synchronized (lock) {
-      transition(State.AUTHORIZED, State.PLANNED);
-      Preconditions.checkNotNull(plannerContext, "Cannot plan, plannerContext 
is null");
-      try (DruidPlanner planner = 
plannerFactory.createPlannerWithContext(plannerContext)) {
-        this.plannerResult = planner.plan(sql);
-      }
-      // we can't collapse catch clauses since SqlPlanningException has 
type-sensitive constructors.
-      catch (SqlParseException e) {
-        throw new SqlPlanningException(e);
-      }
-      catch (ValidationException e) {
-        throw new SqlPlanningException(e);
-      }
-      return plannerContext;
+    transition(State.AUTHORIZED, State.PLANNED);
+    Preconditions.checkNotNull(plannerContext, "Cannot plan, plannerContext is 
null");
+    try (DruidPlanner planner = 
plannerFactory.createPlannerWithContext(plannerContext)) {
+      this.plannerResult = planner.plan(sql);
+    }
+    // we can't collapse catch clauses since SqlPlanningException has 
type-sensitive constructors.
+    catch (SqlParseException e) {
+      throw new SqlPlanningException(e);
+    }
+    catch (ValidationException e) {
+      throw new SqlPlanningException(e);
     }
   }
 
   /**
+   * This method must be called after {@link #plan()}.
+   */
+  public SqlRowTransformer createRowTransformer()
+  {
+    assert plannerContext != null;
+    assert plannerResult != null;
+
+    return new SqlRowTransformer(plannerContext.getTimeZone(), 
plannerResult.rowType());
+  }
+
+  @VisibleForTesting
+  PlannerContext getPlannerContext()
+  {
+    return plannerContext;
+  }
+
+  /**
    * Execute the fully planned query.
    *
    * If successful, the lifecycle will first transition from {@link 
State#PLANNED} to {@link State#EXECUTING}.
    */
   public Sequence<Object[]> execute()
   {
-    synchronized (lock) {
-      transition(State.PLANNED, State.EXECUTING);
-      return plannerResult.run();
-    }
+    transition(State.PLANNED, State.EXECUTING);
+    return plannerResult.run();
   }
 
   @VisibleForTesting
@@ -354,7 +356,9 @@ public class SqlLifecycle
       result = execute();
     }
     catch (Throwable e) {
-      emitLogsAndMetrics(e, null, -1);
+      if (!(e instanceof ForbiddenException)) {
+        finalizeStateAndEmitLogsAndMetrics(e, null, -1);
+      }
       throw e;
     }
 
@@ -363,7 +367,7 @@ public class SqlLifecycle
       @Override
       public void after(boolean isDone, Throwable thrown)
       {
-        emitLogsAndMetrics(thrown, null, -1);
+        finalizeStateAndEmitLogsAndMetrics(thrown, null, -1);
       }
     });
   }
@@ -372,15 +376,34 @@ public class SqlLifecycle
   @VisibleForTesting
   public ValidationResult runAnalyzeResources(AuthenticationResult 
authenticationResult)
   {
-    synchronized (lock) {
-      return validate(authenticationResult);
-    }
+    return validate(authenticationResult);
   }
 
-  public RelDataType rowType()
+  public Set<Resource> getAuthorizedResources()
   {
-    synchronized (lock) {
-      return plannerResult != null ? plannerResult.rowType() : 
prepareResult.getRowType();
+    assert validationResult != null;
+    return validationResult.getResources();
+  }
+
+  /**
+   * Cancel all native queries associated to this lifecycle.
+   *
+   * This method is thread-safe.
+   */
+  public void cancel()
+  {
+    synchronized (stateLock) {
+      if (state == State.CANCELLED) {
+        return;
+      }
+      state = State.CANCELLED;
+    }
+
+    final CopyOnWriteArrayList<String> nativeQueryIds = 
plannerContext.getNativeQueryIds();
+
+    for (String nativeQueryId : nativeQueryIds) {
+      log.debug("canceling native query [%s]", nativeQueryId);
+      queryScheduler.cancelQuery(nativeQueryId);
     }
   }
 
@@ -391,104 +414,121 @@ public class SqlLifecycle
    * @param remoteAddress remote address, for logging; or null if unknown
    * @param bytesWritten  number of bytes written; will become a query/bytes 
metric if >= 0
    */
-  public void emitLogsAndMetrics(
+  public void finalizeStateAndEmitLogsAndMetrics(
       @Nullable final Throwable e,
       @Nullable final String remoteAddress,
       final long bytesWritten
   )
   {
-    synchronized (lock) {
-      if (sql == null) {
-        // Never initialized, don't log or emit anything.
-        return;
-      }
-
-      if (state == State.DONE) {
-        log.warn("Tried to emit logs and metrics twice for query[%s]!", 
sqlQueryId());
-      }
-
-      state = State.DONE;
+    if (sql == null) {
+      // Never initialized, don't log or emit anything.
+      return;
+    }
 
-      final boolean success = e == null;
-      final long queryTimeNs = System.nanoTime() - startNs;
+    synchronized (stateLock) {
+      assert state != State.UNAUTHORIZED; // should not emit below metrics 
when the query fails to authorize
 
-      try {
-        ServiceMetricEvent.Builder metricBuilder = 
ServiceMetricEvent.builder();
-        if (plannerContext != null) {
-          metricBuilder.setDimension("id", plannerContext.getSqlQueryId());
-          metricBuilder.setDimension("nativeQueryIds", 
plannerContext.getNativeQueryIds().toString());
-        }
-        if (validationResult != null) {
-          metricBuilder.setDimension(
-              "dataSource",
-              
validationResult.getResources().stream().map(Resource::getName).collect(Collectors.toList()).toString()
-          );
-        }
-        metricBuilder.setDimension("remoteAddress", 
StringUtils.nullToEmptyNonDruidDataString(remoteAddress));
-        metricBuilder.setDimension("success", String.valueOf(success));
-        emitter.emit(metricBuilder.build("sqlQuery/time", 
TimeUnit.NANOSECONDS.toMillis(queryTimeNs)));
-        if (bytesWritten >= 0) {
-          emitter.emit(metricBuilder.build("sqlQuery/bytes", bytesWritten));
+      if (state != State.CANCELLED) {
+        if (state == State.DONE) {
+          log.warn("Tried to emit logs and metrics twice for query[%s]!", 
sqlQueryId());
         }
 
-        final Map<String, Object> statsMap = new LinkedHashMap<>();
-        statsMap.put("sqlQuery/time", 
TimeUnit.NANOSECONDS.toMillis(queryTimeNs));
-        statsMap.put("sqlQuery/bytes", bytesWritten);
-        statsMap.put("success", success);
-        statsMap.put("context", queryContext);
-        if (plannerContext != null) {
-          statsMap.put("identity", 
plannerContext.getAuthenticationResult().getIdentity());
-          queryContext.put("nativeQueryIds", 
plannerContext.getNativeQueryIds().toString());
-        }
-        if (e != null) {
-          statsMap.put("exception", e.toString());
+        state = State.DONE;
+      }
+    }
 
-          if (e instanceof QueryInterruptedException || e instanceof 
QueryTimeoutException) {
-            statsMap.put("interrupted", true);
-            statsMap.put("reason", e.toString());
-          }
-        }
+    final boolean success = e == null;
+    final long queryTimeNs = System.nanoTime() - startNs;
 
-        requestLogger.logSqlQuery(
-            RequestLogLine.forSql(
-                sql,
-                queryContext,
-                DateTimes.utc(startMs),
-                remoteAddress,
-                new QueryStats(statsMap)
-            )
+    try {
+      ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
+      if (plannerContext != null) {
+        metricBuilder.setDimension("id", plannerContext.getSqlQueryId());
+        metricBuilder.setDimension("nativeQueryIds", 
plannerContext.getNativeQueryIds().toString());
+      }
+      if (validationResult != null) {
+        metricBuilder.setDimension(
+            "dataSource",
+            
validationResult.getResources().stream().map(Resource::getName).collect(Collectors.toList()).toString()
         );
       }
-      catch (Exception ex) {
-        log.error(ex, "Unable to log SQL [%s]!", sql);
+      metricBuilder.setDimension("remoteAddress", 
StringUtils.nullToEmptyNonDruidDataString(remoteAddress));
+      metricBuilder.setDimension("success", String.valueOf(success));
+      emitter.emit(metricBuilder.build("sqlQuery/time", 
TimeUnit.NANOSECONDS.toMillis(queryTimeNs)));
+      if (bytesWritten >= 0) {
+        emitter.emit(metricBuilder.build("sqlQuery/bytes", bytesWritten));
+      }
+
+      final Map<String, Object> statsMap = new LinkedHashMap<>();
+      statsMap.put("sqlQuery/time", 
TimeUnit.NANOSECONDS.toMillis(queryTimeNs));
+      statsMap.put("sqlQuery/bytes", bytesWritten);
+      statsMap.put("success", success);
+      statsMap.put("context", queryContext);
+      if (plannerContext != null) {
+        statsMap.put("identity", 
plannerContext.getAuthenticationResult().getIdentity());
+        queryContext.put("nativeQueryIds", 
plannerContext.getNativeQueryIds().toString());
       }
+      if (e != null) {
+        statsMap.put("exception", e.toString());
+
+        if (e instanceof QueryInterruptedException || e instanceof 
QueryTimeoutException) {
+          statsMap.put("interrupted", true);
+          statsMap.put("reason", e.toString());
+        }
+      }
+
+      requestLogger.logSqlQuery(
+          RequestLogLine.forSql(
+              sql,
+              queryContext,
+              DateTimes.utc(startMs),
+              remoteAddress,
+              new QueryStats(statsMap)
+          )
+      );
+    }
+    catch (Exception ex) {
+      log.error(ex, "Unable to log SQL [%s]!", sql);
     }
   }
 
   @VisibleForTesting
   public State getState()
   {
-    synchronized (lock) {
+    synchronized (stateLock) {
       return state;
     }
   }
 
   @VisibleForTesting
-  public Map<String, Object> getQueryContext()
+  Map<String, Object> getQueryContext()
   {
-    synchronized (lock) {
-      return queryContext;
-    }
+    return queryContext;
   }
 
-  @GuardedBy("lock")
   private void transition(final State from, final State to)
   {
-    if (state != from) {
-      throw new ISE("Cannot transition from[%s] to[%s] because current 
state[%s] is not [%s].", from, to, state, from);
-    }
+    synchronized (stateLock) {
+      if (state == State.CANCELLED) {
+        throw new QueryInterruptedException(
+            QueryInterruptedException.QUERY_CANCELLED,
+            StringUtils.format("Query is canceled [%s]", sqlQueryId()),
+            null,
+            null
+        );
+      }
+      if (state != from) {
+        throw new ISE(
+            "Cannot transition from[%s] to[%s] because current state[%s] is 
not [%s].",
+            from,
+            to,
+            state,
+            from
+        );
+      }
 
-    state = to;
+      state = to;
+    }
   }
 
   enum State
@@ -499,7 +539,10 @@ public class SqlLifecycle
     AUTHORIZED,
     PLANNED,
     EXECUTING,
+
+    // final states
     UNAUTHORIZED,
-    DONE
+    CANCELLED, // query is cancelled. can be transitioned to this state only 
after AUTHORIZED.
+    DONE // query could either succeed or fail
   }
 }
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycleFactory.java 
b/sql/src/main/java/org/apache/druid/sql/SqlLifecycleFactory.java
index 2507894..948492d 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycleFactory.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycleFactory.java
@@ -22,6 +22,7 @@ package org.apache.druid.sql;
 import com.google.inject.Inject;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.log.RequestLogger;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
 
@@ -31,17 +32,20 @@ public class SqlLifecycleFactory
   private final PlannerFactory plannerFactory;
   private final ServiceEmitter emitter;
   private final RequestLogger requestLogger;
+  private final QueryScheduler queryScheduler;
 
   @Inject
   public SqlLifecycleFactory(
       PlannerFactory plannerFactory,
       ServiceEmitter emitter,
-      RequestLogger requestLogger
+      RequestLogger requestLogger,
+      QueryScheduler queryScheduler
   )
   {
     this.plannerFactory = plannerFactory;
     this.emitter = emitter;
     this.requestLogger = requestLogger;
+    this.queryScheduler = queryScheduler;
   }
 
   public SqlLifecycle factorize()
@@ -50,6 +54,7 @@ public class SqlLifecycleFactory
         plannerFactory,
         emitter,
         requestLogger,
+        queryScheduler,
         System.currentTimeMillis(),
         System.nanoTime()
     );
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycleManager.java 
b/sql/src/main/java/org/apache/druid/sql/SqlLifecycleManager.java
new file mode 100644
index 0000000..8b222eb
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycleManager.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql;
+
+import com.google.common.collect.ImmutableList;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.sql.SqlLifecycle.State;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class manages only _authorized_ {@link SqlLifecycle}s submitted via 
HTTP,
+ * such as {@link org.apache.druid.sql.http.SqlResource}. The main use case of 
this class is
+ * tracking running queries so that the cancel API can identify the lifecycles 
to cancel.
+ *
+ * This class is thread-safe as there are 2 or more threads that can access 
lifecycles at the same time
+ * for query running or query canceling.
+ *
+ * For managing and canceling native queries, see {@link 
org.apache.druid.server.QueryScheduler}.
+ * As its name indicates, it also performs resource scheduling for native 
queries based on query lanes
+ * {@link org.apache.druid.server.QueryLaningStrategy}.
+ *
+ * @see org.apache.druid.server.QueryScheduler#cancelQuery(String)
+ */
+@LazySingleton
+public class SqlLifecycleManager
+{
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
+  private final Map<String, List<SqlLifecycle>> sqlLifecycles = new 
HashMap<>();
+
+  public void add(String sqlQueryId, SqlLifecycle lifecycle)
+  {
+    synchronized (lock) {
+      assert lifecycle.getState() == State.AUTHORIZED;
+      sqlLifecycles.computeIfAbsent(sqlQueryId, k -> new ArrayList<>())
+                   .add(lifecycle);
+    }
+  }
+
+  /**
+   * Removes the given lifecycle of the given query ID.
+   * This method uses {@link Object#equals} to find the lifecycle matched to 
the given parameter.
+   */
+  public void remove(String sqlQueryId, SqlLifecycle lifecycle)
+  {
+    synchronized (lock) {
+      List<SqlLifecycle> lifecycles = sqlLifecycles.get(sqlQueryId);
+      if (lifecycles != null) {
+        lifecycles.remove(lifecycle);
+        if (lifecycles.isEmpty()) {
+          sqlLifecycles.remove(sqlQueryId);
+        }
+      }
+    }
+  }
+
+  /**
+   * For the given sqlQueryId, this method removes all lifecycles that match 
to the given list of lifecycles.
+   * This method uses {@link Object#equals} for matching lifecycles.
+   */
+  public void removeAll(String sqlQueryId, List<SqlLifecycle> 
lifecyclesToRemove)
+  {
+    synchronized (lock) {
+      List<SqlLifecycle> lifecycles = sqlLifecycles.get(sqlQueryId);
+      if (lifecycles != null) {
+        lifecycles.removeAll(lifecyclesToRemove);
+        if (lifecycles.isEmpty()) {
+          sqlLifecycles.remove(sqlQueryId);
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns a snapshot of the lifecycles for the given sqlQueryId.
+   */
+  public List<SqlLifecycle> getAll(String sqlQueryId)
+  {
+    synchronized (lock) {
+      List<SqlLifecycle> lifecycles = sqlLifecycles.get(sqlQueryId);
+      return lifecycles == null ? Collections.emptyList() : 
ImmutableList.copyOf(lifecycles);
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlRowTransformer.java 
b/sql/src/main/java/org/apache/druid/sql/SqlRowTransformer.java
new file mode 100644
index 0000000..5570c42
--- /dev/null
+++ b/sql/src/main/java/org/apache/druid/sql/SqlRowTransformer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.ISODateTimeFormat;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class transforms the values of TIMESTAMP or DATE type for sql query 
results.
+ * The transformation is required only when the sql query is submitted to 
{@link org.apache.druid.sql.http.SqlResource}.
+ */
+public class SqlRowTransformer
+{
+  private final DateTimeZone timeZone;
+  private final List<String> fieldList;
+
+  // Remember which columns are time-typed, so we can emit ISO8601 instead of 
millis values.
+  private final boolean[] timeColumns;
+  private final boolean[] dateColumns;
+
+  SqlRowTransformer(DateTimeZone timeZone, RelDataType rowType)
+  {
+    this.timeZone = timeZone;
+    this.fieldList = new ArrayList<>(rowType.getFieldCount());
+    this.timeColumns = new boolean[rowType.getFieldCount()];
+    this.dateColumns = new boolean[rowType.getFieldCount()];
+    for (int i = 0; i < rowType.getFieldCount(); i++) {
+      final SqlTypeName sqlTypeName = 
rowType.getFieldList().get(i).getType().getSqlTypeName();
+      timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP;
+      dateColumns[i] = sqlTypeName == SqlTypeName.DATE;
+      fieldList.add(rowType.getFieldList().get(i).getName());
+    }
+  }
+
+  public List<String> getFieldList()
+  {
+    return fieldList;
+  }
+
+  @Nullable
+  public Object transform(Object[] row, int i)
+  {
+    if (row[i] == null) {
+      return null;
+    } else if (timeColumns[i]) {
+      return ISODateTimeFormat.dateTime().print(
+          Calcites.calciteTimestampToJoda((long) row[i], timeZone)
+      );
+    } else if (dateColumns[i]) {
+      return ISODateTimeFormat.dateTime().print(
+          Calcites.calciteDateToJoda((int) row[i], timeZone)
+      );
+    } else {
+      return row[i];
+    }
+  }
+}
diff --git a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java 
b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
index ff4f268..560116c 100644
--- a/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
+++ b/sql/src/main/java/org/apache/druid/sql/avatica/DruidStatement.java
@@ -58,6 +58,7 @@ public class DruidStatement implements Closeable
   private final String connectionId;
   private final int statementId;
   private final Map<String, Object> queryContext;
+  @GuardedBy("lock")
   private final SqlLifecycle sqlLifecycle;
   private final Runnable onClose;
   private final Object lock = new Object();
@@ -261,14 +262,6 @@ public class DruidStatement implements Closeable
     }
   }
 
-  public RelDataType getRowType()
-  {
-    synchronized (lock) {
-      ensure(State.PREPARED, State.RUNNING, State.DONE);
-      return sqlLifecycle.rowType();
-    }
-  }
-
   public long getCurrentOffset()
   {
     synchronized (lock) {
@@ -348,7 +341,9 @@ public class DruidStatement implements Closeable
         // First close. Run the onClose function.
         try {
           onClose.run();
-          sqlLifecycle.emitLogsAndMetrics(t, null, -1);
+          synchronized (lock) {
+            sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(t, null, -1);
+          }
         }
         catch (Throwable t1) {
           t.addSuppressed(t1);
@@ -362,7 +357,9 @@ public class DruidStatement implements Closeable
       // First close. Run the onClose function.
       try {
         if (!(this.throwable instanceof ForbiddenException)) {
-          sqlLifecycle.emitLogsAndMetrics(this.throwable, null, -1);
+          synchronized (lock) {
+            sqlLifecycle.finalizeStateAndEmitLogsAndMetrics(this.throwable, 
null, -1);
+          }
         }
         onClose.run();
       }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
index 0d7f979..59d4bd8 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
@@ -71,7 +71,7 @@ public class PlannerContext
   private final Map<String, Object> queryContext;
   private final String sqlQueryId;
   private final boolean stringifyArrays;
-  private final List<String> nativeQueryIds = new CopyOnWriteArrayList<>();
+  private final CopyOnWriteArrayList<String> nativeQueryIds = new 
CopyOnWriteArrayList<>();
   // bindings for dynamic parameters to bind during planning
   private List<TypedValue> parameters = Collections.emptyList();
   // result of authentication, providing identity to authorize set of 
resources produced by validation
@@ -204,7 +204,7 @@ public class PlannerContext
     return sqlQueryId;
   }
 
-  public List<String> getNativeQueryIds()
+  public CopyOnWriteArrayList<String> getNativeQueryIds()
   {
     return nativeQueryIds;
   }
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java 
b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index b880209..232cf5b 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -22,14 +22,14 @@ package org.apache.druid.sql.http;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.io.CountingOutputStream;
 import com.google.inject.Inject;
 import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Yielder;
 import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -39,19 +39,24 @@ import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryTimeoutException;
 import org.apache.druid.query.QueryUnsupportedException;
 import org.apache.druid.query.ResourceLimitExceededException;
+import org.apache.druid.server.security.Access;
+import org.apache.druid.server.security.AuthorizationUtils;
+import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.server.security.Resource;
 import org.apache.druid.sql.SqlLifecycle;
 import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlLifecycleManager;
 import org.apache.druid.sql.SqlPlanningException;
-import org.apache.druid.sql.calcite.planner.Calcites;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
-import org.joda.time.DateTimeZone;
-import org.joda.time.format.ISODateTimeFormat;
+import org.apache.druid.sql.SqlRowTransformer;
 
+import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
@@ -59,8 +64,9 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 @Path("/druid/v2/sql/")
 public class SqlResource
@@ -68,16 +74,22 @@ public class SqlResource
   private static final Logger log = new Logger(SqlResource.class);
 
   private final ObjectMapper jsonMapper;
+  private final AuthorizerMapper authorizerMapper;
   private final SqlLifecycleFactory sqlLifecycleFactory;
+  private final SqlLifecycleManager sqlLifecycleManager;
 
   @Inject
   public SqlResource(
       @Json ObjectMapper jsonMapper,
-      SqlLifecycleFactory sqlLifecycleFactory
+      AuthorizerMapper authorizerMapper,
+      SqlLifecycleFactory sqlLifecycleFactory,
+      SqlLifecycleManager sqlLifecycleManager
   )
   {
     this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "jsonMapper");
+    this.authorizerMapper = Preconditions.checkNotNull(authorizerMapper, 
"authorizerMapper");
     this.sqlLifecycleFactory = Preconditions.checkNotNull(sqlLifecycleFactory, 
"sqlLifecycleFactory");
+    this.sqlLifecycleManager = Preconditions.checkNotNull(sqlLifecycleManager, 
"sqlLifecycleManager");
   }
 
   @POST
@@ -98,24 +110,14 @@ public class SqlResource
 
       lifecycle.setParameters(sqlQuery.getParameterList());
       lifecycle.validateAndAuthorize(req);
-      final PlannerContext plannerContext = lifecycle.plan();
-      final DateTimeZone timeZone = plannerContext.getTimeZone();
-
-      // Remember which columns are time-typed, so we can emit ISO8601 instead 
of millis values.
-      // Also store list of all column names, for X-Druid-Sql-Columns header.
-      final List<RelDataTypeField> fieldList = 
lifecycle.rowType().getFieldList();
-      final boolean[] timeColumns = new boolean[fieldList.size()];
-      final boolean[] dateColumns = new boolean[fieldList.size()];
-      final String[] columnNames = new String[fieldList.size()];
-
-      for (int i = 0; i < fieldList.size(); i++) {
-        final SqlTypeName sqlTypeName = 
fieldList.get(i).getType().getSqlTypeName();
-        timeColumns[i] = sqlTypeName == SqlTypeName.TIMESTAMP;
-        dateColumns[i] = sqlTypeName == SqlTypeName.DATE;
-        columnNames[i] = fieldList.get(i).getName();
-      }
+      // must add after lifecycle is authorized
+      sqlLifecycleManager.add(sqlQueryId, lifecycle);
+
+      lifecycle.plan();
 
-      final Yielder<Object[]> yielder0 = Yielders.each(lifecycle.execute());
+      final SqlRowTransformer rowTransformer = 
lifecycle.createRowTransformer();
+      final Sequence<Object[]> sequence = lifecycle.execute();
+      final Yielder<Object[]> yielder0 = Yielders.each(sequence);
 
       try {
         return Response
@@ -130,30 +132,15 @@ public class SqlResource
                     writer.writeResponseStart();
 
                     if (sqlQuery.includeHeader()) {
-                      writer.writeHeader(Arrays.asList(columnNames));
+                      writer.writeHeader(rowTransformer.getFieldList());
                     }
 
                     while (!yielder.isDone()) {
                       final Object[] row = yielder.get();
                       writer.writeRowStart();
-                      for (int i = 0; i < fieldList.size(); i++) {
-                        final Object value;
-
-                        if (row[i] == null) {
-                          value = null;
-                        } else if (timeColumns[i]) {
-                          value = ISODateTimeFormat.dateTime().print(
-                              Calcites.calciteTimestampToJoda((long) row[i], 
timeZone)
-                          );
-                        } else if (dateColumns[i]) {
-                          value = ISODateTimeFormat.dateTime().print(
-                              Calcites.calciteDateToJoda((int) row[i], 
timeZone)
-                          );
-                        } else {
-                          value = row[i];
-                        }
-
-                        writer.writeRowField(fieldList.get(i).getName(), 
value);
+                      for (int i = 0; i < 
rowTransformer.getFieldList().size(); i++) {
+                        final Object value = rowTransformer.transform(row, i);
+                        
writer.writeRowField(rowTransformer.getFieldList().get(i), value);
                       }
                       writer.writeRowEnd();
                       yielder = yielder.next(null);
@@ -168,7 +155,7 @@ public class SqlResource
                   }
                   finally {
                     yielder.close();
-                    lifecycle.emitLogsAndMetrics(e, remoteAddr, os.getCount());
+                    endLifecycle(sqlQueryId, lifecycle, e, remoteAddr, 
os.getCount());
                   }
                 }
             )
@@ -182,27 +169,28 @@ public class SqlResource
       }
     }
     catch (QueryCapacityExceededException cap) {
-      lifecycle.emitLogsAndMetrics(cap, remoteAddr, -1);
+      endLifecycle(sqlQueryId, lifecycle, cap, remoteAddr, -1);
       return buildNonOkResponse(QueryCapacityExceededException.STATUS_CODE, 
cap);
     }
     catch (QueryUnsupportedException unsupported) {
-      lifecycle.emitLogsAndMetrics(unsupported, remoteAddr, -1);
+      endLifecycle(sqlQueryId, lifecycle, unsupported, remoteAddr, -1);
       return buildNonOkResponse(QueryUnsupportedException.STATUS_CODE, 
unsupported);
     }
     catch (QueryTimeoutException timeout) {
-      lifecycle.emitLogsAndMetrics(timeout, remoteAddr, -1);
+      endLifecycle(sqlQueryId, lifecycle, timeout, remoteAddr, -1);
       return buildNonOkResponse(QueryTimeoutException.STATUS_CODE, timeout);
     }
     catch (SqlPlanningException | ResourceLimitExceededException e) {
-      lifecycle.emitLogsAndMetrics(e, remoteAddr, -1);
+      endLifecycle(sqlQueryId, lifecycle, e, remoteAddr, -1);
       return buildNonOkResponse(BadQueryException.STATUS_CODE, e);
     }
     catch (ForbiddenException e) {
+      endLifecycleWithoutEmittingMetrics(sqlQueryId, lifecycle);
       throw e; // let ForbiddenExceptionMapper handle this
     }
     catch (Exception e) {
       log.warn(e, "Failed to handle query: %s", sqlQuery);
-      lifecycle.emitLogsAndMetrics(e, remoteAddr, -1);
+      endLifecycle(sqlQueryId, lifecycle, e, remoteAddr, -1);
 
       final Exception exceptionToReport;
 
@@ -222,11 +210,66 @@ public class SqlResource
     }
   }
 
-  Response buildNonOkResponse(int status, Exception e) throws 
JsonProcessingException
+  private void endLifecycleWithoutEmittingMetrics(
+      String sqlQueryId,
+      SqlLifecycle lifecycle
+  )
+  {
+    sqlLifecycleManager.remove(sqlQueryId, lifecycle);
+  }
+
+  private void endLifecycle(
+      String sqlQueryId,
+      SqlLifecycle lifecycle,
+      @Nullable final Throwable e,
+      @Nullable final String remoteAddress,
+      final long bytesWritten
+  )
+  {
+    lifecycle.finalizeStateAndEmitLogsAndMetrics(e, remoteAddress, 
bytesWritten);
+    sqlLifecycleManager.remove(sqlQueryId, lifecycle);
+  }
+
+  private Response buildNonOkResponse(int status, Exception e) throws 
JsonProcessingException
   {
     return Response.status(status)
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .entity(jsonMapper.writeValueAsBytes(e))
                    .build();
   }
+
+  @DELETE
+  @Path("{id}")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response cancelQuery(
+      @PathParam("id") String sqlQueryId,
+      @Context final HttpServletRequest req
+  )
+  {
+    log.debug("Received cancel request for query [%s]", sqlQueryId);
+
+    List<SqlLifecycle> lifecycles = sqlLifecycleManager.getAll(sqlQueryId);
+    if (lifecycles.isEmpty()) {
+      return Response.status(Status.NOT_FOUND).build();
+    }
+    Set<Resource> resources = lifecycles
+        .stream()
+        .flatMap(lifecycle -> lifecycle.getAuthorizedResources().stream())
+        .collect(Collectors.toSet());
+    Access access = AuthorizationUtils.authorizeAllResourceActions(
+        req,
+        Iterables.transform(resources, 
AuthorizationUtils.RESOURCE_READ_RA_GENERATOR),
+        authorizerMapper
+    );
+
+    if (access.isAllowed()) {
+      // should remove only the lifecycles in the snapshot.
+      sqlLifecycleManager.removeAll(sqlQueryId, lifecycles);
+      lifecycles.forEach(SqlLifecycle::cancel);
+      return Response.status(Status.ACCEPTED).build();
+    } else {
+      // Return 404 for authorization failures as well
+      return Response.status(Status.NOT_FOUND).build();
+    }
+  }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/SqlLifecycleManagerTest.java 
b/sql/src/test/java/org/apache/druid/sql/SqlLifecycleManagerTest.java
new file mode 100644
index 0000000..8ddef9f
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/SqlLifecycleManagerTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.sql.SqlLifecycle.State;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.List;
+
+public class SqlLifecycleManagerTest
+{
+  private SqlLifecycleManager lifecycleManager;
+
+  @Before
+  public void setup()
+  {
+    lifecycleManager = new SqlLifecycleManager();
+  }
+
+  @Test
+  public void testAddNonAuthorizedLifeCycle()
+  {
+    SqlLifecycle lifecycle = mockLifecycle(State.INITIALIZED);
+    Assert.assertThrows(AssertionError.class, () -> 
lifecycleManager.add("sqlId", lifecycle));
+  }
+
+  @Test
+  public void testAddAuthorizedLifecycle()
+  {
+    final String sqlId = "sqlId";
+    SqlLifecycle lifecycle = mockLifecycle(State.AUTHORIZED);
+    lifecycleManager.add(sqlId, lifecycle);
+    Assert.assertEquals(ImmutableList.of(lifecycle), 
lifecycleManager.getAll(sqlId));
+  }
+
+  @Test
+  public void testRemoveValidLifecycle()
+  {
+    final String sqlId = "sqlId";
+    SqlLifecycle lifecycle = mockLifecycle(State.AUTHORIZED);
+    lifecycleManager.add(sqlId, lifecycle);
+    Assert.assertEquals(ImmutableList.of(lifecycle), 
lifecycleManager.getAll(sqlId));
+    lifecycleManager.remove(sqlId, lifecycle);
+    Assert.assertEquals(ImmutableList.of(), lifecycleManager.getAll(sqlId));
+  }
+
+  @Test
+  public void testRemoveInvalidSqlQueryId()
+  {
+    final String sqlId = "sqlId";
+    SqlLifecycle lifecycle = mockLifecycle(State.AUTHORIZED);
+    lifecycleManager.add(sqlId, lifecycle);
+    Assert.assertEquals(ImmutableList.of(lifecycle), 
lifecycleManager.getAll(sqlId));
+    lifecycleManager.remove("invalid", lifecycle);
+    Assert.assertEquals(ImmutableList.of(lifecycle), 
lifecycleManager.getAll(sqlId));
+  }
+
+  @Test
+  public void testRemoveValidSqlQueryIdDifferntLifecycleObject()
+  {
+    final String sqlId = "sqlId";
+    SqlLifecycle lifecycle = mockLifecycle(State.AUTHORIZED);
+    lifecycleManager.add(sqlId, lifecycle);
+    Assert.assertEquals(ImmutableList.of(lifecycle), 
lifecycleManager.getAll(sqlId));
+    lifecycleManager.remove(sqlId, mockLifecycle(State.AUTHORIZED));
+    Assert.assertEquals(ImmutableList.of(lifecycle), 
lifecycleManager.getAll(sqlId));
+  }
+
+  @Test
+  public void testRemoveAllValidSqlQueryIdSubsetOfLifecycles()
+  {
+    final String sqlId = "sqlId";
+    final List<SqlLifecycle> lifecycles = ImmutableList.of(
+        mockLifecycle(State.AUTHORIZED),
+        mockLifecycle(State.AUTHORIZED),
+        mockLifecycle(State.AUTHORIZED)
+    );
+    lifecycles.forEach(lifecycle -> lifecycleManager.add(sqlId, lifecycle));
+    Assert.assertEquals(lifecycles, lifecycleManager.getAll(sqlId));
+    lifecycleManager.removeAll(sqlId, ImmutableList.of(lifecycles.get(0), 
lifecycles.get(1)));
+    Assert.assertEquals(ImmutableList.of(lifecycles.get(2)), 
lifecycleManager.getAll(sqlId));
+  }
+
+  @Test
+  public void testRemoveAllInvalidSqlQueryId()
+  {
+    final String sqlId = "sqlId";
+    final List<SqlLifecycle> lifecycles = ImmutableList.of(
+        mockLifecycle(State.AUTHORIZED),
+        mockLifecycle(State.AUTHORIZED),
+        mockLifecycle(State.AUTHORIZED)
+    );
+    lifecycles.forEach(lifecycle -> lifecycleManager.add(sqlId, lifecycle));
+    Assert.assertEquals(lifecycles, lifecycleManager.getAll(sqlId));
+    lifecycleManager.removeAll("invalid", ImmutableList.of(lifecycles.get(0), 
lifecycles.get(1)));
+    Assert.assertEquals(lifecycles, lifecycleManager.getAll(sqlId));
+  }
+
+  @Test
+  public void testGetAllReturnsListCopy()
+  {
+    final String sqlId = "sqlId";
+    final List<SqlLifecycle> lifecycles = ImmutableList.of(
+        mockLifecycle(State.AUTHORIZED),
+        mockLifecycle(State.AUTHORIZED),
+        mockLifecycle(State.AUTHORIZED)
+    );
+    lifecycles.forEach(lifecycle -> lifecycleManager.add(sqlId, lifecycle));
+    final List<SqlLifecycle> lifecyclesFromGetAll = 
lifecycleManager.getAll(sqlId);
+    lifecycleManager.removeAll(sqlId, lifecyclesFromGetAll);
+    Assert.assertEquals(lifecycles, lifecyclesFromGetAll);
+    Assert.assertTrue(lifecycleManager.getAll(sqlId).isEmpty());
+  }
+
+  private static SqlLifecycle mockLifecycle(State state)
+  {
+    SqlLifecycle lifecycle = Mockito.mock(SqlLifecycle.class);
+    Mockito.when(lifecycle.getState()).thenReturn(state);
+    return lifecycle;
+  }
+}
diff --git a/sql/src/test/java/org/apache/druid/sql/SqlLifecycleTest.java 
b/sql/src/test/java/org/apache/druid/sql/SqlLifecycleTest.java
index 6fd383b..6065912 100644
--- a/sql/src/test/java/org/apache/druid/sql/SqlLifecycleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/SqlLifecycleTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.query.QueryContexts;
+import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.log.RequestLogger;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.AuthConfig;
@@ -51,6 +52,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class SqlLifecycleTest
 {
@@ -65,7 +67,12 @@ public class SqlLifecycleTest
     this.plannerFactory = EasyMock.createMock(PlannerFactory.class);
     this.serviceEmitter = EasyMock.createMock(ServiceEmitter.class);
     this.requestLogger = EasyMock.createMock(RequestLogger.class);
-    this.sqlLifecycleFactory = new SqlLifecycleFactory(plannerFactory, 
serviceEmitter, requestLogger);
+    this.sqlLifecycleFactory = new SqlLifecycleFactory(
+        plannerFactory,
+        serviceEmitter,
+        requestLogger,
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
+    );
   }
 
   @Test
@@ -142,8 +149,8 @@ public class SqlLifecycleTest
     mockPlanner.close();
     EasyMock.expectLastCall();
     EasyMock.replay(plannerFactory, serviceEmitter, requestLogger, 
mockPlanner, mockPlannerContext, mockPrepareResult, mockPlanResult);
-    PlannerContext context = lifecycle.plan();
-    Assert.assertEquals(mockPlannerContext, context);
+    lifecycle.plan();
+    Assert.assertEquals(mockPlannerContext, lifecycle.getPlannerContext());
     Assert.assertEquals(SqlLifecycle.State.PLANNED, lifecycle.getState());
     EasyMock.verify(plannerFactory, serviceEmitter, requestLogger, 
mockPlanner, mockPlannerContext, mockPrepareResult, mockPlanResult);
     EasyMock.reset(plannerFactory, serviceEmitter, requestLogger, mockPlanner, 
mockPlannerContext, mockPrepareResult, mockPlanResult);
@@ -158,7 +165,8 @@ public class SqlLifecycleTest
 
     // test emit
     EasyMock.expect(mockPlannerContext.getSqlQueryId()).andReturn("id").once();
-    
EasyMock.expect(mockPlannerContext.getNativeQueryIds()).andReturn(ImmutableList.of("id")).times(2);
+    CopyOnWriteArrayList<String> nativeQueryIds = new 
CopyOnWriteArrayList<>(ImmutableList.of("id"));
+    
EasyMock.expect(mockPlannerContext.getNativeQueryIds()).andReturn(nativeQueryIds).times(2);
     
EasyMock.expect(mockPlannerContext.getAuthenticationResult()).andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT).once();
 
     serviceEmitter.emit(EasyMock.anyObject(ServiceEventBuilder.class));
@@ -169,7 +177,7 @@ public class SqlLifecycleTest
     EasyMock.expectLastCall();
     EasyMock.replay(plannerFactory, serviceEmitter, requestLogger, 
mockPlanner, mockPlannerContext, mockPrepareResult, mockPlanResult);
 
-    lifecycle.emitLogsAndMetrics(null, null, 10);
+    lifecycle.finalizeStateAndEmitLogsAndMetrics(null, null, 10);
     Assert.assertEquals(SqlLifecycle.State.DONE, lifecycle.getState());
     EasyMock.verify(plannerFactory, serviceEmitter, requestLogger, 
mockPlanner, mockPlannerContext, mockPrepareResult, mockPlanResult);
     EasyMock.reset(plannerFactory, serviceEmitter, requestLogger, mockPlanner, 
mockPlannerContext, mockPrepareResult, mockPlanResult);
@@ -244,8 +252,8 @@ public class SqlLifecycleTest
     mockPlanner.close();
     EasyMock.expectLastCall();
     EasyMock.replay(plannerFactory, serviceEmitter, requestLogger, 
mockPlanner, mockPlannerContext, mockPrepareResult, mockPlanResult);
-    PlannerContext context = lifecycle.plan();
-    Assert.assertEquals(mockPlannerContext, context);
+    lifecycle.plan();
+    Assert.assertEquals(mockPlannerContext, lifecycle.getPlannerContext());
     Assert.assertEquals(SqlLifecycle.State.PLANNED, lifecycle.getState());
     EasyMock.verify(plannerFactory, serviceEmitter, requestLogger, 
mockPlanner, mockPlannerContext, mockPrepareResult, mockPlanResult);
     EasyMock.reset(plannerFactory, serviceEmitter, requestLogger, mockPlanner, 
mockPlannerContext, mockPrepareResult, mockPlanResult);
@@ -260,7 +268,8 @@ public class SqlLifecycleTest
 
     // test emit
     EasyMock.expect(mockPlannerContext.getSqlQueryId()).andReturn("id").once();
-    
EasyMock.expect(mockPlannerContext.getNativeQueryIds()).andReturn(ImmutableList.of("id")).times(2);
+    CopyOnWriteArrayList<String> nativeQueryIds = new 
CopyOnWriteArrayList<>(ImmutableList.of("id"));
+    
EasyMock.expect(mockPlannerContext.getNativeQueryIds()).andReturn(nativeQueryIds).times(2);
     
EasyMock.expect(mockPlannerContext.getAuthenticationResult()).andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT).once();
 
     serviceEmitter.emit(EasyMock.anyObject(ServiceEventBuilder.class));
@@ -271,7 +280,7 @@ public class SqlLifecycleTest
     EasyMock.expectLastCall();
     EasyMock.replay(plannerFactory, serviceEmitter, requestLogger, 
mockPlanner, mockPlannerContext, mockPrepareResult, mockPlanResult);
 
-    lifecycle.emitLogsAndMetrics(null, null, 10);
+    lifecycle.finalizeStateAndEmitLogsAndMetrics(null, null, 10);
     Assert.assertEquals(SqlLifecycle.State.DONE, lifecycle.getState());
     EasyMock.verify(plannerFactory, serviceEmitter, requestLogger, 
mockPlanner, mockPlannerContext, mockPrepareResult, mockPlanResult);
     EasyMock.reset(plannerFactory, serviceEmitter, requestLogger, mockPlanner, 
mockPlannerContext, mockPrepareResult, mockPlanResult);
diff --git a/sql/src/test/java/org/apache/druid/sql/SqlRowTransformerTest.java 
b/sql/src/test/java/org/apache/druid/sql/SqlRowTransformerTest.java
new file mode 100644
index 0000000..1ca9db2
--- /dev/null
+++ b/sql/src/test/java/org/apache/druid/sql/SqlRowTransformerTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.DruidTypeSystem;
+import org.apache.druid.sql.calcite.util.CalciteTestBase;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.ISODateTimeFormat;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class SqlRowTransformerTest extends CalciteTestBase
+{
+  private RelDataType rowType;
+
+  @Before
+  public void setup()
+  {
+    final RelDataTypeFactory typeFactory = new 
SqlTypeFactoryImpl(DruidTypeSystem.INSTANCE);
+    rowType = typeFactory.createStructType(
+        ImmutableList.of(
+            typeFactory.createSqlType(SqlTypeName.TIMESTAMP),
+            typeFactory.createSqlType(SqlTypeName.DATE),
+            typeFactory.createSqlType(SqlTypeName.VARCHAR),
+            typeFactory.createSqlType(SqlTypeName.VARCHAR)
+        ),
+        ImmutableList.of(
+            "timestamp_col",
+            "date_col",
+            "string_col",
+            "null"
+        )
+    );
+  }
+
+  @Test
+  public void testTransformUTC()
+  {
+    SqlRowTransformer transformer = new SqlRowTransformer(
+        DateTimeZone.UTC,
+        rowType
+    );
+    DateTime timestamp = DateTimes.of("2021-08-01T12:00:00");
+    DateTime date = DateTimes.of("2021-01-01");
+    Object[] expectedRow = new Object[]{
+        ISODateTimeFormat.dateTime().print(timestamp),
+        ISODateTimeFormat.dateTime().print(date),
+        "string",
+        null
+    };
+    Object[] row = new Object[]{
+        Calcites.jodaToCalciteTimestamp(timestamp, DateTimeZone.UTC),
+        Calcites.jodaToCalciteDate(date, DateTimeZone.UTC),
+        expectedRow[2],
+        null
+    };
+    Assert.assertArrayEquals(
+        expectedRow,
+        IntStream.range(0, expectedRow.length).mapToObj(i -> 
transformer.transform(row, i)).toArray()
+    );
+  }
+
+  @Test
+  public void testTransformNonUTC()
+  {
+    DateTimeZone timeZone = DateTimes.inferTzFromString("Asia/Seoul");
+    SqlRowTransformer transformer = new SqlRowTransformer(
+        timeZone,
+        rowType
+    );
+    DateTime timestamp = new DateTime("2021-08-01T12:00:00", timeZone);
+    DateTime date = new DateTime("2021-01-01", timeZone);
+    Object[] expectedRow = new Object[]{
+        ISODateTimeFormat.dateTime().withZone(timeZone).print(timestamp),
+        ISODateTimeFormat.dateTime().withZone(timeZone).print(date),
+        "string",
+        null
+    };
+    Object[] row = new Object[]{
+        Calcites.jodaToCalciteTimestamp(timestamp, timeZone),
+        Calcites.jodaToCalciteDate(date, timeZone),
+        expectedRow[2],
+        null
+    };
+    Assert.assertArrayEquals(
+        expectedRow,
+        IntStream.range(0, expectedRow.length).mapToObj(i -> 
transformer.transform(row, i)).toArray()
+    );
+  }
+
+  @Test
+  public void testGetFieldList()
+  {
+    SqlRowTransformer transformer = new SqlRowTransformer(
+        DateTimeZone.UTC,
+        rowType
+    );
+
+    Assert.assertEquals(
+        
rowType.getFieldList().stream().map(RelDataTypeField::getName).collect(Collectors.toList()),
+        transformer.getFieldList()
+    );
+  }
+}
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index ef72cbd..3278bb6 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -39,6 +39,7 @@ import 
org.apache.calcite.avatica.server.AbstractAvaticaHandler;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.guice.GuiceInjectors;
+import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.initialization.Initialization;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Pair;
@@ -48,6 +49,8 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.server.QueryLifecycleFactory;
+import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QuerySchedulerProvider;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.RequestLogLine;
 import org.apache.druid.server.log.RequestLogger;
@@ -195,6 +198,10 @@ public abstract class DruidAvaticaHandlerTest extends 
CalciteTestBase
                       .toInstance(CalciteTests.DRUID_SCHEMA_NAME);
                 
binder.bind(AvaticaServerConfig.class).toInstance(AVATICA_CONFIG);
                 binder.bind(ServiceEmitter.class).to(NoopServiceEmitter.class);
+                
binder.bind(QuerySchedulerProvider.class).in(LazySingleton.class);
+                binder.bind(QueryScheduler.class)
+                      .toProvider(QuerySchedulerProvider.class)
+                      .in(LazySingleton.class);
               }
             }
         )
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
index 569e831..4e39482 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java
@@ -784,7 +784,8 @@ public class CalciteTests
     return new SqlLifecycleFactory(
         plannerFactory,
         new ServiceEmitter("dummy", "dummy", new NoopEmitter()),
-        new NoopRequestLogger()
+        new NoopRequestLogger(),
+        QueryStackTests.DEFAULT_NOOP_SCHEDULER
     );
   }
 
diff --git a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java 
b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java
index b5c4aa7..e80ae48 100644
--- a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java
@@ -51,6 +51,8 @@ import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.segment.join.JoinableFactory;
 import org.apache.druid.segment.loading.SegmentLoader;
+import org.apache.druid.server.QueryScheduler;
+import org.apache.druid.server.QuerySchedulerProvider;
 import org.apache.druid.server.log.NoopRequestLogger;
 import org.apache.druid.server.log.RequestLogger;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -192,7 +194,10 @@ public class SqlModuleTest
               
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupExtractorFactoryContainerProvider);
               binder.bind(JoinableFactory.class).toInstance(joinableFactory);
               binder.bind(SegmentLoader.class).toInstance(segmentLoader);
-
+              
binder.bind(QuerySchedulerProvider.class).in(LazySingleton.class);
+              binder.bind(QueryScheduler.class)
+                    .toProvider(QuerySchedulerProvider.class)
+                    .in(LazySingleton.class);
             },
             new SqlModule(props),
             new TestViewManagerModule()
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java 
b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
index e0abc58..c035f7c 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlHttpModuleTest.java
@@ -24,8 +24,11 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
+import org.apache.druid.guice.DruidGuiceExtensions;
+import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.annotations.JSR311Resource;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.sql.SqlLifecycleFactory;
 import org.easymock.EasyMockRunner;
 import org.easymock.Mock;
@@ -34,6 +37,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.util.Collections;
 import java.util.Set;
 
 @RunWith(EasyMockRunner.class)
@@ -52,11 +56,15 @@ public class SqlHttpModuleTest
   {
     target = new SqlHttpModule();
     injector = Guice.createInjector(
+        new LifecycleModule(),
+        new DruidGuiceExtensions(),
         binder -> {
           
binder.bind(ObjectMapper.class).annotatedWith(Json.class).toInstance(jsonMpper);
           
binder.bind(SqlLifecycleFactory.class).toInstance(sqlLifecycleFactory);
+          binder.bind(AuthorizerMapper.class).toInstance(new 
AuthorizerMapper(Collections.emptyMap()));
         },
-        target);
+        target
+    );
   }
 
   @Test
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java 
b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index dd0cee1..02f596a 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -29,15 +29,19 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.calcite.avatica.SqlType;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.tools.RelConversionException;
 import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.common.guava.SettableSupplier;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.NonnullPair;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.LazySequence;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryCapacityExceededException;
@@ -51,13 +55,16 @@ import 
org.apache.druid.query.ResourceLimitExceededException;
 import org.apache.druid.server.QueryScheduler;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.initialization.ServerConfig;
+import org.apache.druid.server.log.RequestLogger;
 import org.apache.druid.server.log.TestRequestLogger;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.druid.server.scheduling.HiLoQueryLaningStrategy;
 import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.ForbiddenException;
+import org.apache.druid.sql.SqlLifecycle;
 import org.apache.druid.sql.SqlLifecycleFactory;
+import org.apache.druid.sql.SqlLifecycleManager;
 import org.apache.druid.sql.SqlPlanningException.PlanningError;
 import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
@@ -79,6 +86,7 @@ import org.junit.rules.TemporaryFolder;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -87,7 +95,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -107,6 +117,12 @@ public class SqlResourceTest extends CalciteTestBase
   private SqlResource resource;
   private HttpServletRequest req;
   private ListeningExecutorService executorService;
+  private SqlLifecycleManager lifecycleManager;
+
+  private CountDownLatch lifecycleAddLatch;
+  private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
validateAndAuthorizeLatchSupplier = new SettableSupplier<>();
+  private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
planLatchSupplier = new SettableSupplier<>();
+  private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
executeLatchSupplier = new SettableSupplier<>();
 
   private boolean sleep = false;
 
@@ -204,13 +220,45 @@ public class SqlResourceTest extends CalciteTestBase
         CalciteTests.DRUID_SCHEMA_NAME
     );
 
+    lifecycleManager = new SqlLifecycleManager()
+    {
+      @Override
+      public void add(String sqlQueryId, SqlLifecycle lifecycle)
+      {
+        super.add(sqlQueryId, lifecycle);
+        if (lifecycleAddLatch != null) {
+          lifecycleAddLatch.countDown();
+        }
+      }
+    };
+    final ServiceEmitter emitter = new NoopServiceEmitter();
     resource = new SqlResource(
         JSON_MAPPER,
+        CalciteTests.TEST_AUTHORIZER_MAPPER,
         new SqlLifecycleFactory(
             plannerFactory,
-            new NoopServiceEmitter(),
-            testRequestLogger
+            emitter,
+            testRequestLogger,
+            scheduler
         )
+        {
+          @Override
+          public SqlLifecycle factorize()
+          {
+            return new TestSqlLifecycle(
+                plannerFactory,
+                emitter,
+                testRequestLogger,
+                scheduler,
+                System.currentTimeMillis(),
+                System.nanoTime(),
+                validateAndAuthorizeLatchSupplier,
+                planLatchSupplier,
+                executeLatchSupplier
+            );
+          }
+        },
+        lifecycleManager
     );
   }
 
@@ -220,6 +268,7 @@ public class SqlResourceTest extends CalciteTestBase
     walker.close();
     walker = null;
     executorService.shutdownNow();
+    executorService.awaitTermination(2, TimeUnit.SECONDS);
   }
 
   @Test
@@ -243,7 +292,7 @@ public class SqlResourceTest extends CalciteTestBase
 
     try {
       resource.doPost(
-          new SqlQuery("select count(*) from forbiddenDatasource", null, 
false, null, null),
+          createSimpleQueryWithId("id", "select count(*) from 
forbiddenDatasource"),
           testRequest
       );
       Assert.fail("doPost did not throw ForbiddenException for an unauthorized 
query");
@@ -252,13 +301,14 @@ public class SqlResourceTest extends CalciteTestBase
       // expected
     }
     Assert.assertEquals(0, testRequestLogger.getSqlQueryLogs().size());
+    Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
   }
 
   @Test
   public void testCountStar() throws Exception
   {
     final List<Map<String, Object>> rows = doPost(
-        new SqlQuery("SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo", 
null, false, null, null)
+        createSimpleQueryWithId("id", "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo 
FROM druid.foo")
     ).rhs;
 
     Assert.assertEquals(
@@ -268,6 +318,7 @@ public class SqlResourceTest extends CalciteTestBase
         rows
     );
     checkSqlRequestLog(true);
+    Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
   }
 
 
@@ -275,7 +326,10 @@ public class SqlResourceTest extends CalciteTestBase
   public void testCountStarExtendedCharacters() throws Exception
   {
     final List<Map<String, Object>> rows = doPost(
-        new SqlQuery("SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE 
dimMultivalEnumerated = 'ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ'", null, false, null, null)
+        createSimpleQueryWithId(
+            "id",
+            "SELECT COUNT(*) AS cnt FROM druid.lotsocolumns WHERE 
dimMultivalEnumerated = 'ㅑ ㅓ ㅕ ㅗ ㅛ ㅜ ㅠ ㅡ ㅣ'"
+        )
     ).rhs;
 
     Assert.assertEquals(
@@ -285,6 +339,7 @@ public class SqlResourceTest extends CalciteTestBase
         rows
     );
     checkSqlRequestLog(true);
+    Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
   }
 
   @Test
@@ -490,7 +545,11 @@ public class SqlResourceTest extends CalciteTestBase
   public void testArrayLinesResultFormat() throws Exception
   {
     final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo 
LIMIT 2";
-    final String response = doPostRaw(new SqlQuery(query, 
ResultFormat.ARRAYLINES, false, null, null)).rhs;
+    final Pair<QueryException, String> pair = doPostRaw(
+        new SqlQuery(query, ResultFormat.ARRAYLINES, false, null, null)
+    );
+    Assert.assertNull(pair.lhs);
+    final String response = pair.rhs;
     final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
     final List<String> lines = Splitter.on('\n').splitToList(response);
 
@@ -531,7 +590,11 @@ public class SqlResourceTest extends CalciteTestBase
   public void testArrayLinesResultFormatWithHeader() throws Exception
   {
     final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo 
LIMIT 2";
-    final String response = doPostRaw(new SqlQuery(query, 
ResultFormat.ARRAYLINES, true, null, null)).rhs;
+    final Pair<QueryException, String> pair = doPostRaw(
+        new SqlQuery(query, ResultFormat.ARRAYLINES, true, null, null)
+    );
+    Assert.assertNull(pair.lhs);
+    final String response = pair.rhs;
     final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
     final List<String> lines = Splitter.on('\n').splitToList(response);
 
@@ -622,7 +685,11 @@ public class SqlResourceTest extends CalciteTestBase
   public void testObjectLinesResultFormat() throws Exception
   {
     final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo 
LIMIT 2";
-    final String response = doPostRaw(new SqlQuery(query, 
ResultFormat.OBJECTLINES, false, null, null)).rhs;
+    final Pair<QueryException, String> pair = doPostRaw(
+        new SqlQuery(query, ResultFormat.OBJECTLINES, false, null, null)
+    );
+    Assert.assertNull(pair.lhs);
+    final String response = pair.rhs;
     final String nullStr = NullHandling.replaceWithDefault() ? "" : null;
     final Function<Map<String, Object>, Map<String, Object>> transformer = m 
-> {
       return Maps.transformEntries(
@@ -675,7 +742,11 @@ public class SqlResourceTest extends CalciteTestBase
   public void testCsvResultFormat() throws Exception
   {
     final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo 
LIMIT 2";
-    final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, 
false, null, null)).rhs;
+    final Pair<QueryException, String> pair = doPostRaw(
+        new SqlQuery(query, ResultFormat.CSV, false, null, null)
+    );
+    Assert.assertNull(pair.lhs);
+    final String response = pair.rhs;
     final List<String> lines = Splitter.on('\n').splitToList(response);
 
     Assert.assertEquals(
@@ -693,7 +764,11 @@ public class SqlResourceTest extends CalciteTestBase
   public void testCsvResultFormatWithHeaders() throws Exception
   {
     final String query = "SELECT *, CASE dim2 WHEN '' THEN dim2 END FROM foo 
LIMIT 2";
-    final String response = doPostRaw(new SqlQuery(query, ResultFormat.CSV, 
true, null, null)).rhs;
+    final Pair<QueryException, String> pair = doPostRaw(
+        new SqlQuery(query, ResultFormat.CSV, true, null, null)
+    );
+    Assert.assertNull(pair.lhs);
+    final String response = pair.rhs;
     final List<String> lines = Splitter.on('\n').splitToList(response);
 
     Assert.assertEquals(
@@ -736,13 +811,7 @@ public class SqlResourceTest extends CalciteTestBase
   public void testCannotParse() throws Exception
   {
     final QueryException exception = doPost(
-        new SqlQuery(
-            "FROM druid.foo",
-            ResultFormat.OBJECT,
-            false,
-            null,
-            null
-        )
+        createSimpleQueryWithId("id", "FROM druid.foo")
     ).lhs;
 
     Assert.assertNotNull(exception);
@@ -750,19 +819,14 @@ public class SqlResourceTest extends CalciteTestBase
     Assert.assertEquals(PlanningError.SQL_PARSE_ERROR.getErrorClass(), 
exception.getErrorClass());
     Assert.assertTrue(exception.getMessage().contains("Encountered \"FROM\" at 
line 1, column 1."));
     checkSqlRequestLog(false);
+    Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
   }
 
   @Test
   public void testCannotValidate() throws Exception
   {
     final QueryException exception = doPost(
-        new SqlQuery(
-            "SELECT dim4 FROM druid.foo",
-            ResultFormat.OBJECT,
-            false,
-            null,
-            null
-        )
+        createSimpleQueryWithId("id", "SELECT dim4 FROM druid.foo")
     ).lhs;
 
     Assert.assertNotNull(exception);
@@ -770,6 +834,7 @@ public class SqlResourceTest extends CalciteTestBase
     Assert.assertEquals(PlanningError.VALIDATION_ERROR.getErrorClass(), 
exception.getErrorClass());
     Assert.assertTrue(exception.getMessage().contains("Column 'dim4' not found 
in any table"));
     checkSqlRequestLog(false);
+    Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
   }
 
   @Test
@@ -777,7 +842,7 @@ public class SqlResourceTest extends CalciteTestBase
   {
     // SELECT + ORDER unsupported
     final QueryException exception = doPost(
-        new SqlQuery("SELECT dim1 FROM druid.foo ORDER BY dim1", 
ResultFormat.OBJECT, false, null, null)
+        createSimpleQueryWithId("id", "SELECT dim1 FROM druid.foo ORDER BY 
dim1")
     ).lhs;
 
     Assert.assertNotNull(exception);
@@ -788,6 +853,7 @@ public class SqlResourceTest extends CalciteTestBase
                  .contains("Cannot build plan for query: SELECT dim1 FROM 
druid.foo ORDER BY dim1")
     );
     checkSqlRequestLog(false);
+    Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
   }
 
   @Test
@@ -798,7 +864,7 @@ public class SqlResourceTest extends CalciteTestBase
             "SELECT DISTINCT dim1 FROM foo",
             ResultFormat.OBJECT,
             false,
-            ImmutableMap.of("maxMergingDictionarySize", 1),
+            ImmutableMap.of("maxMergingDictionarySize", 1, "sqlQueryId", "id"),
             null
         )
     ).lhs;
@@ -807,6 +873,7 @@ public class SqlResourceTest extends CalciteTestBase
     Assert.assertEquals(exception.getErrorCode(), 
ResourceLimitExceededException.ERROR_CODE);
     Assert.assertEquals(exception.getErrorClass(), 
ResourceLimitExceededException.class.getName());
     checkSqlRequestLog(false);
+    Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
   }
 
   @Test
@@ -815,7 +882,7 @@ public class SqlResourceTest extends CalciteTestBase
     String errorMessage = "This will be support in Druid 9999";
     SqlQuery badQuery = EasyMock.createMock(SqlQuery.class);
     EasyMock.expect(badQuery.getQuery()).andReturn("SELECT ANSWER TO LIFE");
-    EasyMock.expect(badQuery.getContext()).andReturn(ImmutableMap.of());
+    
EasyMock.expect(badQuery.getContext()).andReturn(ImmutableMap.of("sqlQueryId", 
"id"));
     EasyMock.expect(badQuery.getParameterList()).andThrow(new 
QueryUnsupportedException(errorMessage));
     EasyMock.replay(badQuery);
     final QueryException exception = doPost(badQuery).lhs;
@@ -823,6 +890,7 @@ public class SqlResourceTest extends CalciteTestBase
     Assert.assertNotNull(exception);
     Assert.assertEquals(QueryUnsupportedException.ERROR_CODE, 
exception.getErrorCode());
     Assert.assertEquals(QueryUnsupportedException.class.getName(), 
exception.getErrorClass());
+    Assert.assertTrue(lifecycleManager.getAll("id").isEmpty());
   }
 
   @Test
@@ -830,6 +898,7 @@ public class SqlResourceTest extends CalciteTestBase
   {
     sleep = true;
     final int numQueries = 3;
+    final String sqlQueryId = "tooManyRequestsTest";
 
     List<Future<Pair<QueryException, List<Map<String, Object>>>>> futures = 
new ArrayList<>(numQueries);
     for (int i = 0; i < numQueries; i++) {
@@ -840,7 +909,7 @@ public class SqlResourceTest extends CalciteTestBase
                   "SELECT COUNT(*) AS cnt, 'foo' AS TheFoo FROM druid.foo",
                   null,
                   false,
-                  ImmutableMap.of("priority", -5),
+                  ImmutableMap.of("priority", -5, "sqlQueryId", sqlQueryId),
                   null
               ),
               makeExpectedReq()
@@ -874,12 +943,14 @@ public class SqlResourceTest extends CalciteTestBase
     Assert.assertEquals(2, success);
     Assert.assertEquals(1, limited);
     Assert.assertEquals(3, testRequestLogger.getSqlQueryLogs().size());
+    Assert.assertTrue(lifecycleManager.getAll(sqlQueryId).isEmpty());
   }
 
   @Test
   public void testQueryTimeoutException() throws Exception
   {
-    Map<String, Object> queryContext = 
ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1);
+    final String sqlQueryId = "timeoutTest";
+    Map<String, Object> queryContext = 
ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1, "sqlQueryId", sqlQueryId);
     final QueryException timeoutException = doPost(
         new SqlQuery(
             "SELECT CAST(__time AS DATE), dim1, dim2, dim3 FROM druid.foo 
GROUP by __time, dim1, dim2, dim3 ORDER BY dim2 DESC",
@@ -892,7 +963,95 @@ public class SqlResourceTest extends CalciteTestBase
     Assert.assertNotNull(timeoutException);
     Assert.assertEquals(timeoutException.getErrorCode(), 
QueryTimeoutException.ERROR_CODE);
     Assert.assertEquals(timeoutException.getErrorClass(), 
QueryTimeoutException.class.getName());
+    Assert.assertTrue(lifecycleManager.getAll(sqlQueryId).isEmpty());
+
+  }
+
+  @Test
+  public void testCancelBetweenValidateAndPlan() throws Exception
+  {
+    final String sqlQueryId = "toCancel";
+    lifecycleAddLatch = new CountDownLatch(1);
+    CountDownLatch validateAndAuthorizeLatch = new CountDownLatch(1);
+    validateAndAuthorizeLatchSupplier.set(new 
NonnullPair<>(validateAndAuthorizeLatch, true));
+    CountDownLatch planLatch = new CountDownLatch(1);
+    planLatchSupplier.set(new NonnullPair<>(planLatch, false));
+    Future<Response> future = executorService.submit(
+        () -> resource.doPost(
+            createSimpleQueryWithId(sqlQueryId, "SELECT DISTINCT dim1 FROM 
foo"),
+            makeExpectedReq()
+        )
+    );
+    Assert.assertTrue(validateAndAuthorizeLatch.await(1, TimeUnit.SECONDS));
+    Assert.assertTrue(lifecycleAddLatch.await(1, TimeUnit.SECONDS));
+    Response response = resource.cancelQuery(sqlQueryId, 
mockRequestForCancel());
+    planLatch.countDown();
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    Assert.assertTrue(lifecycleManager.getAll(sqlQueryId).isEmpty());
+
+    response = future.get();
+    Assert.assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
response.getStatus());
+    QueryException exception = JSON_MAPPER.readValue((byte[]) 
response.getEntity(), QueryException.class);
+    Assert.assertEquals(
+        QueryInterruptedException.QUERY_CANCELLED,
+        exception.getErrorCode()
+    );
+  }
+
+  @Test
+  public void testCancelBetweenPlanAndExecute() throws Exception
+  {
+    final String sqlQueryId = "toCancel";
+    CountDownLatch planLatch = new CountDownLatch(1);
+    planLatchSupplier.set(new NonnullPair<>(planLatch, true));
+    CountDownLatch execLatch = new CountDownLatch(1);
+    executeLatchSupplier.set(new NonnullPair<>(execLatch, false));
+    Future<Response> future = executorService.submit(
+        () -> resource.doPost(
+            createSimpleQueryWithId(sqlQueryId, "SELECT DISTINCT dim1 FROM 
foo"),
+            makeExpectedReq()
+        )
+    );
+    Assert.assertTrue(planLatch.await(1, TimeUnit.SECONDS));
+    Response response = resource.cancelQuery(sqlQueryId, 
mockRequestForCancel());
+    execLatch.countDown();
+    Assert.assertEquals(Status.ACCEPTED.getStatusCode(), response.getStatus());
+
+    Assert.assertTrue(lifecycleManager.getAll(sqlQueryId).isEmpty());
+
+    response = future.get();
+    Assert.assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
response.getStatus());
+    QueryException exception = JSON_MAPPER.readValue((byte[]) 
response.getEntity(), QueryException.class);
+    Assert.assertEquals(
+        QueryInterruptedException.QUERY_CANCELLED,
+        exception.getErrorCode()
+    );
+  }
+
+  @Test
+  public void testCancelInvalidQuery() throws Exception
+  {
+    final String sqlQueryId = "validQuery";
+    CountDownLatch planLatch = new CountDownLatch(1);
+    planLatchSupplier.set(new NonnullPair<>(planLatch, true));
+    CountDownLatch execLatch = new CountDownLatch(1);
+    executeLatchSupplier.set(new NonnullPair<>(execLatch, false));
+    Future<Response> future = executorService.submit(
+        () -> resource.doPost(
+            createSimpleQueryWithId(sqlQueryId, "SELECT DISTINCT dim1 FROM 
foo"),
+            makeExpectedReq()
+        )
+    );
+    Assert.assertTrue(planLatch.await(1, TimeUnit.SECONDS));
+    Response response = resource.cancelQuery("invalidQuery", 
mockRequestForCancel());
+    Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), 
response.getStatus());
 
+    Assert.assertFalse(lifecycleManager.getAll(sqlQueryId).isEmpty());
+
+    execLatch.countDown();
+    response = future.get();
+    Assert.assertEquals(Status.OK.getStatusCode(), response.getStatus());
   }
 
   @SuppressWarnings("unchecked")
@@ -913,6 +1072,10 @@ public class SqlResourceTest extends CalciteTestBase
     }
   }
 
+  private static SqlQuery createSimpleQueryWithId(String sqlQueryId, String 
sql)
+  {
+    return new SqlQuery(sql, null, false, ImmutableMap.of("sqlQueryId", 
sqlQueryId), null);
+  }
 
   private Pair<QueryException, List<Map<String, Object>>> doPost(final 
SqlQuery query) throws Exception
   {
@@ -1000,4 +1163,115 @@ public class SqlResourceTest extends CalciteTestBase
     EasyMock.replay(req);
     return req;
   }
+
+  private HttpServletRequest mockRequestForCancel()
+  {
+    HttpServletRequest req = EasyMock.createNiceMock(HttpServletRequest.class);
+    EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
+            .andReturn(CalciteTests.REGULAR_USER_AUTH_RESULT)
+            .anyTimes();
+    
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
+    EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED))
+            .andReturn(null)
+            .anyTimes();
+    req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.replay(req);
+    return req;
+  }
+
+  private static class TestSqlLifecycle extends SqlLifecycle
+  {
+    private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
validateAndAuthorizeLatchSupplier;
+    private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
planLatchSupplier;
+    private final SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
executeLatchSupplier;
+
+    private TestSqlLifecycle(
+        PlannerFactory plannerFactory,
+        ServiceEmitter emitter,
+        RequestLogger requestLogger,
+        QueryScheduler queryScheduler,
+        long startMs,
+        long startNs,
+        SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
validateAndAuthorizeLatchSupplier,
+        SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
planLatchSupplier,
+        SettableSupplier<NonnullPair<CountDownLatch, Boolean>> 
executeLatchSupplier
+    )
+    {
+      super(plannerFactory, emitter, requestLogger, queryScheduler, startMs, 
startNs);
+      this.validateAndAuthorizeLatchSupplier = 
validateAndAuthorizeLatchSupplier;
+      this.planLatchSupplier = planLatchSupplier;
+      this.executeLatchSupplier = executeLatchSupplier;
+    }
+
+    @Override
+    public void validateAndAuthorize(HttpServletRequest req)
+    {
+      if (validateAndAuthorizeLatchSupplier.get() != null) {
+        if (validateAndAuthorizeLatchSupplier.get().rhs) {
+          super.validateAndAuthorize(req);
+          validateAndAuthorizeLatchSupplier.get().lhs.countDown();
+        } else {
+          try {
+            if (!validateAndAuthorizeLatchSupplier.get().lhs.await(1, 
TimeUnit.SECONDS)) {
+              throw new RuntimeException("Latch timed out");
+            }
+          }
+          catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+          super.validateAndAuthorize(req);
+        }
+      } else {
+        super.validateAndAuthorize(req);
+      }
+    }
+
+    @Override
+    public void plan() throws RelConversionException
+    {
+      if (planLatchSupplier.get() != null) {
+        if (planLatchSupplier.get().rhs) {
+          super.plan();
+          planLatchSupplier.get().lhs.countDown();
+        } else {
+          try {
+            if (!planLatchSupplier.get().lhs.await(1, TimeUnit.SECONDS)) {
+              throw new RuntimeException("Latch timed out");
+            }
+          }
+          catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+          super.plan();
+        }
+      } else {
+        super.plan();
+      }
+    }
+
+    @Override
+    public Sequence<Object[]> execute()
+    {
+      if (executeLatchSupplier.get() != null) {
+        if (executeLatchSupplier.get().rhs) {
+          Sequence<Object[]> sequence = super.execute();
+          executeLatchSupplier.get().lhs.countDown();
+          return sequence;
+        } else {
+          try {
+            if (!executeLatchSupplier.get().lhs.await(1, TimeUnit.SECONDS)) {
+              throw new RuntimeException("Latch timed out");
+            }
+          }
+          catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+          return super.execute();
+        }
+      } else {
+        return super.execute();
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to