This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 5b3465ddb884e140e7056780e756bb8b96232002 Author: Ali Alsuliman <[email protected]> AuthorDate: Thu Jul 18 00:03:58 2024 +0300 [ASTERIXDB-3343][API] Add servlet to get acitve/completed requests - user model changes: no - storage format changes: no - interface changes: yes Backports from: [ASTERIXDB-3343][API] Add servlet to get completed requests (cherry picked from commit 88c25279458badf088ae36ecef2bf50a66d9638c) Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18109 [ASTERIXDB-3343][API] Include job details in active/completed requests (cherry picked from commit b767d093235ee01b2b39d98f64592f0ffd822cf4) Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18110 [ASTERIXDB-3343][API] Capture job state changes in client requests Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18279 [ASTERIXDB-3343][API] Add redact param to redact active/completed requests Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18283 [ASTERIXDB-3343][API] Return new list when getting completed requests Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18306 Ext-ref: MB-62288 Change-Id: Iaf0ad268cb0629c1314d983cb30a499d554dafd4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18495 Tested-by: Ali Alsuliman <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- .../asterix/translator/BaseClientRequest.java | 17 +++- .../apache/asterix/translator/ClientRequest.java | 106 ++++++++++++++++++++- .../api/http/server/AbstractRequestsServlet.java | 68 +++++++++++++ ...tionServlet.java => ActiveRequestsServlet.java} | 19 ++-- .../api/http/server/CompletedRequestsServlet.java} | 26 ++++- .../http/server/NCQueryCancellationServlet.java | 2 +- .../app/active/ActiveNotificationHandler.java | 10 +- .../asterix/app/translator/QueryTranslator.java | 26 ++--- .../asterix/hyracks/bootstrap/CCApplication.java | 5 +- .../http/servlet/QueryCancellationServletTest.java | 13 ++- .../asterix/test/active/ActiveStatsTest.java | 6 +- .../test/active/TestClusterControllerActor.java | 7 +- .../active_requests.2.pollquery.sqlpp | 2 +- .../completed_requests.4.query.sqlpp} | 8 +- .../misc/active_requests/active_requests.2.regex | 2 +- .../completed_requests.4.regexjson | 19 ++++ .../apache/asterix/common/api/IClientRequest.java | 47 ++++++++- .../apache/asterix/common/api/IReceptionist.java | 4 +- .../apache/asterix/common/api/IRequestTracker.java | 3 +- .../runtime/job/listener/NodeJobTracker.java | 9 +- .../asterix/runtime/utils/RequestTracker.java | 69 +++++++++++++- .../runtime/job/listener/NodeJobTrackerTest.java | 7 +- .../hyracks/api/job/IJobLifecycleListener.java | 31 +++--- .../apache/hyracks/api/job/JobSpecification.java | 10 ++ .../control/cc/application/CCServiceContext.java | 16 ++-- .../hyracks/control/cc/executor/JobExecutor.java | 2 +- .../apache/hyracks/control/cc/job/JobManager.java | 8 +- .../org/apache/hyracks/control/cc/job/JobRun.java | 12 ++- .../control/cc/result/ResultDirectoryService.java | 9 +- .../integration/TestJobLifecycleListener.java | 9 +- 30 files changed, 472 insertions(+), 100 deletions(-) diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java index 99cda09f7c..0fafc47fbd 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java @@ -20,7 +20,6 @@ package org.apache.asterix.translator; import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.api.IRequestReference; -import org.apache.asterix.common.api.RequestReference; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.om.base.ADateTime; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -93,15 +92,25 @@ public abstract class BaseClientRequest implements IClientRequest { return JSONUtil.convertNodeUnchecked(asJson()); } - protected ObjectNode asJson() { + @Override + public ObjectNode asJson() { + return putJson(); + } + + @Override + public ObjectNode asRedactedJson() { + return putJson(); + } + + private ObjectNode putJson() { ObjectNode json = JSONUtil.createObject(); json.put("uuid", requestReference.getUuid()); json.put("requestTime", new ADateTime(requestReference.getTime()).toSimpleString()); json.put("elapsedTime", getElapsedTimeInSecs()); json.put("node", requestReference.getNode()); json.put("state", state.getLabel()); - json.put("userAgent", ((RequestReference) requestReference).getUserAgent()); - json.put("remoteAddr", ((RequestReference) requestReference).getRemoteAddr()); + json.put("userAgent", requestReference.getUserAgent()); + json.put("remoteAddr", requestReference.getRemoteAddr()); json.put("cancellable", cancellable); return json; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java index c19bb026a5..31f197968d 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientRequest.java @@ -18,11 +18,22 @@ */ package org.apache.asterix.translator; +import static org.apache.hyracks.api.job.resource.IJobCapacityController.JobSubmissionStatus.QUEUE; + +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.asterix.common.api.ICommonRequestParameters; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.om.base.AMutableDateTime; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; +import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.util.LogRedactionUtil; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -32,13 +43,16 @@ public class ClientRequest extends BaseClientRequest { protected final Thread executor; protected final String statement; protected final String clientContextId; + protected final JobState jobState; protected volatile JobId jobId; + private volatile String plan; // can be null public ClientRequest(ICommonRequestParameters requestParameters) { super(requestParameters.getRequestReference()); this.clientContextId = requestParameters.getClientContextId(); this.statement = requestParameters.getStatement(); this.executor = Thread.currentThread(); + this.jobState = new JobState(); } @Override @@ -46,6 +60,10 @@ public class ClientRequest extends BaseClientRequest { return clientContextId; } + public void setPlan(String plan) { + this.plan = plan; + } + public synchronized void setJobId(JobId jobId) { this.jobId = jobId; setRunning(); @@ -76,11 +94,93 @@ public class ClientRequest extends BaseClientRequest { } @Override - protected ObjectNode asJson() { + public ObjectNode asJson() { ObjectNode json = super.asJson(); - json.put("jobId", jobId != null ? jobId.toString() : null); - json.put("statement", statement); + return asJson(json, false); + } + + @Override + public ObjectNode asRedactedJson() { + ObjectNode json = super.asRedactedJson(); + return asJson(json, true); + } + + private ObjectNode asJson(ObjectNode json, boolean redact) { + putJobDetails(json, redact); + json.put("statement", redact ? LogRedactionUtil.statement(statement) : statement); json.put("clientContextID", clientContextId); + if (plan != null) { + json.put("plan", redact ? LogRedactionUtil.userData(plan) : plan); + } return json; } + + @Override + public void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity, + IJobCapacityController.JobSubmissionStatus status) { + jobState.createTime = System.currentTimeMillis(); + jobState.status = status == QUEUE ? JobStatus.PENDING : JobStatus.RUNNING; + jobState.requiredCPUs = requiredClusterCapacity.getAggregatedCores(); + jobState.requiredMemoryInBytes = requiredClusterCapacity.getAggregatedMemoryByteSize(); + } + + @Override + public void jobStarted(JobId jobId) { + jobState.startTime = System.currentTimeMillis(); + jobState.status = JobStatus.RUNNING; + } + + @Override + public void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) { + jobState.endTime = System.currentTimeMillis(); + jobState.status = jobStatus; + if (exceptions != null && !exceptions.isEmpty()) { + jobState.errorMsg = processException(exceptions.get(0)); + } + } + + protected String processException(Exception e) { + return ExceptionUtils.unwrap(e).getMessage(); + } + + private void putJobDetails(ObjectNode json, boolean redact) { + try { + json.put("jobId", jobId != null ? jobId.toString() : null); + putJobState(json, jobState, redact); + } catch (Throwable th) { + // ignore + } + } + + private static void putJobState(ObjectNode json, JobState state, boolean redact) { + AMutableDateTime dateTime = new AMutableDateTime(0); + putTime(json, state.createTime, "jobCreateTime", dateTime); + putTime(json, state.startTime, "jobStartTime", dateTime); + putTime(json, state.endTime, "jobEndTime", dateTime); + long queueTime = (state.startTime > 0 ? state.startTime : System.currentTimeMillis()) - state.createTime; + json.put("jobQueueTime", TimeUnit.MILLISECONDS.toSeconds(queueTime)); + json.put("jobStatus", String.valueOf(state.status)); + json.put("jobRequiredCPUs", state.requiredCPUs); + json.put("jobRequiredMemory", state.requiredMemoryInBytes); + if (state.errorMsg != null) { + json.put("error", redact ? LogRedactionUtil.userData(state.errorMsg) : state.errorMsg); + } + } + + private static void putTime(ObjectNode json, long time, String label, AMutableDateTime dateTime) { + if (time > 0) { + dateTime.setValue(time); + json.put(label, dateTime.toSimpleString()); + } + } + + static class JobState { + volatile long createTime; + volatile long startTime; + volatile long endTime; + volatile long requiredMemoryInBytes; + volatile int requiredCPUs; + volatile JobStatus status; + volatile String errorMsg; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java new file mode 100644 index 0000000000..b6c6a713fd --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/AbstractRequestsServlet.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.hyracks.http.server.utils.HttpUtil; +import org.apache.hyracks.util.JSONUtil; + +import com.fasterxml.jackson.databind.node.ArrayNode; + +import io.netty.handler.codec.http.HttpResponseStatus; + +public abstract class AbstractRequestsServlet extends AbstractServlet { + + public static final String REDACT_PARAM = "redact"; + protected final ICcApplicationContext appCtx; + + public AbstractRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) { + super(ctx, paths); + this.appCtx = appCtx; + } + + @Override + protected void get(IServletRequest request, IServletResponse response) throws Exception { + ArrayNode requestsJson = JSONUtil.createArray(); + Collection<IClientRequest> requests = getRequests(); + String redact = request.getParameter(REDACT_PARAM); + if (Boolean.parseBoolean(redact)) { + for (IClientRequest req : requests) { + requestsJson.add(req.asRedactedJson()); + } + } else { + for (IClientRequest req : requests) { + requestsJson.add(req.asJson()); + } + } + HttpUtil.setContentType(response, HttpUtil.ContentType.APPLICATION_JSON, request); + response.setStatus(HttpResponseStatus.OK); + JSONUtil.writeNode(response.writer(), requestsJson); + response.writer().flush(); + } + + abstract Collection<IClientRequest> getRequests(); + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveRequestsServlet.java similarity index 83% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java rename to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveRequestsServlet.java index 7ba28675b6..7e00e9ae93 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ActiveRequestsServlet.java @@ -19,6 +19,7 @@ package org.apache.asterix.api.http.server; import java.io.IOException; +import java.util.Collection; import java.util.concurrent.ConcurrentMap; import org.apache.asterix.api.http.server.QueryServiceRequestParameters.Parameter; @@ -27,7 +28,6 @@ import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; -import org.apache.hyracks.http.server.AbstractServlet; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -35,24 +35,27 @@ import org.apache.logging.log4j.Logger; import io.netty.handler.codec.http.HttpResponseStatus; /** - * The servlet provides a REST API for cancelling an on-going query. + * The servlet provides a REST API for getting the running queries or cancelling an on-going one. */ -public class CcQueryCancellationServlet extends AbstractServlet { +public class ActiveRequestsServlet extends AbstractRequestsServlet { public static final String REQUEST_UUID_PARAM_NAME = "request_id"; private static final Logger LOGGER = LogManager.getLogger(); - private final ICcApplicationContext appCtx; - public CcQueryCancellationServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, - String... paths) { - super(ctx, paths); - this.appCtx = appCtx; + public ActiveRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) { + super(ctx, appCtx, paths); + } + + @Override + public Collection<IClientRequest> getRequests() { + return appCtx.getRequestTracker().getRunningRequests(); } @Override protected void delete(IServletRequest request, IServletResponse response) throws IOException { String uuid = request.getParameter(REQUEST_UUID_PARAM_NAME); String clientCtxId = request.getParameter(Parameter.CLIENT_ID.str()); + LOGGER.debug("received cancel request, uuid={}, clientCtxId={}", uuid, clientCtxId); if (uuid == null && clientCtxId == null) { response.setStatus(HttpResponseStatus.BAD_REQUEST); return; diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CompletedRequestsServlet.java similarity index 53% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp copy to asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CompletedRequestsServlet.java index 7eee42e6b1..92eacbb0fa 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CompletedRequestsServlet.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -16,7 +16,23 @@ * specific language governing permissions and limitations * under the License. */ - -- param client_context_id=ensure_running_query - -- polltimeoutsecs=15 -SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst -WHERE rqst.clientContextID = 'sleep_async_query'; \ No newline at end of file +package org.apache.asterix.api.http.server; + +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.dataflow.ICcApplicationContext; + +public class CompletedRequestsServlet extends AbstractRequestsServlet { + + public CompletedRequestsServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, String... paths) { + super(ctx, appCtx, paths); + } + + @Override + public Collection<IClientRequest> getRequests() { + return appCtx.getRequestTracker().getCompletedRequests(); + } + +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java index b2134dcfc6..5dd9430840 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.api.http.server; -import static org.apache.asterix.api.http.server.CcQueryCancellationServlet.REQUEST_UUID_PARAM_NAME; +import static org.apache.asterix.api.http.server.ActiveRequestsServlet.REQUEST_UUID_PARAM_NAME; import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; import java.util.concurrent.ConcurrentMap; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 662884d48b..0f2780f51e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -41,6 +41,7 @@ import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.util.SingleThreadEventProcessor; import org.apache.hyracks.util.ExitUtil; import org.apache.logging.log4j.Level; @@ -89,7 +90,8 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active // *** IJobLifecycleListener @Override - public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException { + public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification, + IJobCapacityController.JobSubmissionStatus status) throws HyracksDataException { Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); if (!(property instanceof EntityId)) { if (property != null) { @@ -118,7 +120,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active } @Override - public synchronized void notifyJobStart(JobId jobId) throws HyracksException { + public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException { EntityId entityId = jobId2EntityId.get(jobId); if (entityId != null) { add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null)); @@ -127,8 +129,8 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active } @Override - public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) - throws HyracksException { + public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, + List<Exception> exceptions) throws HyracksException { EntityId entityId = jobId2EntityId.get(jobId); if (entityId != null) { LOGGER.debug("notified of ingestion job finish {}", jobId); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 6b40cbfffd..31b903b6e1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -3664,8 +3664,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen public JobSpecification handleInsertUpsertStatement(MetadataProvider metadataProvider, Statement stmt, IHyracksClientConnection hcc, IResultSet resultSet, ResultDelivery resultDelivery, - ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, - Map<String, IAObject> stmtParams, IStatementRewriter stmtRewriter) throws Exception { + ResultMetadata outMetadata, Stats stats, IRequestParameters reqParams, Map<String, IAObject> stmtParams, + IStatementRewriter stmtRewriter) throws Exception { InsertStatement stmtInsertUpsert = (InsertStatement) stmt; String datasetName = stmtInsertUpsert.getDatasetName(); metadataProvider.validateDatabaseObjectName(stmtInsertUpsert.getDataverseName(), datasetName, @@ -3702,10 +3702,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw e; } }; - + IRequestTracker requestTracker = appCtx.getRequestTracker(); + ClientRequest clientRequest = (ClientRequest) requestTracker.get(reqParams.getRequestReference().getUuid()); if (stmtInsertUpsert.getReturnExpression() != null) { deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - requestParameters, false); + reqParams, false, clientRequest); } else { locker.lock(); try { @@ -4702,13 +4703,13 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } }; deliverResult(hcc, resultSet, compiler, metadataProvider, locker, resultDelivery, outMetadata, stats, - requestParameters, true); + requestParameters, true, clientRequest); } private void deliverResult(IHyracksClientConnection hcc, IResultSet resultSet, IStatementCompiler compiler, MetadataProvider metadataProvider, IMetadataLocker locker, ResultDelivery resultDelivery, - ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable) - throws Exception { + ResultMetadata outMetadata, Stats stats, IRequestParameters requestParameters, boolean cancellable, + ClientRequest clientRequest) throws Exception { final ResultSetId resultSetId = metadataProvider.getResultSetId(); switch (resultDelivery) { case ASYNC: @@ -4724,7 +4725,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen case IMMEDIATE: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { final ResultReader resultReader = new ResultReader(resultSet, id, resultSetId); - updateJobStats(id, stats, metadataProvider.getResultSetId()); + updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest); responsePrinter.addResultPrinter(new ResultsPrinter(appCtx, resultReader, metadataProvider.findOutputRecordType(), stats, sessionOutput)); responsePrinter.printResults(); @@ -4732,7 +4733,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen break; case DEFERRED: createAndRunJob(hcc, jobFlags, null, compiler, locker, resultDelivery, id -> { - updateJobStats(id, stats, metadataProvider.getResultSetId()); + updateJobStats(id, stats, metadataProvider.getResultSetId(), clientRequest); responsePrinter.addResultPrinter( new ResultHandlePrinter(sessionOutput, new ResultHandle(id, resultSetId))); responsePrinter.printResults(); @@ -4747,7 +4748,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } - private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException { + private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId, ClientRequest clientRequest) + throws HyracksDataException { final ClusterControllerService controllerService = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); org.apache.asterix.translator.ResultMetadata resultMetadata = @@ -4757,6 +4759,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) { stats.setJobProfile(resultMetadata.getJobProfile()); } + clientRequest.setPlan(apiFramework.getExecutionPlans().getOptimizedLogicalPlan()); stats.updateTotalWarningsCount(resultMetadata.getTotalWarningsCount()); WarningUtil.mergeWarnings(resultMetadata.getWarnings(), warningCollector); } @@ -4836,6 +4839,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen appCtx.getReceptionist().ensureSchedulable(schedulableRequest); // ensure request not cancelled before running job ensureNotCancelled(clientRequest); + jobSpec.setRequestId(clientRequest.getId()); final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); if (LOGGER.isInfoEnabled()) { LOGGER.info("Created job {} for query uuid:{}, clientContextID:{}", jobId, @@ -5208,7 +5212,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected void trackRequest(IRequestParameters requestParameters) throws HyracksDataException { final IClientRequest clientRequest = appCtx.getReceptionist().requestReceived(requestParameters); - appCtx.getRequestTracker().track(clientRequest); + this.appCtx.getRequestTracker().track(clientRequest); } protected void validateStatements(IRequestParameters requestParameters) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index bc67823916..31ec44cb58 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -37,9 +37,9 @@ import java.util.ServiceLoader; import java.util.concurrent.ConcurrentMap; import org.apache.asterix.api.http.IQueryWebServerRegistrant; +import org.apache.asterix.api.http.server.ActiveRequestsServlet; import org.apache.asterix.api.http.server.ActiveStatsApiServlet; import org.apache.asterix.api.http.server.ApiServlet; -import org.apache.asterix.api.http.server.CcQueryCancellationServlet; import org.apache.asterix.api.http.server.ClusterApiServlet; import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet; import org.apache.asterix.api.http.server.ConnectorApiServlet; @@ -171,6 +171,7 @@ public class CCApplication extends BaseCCApplication { ccServiceCtx.setDistributedState(proxy); MetadataManager.initialize(proxy, metadataProperties, appCtx); ccServiceCtx.addJobLifecycleListener(appCtx.getActiveNotificationHandler()); + ccServiceCtx.addJobLifecycleListener(appCtx.getRequestTracker()); // create event loop groups webManager = new WebManager(); @@ -315,7 +316,7 @@ public class CCApplication extends BaseCCApplication { ConcurrentMap<String, Object> ctx = server.ctx(); switch (key) { case Servlets.RUNNING_REQUESTS: - return new CcQueryCancellationServlet(ctx, appCtx, paths); + return new ActiveRequestsServlet(ctx, appCtx, paths); case Servlets.QUERY_STATUS: return new QueryStatusApiServlet(ctx, appCtx, paths); case Servlets.QUERY_RESULT: diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java index 68fb9a8419..673611d69a 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java @@ -27,7 +27,7 @@ import static org.mockito.Mockito.when; import java.util.concurrent.ConcurrentHashMap; -import org.apache.asterix.api.http.server.CcQueryCancellationServlet; +import org.apache.asterix.api.http.server.ActiveRequestsServlet; import org.apache.asterix.api.http.server.ServletConstants; import org.apache.asterix.app.translator.RequestParameters; import org.apache.asterix.common.api.RequestReference; @@ -35,8 +35,10 @@ import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.runtime.utils.RequestTracker; import org.apache.asterix.translator.ClientRequest; +import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; import org.junit.Test; @@ -57,8 +59,8 @@ public class QueryCancellationServletTest { RequestTracker tracker = new RequestTracker(appCtx); Mockito.when(appCtx.getRequestTracker()).thenReturn(tracker); // Creates a query cancellation servlet. - CcQueryCancellationServlet cancellationServlet = - new CcQueryCancellationServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" }); + ActiveRequestsServlet cancellationServlet = + new ActiveRequestsServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" }); // Adds mocked Hyracks client connection into the servlet context. IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class); cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc); @@ -66,6 +68,11 @@ public class QueryCancellationServletTest { // Tests the case that query is not in the map. IServletRequest mockRequest = mockRequest("1"); IServletResponse mockResponse = mock(IServletResponse.class); + ICCServiceContext mockCCServiceCtx = mock(ICCServiceContext.class); + ClusterControllerService mockCCService = mock(ClusterControllerService.class); + Mockito.when(appCtx.getServiceContext()).thenReturn(mockCCServiceCtx); + Mockito.when(appCtx.getServiceContext().getControllerService()).thenReturn(mockCCService); + Mockito.when(mockCCServiceCtx.getControllerService()).thenReturn(mockCCService); cancellationServlet.handle(mockRequest, mockResponse); verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index cb123bfb92..c9d862c0a5 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -53,6 +53,7 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -135,8 +136,9 @@ public class ActiveStatsTest { TestUserActor user = new TestUserActor("Xikui", mdProvider, null); Action start = user.startActivity(eventsListener); startingSubscriber.sync(); - activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec); - activeJobNotificationHandler.notifyJobStart(jobId); + activeJobNotificationHandler.notifyJobCreation(jobId, jobSpec, + IJobCapacityController.JobSubmissionStatus.EXECUTE); + activeJobNotificationHandler.notifyJobStart(jobId, jobSpec); try { eventsListener.refreshStats(1000); } catch (HyracksDataException e) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java index 52d4225156..883a0cb069 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestClusterControllerActor.java @@ -28,6 +28,7 @@ import org.apache.asterix.metadata.entities.Dataset; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.mockito.Mockito; public class TestClusterControllerActor extends Actor { @@ -49,8 +50,8 @@ public class TestClusterControllerActor extends Actor { JobSpecification jobSpecification = Mockito.mock(JobSpecification.class); Mockito.when(jobSpecification.getProperty(ActiveNotificationHandler.ACTIVE_ENTITY_PROPERTY_NAME)) .thenReturn(entityId); - handler.notifyJobCreation(jobId, jobSpecification); - handler.notifyJobStart(jobId); + handler.notifyJobCreation(jobId, jobSpecification, IJobCapacityController.JobSubmissionStatus.EXECUTE); + handler.notifyJobStart(jobId, null); } }; add(startJob); @@ -72,7 +73,7 @@ public class TestClusterControllerActor extends Actor { Action delivery = new Action() { @Override protected void doExecute(MetadataProvider actorMdProvider) throws Exception { - handler.notifyJobFinish(jobId, jobStatus, exceptions); + handler.notifyJobFinish(jobId, null, jobStatus, exceptions); } }; add(delivery); diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp index 7eee42e6b1..07f8eea144 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp @@ -18,5 +18,5 @@ */ -- param client_context_id=ensure_running_query -- polltimeoutsecs=15 -SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst +SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid, rqst.jobStatus FROM active_requests() rqst WHERE rqst.clientContextID = 'sleep_async_query'; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp similarity index 78% copy from asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp index 7eee42e6b1..de35939f09 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/active_requests/active_requests.2.pollquery.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/misc/completed_requests/completed_requests.4.query.sqlpp @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ - -- param client_context_id=ensure_running_query - -- polltimeoutsecs=15 -SELECT rqst.cancellable, rqst.jobId, rqst.state, rqst.uuid FROM active_requests() rqst -WHERE rqst.clientContextID = 'sleep_async_query'; \ No newline at end of file + -- param client_context_id=ensure_completed_query + -- param ignoreextrafields=true +SELECT VALUE (SELECT VALUE r FROM completed_requests() r +WHERE r.state="completed" AND r.clientContextID = "completed_requests_query")[0]; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex index e31fe3b3c6..170838e5bf 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/active_requests/active_requests.2.regex @@ -1 +1 @@ -/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*" \}/ \ No newline at end of file +/\{ "cancellable": true, "jobId": "JID:.*", "state": "running", "uuid": ".*", "jobStatus": "RUNNING" \}/ \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson new file mode 100644 index 0000000000..15affd15a9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/misc/completed_requests/completed_requests.4.regexjson @@ -0,0 +1,19 @@ +{ + "cancellable": true, + "clientContextID": "completed_requests_query", + "elapsedTime": "R{.*}", + "jobCreateTime": "R{.*}", + "jobEndTime": "R{.*}", + "jobId": "R{.*}", + "jobQueueTime": "R{.*}", + "jobRequiredCPUs": "R{.*}", + "jobRequiredMemory": "R{.*}", + "jobStartTime": "R{.*}", + "jobStatus": "TERMINATED", + "node": "R{.*}", + "remoteAddr": "R{.*}", + "requestTime": "R{.*}", + "state": "completed", + "userAgent": "R{.*}", + "uuid": "R{.*}" +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java index 921fb64095..ee518dd664 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java @@ -18,8 +18,16 @@ */ package org.apache.asterix.common.api; +import java.util.List; + import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; + +import com.fasterxml.jackson.databind.node.ObjectNode; public interface IClientRequest { @@ -86,7 +94,44 @@ public interface IClientRequest { void cancel(ICcApplicationContext appCtx) throws HyracksDataException; /** - * @return A json representation of this request + * @return A json string representation of this request */ String toJson(); + + /** + * @return A json node representation of this request + */ + ObjectNode asJson(); + + /** + * @return A redacted json node representation of this request + */ + ObjectNode asRedactedJson(); + + /** + * Called when the job is created. + * + * @param jobId the job id + * @param requiredClusterCapacity the required resources by the job + * @param status the status of the job; whether it will be executed or queued + */ + void jobCreated(JobId jobId, IReadOnlyClusterCapacity requiredClusterCapacity, + IJobCapacityController.JobSubmissionStatus status); + + /** + * Called when the job starts running. + * + * @param jobId the job id + */ + void jobStarted(JobId jobId); + + /** + * Called when the job finishes. + * + * @param jobId the job id + * @param jobStatus the final job status + * @param exceptions exceptions encountered if any + */ + void jobFinished(JobId jobId, JobStatus jobStatus, List<Exception> exceptions); + } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java index 95ed22ea61..153eea9a69 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IReceptionist.java @@ -34,9 +34,9 @@ public interface IReceptionist { /** * Generates a {@link IClientRequest} based on the requests parameters * - * @param requestParameters + * @param requestParameters the request parameters * @return the client request - * @throws HyracksDataException + * @throws HyracksDataException HyracksDataException */ IClientRequest requestReceived(ICommonRequestParameters requestParameters) throws HyracksDataException; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java index 0019015df8..5a99f09ba8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IRequestTracker.java @@ -21,8 +21,9 @@ package org.apache.asterix.common.api; import java.util.Collection; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.IJobLifecycleListener; -public interface IRequestTracker { +public interface IRequestTracker extends IJobLifecycleListener { /** * Starts tracking {@code request} diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java index a2ebd9a7b8..02f20f8e2f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/NodeJobTracker.java @@ -36,6 +36,7 @@ import org.apache.hyracks.api.constraints.expressions.ConstantExpression; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.util.annotations.ThreadSafe; @ThreadSafe @@ -44,17 +45,19 @@ public class NodeJobTracker implements INodeJobTracker { private final Map<String, Set<JobId>> nodeJobs = new HashMap<>(); @Override - public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) { + public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec, + IJobCapacityController.JobSubmissionStatus status) { getJobParticipatingNodes(spec).stream().map(nodeJobs::get).forEach(jobsSet -> jobsSet.add(jobId)); } @Override - public synchronized void notifyJobStart(JobId jobId) { + public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) { // nothing to do } @Override - public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) { + public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, + List<Exception> exceptions) { nodeJobs.values().forEach(jobsSet -> jobsSet.remove(jobId)); } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java index c9425c6065..a754eb63ee 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/RequestTracker.java @@ -21,6 +21,8 @@ package org.apache.asterix.runtime.utils; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -29,20 +31,32 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.api.IRequestTracker; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; public class RequestTracker implements IRequestTracker { private final Map<String, IClientRequest> runningRequests = new ConcurrentHashMap<>(); private final Map<String, IClientRequest> clientIdRequests = new ConcurrentHashMap<>(); - private final CircularFifoQueue<IClientRequest> completedRequests; + private final Map<String, IClientRequest> completedRequests; private final ICcApplicationContext ccAppCtx; private final AtomicLong numRequests; public RequestTracker(ICcApplicationContext ccAppCtx) { this.ccAppCtx = ccAppCtx; - completedRequests = new CircularFifoQueue<>(ccAppCtx.getExternalProperties().getRequestsArchiveSize()); + int archiveSize = ccAppCtx.getExternalProperties().getRequestsArchiveSize(); + completedRequests = new LinkedHashMap<>(archiveSize) { + private static final long serialVersionUID = 1L; + + @Override + protected boolean removeEldestEntry(Map.Entry<String, IClientRequest> eldest) { + return size() > archiveSize; + } + }; numRequests = new AtomicLong(0); } @@ -94,7 +108,7 @@ public class RequestTracker implements IRequestTracker { @Override public synchronized Collection<IClientRequest> getCompletedRequests() { - return Collections.unmodifiableCollection(new ArrayList<>(completedRequests)); + return Collections.unmodifiableCollection(new ArrayList<>(completedRequests.values())); } private void cancel(IClientRequest request) throws HyracksDataException { @@ -114,10 +128,55 @@ public class RequestTracker implements IRequestTracker { } private synchronized void archive(IClientRequest request) { - completedRequests.add(request); + completedRequests.put(request.getId(), request); } public long getTotalNumberOfRequests() { return numRequests.get(); } + + @Override + public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) + throws HyracksException { + String requestId = spec.getRequestId(); + if (requestId != null) { + IClientRequest request = getRequest(requestId); + if (request != null) { + request.jobCreated(jobId, spec.getRequiredClusterCapacity(), status); + } + } + } + + @Override + public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException { + String requestId = spec.getRequestId(); + if (requestId != null) { + IClientRequest request = getRequest(requestId); + if (request != null) { + request.jobStarted(jobId); + } + } + } + + @Override + public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions) + throws HyracksException { + String requestId = spec.getRequestId(); + if (requestId != null) { + IClientRequest request = getRequest(requestId); + if (request != null) { + request.jobFinished(jobId, jobStatus, exceptions); + } + } + } + + private IClientRequest getRequest(String requestId) { + IClientRequest clientRequest = runningRequests.get(requestId); + if (clientRequest != null) { + return clientRequest; + } + synchronized (this) { + return completedRequests.get(requestId); + } + } } diff --git a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java index be75ecbdec..82d8cb771d 100644 --- a/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java +++ b/asterixdb/asterix-runtime/src/test/java/org/apache/asterix/runtime/job/listener/NodeJobTrackerTest.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.constraints.expressions.LValueConstraintExpression import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -50,18 +51,18 @@ public class NodeJobTrackerTest { jobSpec.getUserConstraints().add(new Constraint(lValueMock, unknownLocation)); JobId jobId = new JobId(1); - nodeJobTracker.notifyJobCreation(jobId, jobSpec); + nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE); // make sure nc1 has a pending job Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1); Assert.assertTrue(nodeJobTracker.getPendingJobs(unknown).isEmpty()); Assert.assertTrue(nodeJobTracker.getPendingJobs(nc2).isEmpty()); - nodeJobTracker.notifyJobFinish(jobId, JobStatus.TERMINATED, null); + nodeJobTracker.notifyJobFinish(jobId, jobSpec, JobStatus.TERMINATED, null); // make sure nc1 doesn't have pending jobs anymore Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty()); // make sure node doesn't have pending jobs after failure jobId = new JobId(2); - nodeJobTracker.notifyJobCreation(jobId, jobSpec); + nodeJobTracker.notifyJobCreation(jobId, jobSpec, IJobCapacityController.JobSubmissionStatus.EXECUTE); Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).size() == 1); nodeJobTracker.notifyNodeFailure(Collections.singleton(nc1)); Assert.assertTrue(nodeJobTracker.getPendingJobs(nc1).isEmpty()); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java index 338c3315d4..6773dde29d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/IJobLifecycleListener.java @@ -21,6 +21,7 @@ package org.apache.hyracks.api.job; import java.util.List; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.api.job.resource.IJobCapacityController; /** * A listener for job related events @@ -29,27 +30,33 @@ public interface IJobLifecycleListener { /** * Notify the listener that a job has been created * - * @param jobId - * @param spec - * @throws HyracksException + * @param jobId the job id + * @param spec the job specification + * @param status the status of the job; whether it will be executed or queued + * @throws HyracksException HyracksException */ - void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException; + void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) + throws HyracksException; /** * Notify the listener that the job has started on the cluster controller * - * @param jobId - * @throws HyracksException + * @param jobId the job id + * @param spec the job specification + * + * @throws HyracksException HyracksException */ - void notifyJobStart(JobId jobId) throws HyracksException; + void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException; /** * Notify the listener that the job has been terminated, passing exceptions in case of failure * - * @param jobId - * @param jobStatus - * @param exceptions - * @throws HyracksException + * @param jobId the job id + * @param spec the job specification + * @param jobStatus the job status + * @param exceptions the job exceptions + * @throws HyracksException HyracksException */ - void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException; + void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions) + throws HyracksException; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java index 58336a0808..6d0ce6bc06 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobSpecification.java @@ -79,6 +79,8 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist private long maxWarnings; + private String requestId; + private IJobletEventListenerFactory jobletEventListenerFactory; private IGlobalJobDataFactory globalJobDataFactory; @@ -254,6 +256,14 @@ public class JobSpecification implements Serializable, IOperatorDescriptorRegist this.connectorPolicyAssignmentPolicy = connectorPolicyAssignmentPolicy; } + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getRequestId() { + return requestId; + } + public void setFrameSize(int frameSize) { this.frameSize = frameSize; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java index de166dd3df..555d37e089 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/application/CCServiceContext.java @@ -37,6 +37,7 @@ import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.application.ServiceContext; @@ -79,22 +80,23 @@ public class CCServiceContext extends ServiceContext implements ICCServiceContex jobLifecycleListeners.add(jobLifecycleListener); } - public synchronized void notifyJobStart(JobId jobId) throws HyracksException { + public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException { for (IJobLifecycleListener l : jobLifecycleListeners) { - l.notifyJobStart(jobId); + l.notifyJobStart(jobId, spec); } } - public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) - throws HyracksException { + public synchronized void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, + List<Exception> exceptions) throws HyracksException { for (IJobLifecycleListener l : jobLifecycleListeners) { - l.notifyJobFinish(jobId, jobStatus, exceptions); + l.notifyJobFinish(jobId, spec, jobStatus, exceptions); } } - public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { + public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec, + IJobCapacityController.JobSubmissionStatus status) throws HyracksException { for (IJobLifecycleListener l : jobLifecycleListeners) { - l.notifyJobCreation(jobId, spec); + l.notifyJobCreation(jobId, spec, status); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index da840afe98..567d20cc55 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -115,7 +115,7 @@ public class JobExecutor { public void startJob() throws HyracksException { startRunnableActivityClusters(); - ccs.getContext().notifyJobStart(jobRun.getJobId()); + ccs.getContext().notifyJobStart(jobRun.getJobId(), jobRun.getJobSpecification()); } public void cancelJob(IResultCallback<Void> callback) throws HyracksException { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index ad971887f7..d803c88fb3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -112,7 +112,7 @@ public class JobManager implements IJobManager { JobSpecification job = jobRun.getJobSpecification(); IJobCapacityController.JobSubmissionStatus status = jobCapacityController.allocate(job); CCServiceContext serviceCtx = ccs.getContext(); - serviceCtx.notifyJobCreation(jobRun.getJobId(), job); + serviceCtx.notifyJobCreation(jobRun.getJobId(), job, status); switch (status) { case QUEUE: queueJob(jobRun); @@ -148,7 +148,8 @@ public class JobManager implements IJobManager { CCServiceContext serviceCtx = ccs.getContext(); if (serviceCtx != null) { try { - serviceCtx.notifyJobFinish(jobId, JobStatus.FAILURE_BEFORE_EXECUTION, exceptions); + serviceCtx.notifyJobFinish(jobId, jobRun.getJobSpecification(), JobStatus.FAILURE_BEFORE_EXECUTION, + exceptions); } catch (Exception e) { LOGGER.error("Exception notifying cancel on pending job {}", jobId, e); throw HyracksDataException.create(e); @@ -224,7 +225,8 @@ public class JobManager implements IJobManager { Throwable caughtException = null; CCServiceContext serviceCtx = ccs.getContext(); try { - serviceCtx.notifyJobFinish(jobId, run.getPendingStatus(), run.getPendingExceptions()); + serviceCtx.notifyJobFinish(jobId, run.getJobSpecification(), run.getPendingStatus(), + run.getPendingExceptions()); } catch (Exception e) { LOGGER.error("Exception notifying job finish {}", jobId, e); caughtException = e; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java index 0cc09b4b0f..06e955ae91 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobRun.java @@ -82,13 +82,13 @@ public class JobRun implements IJobStatusConditionVariable { private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicyMap; - private long createTime; + private final long createTime; - private long startTime; + private volatile long startTime; private String startTimeZoneId; - private long endTime; + private volatile long endTime; private JobStatus status; @@ -98,7 +98,7 @@ public class JobRun implements IJobStatusConditionVariable { private List<Exception> pendingExceptions; - private Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations; + private final Map<OperatorDescriptorId, Map<Integer, String>> operatorLocations; private JobRun(DeploymentId deploymentId, JobId jobId, Set<JobFlag> jobFlags, JobSpecification spec, ActivityClusterGraph acg) { @@ -218,6 +218,10 @@ public class JobRun implements IJobStatusConditionVariable { this.endTime = endTime; } + public long getQueueWaitTimeInMillis() { + return startTime > 0 ? startTime - createTime : System.currentTimeMillis() - createTime; + } + public void registerOperatorLocation(OperatorDescriptorId op, int partition, String location) { operatorLocations.computeIfAbsent(op, k -> new HashMap<>()).put(partition, location); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index 80d7630524..b6274d92b3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -34,6 +34,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.result.IJobResultCallback; import org.apache.hyracks.api.result.IResultMetadata; import org.apache.hyracks.api.result.IResultStateRecord; @@ -77,7 +78,8 @@ public class ResultDirectoryService extends AbstractResultManager implements IRe } @Override - public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { + public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec, + IJobCapacityController.JobSubmissionStatus status) throws HyracksException { if (jobResultLocations.get(jobId) != null) { throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId); } @@ -85,12 +87,13 @@ public class ResultDirectoryService extends AbstractResultManager implements IRe } @Override - public synchronized void notifyJobStart(JobId jobId) throws HyracksException { + public synchronized void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException { jobResultLocations.get(jobId).getRecord().start(); } @Override - public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { + public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions) + throws HyracksException { if (exceptions == null || exceptions.isEmpty()) { final ResultJobRecord resultJobRecord = getResultJobRecord(jobId); if (resultJobRecord == null) { diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java index 008be29584..19fdcfee01 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TestJobLifecycleListener.java @@ -30,6 +30,7 @@ import org.apache.hyracks.api.job.IJobLifecycleListener; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -47,7 +48,8 @@ public class TestJobLifecycleListener implements IJobLifecycleListener { private final Set<JobId> finishWithoutStart = new HashSet<>(); @Override - public void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { + public void notifyJobCreation(JobId jobId, JobSpecification spec, IJobCapacityController.JobSubmissionStatus status) + throws HyracksException { if (created.containsKey(jobId)) { LOGGER.log(Level.WARN, "Job " + jobId + "has been created before"); increment(doubleCreated, jobId); @@ -62,7 +64,7 @@ public class TestJobLifecycleListener implements IJobLifecycleListener { } @Override - public void notifyJobStart(JobId jobId) throws HyracksException { + public void notifyJobStart(JobId jobId, JobSpecification spec) throws HyracksException { if (!created.containsKey(jobId)) { LOGGER.log(Level.WARN, "Job " + jobId + "has not been created"); startWithoutCreate.add(jobId); @@ -75,7 +77,8 @@ public class TestJobLifecycleListener implements IJobLifecycleListener { } @Override - public void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { + public void notifyJobFinish(JobId jobId, JobSpecification spec, JobStatus jobStatus, List<Exception> exceptions) + throws HyracksException { if (!started.contains(jobId)) { LOGGER.log(Level.WARN, "Job " + jobId + "has not been started"); finishWithoutStart.add(jobId);
