Repository: asterixdb Updated Branches: refs/heads/master a7aaf7110 -> 7df888fff
[NO ISSUE][MISC] Introduce IClientRequest - user model changes: no - storage format changes: no - interface changes: yes + IClientRequest: used to represent a client request that can be cancelled. Details: - Introduce IClientRequest to allow for multiple types of requests to be cancellable. Change-Id: I8f65da1744ea7ecf26ea3f8a576ebaf4472ccd62 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2774 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/7df888ff Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/7df888ff Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/7df888ff Branch: refs/heads/master Commit: 7df888fffcb946f639b2c58ef4463d660244a567 Parents: a7aaf71 Author: Abdullah Alamoudi <bamou...@gmail.com> Authored: Wed Jul 18 08:30:00 2018 -0700 Committer: abdullah alamoudi <bamou...@gmail.com> Committed: Sun Jul 22 14:02:28 2018 -0700 ---------------------------------------------------------------------- .../algebra/extension/ExtensionStatement.java | 6 +- .../asterix/translator/BaseClientRequest.java | 54 +++++++++++++ .../asterix/translator/ClientJobRequest.java | 44 +++++++++++ .../translator/IStatementExecutorContext.java | 19 +++-- .../NoOpStatementExecutorContext.java | 8 +- .../api/http/ctx/StatementExecutorContext.java | 13 ++-- .../http/server/CcQueryCancellationServlet.java | 76 +++++++++++++++++++ .../http/server/NCQueryCancellationServlet.java | 3 +- .../http/server/QueryCancellationServlet.java | 79 -------------------- .../api/http/server/QueryServiceServlet.java | 2 +- .../asterix/app/message/CancelQueryRequest.java | 12 ++- .../asterix/app/translator/QueryTranslator.java | 11 ++- .../hyracks/bootstrap/CCApplication.java | 4 +- .../servlet/QueryCancellationServletTest.java | 15 ++-- .../asterix/common/api/IClientRequest.java | 38 ++++++++++ .../metadata/declared/MetadataProvider.java | 1 + 16 files changed, 262 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java index 15267aa..a37f802 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java @@ -22,6 +22,7 @@ import org.apache.asterix.lang.common.base.AbstractStatement; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -45,10 +46,11 @@ public abstract class ExtensionStatement extends AbstractStatement { * @param requestParameters * @param metadataProvider * @param resultSetId + * @param executorCtx * @throws HyracksDataException * @throws AlgebricksException */ public abstract void handle(IHyracksClientConnection hcc, IStatementExecutor statementExecutor, - IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId) - throws HyracksDataException, AlgebricksException; + IRequestParameters requestParameters, MetadataProvider metadataProvider, int resultSetId, + IStatementExecutorContext executorCtx) throws HyracksDataException, AlgebricksException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java new file mode 100644 index 0000000..a9bd856 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.translator; + +import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public abstract class BaseClientRequest implements IClientRequest { + protected final IStatementExecutorContext ctx; + protected final String contextId; + private boolean complete; + + public BaseClientRequest(IStatementExecutorContext ctx, String contextId) { + this.ctx = ctx; + this.contextId = contextId; + } + + @Override + public synchronized void complete() { + if (complete) { + return; + } + complete = true; + ctx.remove(contextId); + } + + @Override + public synchronized void cancel(ICcApplicationContext appCtx) throws HyracksDataException { + if (complete) { + return; + } + complete(); + doCancel(appCtx); + } + + protected abstract void doCancel(ICcApplicationContext appCtx) throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java new file mode 100644 index 0000000..520ce03 --- /dev/null +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.translator; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.JobId; + +public class ClientJobRequest extends BaseClientRequest { + private final JobId jobId; + + public ClientJobRequest(IStatementExecutorContext ctx, String clientCtxId, JobId jobId) { + super(ctx, clientCtxId); + this.jobId = jobId; + } + + @Override + protected void doCancel(ICcApplicationContext appCtx) throws HyracksDataException { + IHyracksClientConnection hcc = appCtx.getHcc(); + try { + hcc.cancelJob(jobId); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + ctx.remove(contextId); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java index 81e1ebf..78080f3 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java @@ -19,32 +19,31 @@ package org.apache.asterix.translator; -import org.apache.hyracks.api.job.JobId; +import org.apache.asterix.common.api.IClientRequest; /** - * The context for statement executors, which maintains the meta information of all queries. - * TODO(yingyi): also maintain the mapping from server generated request ids to jobs. + * The context for statement executors. Maintains ongoing user requests. */ public interface IStatementExecutorContext { /** - * Gets the Hyracks JobId from the user-provided client context id. + * Gets the client request from the user-provided client context id. * * @param clientContextId, * a user provided client context id. - * @return the Hyracks job id of class {@link org.apache.hyracks.api.job.JobId}. + * @return the client request */ - JobId getJobIdFromClientContextId(String clientContextId); + IClientRequest get(String clientContextId); /** - * Puts a client context id for a statement and the corresponding Hyracks job id. + * Puts a client context id for a statement and the corresponding request. * * @param clientContextId, * a user provided client context id. - * @param jobId, + * @param req, * the Hyracks job id of class {@link org.apache.hyracks.api.job.JobId}. */ - void put(String clientContextId, JobId jobId); + void put(String clientContextId, IClientRequest req); /** * Removes the information about the query corresponding to a user-provided client context id. @@ -52,5 +51,5 @@ public interface IStatementExecutorContext { * @param clientContextId, * a user provided client context id. */ - JobId removeJobIdFromClientContextId(String clientContextId); + IClientRequest remove(String clientContextId); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java index ec181bb..c4e2859 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.translator; -import org.apache.hyracks.api.job.JobId; +import org.apache.asterix.common.api.IClientRequest; public class NoOpStatementExecutorContext implements IStatementExecutorContext { @@ -28,17 +28,17 @@ public class NoOpStatementExecutorContext implements IStatementExecutorContext { } @Override - public JobId getJobIdFromClientContextId(String clientContextId) { + public IClientRequest get(String clientContextId) { return null; } @Override - public void put(String clientContextId, JobId jobId) { + public void put(String clientContextId, IClientRequest req) { // Dummy for when a statement doesn't support cancellation } @Override - public JobId removeJobIdFromClientContextId(String clientContextId) { + public IClientRequest remove(String clientContextId) { return null; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java index 7c06762..a4da189 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java @@ -16,31 +16,30 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.asterix.api.http.ctx; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.translator.IStatementExecutorContext; -import org.apache.hyracks.api.job.JobId; public class StatementExecutorContext implements IStatementExecutorContext { - private final Map<String, JobId> runningQueries = new ConcurrentHashMap<>(); + private final Map<String, IClientRequest> runningQueries = new ConcurrentHashMap<>(); @Override - public JobId getJobIdFromClientContextId(String clientContextId) { + public IClientRequest get(String clientContextId) { return runningQueries.get(clientContextId); } @Override - public void put(String clientContextId, JobId jobId) { - runningQueries.put(clientContextId, jobId); + public void put(String clientContextId, IClientRequest req) { + runningQueries.put(clientContextId, req); } @Override - public JobId removeJobIdFromClientContextId(String clientContextId) { + public IClientRequest remove(String clientContextId) { return runningQueries.remove(clientContextId); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java new file mode 100644 index 0000000..5f5692d --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.http.server; + +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; + +import org.apache.asterix.api.http.server.QueryServiceServlet.Parameter; +import org.apache.asterix.common.api.IClientRequest; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.translator.IStatementExecutorContext; +import org.apache.hyracks.http.api.IServletRequest; +import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import io.netty.handler.codec.http.HttpResponseStatus; + +/** + * The servlet provides a REST API for cancelling an on-going query. + */ +public class CcQueryCancellationServlet extends AbstractServlet { + private static final Logger LOGGER = LogManager.getLogger(); + private final ICcApplicationContext appCtx; + + public CcQueryCancellationServlet(ConcurrentMap<String, Object> ctx, ICcApplicationContext appCtx, + String... paths) { + super(ctx, paths); + this.appCtx = appCtx; + } + + @Override + protected void delete(IServletRequest request, IServletResponse response) throws IOException { + String clientContextId = request.getParameter(Parameter.CLIENT_ID.str()); + if (clientContextId == null) { + response.setStatus(HttpResponseStatus.BAD_REQUEST); + return; + } + IStatementExecutorContext executorCtx = + (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR); + IClientRequest req = executorCtx.get(clientContextId); + if (req == null) { + // response: NOT FOUND + response.setStatus(HttpResponseStatus.NOT_FOUND); + return; + } + try { + // Cancels the on-going job. + req.cancel(appCtx); + // response: OK + response.setStatus(HttpResponseStatus.OK); + } catch (Exception e) { + LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e); + // response: INTERNAL SERVER ERROR + response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java index 5ad451f..60806d3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java @@ -32,6 +32,7 @@ import org.apache.asterix.common.utils.RequestStatus; import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.http.api.IServletRequest; import org.apache.hyracks.http.api.IServletResponse; +import org.apache.hyracks.http.server.AbstractServlet; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -41,7 +42,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; /** * The servlet provides a REST API on an NC for cancelling an on-going query. */ -public class NCQueryCancellationServlet extends QueryCancellationServlet { +public class NCQueryCancellationServlet extends AbstractServlet { private static final Logger LOGGER = LogManager.getLogger(); private final INCServiceContext serviceCtx; private final INCMessageBroker messageBroker; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java deleted file mode 100644 index a8b3aef..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.api.http.server; - -import java.io.IOException; -import java.util.concurrent.ConcurrentMap; - -import org.apache.asterix.api.http.server.QueryServiceServlet.Parameter; -import org.apache.asterix.translator.IStatementExecutorContext; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.http.api.IServletRequest; -import org.apache.hyracks.http.api.IServletResponse; -import org.apache.hyracks.http.server.AbstractServlet; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import io.netty.handler.codec.http.HttpResponseStatus; - -/** - * The servlet provides a REST API for cancelling an on-going query. - */ -public class QueryCancellationServlet extends AbstractServlet { - private static final Logger LOGGER = LogManager.getLogger(); - - public QueryCancellationServlet(ConcurrentMap<String, Object> ctx, String... paths) { - super(ctx, paths); - } - - @Override - protected void delete(IServletRequest request, IServletResponse response) throws IOException { - String clientContextId = request.getParameter(Parameter.CLIENT_ID.str()); - if (clientContextId == null) { - response.setStatus(HttpResponseStatus.BAD_REQUEST); - return; - } - - // Retrieves the corresponding Hyracks job id. - IStatementExecutorContext runningQueries = - (IStatementExecutorContext) ctx.get(ServletConstants.RUNNING_QUERIES_ATTR); - IHyracksClientConnection hcc = (IHyracksClientConnection) ctx.get(ServletConstants.HYRACKS_CONNECTION_ATTR); - JobId jobId = runningQueries.getJobIdFromClientContextId(clientContextId); - - if (jobId == null) { - // response: NOT FOUND - response.setStatus(HttpResponseStatus.NOT_FOUND); - return; - } - try { - // Cancels the on-going job. - hcc.cancelJob(jobId); - // Removes the cancelled query from the map activeQueries. - runningQueries.removeJobIdFromClientContextId(clientContextId); - // response: OK - response.setStatus(HttpResponseStatus.OK); - } catch (Exception e) { - LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e); - // response: INTERNAL SERVER ERROR - response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java index ce6185e..316646d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java @@ -53,9 +53,9 @@ import org.apache.asterix.translator.ExecutionPlans; import org.apache.asterix.translator.ExecutionPlansJsonPrintUtil; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; +import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutor.ResultDelivery; import org.apache.asterix.translator.IStatementExecutor.Stats; -import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.ResultProperties; import org.apache.asterix.translator.SessionConfig; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java index a1c87d8..943aad3 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java @@ -18,15 +18,14 @@ */ package org.apache.asterix.app.message; +import org.apache.asterix.common.api.IClientRequest; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.utils.RequestStatus; import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.translator.IStatementExecutorContext; -import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; @@ -51,17 +50,16 @@ public class CancelQueryRequest implements ICcAddressedMessage { ClusterControllerService ccs = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); CCApplication application = (CCApplication) ccs.getApplication(); IStatementExecutorContext executorsCtx = application.getStatementExecutorContext(); - JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId); + IClientRequest req = executorsCtx.get(contextId); RequestStatus status; - if (jobId == null) { + if (req == null) { LOGGER.log(Level.WARN, "No job found for context id " + contextId); status = RequestStatus.NOT_FOUND; } else { try { - IHyracksClientConnection hcc = application.getHcc(); - hcc.cancelJob(jobId); - executorsCtx.removeJobIdFromClientContextId(contextId); + req.cancel(appCtx); + executorsCtx.remove(contextId); status = RequestStatus.SUCCESS; } catch (Exception e) { LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 469a2dd..0d884ba 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -152,6 +152,7 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.types.TypeSignature; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; +import org.apache.asterix.translator.ClientJobRequest; import org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement; import org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement; import org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement; @@ -408,7 +409,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen break; case EXTENSION: ((ExtensionStatement) stmt).handle(hcc, this, requestParameters, metadataProvider, - resultSetIdCounter); + resultSetIdCounter, ctx); break; default: throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, stmt.getSourceLocation(), @@ -2601,6 +2602,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen private static void createAndRunJob(IHyracksClientConnection hcc, EnumSet<JobFlag> jobFlags, Mutable<JobId> jId, IStatementCompiler compiler, IMetadataLocker locker, ResultDelivery resultDelivery, IResultPrinter printer, String clientContextId, IStatementExecutorContext ctx) throws Exception { + ClientJobRequest req = null; locker.lock(); try { final JobSpecification jobSpec = compiler.compile(); @@ -2609,7 +2611,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); if (ctx != null && clientContextId != null) { - ctx.put(clientContextId, jobId); // Adds the running job into the context. + req = new ClientJobRequest(ctx, clientContextId, jobId); + ctx.put(clientContextId, req); // Adds the running job into the context. } if (jId != null) { jId.setValue(jobId); @@ -2624,8 +2627,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } finally { locker.unlock(); // No matter the job succeeds or fails, removes it into the context. - if (ctx != null && clientContextId != null) { - ctx.removeJobIdFromClientContextId(clientContextId); + if (req != null) { + req.complete(); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index b180ef0..f4d24e2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -36,12 +36,12 @@ import org.apache.asterix.api.http.IQueryWebServerRegistrant; import org.apache.asterix.api.http.ctx.StatementExecutorContext; import org.apache.asterix.api.http.server.ActiveStatsApiServlet; import org.apache.asterix.api.http.server.ApiServlet; +import org.apache.asterix.api.http.server.CcQueryCancellationServlet; import org.apache.asterix.api.http.server.ClusterApiServlet; import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet; import org.apache.asterix.api.http.server.ConnectorApiServlet; import org.apache.asterix.api.http.server.DiagnosticsApiServlet; import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet; -import org.apache.asterix.api.http.server.QueryCancellationServlet; import org.apache.asterix.api.http.server.QueryResultApiServlet; import org.apache.asterix.api.http.server.QueryServiceServlet; import org.apache.asterix.api.http.server.QueryStatusApiServlet; @@ -271,7 +271,7 @@ public class CCApplication extends BaseCCApplication { protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String key, String... paths) { switch (key) { case Servlets.RUNNING_REQUESTS: - return new QueryCancellationServlet(ctx, paths); + return new CcQueryCancellationServlet(ctx, appCtx, paths); case Servlets.QUERY_STATUS: return new QueryStatusApiServlet(ctx, appCtx, paths); case Servlets.QUERY_RESULT: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java index d5262cf..eae82af 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java @@ -28,8 +28,10 @@ import static org.mockito.Mockito.when; import java.util.concurrent.ConcurrentHashMap; import org.apache.asterix.api.http.ctx.StatementExecutorContext; -import org.apache.asterix.api.http.server.QueryCancellationServlet; +import org.apache.asterix.api.http.server.CcQueryCancellationServlet; import org.apache.asterix.api.http.server.ServletConstants; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.translator.ClientJobRequest; import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.job.JobId; @@ -46,16 +48,17 @@ public class QueryCancellationServletTest { @Test public void testDelete() throws Exception { + ICcApplicationContext appCtx = mock(ICcApplicationContext.class); // Creates a query cancellation servlet. - QueryCancellationServlet cancellationServlet = - new QueryCancellationServlet(new ConcurrentHashMap<>(), new String[] { "/" }); + CcQueryCancellationServlet cancellationServlet = + new CcQueryCancellationServlet(new ConcurrentHashMap<>(), appCtx, new String[] { "/" }); // Adds mocked Hyracks client connection into the servlet context. IHyracksClientConnection mockHcc = mock(IHyracksClientConnection.class); cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, mockHcc); // Adds a query context into the servlet context. IStatementExecutorContext queryCtx = new StatementExecutorContext(); cancellationServlet.ctx().put(ServletConstants.RUNNING_QUERIES_ATTR, queryCtx); - + Mockito.when(appCtx.getHcc()).thenReturn(mockHcc); // Tests the case that query is not in the map. IServletRequest mockRequest = mockRequest("1"); IServletResponse mockResponse = mock(IServletResponse.class); @@ -63,7 +66,7 @@ public class QueryCancellationServletTest { verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND); // Tests the case that query is in the map. - queryCtx.put("1", new JobId(1)); + queryCtx.put("1", new ClientJobRequest(queryCtx, "1", new JobId(1))); cancellationServlet.handle(mockRequest, mockResponse); verify(mockResponse, times(1)).setStatus(HttpResponseStatus.OK); @@ -73,7 +76,7 @@ public class QueryCancellationServletTest { verify(mockResponse, times(1)).setStatus(HttpResponseStatus.BAD_REQUEST); // Tests the case that the job cancellation hit some exception from Hyracks. - queryCtx.put("2", new JobId(2)); + queryCtx.put("2", new ClientJobRequest(queryCtx, "2", new JobId(2))); Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any()); mockRequest = mockRequest("2"); cancellationServlet.handle(mockRequest, mockResponse); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java new file mode 100644 index 0000000..30759de --- /dev/null +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.common.api; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IClientRequest { + + /** + * Mark the request as complete, non-cancellable anymore + */ + void complete(); + + /** + * Cancel a request + * + * @param appCtx + * @throws HyracksDataException + */ + void cancel(ICcApplicationContext appCtx) throws HyracksDataException; +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 7f8d31d..f2be6cd 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -1545,6 +1545,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } } + @Override public AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context) throws AlgebricksException {