Repository: tajo
Updated Branches:
  refs/heads/branch-0.8.1 a31884860 -> 590cf8464


TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/590cf846
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/590cf846
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/590cf846

Branch: refs/heads/branch-0.8.1
Commit: 590cf8464f6c1f4c12a0856a5c2ab698c548fac8
Parents: a318848
Author: blrunner <[email protected]>
Authored: Wed May 21 17:03:43 2014 +0900
Committer: blrunner <[email protected]>
Committed: Wed May 21 17:03:43 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/client/TajoAdmin.java  |   9 +-
 .../java/org/apache/tajo/client/TajoClient.java |   9 +-
 .../apache/tajo/master/querymaster/Query.java   |  11 +-
 .../tajo/master/querymaster/QueryUnit.java      |  11 +-
 .../master/querymaster/QueryUnitAttempt.java    |  19 +++-
 .../tajo/master/querymaster/SubQuery.java       |  13 ++-
 .../master/rm/TajoWorkerResourceManager.java    | 106 ++++++++++---------
 .../tajo/webapp/QueryExecutorServlet.java       |  22 +++-
 .../main/java/org/apache/tajo/worker/Task.java  |   2 +-
 .../src/main/resources/webapps/admin/query.jsp  |  25 ++++-
 11 files changed, 159 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 915311e..f1ad4d5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -14,6 +14,8 @@ Release 0.8.1 - unreleased
 
   BUGS
 
+    TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa)
+
     TAJO-827: SUM() overflow in the case of INT4. (Hyoungjun Kim via hyunsik)
 
     TAJO-833: NPE occurs when using the column as a alias name in the multiple 

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java 
b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 9a0478c..4c45568 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -21,6 +21,7 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.cli.*;
 import org.apache.commons.lang.StringUtils;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
@@ -407,11 +408,13 @@ public class TajoAdmin {
 
   public void processKill(Writer writer, String queryIdStr)
       throws IOException, ServiceException {
-    boolean killedSuccessfully = 
tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr));
-    if (killedSuccessfully) {
+    QueryStatus status = 
tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr));
+    if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) {
       writer.write(queryIdStr + " is killed successfully.\n");
+    } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) {
+      writer.write(queryIdStr + " will be finished after a while.\n");
     } else {
-      writer.write("killing query is failed.");
+      writer.write("ERROR:" + status.getErrorMessage());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java 
b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index d05a375..a804baf 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -796,7 +796,7 @@ public class TajoClient implements Closeable {
     }.withRetries();
   }
 
