Repository: tajo Updated Branches: refs/heads/branch-0.8.1 a31884860 -> 590cf8464
TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/590cf846 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/590cf846 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/590cf846 Branch: refs/heads/branch-0.8.1 Commit: 590cf8464f6c1f4c12a0856a5c2ab698c548fac8 Parents: a318848 Author: blrunner <[email protected]> Authored: Wed May 21 17:03:43 2014 +0900 Committer: blrunner <[email protected]> Committed: Wed May 21 17:03:43 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/client/TajoAdmin.java | 9 +- .../java/org/apache/tajo/client/TajoClient.java | 9 +- .../apache/tajo/master/querymaster/Query.java | 11 +- .../tajo/master/querymaster/QueryUnit.java | 11 +- .../master/querymaster/QueryUnitAttempt.java | 19 +++- .../tajo/master/querymaster/SubQuery.java | 13 ++- .../master/rm/TajoWorkerResourceManager.java | 106 ++++++++++--------- .../tajo/webapp/QueryExecutorServlet.java | 22 +++- .../main/java/org/apache/tajo/worker/Task.java | 2 +- .../src/main/resources/webapps/admin/query.jsp | 25 ++++- 11 files changed, 159 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 915311e..f1ad4d5 100644 --- a/CHANGES +++ b/CHANGES @@ -14,6 +14,8 @@ Release 0.8.1 - unreleased BUGS + TAJO-819: KillQuery does not work for running query on TajoWorker. (jaehwa) + TAJO-827: SUM() overflow in the case of INT4. (Hyoungjun Kim via hyunsik) TAJO-833: NPE occurs when using the column as a alias name in the multiple http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java index 9a0478c..4c45568 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java @@ -21,6 +21,7 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; import org.apache.commons.cli.*; import org.apache.commons.lang.StringUtils; +import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; @@ -407,11 +408,13 @@ public class TajoAdmin { public void processKill(Writer writer, String queryIdStr) throws IOException, ServiceException { - boolean killedSuccessfully = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr)); - if (killedSuccessfully) { + QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryIdStr)); + if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) { writer.write(queryIdStr + " is killed successfully.\n"); + } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) { + writer.write(queryIdStr + " will be finished after a while.\n"); } else { - writer.write("killing query is failed."); + writer.write("ERROR:" + status.getErrorMessage()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index d05a375..a804baf 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -796,7 +796,7 @@ public class TajoClient implements Closeable { }.withRetries(); } - public boolean killQuery(final QueryId queryId) + public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException { QueryStatus status = getQueryStatus(queryId); @@ -816,7 +816,9 @@ public class TajoClient implements Closeable { long currentTimeMillis = System.currentTimeMillis(); long timeKillIssued = currentTimeMillis; - while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() != QueryState.QUERY_KILLED)) { + while ((currentTimeMillis < timeKillIssued + 10000L) + && ((status.getState() != QueryState.QUERY_KILLED) + || (status.getState() == QueryState.QUERY_KILL_WAIT))) { try { Thread.sleep(100L); } catch(InterruptedException ie) { @@ -825,13 +827,12 @@ public class TajoClient implements Closeable { currentTimeMillis = System.currentTimeMillis(); status = getQueryStatus(queryId); } - return status.getState() == QueryState.QUERY_KILLED; } catch(Exception e) { LOG.debug("Error when checking for application status", e); - return false; } finally { connPool.releaseConnection(tmClient); } + return status; } public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException { http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index a8f5b31..e61fcba 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -164,8 +164,9 @@ public class Query implements EventHandler<QueryEvent> { QueryEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able transitions - .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT, - EnumSet.of(QueryEventType.KILL)) + .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED), + QueryEventType.KILL, + QUERY_COMPLETED_TRANSITION) // Transitions from FAILED state .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED, @@ -685,7 +686,11 @@ public class Query implements EventHandler<QueryEvent> { try { getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); + LOG.error("Can't handle this event at current state" + + ", type:" + event + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java index 42fbf8a..8aa6c05 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java @@ -154,6 +154,11 @@ public class QueryUnit implements EventHandler<TaskEvent> { .addTransition(TaskState.FAILED, TaskState.FAILED, EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED)) + // Transitions from KILLED state + .addTransition(TaskState.KILLED, TaskState.KILLED, + TaskEventType.T_ATTEMPT_KILLED, + new KillTaskTransition()) + .installTopology(); private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine; @@ -582,7 +587,11 @@ public class QueryUnit implements EventHandler<TaskEvent> { try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); + LOG.error("Can't handle this event at current state" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()), QueryEventType.INTERNAL_ERROR)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java index 7993ce9..aa7f5c4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java @@ -97,6 +97,9 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT, TaskAttemptEventType.TA_KILL, new KillTaskTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_KILL, + new KillTaskTransition()) .addTransition(TaskAttemptState.TA_ASSIGNED, EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED), TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition()) @@ -144,7 +147,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition()) .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED, TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) - // Ignore-able transitions + // Ignore-able transitions .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, TaskAttemptEventType.TA_KILL) @@ -155,7 +158,13 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, EnumSet.of( TaskAttemptEventType.TA_UPDATE)) - + .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, + EnumSet.of( + TaskAttemptEventType.TA_LOCAL_KILLED, + TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_ASSIGNED, + TaskAttemptEventType.TA_DONE), + new TaskKilledCompleteTransition()) .installTopology(); private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> @@ -417,7 +426,11 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> { try { stateMachine.doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")", e); + LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); eventHandler.handle( new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getQueryUnitId().getExecutionBlockId(), "Can't handle this event at current state of " + event.getTaskAttemptId() + ")")); http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 8929e8d..f00b99b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -187,7 +187,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { SubQueryEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Transitions from SUCCEEDED state + // Transitions from SUCCEEDED state .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED, SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) @@ -197,7 +197,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { .addTransition(SubQueryState.SUCCEEDED, SubQueryState.ERROR, SubQueryEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Ignore-able events + // Ignore-able events .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED, EnumSet.of( SubQueryEventType.SQ_START, @@ -235,7 +235,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { SubQueryEventType.SQ_START, SubQueryEventType.SQ_KILL, SubQueryEventType.SQ_FAILED, - SubQueryEventType.SQ_INTERNAL_ERROR)) + SubQueryEventType.SQ_INTERNAL_ERROR, + SubQueryEventType.SQ_SUBQUERY_COMPLETED)) .installTopology(); @@ -593,7 +594,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> { try { getStateMachine().doTransition(event.getType(), event); } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state", e); + LOG.error("Can't handle this event at current state" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); eventHandler.handle(new SubQueryEvent(getId(), SubQueryEventType.SQ_INTERNAL_ERROR)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index 2c3572c..09d5161 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -37,6 +37,7 @@ import org.apache.tajo.util.ApplicationIdUtils; import java.io.IOException; import java.util.*; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -85,6 +86,9 @@ public class TajoWorkerResourceManager implements WorkerResourceManager { private Map<YarnProtos.ContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = new HashMap<YarnProtos.ContainerIdProto, AllocatedWorkerResource>(); + private final Set<QueryId> stoppedQueryIds = + Collections.newSetFromMap(new ConcurrentHashMap<QueryId, Boolean>()); + public TajoWorkerResourceManager(TajoMaster.MasterContext masterContext) { this.masterContext = masterContext; init(masterContext.getConf()); @@ -365,59 +369,64 @@ public class TajoWorkerResourceManager implements WorkerResourceManager { ", liveWorkers=" + liveWorkerResources.size()); } - List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest); + // TajoWorkerResourceManager can't return allocated disk slots occasionally. + // Because the rest resource request can remains after QueryMaster stops. + // Thus we need to find whether QueryId stopped or not. + if (!stoppedQueryIds.contains(resourceRequest.queryId)) { + List<AllocatedWorkerResource> allocatedWorkerResources = chooseWorkers(resourceRequest); - if(allocatedWorkerResources.size() > 0) { - if(resourceRequest.queryMasterRequest) { - startQueryMaster(resourceRequest.queryId, allocatedWorkerResources.get(0)); - } else { - List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = - new ArrayList<TajoMasterProtocol.WorkerAllocatedResource>(); - - for(AllocatedWorkerResource eachWorker: allocatedWorkerResources) { - NodeId nodeId = NodeId.newInstance(eachWorker.workerResource.getAllocatedHost(), - eachWorker.workerResource.getPeerRpcPort()); - - TajoWorkerContainerId containerId = new TajoWorkerContainerId(); - - containerId.setApplicationAttemptId( - ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId)); - containerId.setId(containerIdSeq.incrementAndGet()); - - YarnProtos.ContainerIdProto containerIdProto = containerId.getProto(); - allocatedResources.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder() - .setContainerId(containerIdProto) - .setNodeId(nodeId.toString()) - .setWorkerHost(eachWorker.workerResource.getAllocatedHost()) - .setQueryMasterPort(eachWorker.workerResource.getQueryMasterPort()) - .setPeerRpcPort(eachWorker.workerResource.getPeerRpcPort()) - .setWorkerPullServerPort(eachWorker.workerResource.getPullServerPort()) - .setAllocatedMemoryMB(eachWorker.allocatedMemoryMB) - .setAllocatedDiskSlots(eachWorker.allocatedDiskSlots) - .build()); - - synchronized(workerResourceLock) { - allocatedResourceMap.put(containerIdProto, eachWorker); + if(allocatedWorkerResources.size() > 0) { + if(resourceRequest.queryMasterRequest) { + startQueryMaster(resourceRequest.queryId, allocatedWorkerResources.get(0)); + } else { + List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = + new ArrayList<TajoMasterProtocol.WorkerAllocatedResource>(); + + for(AllocatedWorkerResource eachWorker: allocatedWorkerResources) { + NodeId nodeId = NodeId.newInstance(eachWorker.workerResource.getAllocatedHost(), + eachWorker.workerResource.getPeerRpcPort()); + + TajoWorkerContainerId containerId = new TajoWorkerContainerId(); + + containerId.setApplicationAttemptId( + ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId)); + containerId.setId(containerIdSeq.incrementAndGet()); + + YarnProtos.ContainerIdProto containerIdProto = containerId.getProto(); + allocatedResources.add(TajoMasterProtocol.WorkerAllocatedResource.newBuilder() + .setContainerId(containerIdProto) + .setNodeId(nodeId.toString()) + .setWorkerHost(eachWorker.workerResource.getAllocatedHost()) + .setQueryMasterPort(eachWorker.workerResource.getQueryMasterPort()) + .setPeerRpcPort(eachWorker.workerResource.getPeerRpcPort()) + .setWorkerPullServerPort(eachWorker.workerResource.getPullServerPort()) + .setAllocatedMemoryMB(eachWorker.allocatedMemoryMB) + .setAllocatedDiskSlots(eachWorker.allocatedDiskSlots) + .build()); + + synchronized(workerResourceLock) { + allocatedResourceMap.put(containerIdProto, eachWorker); + } } - } - resourceRequest.callBack.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder() - .setExecutionBlockId(resourceRequest.request.getExecutionBlockId()) - .addAllWorkerAllocatedResource(allocatedResources) - .build() - ); - } - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("========================================="); - LOG.debug("Available Workers"); - for(String liveWorker: liveWorkerResources) { - LOG.debug(allWorkerResourceMap.get(liveWorker).toString()); + resourceRequest.callBack.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder() + .setExecutionBlockId(resourceRequest.request.getExecutionBlockId()) + .addAllWorkerAllocatedResource(allocatedResources) + .build() + ); + } + } else { + if(LOG.isDebugEnabled()) { + LOG.debug("========================================="); + LOG.debug("Available Workers"); + for(String liveWorker: liveWorkerResources) { + LOG.debug(allWorkerResourceMap.get(liveWorker).toString()); + } + LOG.debug("========================================="); } - LOG.debug("========================================="); + requestQueue.put(resourceRequest); + Thread.sleep(100); } - requestQueue.put(resourceRequest); - Thread.sleep(100); } } catch(InterruptedException ie) { LOG.error(ie); @@ -628,6 +637,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager { } else { queryMasterWorkerResource = queryMasterMap.remove(queryId); queryMasterWorkerResource.releaseQueryMasterTask(queryMasterDefaultDiskSlot, queryMasterDefaultMemoryMB); + stoppedQueryIds.add(queryId); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java index faeadaf..1b09e9e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java +++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java @@ -13,6 +13,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.util.JSPUtil; +import org.apache.tajo.util.TajoIdUtils; import org.codehaus.jackson.map.DeserializationConfig; import org.codehaus.jackson.map.ObjectMapper; @@ -170,7 +171,24 @@ public class QueryExecutorServlet extends HttpServlet { } queryRunners.clear(); } + } else if("killQuery".equals(action)) { + String queryId = request.getParameter("queryId"); + if(queryId == null || queryId.trim().isEmpty()) { + errorResponse(response, "No queryId parameter"); + return; + } + QueryStatus status = tajoClient.killQuery(TajoIdUtils.parseQueryId(queryId)); + + if (status.getState() == TajoProtos.QueryState.QUERY_KILLED) { + returnValue.put("successMessage", queryId + " is killed successfully."); + } else if (status.getState() == TajoProtos.QueryState.QUERY_KILL_WAIT) { + returnValue.put("successMessage", queryId + " will be finished after a while."); + } else { + errorResponse(response, "ERROR:" + status.getErrorMessage()); + return; + } } + returnValue.put("success", "true"); writeHttpResponse(response, returnValue); } catch (Exception e) { @@ -337,9 +355,9 @@ public class QueryExecutorServlet extends HttpServlet { queryResult = new ArrayList<List<Object>>(); if(sizeLimit < resultSize) { - numOfRows = (long)((float)(desc.getStats().getNumRows()) * ((float)sizeLimit / (float)resultSize)); + numOfRows = (long)((float)(desc.getStats().getNumRows()) * ((float)sizeLimit / (float)resultSize)); } else { - numOfRows = desc.getStats().getNumRows(); + numOfRows = desc.getStats().getNumRows(); } int rowCount = 0; boolean hasMoreData = false; http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 30f56ee..848a9cf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -560,7 +560,7 @@ public class Task { int retryWaitTime = 1000; try { // for releasing fetch latch - while(retryNum < maxRetryNum) { + while(!killed && retryNum < maxRetryNum) { if (retryNum > 0) { try { Thread.sleep(retryWaitTime); http://git-wip-us.apache.org/repos/asf/tajo/blob/590cf846/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index 42e0b9d..52c1348 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -59,6 +59,28 @@ <link rel="stylesheet" type = "text/css" href = "/static/style.css" /> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>Tajo</title> + <script src="/static/js/jquery.js" type="text/javascript"></script> + <script type="text/javascript"> + + function killQuery(queryId) { + $.ajax({ + type: "POST", + url: "query_exec", + data: { action: "killQuery", queryId: queryId } + }) + .done(function(msg) { + var resultJson = $.parseJSON(msg); + if(resultJson.success == "false") { + alert(resultJson.errorMessage); + } else { + alert(resultJson.successMessage); + location.reload(); + } + }) + } + + + </script> </head> <body> <%@ include file="header.jsp"%> @@ -72,7 +94,7 @@ } else { %> <table width="100%" border="1" class='border_table'> - <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th></tr> + <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th><th>Kill Query</th></tr> <% for(QueryInProgress eachQuery: runningQueries) { long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime(); @@ -87,6 +109,7 @@ <td><%=StringUtils.formatTime(time)%></td> <td><%=eachQuery.getQueryInfo().getQueryState()%></td> <td><%=eachQuery.getQueryInfo().getSql()%></td> + <td><input id="btnSubmit" type="submit" value="Kill" onClick="javascript:killQuery('<%=eachQuery.getQueryId()%>');"></td> </tr> <% }
