[ASTERIXDB-2058][HYR] Only Complete job cancellation after cleanup - user model changes: no - storage format changes: no - interface changes: yes --IJobManager.cancel now takes a callback
details: - Job cancellation now completes only after the job cleanup work has completed and not merely when the abort tasks are executed. - The NCQueryServiceServlet actively cancels requests that passes 5 minutes. - Cancellation of timedout jobs is not done through the Http API but through message broker. - Typically, requests might timeout when the servers are overloaded. When that is the case, there is a high chance http requests are to be rejected including requests to cancel previously submitted queries. This is the reason for using Message broker for this task. - ExecuteStatementRequest used to execute the statement in a different executor thread even though it is itself is being executed in an executor thread and is not blocking anyone. This was fixed as well. Change-Id: I14b4bbd512cc88e489254d8bf82edba0fd3a3db5 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1961 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/87411c22 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/87411c22 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/87411c22 Branch: refs/heads/master Commit: 87411c22cc30fb425a4839b20b3a259f28d1ef79 Parents: 5eb1303 Author: Abdullah Alamoudi <[email protected]> Authored: Thu Aug 24 08:51:03 2017 -0700 Committer: abdullah alamoudi <[email protected]> Committed: Thu Aug 24 11:09:08 2017 -0700 ---------------------------------------------------------------------- .../http/server/AbstractQueryApiServlet.java | 4 +- .../asterix/api/http/server/ApiServlet.java | 4 +- .../api/http/server/ClusterApiServlet.java | 2 +- .../ClusterControllerDetailsApiServlet.java | 2 +- .../api/http/server/ConnectorApiServlet.java | 2 +- .../api/http/server/DiagnosticsApiServlet.java | 3 +- .../asterix/api/http/server/Duration.java | 236 +++++++++++++++++++ .../api/http/server/NCQueryServiceServlet.java | 43 +++- .../server/NodeControllerDetailsApiServlet.java | 2 +- .../http/server/QueryCancellationServlet.java | 1 - .../api/http/server/QueryServiceServlet.java | 39 +-- .../api/http/server/RebalanceApiServlet.java | 2 +- .../asterix/api/http/server/RestApiServlet.java | 4 +- .../api/http/server/ServletConstants.java | 31 +++ .../api/http/server/ShutdownApiServlet.java | 2 +- .../api/http/server/VersionApiServlet.java | 2 +- .../api/http/servlet/ServletConstants.java | 31 --- .../app/active/ActiveEntityEventsListener.java | 160 ++----------- .../apache/asterix/app/active/RecoveryTask.java | 188 +++++++++++++++ .../asterix/app/message/CancelQueryRequest.java | 74 ++++++ .../app/message/CancelQueryResponse.java | 44 ++++ .../message/ExecuteStatementRequestMessage.java | 90 ++++--- .../ExecuteStatementResponseMessage.java | 4 - .../hyracks/bootstrap/CCApplication.java | 19 +- .../http/servlet/ConnectorApiServletTest.java | 1 + .../servlet/QueryCancellationServletTest.java | 1 + .../http/servlet/QueryServiceServletTest.java | 46 ---- .../api/http/servlet/VersionApiServletTest.java | 4 +- .../asterix/runtime/ParseDurationTest.java | 161 +++++++++++++ .../asterix/common/exceptions/ErrorCode.java | 3 + .../main/resources/asx_errormsg/en.properties | 3 + asterixdb/asterix-metadata/pom.xml | 3 +- .../client/HyracksClientInterfaceFunctions.java | 3 + .../control/cc/executor/JobExecutor.java | 24 +- .../hyracks/control/cc/job/IJobManager.java | 5 +- .../hyracks/control/cc/job/JobManager.java | 12 +- .../hyracks/control/cc/work/CancelJobWork.java | 5 +- .../hyracks/control/cc/work/JobCleanupWork.java | 8 +- .../hyracks/control/cc/job/JobManagerTest.java | 7 +- .../control/common/work/FutureValue.java | 1 + .../control/common/work/NoOpCallback.java | 38 +++ 41 files changed, 949 insertions(+), 365 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java index a4e72f7..c38b3fc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractQueryApiServlet.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR; import java.io.PrintWriter; import java.util.UUID; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java index 7874aa3..58c282f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ApiServlet.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR; import java.awt.image.BufferedImage; import java.io.BufferedReader; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java index b24a9a1..4faab1e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; import java.io.IOException; import java.io.PrintWriter; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java index 52d4d67..6dea30c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterControllerDetailsApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java index d9a63f7..52af643 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java index dcd0e70..9d2415d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/DiagnosticsApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; @@ -34,7 +34,6 @@ import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.http.api.IServletRequest; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java new file mode 100644 index 0000000..bdda750 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/Duration.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public enum Duration { + SEC("s", 9), + MILLI("ms", 6), + MICRO("µs", 3), + NANO("ns", 0); + + static final long NANOSECONDS = 1; + static final long MICROSECONDS = 1000 * NANOSECONDS; + static final long MILLISECONDS = 1000 * MICROSECONDS; + static final long SECONDS = 1000 * MILLISECONDS; + static final long MINUTES = 60 * SECONDS; + static final long HOURS = 60 * MINUTES; + + String unit; + int nanoDigits; + + Duration(String unit, int nanoDigits) { + this.unit = unit; + this.nanoDigits = nanoDigits; + } + + public static String formatNanos(long nanoTime) { + final String strTime = String.valueOf(nanoTime); + final int len = strTime.length(); + for (Duration tu : Duration.values()) { + if (len > tu.nanoDigits) { + final String integer = strTime.substring(0, len - tu.nanoDigits); + final String fractional = strTime.substring(len - tu.nanoDigits); + return integer + (fractional.length() > 0 ? "." + fractional : "") + tu.unit; + } + } + return "illegal string value: " + strTime; + } + + // ParseDuration parses a duration string. + // A duration string is a possibly signed sequence of + // decimal numbers, each with optional fraction and a unit suffix, + // such as "300ms", "-1.5h" or "2h45m". + // Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". + // returns the duration in nano seconds + public static long parseDurationStringToNanos(String orig) throws HyracksDataException { + // [-+]?([0-9]*(\.[0-9]*)?[a-z]+)+ + String s = orig; + long d = 0; + boolean neg = false; + char c; + // Consume [-+]? + if (!s.isEmpty()) { + c = s.charAt(0); + if (c == '-' || c == '+') { + neg = c == '-'; + s = s.substring(1); + } + } + + // Special case: if all that is left is "0", this is zero. + if ("0".equals(s)) { + return 0L; + } + + if (s.isEmpty()) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + + while (!s.isEmpty()) { + long v = 0L; // integers before decimal + long f = 0L; // integers after decimal + double scale = 1.0; // value = v + f/scale + // The next character must be [0-9.] + if (!(s.charAt(0) == '.' || '0' <= s.charAt(0) && s.charAt(0) <= '9')) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + // Consume [0-9]* + int pl = s.length(); + Pair<Long, String> pair = leadingInt(s); + v = pair.getLeft(); + s = pair.getRight(); + boolean pre = pl != s.length(); // whether we consumed anything before a period + + // Consume (\.[0-9]*)? + boolean post = false; + if (!s.isEmpty() && s.charAt(0) == '.') { + s = s.substring(1); + pl = s.length(); + Triple<Long, Double, String> triple = leadingFraction(s); + f = triple.getLeft(); + scale = triple.getMiddle(); + s = triple.getRight(); + post = pl != s.length(); + } + if (!pre && !post) { + // no digits (e.g. ".s" or "-.s") + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + + // Consume unit. + int i = 0; + for (; i < s.length(); i++) { + c = s.charAt(i); + if (c == '.' || '0' <= c && c <= '9') { + break; + } + } + if (i == 0) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + String u = s.substring(0, i); + s = s.substring(i); + long unit = getUnit(u); + if (v > Long.MAX_VALUE / unit) { + // overflow + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + v *= unit; + if (f > 0) { + // float64 is needed to be nanosecond accurate for fractions of hours. + // v >= 0 && (f*unit/scale) <= 3.6e+12 (ns/h, h is the largest unit) + v += (long) (((double) f * (double) unit) / scale); + if (v < 0) { + // overflow + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + } + d += v; + if (d < 0) { + // overflow + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, orig); + } + } + + if (neg) { + d = -d; + } + return d; + } + + private static final long getUnit(String unit) throws HyracksDataException { + switch (unit) { + case "ns": + return NANOSECONDS; + case "us": + case "µs":// U+00B5 = micro symbol + case "μs":// U+03BC = Greek letter mu + return MICROSECONDS; + case "ms": + return MILLISECONDS; + case "s": + return SECONDS; + case "m": + return MINUTES; + case "h": + return HOURS; + default: + throw new RuntimeDataException(ErrorCode.UNKNOWN_DURATION_UNIT, unit); + } + } + + // leadingInt consumes the leading [0-9]* from s. + static Pair<Long, String> leadingInt(String origin) throws HyracksDataException { + String s = origin; + long x = 0L; + int i = 0; + for (; i < s.length(); i++) { + char c = s.charAt(i); + if (c < '0' || c > '9') { + break; + } + if (x > Long.MAX_VALUE / 10) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, origin); + } + x = x * 10 + Character.getNumericValue(c); + if (x < 0) { + throw new RuntimeDataException(ErrorCode.INVALID_DURATION, origin); + } + } + return Pair.of(x, s.substring(i)); + } + + // leadingFraction consumes the leading [0-9]* from s. + // It is used only for fractions, so does not return an error on overflow, + // it just stops accumulating precision. + static Triple<Long, Double, String> leadingFraction(String s) { + int i = 0; + long x = 0L; + double scale = 1.0; + boolean overflow = false; + for (; i < s.length(); i++) { + char c = s.charAt(i); + if (c < '0' || c > '9') { + break; + } + if (overflow) { + continue; + } + if (x > (1 << 63 - 1) / 10) { + // It's possible for overflow to give a positive number, so take care. + overflow = true; + continue; + } + long y = x * 10 + Character.getNumericValue(c); + if (y < 0) { + overflow = true; + continue; + } + x = y; + scale *= 10; + } + return Triple.of(x, scale, s.substring(i)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 9547514..ef49c35 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -19,17 +19,20 @@ package org.apache.asterix.api.http.server; +import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; import java.util.logging.Level; -import io.netty.handler.codec.http.HttpResponseStatus; import org.apache.asterix.algebra.base.ILangExtension; +import org.apache.asterix.app.message.CancelQueryRequest; import org.apache.asterix.app.message.ExecuteStatementRequestMessage; import org.apache.asterix.app.message.ExecuteStatementResponseMessage; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.config.GlobalConfig; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.MessageFuture; import org.apache.asterix.om.types.ARecordType; @@ -41,11 +44,14 @@ import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.ipc.exceptions.IPCException; +import io.netty.handler.codec.http.HttpResponseStatus; + /** * Query service servlet that can run on NC nodes. * Delegates query execution to CC, then serves the result. */ public class NCQueryServiceServlet extends QueryServiceServlet { + public NCQueryServiceServlet(ConcurrentMap<String, Object> ctx, String[] paths, IApplicationContext appCtx, ILangExtension.Language queryLanguage) { super(ctx, paths, appCtx, queryLanguage, null, null, null); @@ -63,13 +69,28 @@ public class NCQueryServiceServlet extends QueryServiceServlet { ExecuteStatementResponseMessage responseMsg; MessageFuture responseFuture = ncMb.registerMessageFuture(); try { + if (param.clientContextID == null) { + param.clientContextID = UUID.randomUUID().toString(); + } + long timeout = ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; + if (param.timeout != null) { + timeout = java.util.concurrent.TimeUnit.NANOSECONDS + .toMillis(Duration.parseDurationStringToNanos(param.timeout)); + } ExecuteStatementRequestMessage requestMsg = new ExecuteStatementRequestMessage(ncCtx.getNodeId(), responseFuture.getFutureId(), queryLanguage, statementsText, sessionOutput.config(), ccDelivery, param.clientContextID, handleUrl); outExecStartEnd[0] = System.nanoTime(); ncMb.sendMessageToCC(requestMsg); - responseMsg = (ExecuteStatementResponseMessage) responseFuture.get( - ExecuteStatementResponseMessage.DEFAULT_TIMEOUT_MILLIS, java.util.concurrent.TimeUnit.MILLISECONDS); + try { + responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout, + java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (TimeoutException exception) { + RuntimeDataException hde = new RuntimeDataException(ErrorCode.QUERY_TIMEOUT, exception); + // cancel query + cancelQuery(ncMb, ncCtx.getNodeId(), param.clientContextID, hde); + throw hde; + } outExecStartEnd[1] = System.nanoTime(); } finally { ncMb.deregisterMessageFuture(responseFuture.getFutureId()); @@ -97,6 +118,22 @@ public class NCQueryServiceServlet extends QueryServiceServlet { } } + private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String clientContextID, + Exception exception) { + MessageFuture cancelQueryFuture = messageBroker.registerMessageFuture(); + try { + CancelQueryRequest cancelQueryMessage = + new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), clientContextID); + messageBroker.sendMessageToCC(cancelQueryMessage); + cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS, + java.util.concurrent.TimeUnit.MILLISECONDS); + } catch (Exception e) { + exception.addSuppressed(e); + } finally { + messageBroker.deregisterMessageFuture(cancelQueryFuture.getFutureId()); + } + } + @Override protected HttpResponseStatus handleExecuteStatementException(Throwable t) { if (t instanceof IPCException || t instanceof TimeoutException) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java index d9757c7..6291869 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NodeControllerDetailsApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java index bfec146..3f07151 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java @@ -23,7 +23,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobId; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index 1f1d282..c630636 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -27,7 +27,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.algebra.base.ILangExtension; -import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.config.GlobalConfig; @@ -108,7 +107,8 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { FORMAT("format"), CLIENT_ID("client_context_id"), PRETTY("pretty"), - MODE("mode"); + MODE("mode"), + TIMEOUT("timeout"); private final String str; @@ -154,39 +154,12 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { } } - public enum TimeUnit { - SEC("s", 9), - MILLI("ms", 6), - MICRO("µs", 3), - NANO("ns", 0); - - String unit; - int nanoDigits; - - TimeUnit(String unit, int nanoDigits) { - this.unit = unit; - this.nanoDigits = nanoDigits; - } - - public static String formatNanos(long nanoTime) { - final String strTime = String.valueOf(nanoTime); - final int len = strTime.length(); - for (TimeUnit tu : TimeUnit.values()) { - if (len > tu.nanoDigits) { - final String integer = strTime.substring(0, len - tu.nanoDigits); - final String fractional = strTime.substring(len - tu.nanoDigits); - return integer + (fractional.length() > 0 ? "." + fractional : "") + tu.unit; - } - } - return "illegal string value: " + strTime; - } - } - static class RequestParameters { String host; String path; String statement; String format; + String timeout; boolean pretty; String clientContextID; String mode; @@ -202,6 +175,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { on.put("pretty", pretty); on.put("mode", mode); on.put("clientContextID", clientContextID); + on.put("format", format); return om.writer(new MinimalPrettyPrinter()).writeValueAsString(on); } catch (JsonProcessingException e) { // NOSONAR return e.getMessage(); @@ -297,9 +271,9 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { pw.print(ResultFields.METRICS.str()); pw.print("\": {\n"); pw.print("\t"); - ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), TimeUnit.formatNanos(elapsedTime)); + ResultUtil.printField(pw, Metrics.ELAPSED_TIME.str(), Duration.formatNanos(elapsedTime)); pw.print("\t"); - ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), TimeUnit.formatNanos(executionTime)); + ResultUtil.printField(pw, Metrics.EXECUTION_TIME.str(), Duration.formatNanos(executionTime)); pw.print("\t"); ResultUtil.printField(pw, Metrics.RESULT_COUNT.str(), resultCount, true); pw.print("\t"); @@ -334,6 +308,7 @@ public class QueryServiceServlet extends AbstractQueryApiServlet { param.pretty = getOptBoolean(jsonRequest, Parameter.PRETTY.str(), false); param.mode = toLower(getOptText(jsonRequest, Parameter.MODE.str())); param.clientContextID = getOptText(jsonRequest, Parameter.CLIENT_ID.str()); + param.timeout = getOptText(jsonRequest, Parameter.TIMEOUT.str()); } catch (JsonParseException | JsonMappingException e) { // if the JSON parsing fails, the statement is empty and we get an empty statement error GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java index 27e2806..8536571 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.PrintWriter; import java.util.ArrayDeque; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java index 18aae8e..1a7918c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RestApiServlet.java @@ -18,8 +18,8 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_DATASET_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_DATASET_ATTR; import java.io.IOException; import java.util.List; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java new file mode 100644 index 0000000..2fe37c3 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ServletConstants.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +public class ServletConstants { + public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION"; + public static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET"; + public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = "org.apache.asterix.APP_CONTEXT_INFO"; + public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE"; + public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES"; + public static final String SERVICE_CONTEXT_ATTR = "org.apache.asterix.SERVICE_CONTEXT"; + + private ServletConstants() { + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java index fdd106d..06e2383 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ShutdownApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.io.IOException; import java.io.PrintWriter; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java index 5acba381..eeef8e8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/VersionApiServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; import java.io.IOException; import java.io.PrintWriter; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java deleted file mode 100644 index b815d76..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/servlet/ServletConstants.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.api.http.servlet; - -public class ServletConstants { - public static final String HYRACKS_CONNECTION_ATTR = "org.apache.asterix.HYRACKS_CONNECTION"; - public static final String HYRACKS_DATASET_ATTR = "org.apache.asterix.HYRACKS_DATASET"; - public static final String ASTERIX_APP_CONTEXT_INFO_ATTR = "org.apache.asterix.APP_CONTEXT_INFO"; - public static final String EXECUTOR_SERVICE_ATTR = "org.apache.asterix.EXECUTOR_SERVICE"; - public static final String RUNNING_QUERIES_ATTR = "org.apache.asterix.RUNINNG_QUERIES"; - public static final String SERVICE_CONTEXT_ATTR = "org.apache.asterix.SERVICE_CONTEXT"; - - private ServletConstants() { - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index a16f678..3b4b974 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -34,14 +34,11 @@ import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.ActivityState; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveEntityEventSubscriber; -import org.apache.asterix.active.IRetryPolicy; import org.apache.asterix.active.IRetryPolicyFactory; import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.active.message.StatsRequestMessage; -import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; -import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ErrorCode; @@ -53,8 +50,6 @@ import org.apache.asterix.external.feed.watch.WaitForStateSubscriber; import org.apache.asterix.metadata.api.IActiveEntityController; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; -import org.apache.asterix.metadata.utils.DatasetUtil; -import org.apache.asterix.metadata.utils.MetadataLockUtil; import org.apache.asterix.translator.IStatementExecutor; import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -94,8 +89,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl protected boolean isFetchingStats; protected int numRegistered; protected int numDeRegistered; - protected volatile Future<Void> recoveryTask; - protected volatile boolean cancelRecovery; + protected volatile RecoveryTask rt; protected volatile boolean suspended = false; // failures protected Exception jobFailure; @@ -199,7 +193,8 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) : exceptions.get(0); setState(ActivityState.TEMPORARILY_FAILED); - if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING) { + if (prevState != ActivityState.SUSPENDING && prevState != ActivityState.RECOVERING + && prevState != ActivityState.RESUMING) { recover(); } } else { @@ -356,112 +351,16 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void recover() throws HyracksDataException { LOGGER.log(level, "Recover is called on " + entityId); - if (recoveryTask != null) { - LOGGER.log(level, "But recovery task for " + entityId + " is already there!! throwing an exception"); - throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS); - } if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); setState(ActivityState.PERMANENTLY_FAILED); } else { ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); - IRetryPolicy policy = retryPolicyFactory.create(this); - cancelRecovery = false; setState(ActivityState.TEMPORARILY_FAILED); LOGGER.log(level, "Recovery task has been submitted"); - recoveryTask = executor.submit(() -> { - String nameBefore = Thread.currentThread().getName(); - try { - Thread.currentThread().setName("RecoveryTask (" + entityId + ")"); - doRecover(policy); - } finally { - Thread.currentThread().setName(nameBefore); - } - return null; - }); - } - } - - protected Void doRecover(IRetryPolicy policy) - throws AlgebricksException, HyracksDataException, InterruptedException { - LOGGER.log(level, "Actual Recovery task has started"); - if (getState() != ActivityState.TEMPORARILY_FAILED) { - LOGGER.log(level, "but its state is not temp failure and so we're just returning"); - return null; - } - LOGGER.log(level, "calling the policy"); - while (policy.retry()) { - synchronized (this) { - if (cancelRecovery) { - recoveryTask = null; - notifyAll(); - return null; - } - while (clusterStateManager.getState() != ClusterState.ACTIVE) { - if (cancelRecovery) { - recoveryTask = null; - notifyAll(); - return null; - } - wait(); - } - } - waitForNonTransitionState(); - IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); - lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), - entityId.getDataverse() + '.' + entityId.getEntityName()); - for (Dataset dataset : getDatasets()) { - MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), - dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset)); - } - synchronized (this) { - try { - if (cancelRecovery) { - recoveryTask = null; - notifyAll(); - return null; - } - setState(ActivityState.RECOVERING); - doStart(metadataProvider); - recoveryTask = null; - notifyAll(); - return null; - } catch (Exception e) { - LOGGER.log(level, "Attempt to revive " + entityId + " failed", e); - setState(ActivityState.TEMPORARILY_FAILED); - recoverFailure = e; - } finally { - metadataProvider.getLocks().reset(); - } - notifyAll(); - } - } - // Recovery task is essntially over now either through failure or through cancellation(stop) - synchronized (this) { - recoveryTask = null; - notifyAll(); - if (state != ActivityState.TEMPORARILY_FAILED) { - return null; - } - } - IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); - try { - lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), - entityId.getDataverse() + '.' + entityId.getEntityName()); - for (Dataset dataset : getDatasets()) { - MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(), - DatasetUtil.getFullyQualifiedName(dataset)); - } - synchronized (this) { - if (state == ActivityState.TEMPORARILY_FAILED) { - setState(ActivityState.PERMANENTLY_FAILED); - } - notifyAll(); - } - } finally { - metadataProvider.getLocks().reset(); + rt = new RecoveryTask(appCtx, this, retryPolicyFactory); + executor.submit(rt.recover()); } - return null; } @Override @@ -503,13 +402,10 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_CANNOT_BE_STOPPED, entityId, state); } if (state == ActivityState.TEMPORARILY_FAILED || state == ActivityState.PERMANENTLY_FAILED) { - if (recoveryTask != null) { + if (rt != null) { setState(ActivityState.STOPPING); - cancelRecovery = true; - recoveryTask.cancel(true); - while (recoveryTask != null) { - wait(); - } + rt.cancel(); + rt = null; } setState(ActivityState.STOPPED); try { @@ -533,6 +429,10 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl } } + public RecoveryTask getRecoveryTask() { + return rt; + } + @Override public void suspend(MetadataProvider metadataProvider) throws HyracksDataException, InterruptedException { WaitForStateSubscriber subscriber; @@ -602,8 +502,9 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl setState(ActivityState.RESUMING); WaitForStateSubscriber subscriber = new WaitForStateSubscriber(this, EnumSet.of(ActivityState.RUNNING, ActivityState.TEMPORARILY_FAILED)); - recoveryTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService() - .getExecutor().submit(() -> resumeOrRecover(metadataProvider)); + rt = new RecoveryTask(appCtx, this, retryPolicyFactory); + metadataProvider.getApplicationContext().getServiceContext().getControllerService().getExecutor() + .submit(() -> rt.resumeOrRecover(metadataProvider)); try { subscriber.sync(); } catch (Exception e) { @@ -616,28 +517,6 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl } } - protected Void resumeOrRecover(MetadataProvider metadataProvider) - throws HyracksDataException, AlgebricksException, InterruptedException { - try { - doResume(metadataProvider); - synchronized (this) { - setState(ActivityState.RUNNING); - recoveryTask = null; - } - } catch (Exception e) { - LOGGER.log(Level.WARNING, "First attempt to resume " + entityId + " Failed", e); - setState(ActivityState.TEMPORARILY_FAILED); - if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { - setState(ActivityState.PERMANENTLY_FAILED); - } else { - IRetryPolicy policy = retryPolicyFactory.create(this); - cancelRecovery = false; - doRecover(policy); - } - } - return null; - } - @Override public boolean isActive() { return state != ActivityState.STOPPED && state != ActivityState.PERMANENTLY_FAILED; @@ -652,15 +531,6 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl this.locations = locations; } - public Future<Void> getRecoveryTask() { - return recoveryTask; - } - - public synchronized void cancelRecovery() { - cancelRecovery = true; - notifyAll(); - } - @Override public Exception getJobFailure() { return jobFailure; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java new file mode 100644 index 0000000..7b7de93 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/RecoveryTask.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.active; + +import java.util.concurrent.Callable; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.active.ActivityState; +import org.apache.asterix.active.IRetryPolicy; +import org.apache.asterix.active.IRetryPolicyFactory; +import org.apache.asterix.active.NoRetryPolicyFactory; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.api.IMetadataLockManager; +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.utils.DatasetUtil; +import org.apache.asterix.metadata.utils.MetadataLockUtil; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class RecoveryTask { + + private static final Logger LOGGER = Logger.getLogger(RecoveryTask.class.getName()); + private static final Level level = Level.INFO; + private final ActiveEntityEventsListener listener; + private volatile boolean cancelRecovery = false; + private final IRetryPolicyFactory retryPolicyFactory; + private final MetadataProvider metadataProvider; + private final IClusterStateManager clusterStateManager; + private Exception failure; + + public RecoveryTask(ICcApplicationContext appCtx, ActiveEntityEventsListener listener, + IRetryPolicyFactory retryPolicyFactory) { + this.listener = listener; + this.retryPolicyFactory = retryPolicyFactory; + this.metadataProvider = new MetadataProvider(appCtx, null); + this.clusterStateManager = appCtx.getClusterStateManager(); + } + + public Callable<Void> recover() { + if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { + return () -> { + return null; + }; + } + IRetryPolicy policy = retryPolicyFactory.create(listener); + return () -> { + String nameBefore = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("RecoveryTask (" + listener.getEntityId() + ")"); + doRecover(policy); + } finally { + Thread.currentThread().setName(nameBefore); + } + return null; + }; + } + + public void cancel() { + cancelRecovery = true; + } + + protected Void resumeOrRecover(MetadataProvider metadataProvider) + throws HyracksDataException, AlgebricksException, InterruptedException { + try { + synchronized (listener) { + listener.doResume(metadataProvider); + listener.setState(ActivityState.RUNNING); + } + } catch (Exception e) { + LOGGER.log(Level.WARNING, "First attempt to resume " + listener.getEntityId() + " Failed", e); + synchronized (listener) { + if (listener.getState() == ActivityState.RESUMING) { + // This will be the case if compilation failure + // If the failure is a runtime failure, then the state + // would've been set to temporarily failed already + listener.setState(ActivityState.TEMPORARILY_FAILED); + } + } + if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { + synchronized (listener) { + if (!cancelRecovery) { + listener.setState(ActivityState.PERMANENTLY_FAILED); + } + } + } else { + IRetryPolicy policy = retryPolicyFactory.create(listener); + doRecover(policy); + } + } + return null; + } + + protected Void doRecover(IRetryPolicy policy) + throws AlgebricksException, HyracksDataException, InterruptedException { + LOGGER.log(level, "Actual Recovery task has started"); + if (listener.getState() != ActivityState.TEMPORARILY_FAILED) { + LOGGER.log(level, "but its state is not temp failure and so we're just returning"); + return null; + } + LOGGER.log(level, "calling the policy"); + while (policy.retry()) { + synchronized (listener) { + if (cancelRecovery) { + return null; + } + while (clusterStateManager.getState() != ClusterState.ACTIVE) { + if (cancelRecovery) { + return null; + } + wait(); + } + } + IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), + listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName()); + for (Dataset dataset : listener.getDatasets()) { + MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), + dataset.getDataverseName(), DatasetUtil.getFullyQualifiedName(dataset)); + } + synchronized (listener) { + try { + if (cancelRecovery) { + return null; + } + listener.setState(ActivityState.RECOVERING); + listener.doStart(metadataProvider); + return null; + } catch (Exception e) { + LOGGER.log(level, "Attempt to revive " + listener.getEntityId() + " failed", e); + listener.setState(ActivityState.TEMPORARILY_FAILED); + failure = e; + } finally { + metadataProvider.getLocks().reset(); + } + listener.notifyAll(); + } + } + // Recovery task is essntially over now either through failure or through cancellation(stop) + synchronized (listener) { + listener.notifyAll(); + if (listener.getState() != ActivityState.TEMPORARILY_FAILED) { + return null; + } + } + IMetadataLockManager lockManager = metadataProvider.getApplicationContext().getMetadataLockManager(); + try { + lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), + listener.getEntityId().getDataverse() + '.' + listener.getEntityId().getEntityName()); + for (Dataset dataset : listener.getDatasets()) { + MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataset.getDatasetName(), + DatasetUtil.getFullyQualifiedName(dataset)); + } + synchronized (listener) { + if (listener.getState() == ActivityState.TEMPORARILY_FAILED) { + listener.setState(ActivityState.PERMANENTLY_FAILED); + } + listener.notifyAll(); + } + } finally { + metadataProvider.getLocks().reset(); + } + return null; + } + + public Exception getFailure() { + return failure; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java new file mode 100644 index 0000000..fb6ec37 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.messaging.api.ICcAddressedMessage; +import org.apache.asterix.hyracks.bootstrap.CCApplication; +import org.apache.asterix.messaging.CCMessageBroker; +import org.apache.asterix.translator.IStatementExecutorContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +public class CancelQueryRequest implements ICcAddressedMessage { + + private static final Logger LOGGER = Logger.getLogger(CancelQueryRequest.class.getName()); + private static final long serialVersionUID = 1L; + private final String nodeId; + private final long reqId; + private final String contextId; + + public CancelQueryRequest(String nodeId, long reqId, String contextId) { + this.nodeId = nodeId; + this.reqId = reqId; + this.contextId = contextId; + } + + @Override + public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); + CCApplication application = (CCApplication) ccs.getApplication(); + IStatementExecutorContext executorsCtx = application.getStatementExecutorContext(); + JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId); + + if (jobId == null) { + LOGGER.log(Level.WARN, "No job found for context id " + contextId); + } else { + try { + IHyracksClientConnection hcc = application.getHcc(); + hcc.cancelJob(jobId); + executorsCtx.removeJobIdFromClientContextId(contextId); + } catch (Exception e) { + LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e); + } + } + CancelQueryResponse response = new CancelQueryResponse(reqId); + CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + try { + messageBroker.sendApplicationMessageToNC(response, nodeId); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Failure sending response to nc", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java new file mode 100644 index 0000000..4fbcf22 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.message; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.messaging.api.INcAddressedMessage; +import org.apache.asterix.common.messaging.api.MessageFuture; +import org.apache.asterix.messaging.NCMessageBroker; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class CancelQueryResponse implements INcAddressedMessage { + + private static final long serialVersionUID = 1L; + private final long reqId; + + public CancelQueryResponse(long reqId) { + this.reqId = reqId; + } + + @Override + public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { + NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); + MessageFuture future = mb.deregisterMessageFuture(reqId); + if (future != null) { + future.complete(this); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index e7919fa..5cee3d9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -22,6 +22,7 @@ package org.apache.asterix.app.message; import java.io.PrintWriter; import java.io.StringWriter; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -54,23 +55,18 @@ import org.apache.hyracks.control.cc.ClusterControllerService; public final class ExecuteStatementRequestMessage implements ICcAddressedMessage { private static final long serialVersionUID = 1L; - private static final Logger LOGGER = Logger.getLogger(ExecuteStatementRequestMessage.class.getName()); - + //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2062 + public static final long DEFAULT_NC_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5); + //TODO: Make configurable: https://issues.apache.org/jira/browse/ASTERIXDB-2063 + public static final long DEFAULT_QUERY_CANCELLATION_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1); private final String requestNodeId; - private final long requestMessageId; - private final ILangExtension.Language lang; - private final String statementsText; - private final SessionConfig sessionConfig; - private final IStatementExecutor.ResultDelivery delivery; - private final String clientContextID; - private final String handleUrl; public ExecuteStatementRequestMessage(String requestNodeId, long requestMessageId, ILangExtension.Language lang, @@ -102,47 +98,41 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider(); IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory(); IStatementExecutorContext statementExecutorContext = ccApp.getStatementExecutorContext(); - - ccSrv.getExecutor().submit(() -> { - ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); - try { - IParser parser = compilationProvider.getParserFactory().createParser(statementsText); - List<Statement> statements = parser.parse(); - StringWriter outWriter = new StringWriter(256); - PrintWriter outPrinter = new PrintWriter(outWriter); - SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); - SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); - SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); - SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender(); - SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix, - appendHandle, appendStatus); - - IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata(); - - MetadataManager.INSTANCE.init(); - IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput, - compilationProvider, storageComponentProvider); - translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata, - new IStatementExecutor.Stats(), clientContextID, statementExecutorContext); - - outPrinter.close(); - responseMsg.setResult(outWriter.toString()); - responseMsg.setMetadata(outMetadata); - } catch (AlgebricksException | HyracksException | TokenMgrError - | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { - // we trust that "our" exceptions are serializable and have a comprehensible error message - GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe); - responseMsg.setError(pe); - } catch (Exception e) { - GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e); - responseMsg.setError(new Exception(e.toString())); - } - try { - messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); - } catch (Exception e) { - LOGGER.log(Level.WARNING, e.toString(), e); - } - }); + ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); + try { + IParser parser = compilationProvider.getParserFactory().createParser(statementsText); + List<Statement> statements = parser.parse(); + StringWriter outWriter = new StringWriter(256); + PrintWriter outPrinter = new PrintWriter(outWriter); + SessionOutput.ResultDecorator resultPrefix = ResultUtil.createPreResultDecorator(); + SessionOutput.ResultDecorator resultPostfix = ResultUtil.createPostResultDecorator(); + SessionOutput.ResultAppender appendHandle = ResultUtil.createResultHandleAppender(handleUrl); + SessionOutput.ResultAppender appendStatus = ResultUtil.createResultStatusAppender(); + SessionOutput sessionOutput = new SessionOutput(sessionConfig, outPrinter, resultPrefix, resultPostfix, + appendHandle, appendStatus); + IStatementExecutor.ResultMetadata outMetadata = new IStatementExecutor.ResultMetadata(); + MetadataManager.INSTANCE.init(); + IStatementExecutor translator = statementExecutorFactory.create(ccAppCtx, statements, sessionOutput, + compilationProvider, storageComponentProvider); + translator.compileAndExecute(ccAppCtx.getHcc(), null, delivery, outMetadata, new IStatementExecutor.Stats(), + clientContextID, statementExecutorContext); + outPrinter.close(); + responseMsg.setResult(outWriter.toString()); + responseMsg.setMetadata(outMetadata); + } catch (AlgebricksException | HyracksException | TokenMgrError + | org.apache.asterix.aqlplus.parser.TokenMgrError pe) { + // we trust that "our" exceptions are serializable and have a comprehensible error message + GlobalConfig.ASTERIX_LOGGER.log(Level.WARNING, pe.getMessage(), pe); + responseMsg.setError(pe); + } catch (Exception e) { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e); + responseMsg.setError(new Exception(e.toString())); + } + try { + messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); + } catch (Exception e) { + LOGGER.log(Level.WARNING, e.toString(), e); + } } private String getRejectionReason(ClusterControllerService ccSrv) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java index 4f9aa0c..54f0a4e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java @@ -19,8 +19,6 @@ package org.apache.asterix.app.message; -import java.util.concurrent.TimeUnit; - import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.messaging.api.MessageFuture; @@ -31,8 +29,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public final class ExecuteStatementResponseMessage implements INcAddressedMessage { private static final long serialVersionUID = 1L; - public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5); - private final long requestMessageId; private String result; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index e8636c8..9040ad1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -21,8 +21,8 @@ package org.apache.asterix.hyracks.bootstrap; import static org.apache.asterix.algebra.base.ILangExtension.Language.AQL; import static org.apache.asterix.algebra.base.ILangExtension.Language.SQLPP; -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import java.util.Arrays; import java.util.List; @@ -47,10 +47,10 @@ import org.apache.asterix.api.http.server.QueryServiceServlet; import org.apache.asterix.api.http.server.QueryStatusApiServlet; import org.apache.asterix.api.http.server.QueryWebInterfaceServlet; import org.apache.asterix.api.http.server.RebalanceApiServlet; +import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.api.http.server.ShutdownApiServlet; import org.apache.asterix.api.http.server.UpdateApiServlet; import org.apache.asterix.api.http.server.VersionApiServlet; -import org.apache.asterix.api.http.servlet.ServletConstants; import org.apache.asterix.app.active.ActiveNotificationHandler; import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.cc.ResourceIdManager; @@ -106,6 +106,7 @@ public class CCApplication extends BaseCCApplication { protected WebManager webManager; protected CcApplicationContext appCtx; private IJobCapacityController jobCapacityController; + private IHyracksClientConnection hcc; @Override public void start(IServiceContext serviceCtx, String[] args) throws Exception { @@ -124,6 +125,9 @@ public class CCApplication extends BaseCCApplication { ccServiceCtx.setThreadFactory( new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager())); + String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress(); + int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort(); + hcc = new HyracksConnection(strIP, port); ILibraryManager libraryManager = new ExternalLibraryManager(); ResourceIdManager resourceIdManager = new ResourceIdManager(); IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy(); @@ -196,7 +200,6 @@ public class CCApplication extends BaseCCApplication { protected HttpServer setupWebServer(ExternalProperties externalProperties) throws Exception { HttpServer webServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getWebInterfacePort()); - IHyracksClientConnection hcc = getHcc(); webServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); webServer.addServlet(new ApiServlet(webServer.ctx(), new String[] { "/*" }, appCtx, ccExtensionManager.getCompilationProvider(AQL), ccExtensionManager.getCompilationProvider(SQLPP), @@ -207,7 +210,6 @@ public class CCApplication extends BaseCCApplication { protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws Exception { HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort()); - IHyracksClientConnection hcc = getHcc(); jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx); jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR, @@ -252,7 +254,6 @@ public class CCApplication extends BaseCCApplication { protected HttpServer setupQueryWebServer(ExternalProperties externalProperties) throws Exception { HttpServer queryWebServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getQueryWebInterfacePort()); - IHyracksClientConnection hcc = getHcc(); queryWebServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); queryWebServer.addServlet(new QueryWebInterfaceServlet(appCtx, queryWebServer.ctx(), new String[] { "/*" })); return queryWebServer; @@ -357,9 +358,7 @@ public class CCApplication extends BaseCCApplication { return appCtx; } - protected IHyracksClientConnection getHcc() throws Exception { - String strIP = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress(); - int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort(); - return new HyracksConnection(strIP, port); + public IHyracksClientConnection getHcc() { + return hcc; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java index 5abbe40..cd58d8f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/ConnectorApiServletTest.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.asterix.api.http.server.ConnectorApiServlet; +import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java index 5f40a85..3cb46fe 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.asterix.api.http.ctx.StatementExecutorContext; import org.apache.asterix.api.http.server.QueryCancellationServlet; +import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobId; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java deleted file mode 100644 index e0539ac..0000000 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryServiceServletTest.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.api.http.servlet; - -import org.apache.asterix.api.http.server.QueryServiceServlet; -import org.junit.Assert; -import org.junit.Test; - -public class QueryServiceServletTest { - - @Test - public void testTimeUnitFormatNanos() throws Exception { - Assert.assertEquals("123.456789012s", QueryServiceServlet.TimeUnit.formatNanos(123456789012l)); - Assert.assertEquals("12.345678901s", QueryServiceServlet.TimeUnit.formatNanos(12345678901l)); - Assert.assertEquals("1.234567890s", QueryServiceServlet.TimeUnit.formatNanos(1234567890l)); - Assert.assertEquals("123.456789ms", QueryServiceServlet.TimeUnit.formatNanos(123456789l)); - Assert.assertEquals("12.345678ms", QueryServiceServlet.TimeUnit.formatNanos(12345678l)); - Assert.assertEquals("1.234567ms", QueryServiceServlet.TimeUnit.formatNanos(1234567l)); - Assert.assertEquals("123.456µs", QueryServiceServlet.TimeUnit.formatNanos(123456l)); - Assert.assertEquals("12.345µs", QueryServiceServlet.TimeUnit.formatNanos(12345l)); - Assert.assertEquals("1.234µs", QueryServiceServlet.TimeUnit.formatNanos(1234l)); - Assert.assertEquals("123ns", QueryServiceServlet.TimeUnit.formatNanos(123l)); - Assert.assertEquals("12ns", QueryServiceServlet.TimeUnit.formatNanos(12l)); - Assert.assertEquals("1ns", QueryServiceServlet.TimeUnit.formatNanos(1l)); - Assert.assertEquals("-123.456789012s", QueryServiceServlet.TimeUnit.formatNanos(-123456789012l)); - Assert.assertEquals("120.000000000s", QueryServiceServlet.TimeUnit.formatNanos(120000000000l)); - Assert.assertEquals("-12ns", QueryServiceServlet.TimeUnit.formatNanos(-12l)); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/87411c22/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java index f994e98..e583c75 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/VersionApiServletTest.java @@ -19,8 +19,8 @@ package org.apache.asterix.api.http.servlet; -import static org.apache.asterix.api.http.servlet.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; -import static org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.ASTERIX_APP_CONTEXT_INFO_ATTR; +import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNECTION_ATTR; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when;
