Repository: asterixdb
Updated Branches:
  refs/heads/master a7aaf7110 -> 7df888fff


[NO ISSUE][MISC] Introduce IClientRequest

- user model changes: no
- storage format changes: no
- interface changes: yes
  + IClientRequest: used to represent a client request
    that can be cancelled.

Details:
- Introduce IClientRequest to allow for multiple types of requests
  to be cancellable.

Change-Id: I8f65da1744ea7ecf26ea3f8a576ebaf4472ccd62
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2774
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <bamou...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/7df888ff
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/7df888ff
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/7df888ff

Branch: refs/heads/master
Commit: 7df888fffcb946f639b2c58ef4463d660244a567
Parents: a7aaf71
Author: Abdullah Alamoudi <bamou...@gmail.com>
Authored: Wed Jul 18 08:30:00 2018 -0700
Committer: abdullah alamoudi <bamou...@gmail.com>
Committed: Sun Jul 22 14:02:28 2018 -0700

----------------------------------------------------------------------
 .../algebra/extension/ExtensionStatement.java   |  6 +-
 .../asterix/translator/BaseClientRequest.java   | 54 +++++++++++++
 .../asterix/translator/ClientJobRequest.java    | 44 +++++++++++
 .../translator/IStatementExecutorContext.java   | 19 +++--
 .../NoOpStatementExecutorContext.java           |  8 +-
 .../api/http/ctx/StatementExecutorContext.java  | 13 ++--
 .../http/server/CcQueryCancellationServlet.java | 76 +++++++++++++++++++
 .../http/server/NCQueryCancellationServlet.java |  3 +-
 .../http/server/QueryCancellationServlet.java   | 79 --------------------
 .../api/http/server/QueryServiceServlet.java    |  2 +-
 .../asterix/app/message/CancelQueryRequest.java | 12 ++-
 .../asterix/app/translator/QueryTranslator.java | 11 ++-
 .../hyracks/bootstrap/CCApplication.java        |  4 +-
 .../servlet/QueryCancellationServletTest.java   | 15 ++--
 .../asterix/common/api/IClientRequest.java      | 38 ++++++++++
 .../metadata/declared/MetadataProvider.java     |  1 +
 16 files changed, 262 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
