[ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup

- user model changes: no
- storage format changes: no
- interface changes: yes
  --IJobManager.cancel now takes a callback

details:
- Job cancellation now completes only after the job cleanup work
  has completed and not merely when the abort tasks are executed.
- The NCQueryServiceServlet actively cancels requests that passes
  5 minutes.
- Cancellation of timedout jobs is not done through the Http API
  but through message broker.
- Typically, requests might timeout when the servers are
  overloaded. When that is the case, there is a high chance http
  requests are to be rejected including requests to cancel
  previously submitted queries. This is the reason for using
  Message broker for this task.
- ExecuteStatementRequest used to execute the statement in
  a different executor thread even though it is itself is being
  executed in an executor thread and is not blocking anyone.
  This was fixed as well.

Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1961
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/87411c22
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/87411c22
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/87411c22

Branch: refs/heads/master
Commit: 87411c22cc30fb425a4839b20b3a259f28d1ef79
Parents: 5eb1303
Author: Abdullah Alamoudi <[email protected]>
Authored: Thu Aug 24 08:51:03 2017 -0700
Committer: abdullah alamoudi <[email protected]>
Committed: Thu Aug 24 11:09:08 2017 -0700

----------------------------------------------------------------------
 .../http/server/AbstractQueryApiServlet.java    |   4 +-
 .../asterix/api/http/server/ApiServlet.java     |   4 +-
 .../api/http/server/ClusterApiServlet.java      |   2 +-
 .../ClusterControllerDetailsApiServlet.java     |   2 +-
 .../api/http/server/ConnectorApiServlet.java    |   2 +-
 .../api/http/server/DiagnosticsApiServlet.java  |   3 +-
 .../asterix/api/http/server/Duration.java       | 236 +++++++++++++++++++
 .../api/http/server/NCQueryServiceServlet.java  |  43 +++-
 .../server/NodeControllerDetailsApiServlet.java |   2 +-
 .../http/server/QueryCancellationServlet.java   |   1 -
 .../api/http/server/QueryServiceServlet.java    |  39 +--
 .../api/http/server/RebalanceApiServlet.java    |   2 +-
 .../asterix/api/http/server/RestApiServlet.java |   4 +-
 .../api/http/server/ServletConstants.java       |  31 +++
 .../api/http/server/ShutdownApiServlet.java     |   2 +-
 .../api/http/server/VersionApiServlet.java      |   2 +-
 .../api/http/servlet/ServletConstants.java      |  31 ---
 .../app/active/ActiveEntityEventsListener.java  | 160 ++-----------
 .../apache/asterix/app/active/RecoveryTask.java | 188 +++++++++++++++
 .../asterix/app/message/CancelQueryRequest.java |  74 ++++++
 .../app/message/CancelQueryResponse.java        |  44 ++++
 .../message/ExecuteStatementRequestMessage.java |  90 ++++---
 .../ExecuteStatementResponseMessage.java        |   4 -
 .../hyracks/bootstrap/CCApplication.java        |  19 +-
 .../http/servlet/ConnectorApiServletTest.java   |   1 +
 .../servlet/QueryCancellationServletTest.java   |   1 +
 .../http/servlet/QueryServiceServletTest.java   |  46 ----
 .../api/http/servlet/VersionApiServletTest.java |   4 +-
 .../asterix/runtime/ParseDurationTest.java      | 161 +++++++++++++
 .../asterix/common/exceptions/ErrorCode.java    |   3 +
 .../main/resources/asx_errormsg/en.properties   |   3 +
 asterixdb/asterix-metadata/pom.xml              |   3 +-
 .../client/HyracksClientInterfaceFunctions.java |   3 +
 .../control/cc/executor/JobExecutor.java        |  24 +-
 .../hyracks/control/cc/job/IJobManager.java     |   5 +-
 .../hyracks/control/cc/job/JobManager.java      |  12 +-
 .../hyracks/control/cc/work/CancelJobWork.java  |   5 +-
 .../hyracks/control/cc/work/JobCleanupWork.java |   8 +-
 .../hyracks/control/cc/job/JobManagerTest.java  |   7 +-
 .../control/common/work/FutureValue.java        |   1 +
 .../control/common/work/NoOpCallback.java       |  38 +++
 41 files changed, 949 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
index a4e72f7..c38b3fc 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
 
 import java.io.PrintWriter;
 import java.util.UUID;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
index 7874aa3..58c282f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
 
 import java.awt.image.BufferedImage;
 import java.io.BufferedReader;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
index b24a9a1..4faab1e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
index 52d4d67..6dea30c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index d9a63f7..52af643 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
index dcd0e70..9d2415d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -34,7 +34,6 @@ import java.util.concurrent.Future;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.http.api.IServletRequest;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java
new file mode 100644
index 0000000..bdda750
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.Triple;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public enum Duration {
+    SEC("s", 9),
+    MILLI("ms", 6),
+    MICRO("µs", 3),
+    NANO("ns", 0);
+
+    static final long NANOSECONDS = 1;
+    static final long MICROSECONDS = 1000 * NANOSECONDS;
+    static final long MILLISECONDS = 1000 * MICROSECONDS;
+    static final long SECONDS = 1000 * MILLISECONDS;
+    static final long MINUTES = 60 * SECONDS;
+    static final long HOURS = 60 * MINUTES;
+
+    String unit;
+    int nanoDigits;
+
+    Duration(String unit, int nanoDigits) {
+        this.unit = unit;
+        this.nanoDigits = nanoDigits;
+    }
+
+    public static String formatNanos(long nanoTime) {
+        final String strTime = String.valueOf(nanoTime);
+        final int len = strTime.length();
+        for (Duration tu : Duration.values()) {
+            if (len > tu.nanoDigits) {
+                final String integer = strTime.substring(0, len - 
tu.nanoDigits);
+                final String fractional = strTime.substring(len - 
tu.nanoDigits);
+                return integer + (fractional.length() > 0 ? "." + fractional : 
"") + tu.unit;
+            }
+        }
+        return "illegal string value: " + strTime;
+    }
+
+    // ParseDuration parses a duration string.
+    // A duration string is a possibly signed sequence of
+    // decimal numbers, each with optional fraction and a unit suffix,
+    // such as "300ms", "-1.5h" or "2h45m".
+    // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
+    // returns the duration in nano seconds
+    public static long parseDurationStringToNanos(String orig) throws 
HyracksDataException {
+        // [-+]?([0-9]*(\.[0-9]*)?[a-z]+)+
+        String s = orig;
+        long d = 0;
+        boolean neg = false;
+        char c;
+        // Consume [-+]?
+        if (!s.isEmpty()) {
+            c = s.charAt(0);
+            if (c == '-' || c == '+') {
+                neg = c == '-';
+                s = s.substring(1);
+            }
+        }
+
+        // Special case: if all that is left is "0", this is zero.
+        if ("0".equals(s)) {
+            return 0L;
+        }
+
+        if (s.isEmpty()) {
+            throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig);
+        }
+
+        while (!s.isEmpty()) {
+            long v = 0L; // integers before decimal
+            long f = 0L; // integers after decimal
+            double scale = 1.0; // value = v + f/scale
+            // The next character must be [0-9.]
+            if (!(s.charAt(0) == '.' || '0' <= s.charAt(0) && s.charAt(0) <= 
'9')) {
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, 
orig);
+            }
+            // Consume [0-9]*
+            int pl = s.length();
+            Pair<Long, String> pair = leadingInt(s);
+            v = pair.getLeft();
+            s = pair.getRight();
+            boolean pre = pl != s.length(); // whether we consumed anything 
before a period
+
+            // Consume (\.[0-9]*)?
+            boolean post = false;
+            if (!s.isEmpty() && s.charAt(0) == '.') {
+                s = s.substring(1);
+                pl = s.length();
+                Triple<Long, Double, String> triple = leadingFraction(s);
+                f = triple.getLeft();
+                scale = triple.getMiddle();
+                s = triple.getRight();
+                post = pl != s.length();
+            }
+            if (!pre && !post) {
+                // no digits (e.g. ".s" or "-.s")
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, 
orig);
+            }
+
+            // Consume unit.
+            int i = 0;
+            for (; i < s.length(); i++) {
+                c = s.charAt(i);
+                if (c == '.' || '0' <= c && c <= '9') {
+                    break;
+                }
+            }
+            if (i == 0) {
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, 
orig);
+            }
+            String u = s.substring(0, i);
+            s = s.substring(i);
+            long unit = getUnit(u);
+            if (v > Long.MAX_VALUE / unit) {
+                // overflow
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, 
orig);
+            }
+            v *= unit;
+            if (f > 0) {
+                // float64 is needed to be nanosecond accurate for fractions 
of hours.
+                // v >= 0 && (f*unit/scale) <= 3.6e+12 (ns/h, h is the largest 
unit)
+                v += (long) (((double) f * (double) unit) / scale);
+                if (v < 0) {
+                    // overflow
+                    throw new RuntimeDataException(ErrorCode.INVALID_DURATION, 
orig);
+                }
+            }
+            d += v;
+            if (d < 0) {
+                // overflow
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, 
orig);
+            }
+        }
+
+        if (neg) {
+            d = -d;
+        }
+        return d;
+    }
+
+    private static final long getUnit(String unit) throws HyracksDataException 
{
+        switch (unit) {
+            case "ns":
+                return NANOSECONDS;
+            case "us":
+            case "µs":// U+00B5 = micro symbol
+            case "μs":// U+03BC = Greek letter mu
+                return MICROSECONDS;
+            case "ms":
+                return MILLISECONDS;
+            case "s":
+                return SECONDS;
+            case "m":
+                return MINUTES;
+            case "h":
+                return HOURS;
+            default:
+                throw new 
RuntimeDataException(ErrorCode.UNKNOWN_DURATION_UNIT, unit);
+        }
+    }
+
+    // leadingInt consumes the leading [0-9]* from s.
+    static Pair<Long, String> leadingInt(String origin) throws 
HyracksDataException {
+        String s = origin;
+        long x = 0L;
+        int i = 0;
+        for (; i < s.length(); i++) {
+            char c = s.charAt(i);
+            if (c < '0' || c > '9') {
+                break;
+            }
+            if (x > Long.MAX_VALUE / 10) {
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, 
origin);
+            }
+            x = x * 10 + Character.getNumericValue(c);
+            if (x < 0) {
+                throw new RuntimeDataException(ErrorCode.INVALID_DURATION, 
origin);
+            }
+        }
+        return Pair.of(x, s.substring(i));
+    }
+
+    // leadingFraction consumes the leading [0-9]* from s.
+    // It is used only for fractions, so does not return an error on overflow,
+    // it just stops accumulating precision.
+    static Triple<Long, Double, String> leadingFraction(String s) {
+        int i = 0;
+        long x = 0L;
+        double scale = 1.0;
+        boolean overflow = false;
+        for (; i < s.length(); i++) {
+            char c = s.charAt(i);
+            if (c < '0' || c > '9') {
+                break;
+            }
+            if (overflow) {
+                continue;
+            }
+            if (x > (1 << 63 - 1) / 10) {
+                // It's possible for overflow to give a positive number, so 
take care.
+                overflow = true;
+                continue;
+            }
+            long y = x * 10 + Character.getNumericValue(c);
+            if (y < 0) {
+                overflow = true;
+                continue;
+            }
+            x = y;
+            scale *= 10;
+        }
+        return Triple.of(x, scale, s.substring(i));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 9547514..ef49c35 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -19,17 +19,20 @@
 
 package org.apache.asterix.api.http.server;
 
+import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 import java.util.logging.Level;
 
-import io.netty.handler.codec.http.HttpResponseStatus;
 import org.apache.asterix.algebra.base.ILangExtension;
+import org.apache.asterix.app.message.CancelQueryRequest;
 import org.apache.asterix.app.message.ExecuteStatementRequestMessage;
 import org.apache.asterix.app.message.ExecuteStatementResponseMessage;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.MessageFuture;
 import org.apache.asterix.om.types.ARecordType;
@@ -41,11 +44,14 @@ import org.apache.hyracks.api.dataset.ResultSetId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.ipc.exceptions.IPCException;
 
+import io.netty.handler.codec.http.HttpResponseStatus;
+
 /**
  * Query service servlet that can run on NC nodes.
  * Delegates query execution to CC, then serves the result.
  */
 public class NCQueryServiceServlet extends QueryServiceServlet {
+
     public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, IApplicationContext appCtx,
             ILangExtension.Language queryLanguage) {
         super(ctx, paths, appCtx, queryLanguage, null, null, null);
@@ -63,13 +69,28 @@ public class NCQueryServiceServlet extends 
QueryServiceServlet {
         ExecuteStatementResponseMessage responseMsg;
         MessageFuture responseFuture = ncMb.registerMessageFuture();
         try {
+            if (param.clientContextID == null) {
+                param.clientContextID = UUID.randomUUID().toString();
+            }
+            long timeout = 
ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS;
+            if (param.timeout != null) {
+                timeout = java.util.concurrent.TimeUnit.NANOSECONDS
+                        
.toMillis(Duration.parseDurationStringToNanos(param.timeout));
+            }
             ExecuteStatementRequestMessage requestMsg =
                     new ExecuteStatementRequestMessage(ncCtx.getNodeId(), 
responseFuture.getFutureId(), queryLanguage,
                             statementsText, sessionOutput.config(), 
ccDelivery, param.clientContextID, handleUrl);
             outExecStartEnd[0] = System.nanoTime();
             ncMb.sendMessageToCC(requestMsg);
-            responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(
-                    ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, 
java.util.concurrent.TimeUnit.MILLISECONDS);
+            try {
+                responseMsg = (ExecuteStatementResponseMessage) 
responseFuture.get(timeout,
+                        java.util.concurrent.TimeUnit.MILLISECONDS);
+            } catch (TimeoutException exception) {
+                RuntimeDataException hde = new 
RuntimeDataException(ErrorCode.QUERY_TIMEOUT, exception);
+                // cancel query
+                cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, 
hde);
+                throw hde;
+            }
             outExecStartEnd[1] = System.nanoTime();
         } finally {
             ncMb.deregisterMessageFuture(responseFuture.getFutureId());
@@ -97,6 +118,22 @@ public class NCQueryServiceServlet extends 
QueryServiceServlet {
         }
     }
 
+    private void cancelQuery(INCMessageBroker messageBroker, String nodeId, 
String clientContextID,
+            Exception exception) {
+        MessageFuture cancelQueryFuture = 
messageBroker.registerMessageFuture();
+        try {
+            CancelQueryRequest cancelQueryMessage =
+                    new CancelQueryRequest(nodeId, 
cancelQueryFuture.getFutureId(), clientContextID);
+            messageBroker.sendMessageToCC(cancelQueryMessage);
+            
cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS,
+                    java.util.concurrent.TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            exception.addSuppressed(e);
+        } finally {
+            
messageBroker.deregisterMessageFuture(cancelQueryFuture.getFutureId());
+        }
+    }
+
     @Override
     protected HttpResponseStatus handleExecuteStatementException(Throwable t) {
         if (t instanceof IPCException || t instanceof TimeoutException) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
index d9757c7..6291869 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
index bfec146..3f07151 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
@@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index 1f1d282..c630636 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -27,7 +27,6 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.algebra.base.ILangExtension;
-import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.config.GlobalConfig;
@@ -108,7 +107,8 @@ public class QueryServiceServlet extends 
AbstractQueryApiServlet {
         FORMAT("format"),
         CLIENT_ID("client_context_id"),
         PRETTY("pretty"),
-        MODE("mode");
+        MODE("mode"),
+        TIMEOUT("timeout");
 
         private final String str;
 
@@ -154,39 +154,12 @@ public class QueryServiceServlet extends 
AbstractQueryApiServlet {
         }
     }
 
-    public enum TimeUnit {
-        SEC("s", 9),
-        MILLI("ms", 6),
-        MICRO("µs", 3),
-        NANO("ns", 0);
-
-        String unit;
-        int nanoDigits;
-
-        TimeUnit(String unit, int nanoDigits) {
-            this.unit = unit;
-            this.nanoDigits = nanoDigits;
-        }
-
-        public static String formatNanos(long nanoTime) {
-            final String strTime = String.valueOf(nanoTime);
-            final int len = strTime.length();
-            for (TimeUnit tu : TimeUnit.values()) {
-                if (len > tu.nanoDigits) {
-                    final String integer = strTime.substring(0, len - 
tu.nanoDigits);
-                    final String fractional = strTime.substring(len - 
tu.nanoDigits);
-                    return integer + (fractional.length() > 0 ? "." + 
fractional : "") + tu.unit;
-                }
-            }
-            return "illegal string value: " + strTime;
-        }
-    }
-
     static class RequestParameters {
         String host;
         String path;
         String statement;
         String format;
+        String timeout;
         boolean pretty;
         String clientContextID;
         String mode;
@@ -202,6 +175,7 @@ public class QueryServiceServlet extends 
AbstractQueryApiServlet {
                 on.put("pretty", pretty);
                 on.put("mode", mode);
                 on.put("clientContextID", clientContextID);
+                on.put("format", format);
                 return om.writer(new 
MinimalPrettyPrinter()).writeValueAsString(on);
             } catch (JsonProcessingException e) { // NOSONAR
                 return e.getMessage();
@@ -297,9 +271,9 @@ public class QueryServiceServlet extends 
AbstractQueryApiServlet {
         pw.print(ResultFields.METRICS.str());
         pw.print("\": {\n");
         pw.print("\t");
-        ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), 
TimeUnit.formatNanos(elapsedTime));
+        ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), 
Duration.formatNanos(elapsedTime));
         pw.print("\t");
-        ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), 
TimeUnit.formatNanos(executionTime));
+        ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), 
Duration.formatNanos(executionTime));
         pw.print("\t");
         ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount, 