-  public boolean killQuery(final QueryId queryId)
+  public QueryStatus killQuery(final QueryId queryId)
       throws ServiceException, IOException {
 
     QueryStatus status = getQueryStatus(queryId);
@@ -816,7 +816,9 @@ public class TajoClient implements Closeable {
 
       long currentTimeMillis = System.currentTimeMillis();
       long timeKillIssued = currentTimeMillis;
-      while ((currentTimeMillis < timeKillIssued + 10000L) && 
(status.getState() != QueryState.QUERY_KILLED)) {
+      while ((currentTimeMillis < timeKillIssued + 10000L)
+          && ((status.getState() != QueryState.QUERY_KILLED)
+          || (status.getState() == QueryState.QUERY_KILL_WAIT))) {
         try {
           Thread.sleep(100L);
         } catch(InterruptedException ie) {
@@ -825,13 +827,12 @@ public class TajoClient implements Closeable {
         currentTimeMillis = System.currentTimeMillis();
         status = getQueryStatus(queryId);
       }
-      return status.getState() == QueryState.QUERY_KILLED;
     } catch(Exception e) {
       LOG.debug("Error when checking for application status", e);
-      return false;
     } finally {
       connPool.releaseConnection(tmClient);
     }
+    return status;
   }
 
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String 
functionName) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index a8f5b31..e61fcba 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -164,8 +164,9 @@ public class Query implements EventHandler<QueryEvent> {
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able transitions
-          .addTransition(QueryState.QUERY_KILL_WAIT, 
QueryState.QUERY_KILL_WAIT,
-              EnumSet.of(QueryEventType.KILL))
+          .addTransition(QueryState.QUERY_KILL_WAIT, 
EnumSet.of(QueryState.QUERY_KILLED),
+              QueryEventType.KILL,
+              QUERY_COMPLETED_TRANSITION)
 
           // Transitions from FAILED state
           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
@@ -685,7 +686,11 @@ public class Query implements EventHandler<QueryEvent> {
       try {
         getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
+        LOG.error("Can't handle this event at current state"
+            + ", type:" + event
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
         eventHandler.handle(new QueryEvent(this.id, 
QueryEventType.INTERNAL_ERROR));
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 42fbf8a..8aa6c05 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -154,6 +154,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
           .addTransition(TaskState.FAILED, TaskState.FAILED,
               EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, 
TaskEventType.T_ATTEMPT_SUCCEEDED))
 
+          // Transitions from KILLED state
+          .addTransition(TaskState.KILLED, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_KILLED,
+              new KillTaskTransition())
+
           .installTopology();
 
   private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
@@ -582,7 +587,11 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
+        LOG.error("Can't handle this event at current state"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
         eventHandler.handle(new 
QueryEvent(TajoIdUtils.parseQueryId(getId().toString()),
             QueryEventType.INTERNAL_ERROR));
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 7993ce9..aa7f5c4 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -97,6 +97,9 @@ public class QueryUnitAttempt implements 
EventHandler<TaskAttemptEvent> {
       .addTransition(TaskAttemptState.TA_ASSIGNED, 
TaskAttemptState.TA_KILL_WAIT,
           TaskAttemptEventType.TA_KILL,
           new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
       .addTransition(TaskAttemptState.TA_ASSIGNED,
           EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
           TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
@@ -144,7 +147,7 @@ public class QueryUnitAttempt implements 
EventHandler<TaskAttemptEvent> {
           TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
       .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
           TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
-       // Ignore-able transitions
+      // Ignore-able transitions
       .addTransition(TaskAttemptState.TA_SUCCEEDED, 
TaskAttemptState.TA_SUCCEEDED,
           TaskAttemptEventType.TA_KILL)
 
@@ -155,7 +158,13 @@ public class QueryUnitAttempt implements 
EventHandler<TaskAttemptEvent> {
       .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
           EnumSet.of(
               TaskAttemptEventType.TA_UPDATE))
-
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          EnumSet.of(
+              TaskAttemptEventType.TA_LOCAL_KILLED,
+              TaskAttemptEventType.TA_KILL,
+              TaskAttemptEventType.TA_ASSIGNED,
+              TaskAttemptEventType.TA_DONE),
+          new TaskKilledCompleteTransition())
       .installTopology();
 
   private final StateMachine<TaskAttemptState, TaskAttemptEventType, 
TaskAttemptEvent>
@@ -417,7 +426,11 @@ public class QueryUnitAttempt implements 
EventHandler<TaskAttemptEvent> {
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state of " + 
event.getTaskAttemptId() + ")", e);
+        LOG.error("Can't handle this event at current state of " + 
event.getTaskAttemptId() + ")"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
         eventHandler.handle(
             new 
SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(),
                 "Can't handle this event at current state of " + 
event.getTaskAttemptId() + ")"));

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 8929e8d..f00b99b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -187,7 +187,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-              // Transitions from SUCCEEDED state
+          // Transitions from SUCCEEDED state
           .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
               SubQueryEventType.SQ_CONTAINER_ALLOCATED,
               CONTAINERS_CANCEL_TRANSITION)
@@ -197,7 +197,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
           .addTransition(SubQueryState.SUCCEEDED, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-              // Ignore-able events
+          // Ignore-able events
           .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
               EnumSet.of(
                   SubQueryEventType.SQ_START,
@@ -235,7 +235,8 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
                   SubQueryEventType.SQ_START,
                   SubQueryEventType.SQ_KILL,
                   SubQueryEventType.SQ_FAILED,
-                  SubQueryEventType.SQ_INTERNAL_ERROR))
+                  SubQueryEventType.SQ_INTERNAL_ERROR,
+                  SubQueryEventType.SQ_SUBQUERY_COMPLETED))
 
           .installTopology();
 
@@ -593,7 +594,11 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       try {
         getStateMachine().doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
-        LOG.error("Can't handle this event at current state", e);
+        LOG.error("Can't handle this event at current state"
+            + ", eventType:" + event.getType().name()
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getState().name()
+            , e);
         eventHandler.handle(new SubQueryEvent(getId(),
             SubQueryEventType.SQ_INTERNAL_ERROR));
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 2c3572c..09d5161 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -37,6 +37,7 @@ import org.apache.tajo.util.ApplicationIdUtils;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -85,6 +86,9 @@ public class TajoWorkerResourceManager implements 
WorkerResourceManager {
   private Map<YarnProtos.ContainerIdProto, AllocatedWorkerResource> 
allocatedResourceMap =
       new HashMap<YarnProtos.ContainerIdProto, AllocatedWorkerResource>();
 
+  private final Set<QueryId> stoppedQueryIds =
+      Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>());
+
   public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) {
     this.masterContext = masterContext;
     init(masterContext.getConf());
@@ -365,59 +369,64 @@ public class TajoWorkerResourceManager implements 
WorkerResourceManager {
                 ", liveWorkers=" + liveWorkerResources.size());
           }
 
-          List<AllocatedWorkerResource> allocatedWorkerResources = 
chooseWorkers(resourceRequest);
+          // TajoWorkerResourceManager can't return allocated disk slots 
occasionally.
+          // Because the rest resource request can remains after QueryMaster 
stops.
+          // Thus we need to find whether QueryId stopped or not.
+          if (!stoppedQueryIds.contains(resourceRequest.queryId)) {
+            List<AllocatedWorkerResource> allocatedWorkerResources = 
chooseWorkers(resourceRequest);
 
-          if(allocatedWorkerResources.size() > 0) {
-            if(resourceRequest.queryMasterRequest) {
-              startQueryMaster(resourceRequest.queryId, 
allocatedWorkerResources.get(0));
-            } else {
-              List<TajoMasterProtocol.WorkerAllocatedResource> 
allocatedResources =
-                  new ArrayList<TajoMasterProtocol.WorkerAllocatedResource>();
-
-              for(AllocatedWorkerResource eachWorker: 
allocatedWorkerResources) {
-                NodeId nodeId = 
NodeId.newInstance(eachWorker.workerResource.getAllocatedHost(),
-                    eachWorker.workerResource.getPeerRpcPort());
-
-                TajoWorkerContainerId containerId = new 
TajoWorkerContainerId();
-
-                containerId.setApplicationAttemptId(
-                    
ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
-                containerId.setId(containerIdSeq.incrementAndGet());
-
-                YarnProtos.ContainerIdProto containerIdProto = 
containerId.getProto();
-                
allocatedResources.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
-                    .setContainerId(containerIdProto)
-                    .setNodeId(nodeId.toString())
-                    
.setWorkerHost(eachWorker.workerResource.getAllocatedHost())
-                    
.setQueryMasterPort(eachWorker.workerResource.getQueryMasterPort())
-                    .setPeerRpcPort(eachWorker.workerResource.getPeerRpcPort())
-                    
.setWorkerPullServerPort(eachWorker.workerResource.getPullServerPort())
-                    .setAllocatedMemoryMB(eachWorker.allocatedMemoryMB)
-                    .setAllocatedDiskSlots(eachWorker.allocatedDiskSlots)
-                    .build());
-
-                synchronized(workerResourceLock) {
-                  allocatedResourceMap.put(containerIdProto, eachWorker);
+            if(allocatedWorkerResources.size() > 0) {
+              if(resourceRequest.queryMasterRequest) {
+                startQueryMaster(resourceRequest.queryId, 
allocatedWorkerResources.get(0));
+              } else {
+                List<TajoMasterProtocol.WorkerAllocatedResource> 
allocatedResources =
+                    new 
ArrayList<TajoMasterProtocol.WorkerAllocatedResource>();
+
+                for(AllocatedWorkerResource eachWorker: 
allocatedWorkerResources) {
+                  NodeId nodeId = 
NodeId.newInstance(eachWorker.workerResource.getAllocatedHost(),
+                      eachWorker.workerResource.getPeerRpcPort());
+
+                  TajoWorkerContainerId containerId = new 
TajoWorkerContainerId();
+
+                  containerId.setApplicationAttemptId(
+                      
ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+                  containerId.setId(containerIdSeq.incrementAndGet());
+
+                  YarnProtos.ContainerIdProto containerIdProto = 
containerId.getProto();
+                  
allocatedResources.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder()
+                      .setContainerId(containerIdProto)
+                      .setNodeId(nodeId.toString())
+                      
.setWorkerHost(eachWorker.workerResource.getAllocatedHost())
+                      
.setQueryMasterPort(eachWorker.workerResource.getQueryMasterPort())
+                      
.setPeerRpcPort(eachWorker.workerResource.getPeerRpcPort())
+                      
.setWorkerPullServerPort(eachWorker.workerResource.getPullServerPort())
+                      .setAllocatedMemoryMB(eachWorker.allocatedMemoryMB)
+                      .setAllocatedDiskSlots(eachWorker.allocatedDiskSlots)
+                      .build());
+
+                  synchronized(workerResourceLock) {
+                    allocatedResourceMap.put(containerIdProto, eachWorker);
+                  }
                 }
-              }
 
-              
resourceRequest.callBack.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
-                  
.setExecutionBlockId(resourceRequest.request.getExecutionBlockId())
-                  .addAllWorkerAllocatedResource(allocatedResources)
-                  .build()
-              );
-            }
-          } else {
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("=========================================");
-              LOG.debug("Available Workers");
-              for(String liveWorker: liveWorkerResources) {
-                LOG.debug(allWorkerResourceMap.get(liveWorker).toString());
+                
resourceRequest.callBack.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
+                    
.setExecutionBlockId(resourceRequest.request.getExecutionBlockId())
+                    .addAllWorkerAllocatedResource(allocatedResources)
+                    .build()
+                );
+              }
+            } else {
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("=========================================");
+                LOG.debug("Available Workers");
+                for(String liveWorker: liveWorkerResources) {
+                  LOG.debug(allWorkerResourceMap.get(liveWorker).toString());
+                }
+                LOG.debug("=========================================");
               }
-              LOG.debug("=========================================");
+              requestQueue.put(resourceRequest);
+              Thread.sleep(100);
             }
-            requestQueue.put(resourceRequest);
-            Thread.sleep(100);
           }
         } catch(InterruptedException ie) {
           LOG.error(ie);
@@ -628,6 +637,7 @@ public class TajoWorkerResourceManager implements 
WorkerResourceManager {
       } else {
         queryMasterWorkerResource = queryMasterMap.remove(queryId);
         
queryMasterWorkerResource.releaseQueryMasterTask(queryMasterDefaultDiskSlot, 
queryMasterDefaultMemoryMB);
+        stoppedQueryIds.add(queryId);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java 
b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index faeadaf..1b09e9e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -13,6 +13,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.util.JSPUtil;
+import org.apache.tajo.util.TajoIdUtils;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -170,7 +171,24 @@ public class QueryExecutorServlet extends HttpServlet {
           }
           queryRunners.clear();
         }
+      } else if("killQuery".equals(action)) {
+        String queryId = request.getParameter("queryId");
+        if(queryId == null || queryId.trim().isEmpty()) {
+          errorResponse(response, "No queryId parameter");
+          return;
+        }
+        QueryStatus status = 
tajoClient.killQuery(TajoIdUtils.parseQueryId(queryId));
+
+        if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) {
+          returnValue.put("successMessage", queryId + " is killed 
successfully.");
+        } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) 
{
+          returnValue.put("successMessage", queryId + " will be finished after 
a while.");
+        } else {
+          errorResponse(response, "ERROR:" + status.getErrorMessage());
+          return;
+        }
       }
+
       returnValue.put("success", "true");
       writeHttpResponse(response, returnValue);
     } catch (Exception e) {
@@ -337,9 +355,9 @@ public class QueryExecutorServlet extends HttpServlet {
                 queryResult = new ArrayList<List<Object>>();
 
                 if(sizeLimit < resultSize) {
-                    numOfRows = (long)((float)(desc.getStats().getNumRows()) * 
((float)sizeLimit / (float)resultSize));
+                  numOfRows = (long)((float)(desc.getStats().getNumRows()) * 
((float)sizeLimit / (float)resultSize));
                 } else {
-                    numOfRows = desc.getStats().getNumRows();
+                  numOfRows = desc.getStats().getNumRows();
                 }
                 int rowCount = 0;
                 boolean hasMoreData = false;

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 30f56ee..848a9cf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -560,7 +560,7 @@ public class Task {
       int retryWaitTime = 1000;
 
       try { // for releasing fetch latch
-        while(retryNum < maxRetryNum) {
+        while(!killed && retryNum < maxRetryNum) {
           if (retryNum > 0) {
             try {
               Thread.sleep(retryWaitTime);

http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp 
b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 42e0b9d..52c1348 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -59,6 +59,28 @@
   <link rel="stylesheet" type = "text/css" href = "/static/style.css" />
   <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
   <title>Tajo</title>
+    <script src="/static/js/jquery.js" type="text/javascript"></script>
+    <script type="text/javascript">
+
+        function killQuery(queryId) {
+            $.ajax({
+                type: "POST",
+                url: "query_exec",
+                data: { action: "killQuery", queryId: queryId }
+            })
+                    .done(function(msg) {
+                        var resultJson = $.parseJSON(msg);
+                        if(resultJson.success == "false") {
+                            alert(resultJson.errorMessage);
+                        } else {
+                            alert(resultJson.successMessage);
+                            location.reload();
+                        }
+                    })
+        }
+
+
+    </script>
 </head>
 <body>
 <%@ include file="header.jsp"%>
@@ -72,7 +94,7 @@
   } else {
 %>
   <table width="100%" border="1" class='border_table'>
-    <tr></tr><th>QueryId</th><th>Query 
Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th></tr>
+    <tr></tr><th>QueryId</th><th>Query 
Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th><th>Kill
 Query</th></tr>
     <%
       for(QueryInProgress eachQuery: runningQueries) {
         long time = System.currentTimeMillis() - 
eachQuery.getQueryInfo().getStartTime();
@@ -87,6 +109,7 @@
       <td><%=StringUtils.formatTime(time)%></td>
       <td><%=eachQuery.getQueryInfo().getQueryState()%></td>
       <td><%=eachQuery.getQueryInfo().getSql()%></td>
+      <td><input id="btnSubmit" type="submit" value="Kill" 
onClick="javascript:killQuery('<%=eachQuery.getQueryId()%>');"></td>
     </tr>
     <%
       }

Reply via email to