index 15267aa..a37f802 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/extension/ExtensionStatement.java
@@ -22,6 +22,7 @@ import org.apache.asterix.lang.common.base.AbstractStatement;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -45,10 +46,11 @@ public abstract class ExtensionStatement extends 
AbstractStatement {
      * @param requestParameters
      * @param metadataProvider
      * @param resultSetId
+     * @param executorCtx
      * @throws HyracksDataException
      * @throws AlgebricksException
      */
     public abstract void handle(IHyracksClientConnection hcc, 
IStatementExecutor statementExecutor,
-            IRequestParameters requestParameters, MetadataProvider 
metadataProvider, int resultSetId)
-            throws HyracksDataException, AlgebricksException;
+            IRequestParameters requestParameters, MetadataProvider 
metadataProvider, int resultSetId,
+            IStatementExecutorContext executorCtx) throws 
HyracksDataException, AlgebricksException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
new file mode 100644
index 0000000..a9bd856
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/BaseClientRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class BaseClientRequest implements IClientRequest {
+    protected final IStatementExecutorContext ctx;
+    protected final String contextId;
+    private boolean complete;
+
+    public BaseClientRequest(IStatementExecutorContext ctx, String contextId) {
+        this.ctx = ctx;
+        this.contextId = contextId;
+    }
+
+    @Override
+    public synchronized void complete() {
+        if (complete) {
+            return;
+        }
+        complete = true;
+        ctx.remove(contextId);
+    }
+
+    @Override
+    public synchronized void cancel(ICcApplicationContext appCtx) throws 
HyracksDataException {
+        if (complete) {
+            return;
+        }
+        complete();
+        doCancel(appCtx);
+    }
+
+    protected abstract void doCancel(ICcApplicationContext appCtx) throws 
HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
new file mode 100644
index 0000000..520ce03
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ClientJobRequest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.translator;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.JobId;
+
+public class ClientJobRequest extends BaseClientRequest {
+    private final JobId jobId;
+
+    public ClientJobRequest(IStatementExecutorContext ctx, String clientCtxId, 
JobId jobId) {
+        super(ctx, clientCtxId);
+        this.jobId = jobId;
+    }
+
+    @Override
+    protected void doCancel(ICcApplicationContext appCtx) throws 
HyracksDataException {
+        IHyracksClientConnection hcc = appCtx.getHcc();
+        try {
+            hcc.cancelJob(jobId);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+        ctx.remove(contextId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
index 81e1ebf..78080f3 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/IStatementExecutorContext.java
@@ -19,32 +19,31 @@
 
 package org.apache.asterix.translator;
 
-import org.apache.hyracks.api.job.JobId;
+import org.apache.asterix.common.api.IClientRequest;
 
 /**
- * The context for statement executors, which maintains the meta information 
of all queries.
- * TODO(yingyi): also maintain the mapping from server generated request ids 
to jobs.
+ * The context for statement executors. Maintains ongoing user requests.
  */
 public interface IStatementExecutorContext {
 
     /**
-     * Gets the Hyracks JobId from the user-provided client context id.
+     * Gets the client request from the user-provided client context id.
      *
      * @param clientContextId,
      *            a user provided client context id.
-     * @return the Hyracks job id of class {@link 
org.apache.hyracks.api.job.JobId}.
+     * @return the client request
      */
-    JobId getJobIdFromClientContextId(String clientContextId);
+    IClientRequest get(String clientContextId);
 
     /**
-     * Puts a client context id for a statement and the corresponding Hyracks 
job id.
+     * Puts a client context id for a statement and the corresponding request.
      *
      * @param clientContextId,
      *            a user provided client context id.
-     * @param jobId,
+     * @param req,
      *            the Hyracks job id of class {@link 
org.apache.hyracks.api.job.JobId}.
      */
-    void put(String clientContextId, JobId jobId);
+    void put(String clientContextId, IClientRequest req);
 
     /**
      * Removes the information about the query corresponding to a 
user-provided client context id.
@@ -52,5 +51,5 @@ public interface IStatementExecutorContext {
      * @param clientContextId,
      *            a user provided client context id.
      */
-    JobId removeJobIdFromClientContextId(String clientContextId);
+    IClientRequest remove(String clientContextId);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
index ec181bb..c4e2859 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/NoOpStatementExecutorContext.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.translator;
 
-import org.apache.hyracks.api.job.JobId;
+import org.apache.asterix.common.api.IClientRequest;
 
 public class NoOpStatementExecutorContext implements IStatementExecutorContext 
{
 
@@ -28,17 +28,17 @@ public class NoOpStatementExecutorContext implements 
IStatementExecutorContext {
     }
 
     @Override
-    public JobId getJobIdFromClientContextId(String clientContextId) {
+    public IClientRequest get(String clientContextId) {
         return null;
     }
 
     @Override
-    public void put(String clientContextId, JobId jobId) {
+    public void put(String clientContextId, IClientRequest req) {
         // Dummy for when a statement doesn't support cancellation
     }
 
     @Override
-    public JobId removeJobIdFromClientContextId(String clientContextId) {
+    public IClientRequest remove(String clientContextId) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
index 7c06762..a4da189 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/ctx/StatementExecutorContext.java
@@ -16,31 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.asterix.api.http.ctx;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.hyracks.api.job.JobId;
 
 public class StatementExecutorContext implements IStatementExecutorContext {
 
-    private final Map<String, JobId> runningQueries = new 
ConcurrentHashMap<>();
+    private final Map<String, IClientRequest> runningQueries = new 
ConcurrentHashMap<>();
 
     @Override
-    public JobId getJobIdFromClientContextId(String clientContextId) {
+    public IClientRequest get(String clientContextId) {
         return runningQueries.get(clientContextId);
     }
 
     @Override
-    public void put(String clientContextId, JobId jobId) {
-        runningQueries.put(clientContextId, jobId);
+    public void put(String clientContextId, IClientRequest req) {
+        runningQueries.put(clientContextId, req);
     }
 
     @Override
-    public JobId removeJobIdFromClientContextId(String clientContextId) {
+    public IClientRequest remove(String clientContextId) {
         return runningQueries.remove(clientContextId);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
new file mode 100644
index 0000000..5f5692d
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/CcQueryCancellationServlet.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.api.http.server;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.asterix.api.http.server.QueryServiceServlet.Parameter;
+import org.apache.asterix.common.api.IClientRequest;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.translator.IStatementExecutorContext;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * The servlet provides a REST API for cancelling an on-going query.
+ */
+public class CcQueryCancellationServlet extends AbstractServlet {
+    private static final Logger LOGGER = LogManager.getLogger();
+    private final ICcApplicationContext appCtx;
+
+    public CcQueryCancellationServlet(ConcurrentMap<String, Object> ctx, 
ICcApplicationContext appCtx,
+            String... paths) {
+        super(ctx, paths);
+        this.appCtx = appCtx;
+    }
+
+    @Override
+    protected void delete(IServletRequest request, IServletResponse response) 
throws IOException {
+        String clientContextId = 
request.getParameter(Parameter.CLIENT_ID.str());
+        if (clientContextId == null) {
+            response.setStatus(HttpResponseStatus.BAD_REQUEST);
+            return;
+        }
+        IStatementExecutorContext executorCtx =
+                (IStatementExecutorContext) 
ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
+        IClientRequest req = executorCtx.get(clientContextId);
+        if (req == null) {
+            // response: NOT FOUND
+            response.setStatus(HttpResponseStatus.NOT_FOUND);
+            return;
+        }
+        try {
+            // Cancels the on-going job.
+            req.cancel(appCtx);
+            // response: OK
+            response.setStatus(HttpResponseStatus.OK);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", 
e);
+            // response: INTERNAL SERVER ERROR
+            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index 5ad451f..60806d3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -32,6 +32,7 @@ import org.apache.asterix.common.utils.RequestStatus;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -41,7 +42,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 /**
  * The servlet provides a REST API on an NC for cancelling an on-going query.
  */
-public class NCQueryCancellationServlet extends QueryCancellationServlet {
+public class NCQueryCancellationServlet extends AbstractServlet {
     private static final Logger LOGGER = LogManager.getLogger();
     private final INCServiceContext serviceCtx;
     private final INCMessageBroker messageBroker;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
deleted file mode 100644
index a8b3aef..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryCancellationServlet.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.api.http.server;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.asterix.api.http.server.QueryServiceServlet.Parameter;
-import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
-import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.http.api.IServletRequest;
-import org.apache.hyracks.http.api.IServletResponse;
-import org.apache.hyracks.http.server.AbstractServlet;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import io.netty.handler.codec.http.HttpResponseStatus;
-
-/**
- * The servlet provides a REST API for cancelling an on-going query.
- */
-public class QueryCancellationServlet extends AbstractServlet {
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    public QueryCancellationServlet(ConcurrentMap<String, Object> ctx, 
String... paths) {
-        super(ctx, paths);
-    }
-
-    @Override
-    protected void delete(IServletRequest request, IServletResponse response) 
throws IOException {
-        String clientContextId = 
request.getParameter(Parameter.CLIENT_ID.str());
-        if (clientContextId == null) {
-            response.setStatus(HttpResponseStatus.BAD_REQUEST);
-            return;
-        }
-
-        // Retrieves the corresponding Hyracks job id.
-        IStatementExecutorContext runningQueries =
-                (IStatementExecutorContext) 
ctx.get(ServletConstants.RUNNING_QUERIES_ATTR);
-        IHyracksClientConnection hcc = (IHyracksClientConnection) 
ctx.get(ServletConstants.HYRACKS_CONNECTION_ATTR);
-        JobId jobId = 
runningQueries.getJobIdFromClientContextId(clientContextId);
-
-        if (jobId == null) {
-            // response: NOT FOUND
-            response.setStatus(HttpResponseStatus.NOT_FOUND);
-            return;
-        }
-        try {
-            // Cancels the on-going job.
-            hcc.cancelJob(jobId);
-            // Removes the cancelled query from the map activeQueries.
-            runningQueries.removeJobIdFromClientContextId(clientContextId);
-            // response: OK
-            response.setStatus(HttpResponseStatus.OK);
-        } catch (Exception e) {
-            LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", 
e);
-            // response: INTERNAL SERVER ERROR
-            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
index ce6185e..316646d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryServiceServlet.java
@@ -53,9 +53,9 @@ import org.apache.asterix.translator.ExecutionPlans;
 import org.apache.asterix.translator.ExecutionPlansJsonPrintUtil;
 import org.apache.asterix.translator.IRequestParameters;
 import org.apache.asterix.translator.IStatementExecutor;
+import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
 import org.apache.asterix.translator.IStatementExecutor.Stats;
-import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
 import org.apache.asterix.translator.ResultProperties;
 import org.apache.asterix.translator.SessionConfig;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index a1c87d8..943aad3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -18,15 +18,14 @@
  */
 package org.apache.asterix.app.message;
 
+import org.apache.asterix.common.api.IClientRequest;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.utils.RequestStatus;
 import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.translator.IStatementExecutorContext;
-import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -51,17 +50,16 @@ public class CancelQueryRequest implements 
ICcAddressedMessage {
         ClusterControllerService ccs = (ClusterControllerService) 
appCtx.getServiceContext().getControllerService();
         CCApplication application = (CCApplication) ccs.getApplication();
         IStatementExecutorContext executorsCtx = 
application.getStatementExecutorContext();
-        JobId jobId = executorsCtx.getJobIdFromClientContextId(contextId);
+        IClientRequest req = executorsCtx.get(contextId);
         RequestStatus status;
 
-        if (jobId == null) {
+        if (req == null) {
             LOGGER.log(Level.WARN, "No job found for context id " + contextId);
             status = RequestStatus.NOT_FOUND;
         } else {
             try {
-                IHyracksClientConnection hcc = application.getHcc();
-                hcc.cancelJob(jobId);
-                executorsCtx.removeJobIdFromClientContextId(contextId);
+                req.cancel(appCtx);
+                executorsCtx.remove(contextId);
                 status = RequestStatus.SUCCESS;
             } catch (Exception e) {
                 LOGGER.log(Level.WARN, "unexpected exception thrown from 
cancel", e);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 469a2dd..0d884ba 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -152,6 +152,7 @@ import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeSignature;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.asterix.translator.AbstractLangTranslator;
+import org.apache.asterix.translator.ClientJobRequest;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledDeleteStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledInsertStatement;
 import 
org.apache.asterix.translator.CompiledStatements.CompiledLoadFromFileStatement;
@@ -408,7 +409,7 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                         break;
                     case EXTENSION:
                         ((ExtensionStatement) stmt).handle(hcc, this, 
requestParameters, metadataProvider,
-                                resultSetIdCounter);
+                                resultSetIdCounter, ctx);
                         break;
                     default:
                         throw new 
CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, 
stmt.getSourceLocation(),
@@ -2601,6 +2602,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     private static void createAndRunJob(IHyracksClientConnection hcc, 
EnumSet<JobFlag> jobFlags, Mutable<JobId> jId,
             IStatementCompiler compiler, IMetadataLocker locker, 
ResultDelivery resultDelivery, IResultPrinter printer,
             String clientContextId, IStatementExecutorContext ctx) throws 
Exception {
+        ClientJobRequest req = null;
         locker.lock();
         try {
             final JobSpecification jobSpec = compiler.compile();
@@ -2609,7 +2611,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             }
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
             if (ctx != null && clientContextId != null) {
-                ctx.put(clientContextId, jobId); // Adds the running job into 
the context.
+                req = new ClientJobRequest(ctx, clientContextId, jobId);
+                ctx.put(clientContextId, req); // Adds the running job into 
the context.
             }
             if (jId != null) {
                 jId.setValue(jobId);
@@ -2624,8 +2627,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         } finally {
             locker.unlock();
             // No matter the job succeeds or fails, removes it into the 
context.
-            if (ctx != null && clientContextId != null) {
-                ctx.removeJobIdFromClientContextId(clientContextId);
+            if (req != null) {
+                req.complete();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index b180ef0..f4d24e2 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -36,12 +36,12 @@ import 
org.apache.asterix.api.http.IQueryWebServerRegistrant;
 import org.apache.asterix.api.http.ctx.StatementExecutorContext;
 import org.apache.asterix.api.http.server.ActiveStatsApiServlet;
 import org.apache.asterix.api.http.server.ApiServlet;
+import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
 import org.apache.asterix.api.http.server.ClusterApiServlet;
 import org.apache.asterix.api.http.server.ClusterControllerDetailsApiServlet;
 import org.apache.asterix.api.http.server.ConnectorApiServlet;
 import org.apache.asterix.api.http.server.DiagnosticsApiServlet;
 import org.apache.asterix.api.http.server.NodeControllerDetailsApiServlet;
-import org.apache.asterix.api.http.server.QueryCancellationServlet;
 import org.apache.asterix.api.http.server.QueryResultApiServlet;
 import org.apache.asterix.api.http.server.QueryServiceServlet;
 import org.apache.asterix.api.http.server.QueryStatusApiServlet;
@@ -271,7 +271,7 @@ public class CCApplication extends BaseCCApplication {
     protected IServlet createServlet(ConcurrentMap<String, Object> ctx, String 
key, String... paths) {
         switch (key) {
             case Servlets.RUNNING_REQUESTS:
-                return new QueryCancellationServlet(ctx, paths);
+                return new CcQueryCancellationServlet(ctx, appCtx, paths);
             case Servlets.QUERY_STATUS:
                 return new QueryStatusApiServlet(ctx, appCtx, paths);
             case Servlets.QUERY_RESULT:

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
index d5262cf..eae82af 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/http/servlet/QueryCancellationServletTest.java
@@ -28,8 +28,10 @@ import static org.mockito.Mockito.when;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.asterix.api.http.ctx.StatementExecutorContext;
-import org.apache.asterix.api.http.server.QueryCancellationServlet;
+import org.apache.asterix.api.http.server.CcQueryCancellationServlet;
 import org.apache.asterix.api.http.server.ServletConstants;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.translator.ClientJobRequest;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.job.JobId;
@@ -46,16 +48,17 @@ public class QueryCancellationServletTest {
 
     @Test
     public void testDelete() throws Exception {
+        ICcApplicationContext appCtx = mock(ICcApplicationContext.class);
         // Creates a query cancellation servlet.
-        QueryCancellationServlet cancellationServlet =
-                new QueryCancellationServlet(new ConcurrentHashMap<>(), new 
String[] { "/" });
+        CcQueryCancellationServlet cancellationServlet =
+                new CcQueryCancellationServlet(new ConcurrentHashMap<>(), 
appCtx, new String[] { "/" });
         // Adds mocked Hyracks client connection into the servlet context.
         IHyracksClientConnection mockHcc = 
mock(IHyracksClientConnection.class);
         
cancellationServlet.ctx().put(ServletConstants.HYRACKS_CONNECTION_ATTR, 
mockHcc);
         // Adds a query context into the servlet context.
         IStatementExecutorContext queryCtx = new StatementExecutorContext();
         cancellationServlet.ctx().put(ServletConstants.RUNNING_QUERIES_ATTR, 
queryCtx);
-
+        Mockito.when(appCtx.getHcc()).thenReturn(mockHcc);
         // Tests the case that query is not in the map.
         IServletRequest mockRequest = mockRequest("1");
         IServletResponse mockResponse = mock(IServletResponse.class);
@@ -63,7 +66,7 @@ public class QueryCancellationServletTest {
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.NOT_FOUND);
 
         // Tests the case that query is in the map.
-        queryCtx.put("1", new JobId(1));
+        queryCtx.put("1", new ClientJobRequest(queryCtx, "1", new JobId(1)));
         cancellationServlet.handle(mockRequest, mockResponse);
         verify(mockResponse, times(1)).setStatus(HttpResponseStatus.OK);
 
@@ -73,7 +76,7 @@ public class QueryCancellationServletTest {
         verify(mockResponse, 
times(1)).setStatus(HttpResponseStatus.BAD_REQUEST);
 
         // Tests the case that the job cancellation hit some exception from 
Hyracks.
-        queryCtx.put("2", new JobId(2));
+        queryCtx.put("2", new ClientJobRequest(queryCtx, "2", new JobId(2)));
         Mockito.doThrow(new Exception()).when(mockHcc).cancelJob(any());
         mockRequest = mockRequest("2");
         cancellationServlet.handle(mockRequest, mockResponse);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
new file mode 100644
index 0000000..30759de
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClientRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.api;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IClientRequest {
+
+    /**
+     * Mark the request as complete, non-cancellable anymore
+     */
+    void complete();
+
+    /**
+     * Cancel a request
+     *
+     * @param appCtx
+     * @throws HyracksDataException
+     */
+    void cancel(ICcApplicationContext appCtx) throws HyracksDataException;
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7df888ff/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7f8d31d..f2be6cd 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1545,6 +1545,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
     }
 
+    @Override
     public AsterixTupleFilterFactory 
createTupleFilterFactory(IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, 
JobGenContext context)
             throws AlgebricksException {

Reply via email to