true);
         pw.print("\t");
@@ -334,6 +308,7 @@ public class QueryServiceServlet extends 
AbstractQueryApiServlet {
                 param.pretty = getOptBoolean(jsonRequest, 
Parameter.PRETTY.str(), false);
                 param.mode = toLower(getOptText(jsonRequest, 
Parameter.MODE.str()));
                 param.clientContextID = getOptText(jsonRequest, 
Parameter.CLIENT_ID.str());
+                param.timeout = getOptText(jsonRequest, 
Parameter.TIMEOUT.str());
             } catch (JsonParseException | JsonMappingException e) {
                 // if the JSON parsing fails, the statement is empty and we 
get an empty statement error
                 GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), 
e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
index 27e2806..8536571 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.PrintWriter;
 import java.util.ArrayDeque;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
index 18aae8e..1a7918c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR;
 
 import java.io.IOException;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
new file mode 100644
index 0000000..2fe37c3
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+public class ServletConstants {
+    public static final String HYRACKS_CONNECTION_ATTR = 
"org.apache.asterix.HYRACKS_CONNECTION";
+    public static final String HYRACKS_DATASET_ATTR = 
"org.apache.asterix.HYRACKS_DATASET";
+    public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = 
"org.apache.asterix.APP_CONTEXT_INFO";
+    public static final String EXECUTOR_SERVICE_ATTR = 
"org.apache.asterix.EXECUTOR_SERVICE";
+    public static final String RUNNING_QUERIES_ATTR = 
"org.apache.asterix.RUNINNG_QUERIES";
+    public static final String SERVICE_CONTEXT_ATTR = 
"org.apache.asterix.SERVICE_CONTEXT";
+
+    private ServletConstants() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
index fdd106d..06e2383 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
index 5acba381..eeef8e8 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.api.http.server;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
 
 import java.io.IOException;
 import java.io.PrintWriter;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
deleted file mode 100644
index b815d76..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.api.http.servlet;
-
-public class ServletConstants {
-    public static final String HYRACKS_CONNECTION_ATTR = 
"org.apache.asterix.HYRACKS_CONNECTION";
-    public static final String HYRACKS_DATASET_ATTR = 
"org.apache.asterix.HYRACKS_DATASET";
-    public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = 
"org.apache.asterix.APP_CONTEXT_INFO";
-    public static final String EXECUTOR_SERVICE_ATTR = 
"org.apache.asterix.EXECUTOR_SERVICE";
-    public static final String RUNNING_QUERIES_ATTR = 
"org.apache.asterix.RUNINNG_QUERIES";
-    public static final String SERVICE_CONTEXT_ATTR = 
"org.apache.asterix.SERVICE_CONTEXT";
-
-    private ServletConstants() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index a16f678..3b4b974 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -34,14 +34,11 @@ import org.apache.asterix.active.ActiveRuntimeId;
 import org.apache.asterix.active.ActivityState;
 import org.apache.asterix.active.EntityId;
 import org.apache.asterix.active.IActiveEntityEventSubscriber;
-import org.apache.asterix.active.IRetryPolicy;
 import org.apache.asterix.active.IRetryPolicyFactory;
 import org.apache.asterix.active.NoRetryPolicyFactory;
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.active.message.ActivePartitionMessage.Event;
 import org.apache.asterix.active.message.StatsRequestMessage;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
-import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -53,8 +50,6 @@ import 
org.apache.asterix.external.feed.watch.WaitForStateSubscriber;
 import org.apache.asterix.metadata.api.IActiveEntityController;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.utils.DatasetUtil;
-import org.apache.asterix.metadata.utils.MetadataLockUtil;
 import org.apache.asterix.translator.IStatementExecutor;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -94,8 +89,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
     protected boolean isFetchingStats;
     protected int numRegistered;
     protected int numDeRegistered;
-    protected volatile Future<Void> recoveryTask;
-    protected volatile boolean cancelRecovery;
+    protected volatile RecoveryTask rt;
     protected volatile boolean suspended = false;
     // failures
     protected Exception jobFailure;
@@ -199,7 +193,8 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
             jobFailure = exceptions.isEmpty() ? new 
RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
             setState(ActivityState.TEMPORARILY_FAILED);
-            if (prevState != ActivityState.SUSPENDING && prevState != 
ActivityState.RECOVERING) {
+            if (prevState != ActivityState.SUSPENDING && prevState != 
ActivityState.RECOVERING
+                    && prevState != ActivityState.RESUMING) {
                 recover();
             }
         } else {
@@ -356,112 +351,16 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
     @Override
     public synchronized void recover() throws HyracksDataException {
         LOGGER.log(level, "Recover is called on " + entityId);
-        if (recoveryTask != null) {
-            LOGGER.log(level, "But recovery task for " + entityId + " is 
already there!! throwing an exception");
-            throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS);
-        }
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
             LOGGER.log(level, "But it has no recovery policy, so it is set to 
permanent failure");
             setState(ActivityState.PERMANENTLY_FAILED);
         } else {
             ExecutorService executor = 
appCtx.getServiceContext().getControllerService().getExecutor();
-            IRetryPolicy policy = retryPolicyFactory.create(this);
-            cancelRecovery = false;
             setState(ActivityState.TEMPORARILY_FAILED);
             LOGGER.log(level, "Recovery task has been submitted");
-            recoveryTask = executor.submit(() -> {
-                String nameBefore = Thread.currentThread().getName();
-                try {
-                    Thread.currentThread().setName("RecoveryTask (" + entityId 
+ ")");
-                    doRecover(policy);
-                } finally {
-                    Thread.currentThread().setName(nameBefore);
-                }
-                return null;
-            });
-        }
-    }
-
-    protected Void doRecover(IRetryPolicy policy)
-            throws AlgebricksException, HyracksDataException, 
InterruptedException {
-        LOGGER.log(level, "Actual Recovery task has started");
-        if (getState() != ActivityState.TEMPORARILY_FAILED) {
-            LOGGER.log(level, "but its state is not temp failure and so we're 
just returning");
-            return null;
-        }
-        LOGGER.log(level, "calling the policy");
-        while (policy.retry()) {
-            synchronized (this) {
-                if (cancelRecovery) {
-                    recoveryTask = null;
-                    notifyAll();
-                    return null;
-                }
-                while (clusterStateManager.getState() != ClusterState.ACTIVE) {
-                    if (cancelRecovery) {
-                        recoveryTask = null;
-                        notifyAll();
-                        return null;
-                    }
-                    wait();
-                }
-            }
-            waitForNonTransitionState();
-            IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
-            
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
-                    entityId.getDataverse() + '.' + entityId.getEntityName());
-            for (Dataset dataset : getDatasets()) {
-                MetadataLockUtil.modifyDatasetBegin(lockManager, 
metadataProvider.getLocks(),
-                        dataset.getDataverseName(), 
DatasetUtil.getFullyQualifiedName(dataset));
-            }
-            synchronized (this) {
-                try {
-                    if (cancelRecovery) {
-                        recoveryTask = null;
-                        notifyAll();
-                        return null;
-                    }
-                    setState(ActivityState.RECOVERING);
-                    doStart(metadataProvider);
-                    recoveryTask = null;
-                    notifyAll();
-                    return null;
-                } catch (Exception e) {
-                    LOGGER.log(level, "Attempt to revive " + entityId + " 
failed", e);
-                    setState(ActivityState.TEMPORARILY_FAILED);
-                    recoverFailure = e;
-                } finally {
-                    metadataProvider.getLocks().reset();
-                }
-                notifyAll();
-            }
-        }
-        // Recovery task is essntially over now either through failure or 
through cancellation(stop)
-        synchronized (this) {
-            recoveryTask = null;
-            notifyAll();
-            if (state != ActivityState.TEMPORARILY_FAILED) {
-                return null;
-            }
-        }
-        IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
-        try {
-            
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
-                    entityId.getDataverse() + '.' + entityId.getEntityName());
-            for (Dataset dataset : getDatasets()) {
-                MetadataLockUtil.modifyDatasetBegin(lockManager, 
metadataProvider.getLocks(), dataset.getDatasetName(),
-                        DatasetUtil.getFullyQualifiedName(dataset));
-            }
-            synchronized (this) {
-                if (state == ActivityState.TEMPORARILY_FAILED) {
-                    setState(ActivityState.PERMANENTLY_FAILED);
-                }
-                notifyAll();
-            }
-        } finally {
-            metadataProvider.getLocks().reset();
+            rt = new RecoveryTask(appCtx, this, retryPolicyFactory);
+            executor.submit(rt.recover());
         }
-        return null;
     }
 
     @Override
@@ -503,13 +402,10 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, 
state);
         }
         if (state == ActivityState.TEMPORARILY_FAILED || state == 
ActivityState.PERMANENTLY_FAILED) {
-            if (recoveryTask != null) {
+            if (rt != null) {
                 setState(ActivityState.STOPPING);
-                cancelRecovery = true;
-                recoveryTask.cancel(true);
-                while (recoveryTask != null) {
-                    wait();
-                }
+                rt.cancel();
+                rt = null;
             }
             setState(ActivityState.STOPPED);
             try {
@@ -533,6 +429,10 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
         }
     }
 
+    public RecoveryTask getRecoveryTask() {
+        return rt;
+    }
+
     @Override
     public void suspend(MetadataProvider metadataProvider) throws 
HyracksDataException, InterruptedException {
         WaitForStateSubscriber subscriber;
@@ -602,8 +502,9 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
             setState(ActivityState.RESUMING);
             WaitForStateSubscriber subscriber = new 
WaitForStateSubscriber(this,
                     EnumSet.of(ActivityState.RUNNING, 
ActivityState.TEMPORARILY_FAILED));
-            recoveryTask = 
metadataProvider.getApplicationContext().getServiceContext().getControllerService()
-                    .getExecutor().submit(() -> 
resumeOrRecover(metadataProvider));
+            rt = new RecoveryTask(appCtx, this, retryPolicyFactory);
+            
metadataProvider.getApplicationContext().getServiceContext().getControllerService().getExecutor()
+                    .submit(() -> rt.resumeOrRecover(metadataProvider));
             try {
                 subscriber.sync();
             } catch (Exception e) {
@@ -616,28 +517,6 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
         }
     }
 
-    protected Void resumeOrRecover(MetadataProvider metadataProvider)
-            throws HyracksDataException, AlgebricksException, 
InterruptedException {
-        try {
-            doResume(metadataProvider);
-            synchronized (this) {
-                setState(ActivityState.RUNNING);
-                recoveryTask = null;
-            }
-        } catch (Exception e) {
-            LOGGER.log(Level.WARNING, "First attempt to resume " + entityId + 
" Failed", e);
-            setState(ActivityState.TEMPORARILY_FAILED);
-            if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
-                setState(ActivityState.PERMANENTLY_FAILED);
-            } else {
-                IRetryPolicy policy = retryPolicyFactory.create(this);
-                cancelRecovery = false;
-                doRecover(policy);
-            }
-        }
-        return null;
-    }
-
     @Override
     public boolean isActive() {
         return state != ActivityState.STOPPED && state != 
ActivityState.PERMANENTLY_FAILED;
@@ -652,15 +531,6 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
         this.locations = locations;
     }
 
-    public Future<Void> getRecoveryTask() {
-        return recoveryTask;
-    }
-
-    public synchronized void cancelRecovery() {
-        cancelRecovery = true;
-        notifyAll();
-    }
-
     @Override
     public Exception getJobFailure() {
         return jobFailure;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
new file mode 100644
index 0000000..7b7de93
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.active;
+
+import java.util.concurrent.Callable;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.active.ActivityState;
+import org.apache.asterix.active.IRetryPolicy;
+import org.apache.asterix.active.IRetryPolicyFactory;
+import org.apache.asterix.active.NoRetryPolicyFactory;
+import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.IMetadataLockManager;
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.MetadataLockUtil;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class RecoveryTask {
+
+    private static final Logger LOGGER = 
Logger.getLogger(RecoveryTask.class.getName());
+    private static final Level level = Level.INFO;
+    private final ActiveEntityEventsListener listener;
+    private volatile boolean cancelRecovery = false;
+    private final IRetryPolicyFactory retryPolicyFactory;
+    private final MetadataProvider metadataProvider;
+    private final IClusterStateManager clusterStateManager;
+    private Exception failure;
+
+    public RecoveryTask(ICcApplicationContext appCtx, 
ActiveEntityEventsListener listener,
+            IRetryPolicyFactory retryPolicyFactory) {
+        this.listener = listener;
+        this.retryPolicyFactory = retryPolicyFactory;
+        this.metadataProvider = new MetadataProvider(appCtx, null);
+        this.clusterStateManager = appCtx.getClusterStateManager();
+    }
+
+    public Callable<Void> recover() {
+        if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
+            return () -> {
+                return null;
+            };
+        }
+        IRetryPolicy policy = retryPolicyFactory.create(listener);
+        return () -> {
+            String nameBefore = Thread.currentThread().getName();
+            try {
+                Thread.currentThread().setName("RecoveryTask (" + 
listener.getEntityId() + ")");
+                doRecover(policy);
+            } finally {
+                Thread.currentThread().setName(nameBefore);
+            }
+            return null;
+        };
+    }
+
+    public void cancel() {
+        cancelRecovery = true;
+    }
+
+    protected Void resumeOrRecover(MetadataProvider metadataProvider)
+            throws HyracksDataException, AlgebricksException, 
InterruptedException {
+        try {
+            synchronized (listener) {
+                listener.doResume(metadataProvider);
+                listener.setState(ActivityState.RUNNING);
+            }
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "First attempt to resume " + 
listener.getEntityId() + " Failed", e);
+            synchronized (listener) {
+                if (listener.getState() == ActivityState.RESUMING) {
+                    // This will be the case if compilation failure
+                    // If the failure is a runtime failure, then the state
+                    // would've been set to temporarily failed already
+                    listener.setState(ActivityState.TEMPORARILY_FAILED);
+                }
+            }
+            if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
+                synchronized (listener) {
+                    if (!cancelRecovery) {
+                        listener.setState(ActivityState.PERMANENTLY_FAILED);
+                    }
+                }
+            } else {
+                IRetryPolicy policy = retryPolicyFactory.create(listener);
+                doRecover(policy);
+            }
+        }
+        return null;
+    }
+
+    protected Void doRecover(IRetryPolicy policy)
+            throws AlgebricksException, HyracksDataException, 
InterruptedException {
+        LOGGER.log(level, "Actual Recovery task has started");
+        if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
+            LOGGER.log(level, "but its state is not temp failure and so we're 
just returning");
+            return null;
+        }
+        LOGGER.log(level, "calling the policy");
+        while (policy.retry()) {
+            synchronized (listener) {
+                if (cancelRecovery) {
+                    return null;
+                }
+                while (clusterStateManager.getState() != ClusterState.ACTIVE) {
+                    if (cancelRecovery) {
+                        return null;
+                    }
+                    wait();
+                }
+            }
+            IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
+            
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+                    listener.getEntityId().getDataverse() + '.' + 
listener.getEntityId().getEntityName());
+            for (Dataset dataset : listener.getDatasets()) {
+                MetadataLockUtil.modifyDatasetBegin(lockManager, 
metadataProvider.getLocks(),
+                        dataset.getDataverseName(), 
DatasetUtil.getFullyQualifiedName(dataset));
+            }
+            synchronized (listener) {
+                try {
+                    if (cancelRecovery) {
+                        return null;
+                    }
+                    listener.setState(ActivityState.RECOVERING);
+                    listener.doStart(metadataProvider);
+                    return null;
+                } catch (Exception e) {
+                    LOGGER.log(level, "Attempt to revive " + 
listener.getEntityId() + " failed", e);
+                    listener.setState(ActivityState.TEMPORARILY_FAILED);
+                    failure = e;
+                } finally {
+                    metadataProvider.getLocks().reset();
+                }
+                listener.notifyAll();
+            }
+        }
+        // Recovery task is essntially over now either through failure or 
through cancellation(stop)
+        synchronized (listener) {
+            listener.notifyAll();
+            if (listener.getState() != ActivityState.TEMPORARILY_FAILED) {
+                return null;
+            }
+        }
+        IMetadataLockManager lockManager = 
metadataProvider.getApplicationContext().getMetadataLockManager();
+        try {
+            
lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(),
+                    listener.getEntityId().getDataverse() + '.' + 
listener.getEntityId().getEntityName());
+            for (Dataset dataset : listener.getDatasets()) {
+                MetadataLockUtil.modifyDatasetBegin(lockManager, 
metadataProvider.getLocks(), dataset.getDatasetName(),
+                        DatasetUtil.getFullyQualifiedName(dataset));
+            }
+            synchronized (listener) {
+                if (listener.getState() == ActivityState.TEMPORARILY_FAILED) {
+                    listener.setState(ActivityState.PERMANENTLY_FAILED);
+                }
+                listener.notifyAll();
+            }
+        } finally {
+            metadataProvider.getLocks().reset();
+        }
+        return null;
+    }
+
+    public Exception getFailure() {
+        return failure;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
new file mode 100644
index 0000000..fb6ec37
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
+import org.apache.asterix.hyracks.bootstrap.CCApplication;
+import org.apache.asterix.messaging.CCMessageBroker;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class CancelQueryRequest implements ICcAddressedMessage {
+
+    private static final Logger LOGGER = 
Logger.getLogger(CancelQueryRequest.class.getName());
+    private static final long serialVersionUID = 1L;
+    private final String nodeId;
+    private final long reqId;
+    private final String contextId;
+
+    public CancelQueryRequest(String nodeId, long reqId, String contextId) {
+        this.nodeId = nodeId;
+        this.reqId = reqId;
+        this.contextId = contextId;
+    }
+
+    @Override
+    public void handle(ICcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        ClusterControllerService ccs = (ClusterControllerService) 
appCtx.getServiceContext().getControllerService();
+        CCApplication application = (CCApplication) ccs.getApplication();
+        IStatementExecutorContext executorsCtx = 
application.getStatementExecutorContext();
+        JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId);
+
+        if (jobId == null) {
+            LOGGER.log(Level.WARN, "No job found for context id " + contextId);
+        } else {
+            try {
+                IHyracksClientConnection hcc = application.getHcc();
+                hcc.cancelJob(jobId);
+                executorsCtx.removeJobIdFromClientContextId(contextId);
+            } catch (Exception e) {
+                LOGGER.log(Level.WARN, "unexpected exception thrown from 
cancel", e);
+            }
+        }
+        CancelQueryResponse response = new CancelQueryResponse(reqId);
+        CCMessageBroker messageBroker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        try {
+            messageBroker.sendApplicationMessageToNC(response, nodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "Failure sending response to nc", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
new file mode 100644
index 0000000..4fbcf22
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.message;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.messaging.api.INcAddressedMessage;
+import org.apache.asterix.common.messaging.api.MessageFuture;
+import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class CancelQueryResponse implements INcAddressedMessage {
+
+    private static final long serialVersionUID = 1L;
+    private final long reqId;
+
+    public CancelQueryResponse(long reqId) {
+        this.reqId = reqId;
+    }
+
+    @Override
+    public void handle(INcApplicationContext appCtx) throws 
HyracksDataException, InterruptedException {
+        NCMessageBroker mb = (NCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
+        MessageFuture future = mb.deregisterMessageFuture(reqId);
+        if (future != null) {
+            future.complete(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index e7919fa..5cee3d9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -22,6 +22,7 @@ package org.apache.asterix.app.message;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -54,23 +55,18 @@ import 
org.apache.hyracks.control.cc.ClusterControllerService;
 
 public final class ExecuteStatementRequestMessage implements 
ICcAddressedMessage {
     private static final long serialVersionUID = 1L;
-
     private static final Logger LOGGER = 
Logger.getLogger(ExecuteStatementRequestMessage.class.getName());
-
+    //TODO: Make configurable: 
https://issues.apache.org/jira/browse/ASTERIXDB-2062
+    public static final long DEFAULT_NC_TIMEOUT_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
+    //TODO: Make configurable: 
https://issues.apache.org/jira/browse/ASTERIXDB-2063
+    public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = 
TimeUnit.MINUTES.toMillis(1);
     private final String requestNodeId;
-
     private final long requestMessageId;
-
     private final ILangExtension.Language lang;
-
     private final String statementsText;
-
     private final SessionConfig sessionConfig;
-
     private final IStatementExecutor.ResultDelivery delivery;
-
     private final String clientContextID;
-
     private final String handleUrl;
 
     public ExecuteStatementRequestMessage(String requestNodeId, long 
requestMessageId, ILangExtension.Language lang,
@@ -102,47 +98,41 @@ public final class ExecuteStatementRequestMessage 
implements ICcAddressedMessage
         IStorageComponentProvider storageComponentProvider = 
ccAppCtx.getStorageComponentProvider();
         IStatementExecutorFactory statementExecutorFactory = 
ccApp.getStatementExecutorFactory();
         IStatementExecutorContext statementExecutorContext = 
ccApp.getStatementExecutorContext();
-
-        ccSrv.getExecutor().submit(() -> {
-            ExecuteStatementResponseMessage responseMsg = new 
ExecuteStatementResponseMessage(requestMessageId);
-            try {
-                IParser parser = 
compilationProvider.getParserFactory().createParser(statementsText);
-                List<Statement> statements = parser.parse();
-                StringWriter outWriter = new StringWriter(256);
-                PrintWriter outPrinter = new PrintWriter(outWriter);
-                SessionOutput.ResultDecorator resultPrefix = 
ResultUtil.createPreResultDecorator();
-                SessionOutput.ResultDecorator resultPostfix = 
ResultUtil.createPostResultDecorator();
-                SessionOutput.ResultAppender appendHandle = 
ResultUtil.createResultHandleAppender(handleUrl);
-                SessionOutput.ResultAppender appendStatus = 
ResultUtil.createResultStatusAppender();
-                SessionOutput sessionOutput = new SessionOutput(sessionConfig, 
outPrinter, resultPrefix, resultPostfix,
-                        appendHandle, appendStatus);
-
-                IStatementExecutor.ResultMetadata outMetadata = new 
IStatementExecutor.ResultMetadata();
-
-                MetadataManager.INSTANCE.init();
-                IStatementExecutor translator = 
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
-                        compilationProvider, storageComponentProvider);
-                translator.compileAndExecute(ccAppCtx.getHcc(), null, 
delivery, outMetadata,
-                        new IStatementExecutor.Stats(), clientContextID, 
statementExecutorContext);
-
-                outPrinter.close();
-                responseMsg.setResult(outWriter.toString());
-                responseMsg.setMetadata(outMetadata);
-            } catch (AlgebricksException | HyracksException | TokenMgrError
-                    | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
-                // we trust that "our" exceptions are serializable and have a 
comprehensible error message
-                GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, 
pe.getMessage(), pe);
-                responseMsg.setError(pe);
-            } catch (Exception e) {
-                GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected 
exception", e);
-                responseMsg.setError(new Exception(e.toString()));
-            }
-            try {
-                messageBroker.sendApplicationMessageToNC(responseMsg, 
requestNodeId);
-            } catch (Exception e) {
-                LOGGER.log(Level.WARNING, e.toString(), e);
-            }
-        });
+        ExecuteStatementResponseMessage responseMsg = new 
ExecuteStatementResponseMessage(requestMessageId);
+        try {
+            IParser parser = 
compilationProvider.getParserFactory().createParser(statementsText);
+            List<Statement> statements = parser.parse();
+            StringWriter outWriter = new StringWriter(256);
+            PrintWriter outPrinter = new PrintWriter(outWriter);
+            SessionOutput.ResultDecorator resultPrefix = 
ResultUtil.createPreResultDecorator();
+            SessionOutput.ResultDecorator resultPostfix = 
ResultUtil.createPostResultDecorator();
+            SessionOutput.ResultAppender appendHandle = 
ResultUtil.createResultHandleAppender(handleUrl);
+            SessionOutput.ResultAppender appendStatus = 
ResultUtil.createResultStatusAppender();
+            SessionOutput sessionOutput = new SessionOutput(sessionConfig, 
outPrinter, resultPrefix, resultPostfix,
+                    appendHandle, appendStatus);
+            IStatementExecutor.ResultMetadata outMetadata = new 
IStatementExecutor.ResultMetadata();
+            MetadataManager.INSTANCE.init();
+            IStatementExecutor translator = 
statementExecutorFactory.create(ccAppCtx, statements, sessionOutput,
+                    compilationProvider, storageComponentProvider);
+            translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, 
outMetadata, new IStatementExecutor.Stats(),
+                    clientContextID, statementExecutorContext);
+            outPrinter.close();
+            responseMsg.setResult(outWriter.toString());
+            responseMsg.setMetadata(outMetadata);
+        } catch (AlgebricksException | HyracksException | TokenMgrError
+                | org.apache.asterix.aqlplus.parser.TokenMgrError pe) {
+            // we trust that "our" exceptions are serializable and have a 
comprehensible error message
+            GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), 
pe);
+            responseMsg.setError(pe);
+        } catch (Exception e) {
+            GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected 
exception", e);
+            responseMsg.setError(new Exception(e.toString()));
+        }
+        try {
+            messageBroker.sendApplicationMessageToNC(responseMsg, 
requestNodeId);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, e.toString(), e);
+        }
     }
 
     private String getRejectionReason(ClusterControllerService ccSrv) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
index 4f9aa0c..54f0a4e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java
@@ -19,8 +19,6 @@
 
 package org.apache.asterix.app.message;
 
-import java.util.concurrent.TimeUnit;
-
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.messaging.api.MessageFuture;
@@ -31,8 +29,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public final class ExecuteStatementResponseMessage implements 
INcAddressedMessage {
     private static final long serialVersionUID = 1L;
 
-    public static final long DEFAULT_TIMEOUT_MILLIS = 
TimeUnit.MINUTES.toMillis(5);
-
     private final long requestMessageId;
 
     private String result;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index e8636c8..9040ad1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -21,8 +21,8 @@ package org.apache.asterix.hyracks.bootstrap;
 
 import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL;
 import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP;
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 
 import java.util.Arrays;
 import java.util.List;
@@ -47,10 +47,10 @@ import 
org.apache.asterix.api.http.server.QueryServiceServlet;
 import org.apache.asterix.api.http.server.QueryStatusApiServlet;
 import org.apache.asterix.api.http.server.QueryWebInterfaceServlet;
 import org.apache.asterix.api.http.server.RebalanceApiServlet;
+import org.apache.asterix.api.http.server.ServletConstants;
 import org.apache.asterix.api.http.server.ShutdownApiServlet;
 import org.apache.asterix.api.http.server.UpdateApiServlet;
 import org.apache.asterix.api.http.server.VersionApiServlet;
-import org.apache.asterix.api.http.servlet.ServletConstants;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
 import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.cc.ResourceIdManager;
@@ -106,6 +106,7 @@ public class CCApplication extends BaseCCApplication {
     protected WebManager webManager;
     protected CcApplicationContext appCtx;
     private IJobCapacityController jobCapacityController;
+    private IHyracksClientConnection hcc;
 
     @Override
     public void start(IServiceContext serviceCtx, String[] args) throws 
Exception {
@@ -124,6 +125,9 @@ public class CCApplication extends BaseCCApplication {
 
         ccServiceCtx.setThreadFactory(
                 new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new 
LifeCycleComponentManager()));
+        String strIP = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
+        int port = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
+        hcc = new HyracksConnection(strIP, port);
         ILibraryManager libraryManager = new ExternalLibraryManager();
         ResourceIdManager resourceIdManager = new ResourceIdManager();
         IReplicationStrategy repStrategy = 
ClusterProperties.INSTANCE.getReplicationStrategy();
@@ -196,7 +200,6 @@ public class CCApplication extends BaseCCApplication {
     protected HttpServer setupWebServer(ExternalProperties externalProperties) 
throws Exception {
         HttpServer webServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(),
                 externalProperties.getWebInterfacePort());
-        IHyracksClientConnection hcc = getHcc();
         webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { 
"/*" }, appCtx,
                 ccExtensionManager.getCompilationProvider(AQL), 
ccExtensionManager.getCompilationProvider(SQLPP),
@@ -207,7 +210,6 @@ public class CCApplication extends BaseCCApplication {
     protected HttpServer setupJSONAPIServer(ExternalProperties 
externalProperties) throws Exception {
         HttpServer jsonAPIServer =
                 new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), externalProperties.getAPIServerPort());
-        IHyracksClientConnection hcc = getHcc();
         jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
@@ -252,7 +254,6 @@ public class CCApplication extends BaseCCApplication {
     protected HttpServer setupQueryWebServer(ExternalProperties 
externalProperties) throws Exception {
         HttpServer queryWebServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(),
                 externalProperties.getQueryWebInterfacePort());
-        IHyracksClientConnection hcc = getHcc();
         queryWebServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         queryWebServer.addServlet(new QueryWebInterfaceServlet(appCtx, 
queryWebServer.ctx(), new String[] { "/*" }));
         return queryWebServer;
@@ -357,9 +358,7 @@ public class CCApplication extends BaseCCApplication {
         return appCtx;
     }
 
-    protected IHyracksClientConnection getHcc() throws Exception {
-        String strIP = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
-        int port = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
-        return new HyracksConnection(strIP, port);
+    public IHyracksClientConnection getHcc() {
+        return hcc;
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
index 5abbe40..cd58d8f 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.api.http.server.ConnectorApiServlet;
+import org.apache.asterix.api.http.server.ServletConstants;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index 5f40a85..3cb46fe 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.QueryCancellationServlet;
+import org.apache.asterix.api.http.server.ServletConstants;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java
deleted file mode 100644
index e0539ac..0000000
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.api.http.servlet;
-
-import org.apache.asterix.api.http.server.QueryServiceServlet;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class QueryServiceServletTest {
-
-    @Test
-    public void testTimeUnitFormatNanos() throws Exception {
-        Assert.assertEquals("123.456789012s", 
QueryServiceServlet.TimeUnit.formatNanos(123456789012l));
-        Assert.assertEquals("12.345678901s", 
QueryServiceServlet.TimeUnit.formatNanos(12345678901l));
-        Assert.assertEquals("1.234567890s", 
QueryServiceServlet.TimeUnit.formatNanos(1234567890l));
-        Assert.assertEquals("123.456789ms", 
QueryServiceServlet.TimeUnit.formatNanos(123456789l));
-        Assert.assertEquals("12.345678ms", 
QueryServiceServlet.TimeUnit.formatNanos(12345678l));
-        Assert.assertEquals("1.234567ms", 
QueryServiceServlet.TimeUnit.formatNanos(1234567l));
-        Assert.assertEquals("123.456µs", 
QueryServiceServlet.TimeUnit.formatNanos(123456l));
-        Assert.assertEquals("12.345µs", 
QueryServiceServlet.TimeUnit.formatNanos(12345l));
-        Assert.assertEquals("1.234µs", 
QueryServiceServlet.TimeUnit.formatNanos(1234l));
-        Assert.assertEquals("123ns", 
QueryServiceServlet.TimeUnit.formatNanos(123l));
-        Assert.assertEquals("12ns", 
QueryServiceServlet.TimeUnit.formatNanos(12l));
-        Assert.assertEquals("1ns", 
QueryServiceServlet.TimeUnit.formatNanos(1l));
-        Assert.assertEquals("-123.456789012s", 
QueryServiceServlet.TimeUnit.formatNanos(-123456789012l));
-        Assert.assertEquals("120.000000000s", 
QueryServiceServlet.TimeUnit.formatNanos(120000000000l));
-        Assert.assertEquals("-12ns", 
QueryServiceServlet.TimeUnit.formatNanos(-12l));
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
index f994e98..e583c75 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java
@@ -19,8 +19,8 @@
 
 package org.apache.asterix.api.http.servlet;
 
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
-import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR;
+import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 

Reply via email to