Repository: tajo Updated Branches: refs/heads/master 23f81373f -> 60858b25c
TAJO-985: Client API should be non-blocking. (jinho) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/60858b25 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/60858b25 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/60858b25 Branch: refs/heads/master Commit: 60858b25c46fceedf70b8583f1d7ef1383a19c6b Parents: 23f8137 Author: jhkim <[email protected]> Authored: Mon Aug 18 13:25:02 2014 +0900 Committer: jhkim <[email protected]> Committed: Mon Aug 18 13:25:02 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../main/java/org/apache/tajo/cli/TajoCli.java | 10 +-- .../java/org/apache/tajo/client/TajoAdmin.java | 2 +- .../java/org/apache/tajo/client/TajoClient.java | 26 ++++-- .../apache/tajo/master/querymaster/Query.java | 25 ++++-- .../tajo/master/querymaster/QueryMaster.java | 14 ++++ .../master/querymaster/QueryMasterTask.java | 4 +- .../tajo/master/querymaster/SubQuery.java | 85 +++++++++++++------- .../tajo/worker/TajoResourceAllocator.java | 2 +- .../tajo/worker/TajoWorkerClientService.java | 12 +-- .../org/apache/tajo/client/TestTajoClient.java | 2 +- 11 files changed, 124 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 14eb1e3..dee8fb5 100644 --- a/CHANGES +++ b/CHANGES @@ -112,6 +112,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-985: Client API should be non-blocking. (jinho) + TAJO-1006: Fix wrong storage unit for kilo bytes and others. (hyunsik) TAJO-1000: TextDatum.asChar() is incorrect, if client charset is different. http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java index 81427ab..fdc766e 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java @@ -549,22 +549,20 @@ public class TajoCli { while (true) { // TODO - configurable status = client.getQueryStatus(queryId); - if(status.getState() == QueryState.QUERY_MASTER_INIT || status.getState() == QueryState.QUERY_MASTER_LAUNCHED) { + if(TajoClient.isInPreNewState(status.getState())) { Thread.sleep(Math.min(20 * initRetries, 1000)); initRetries++; continue; } - if (status.getState() == QueryState.QUERY_RUNNING || status.getState() == QueryState.QUERY_SUCCEEDED) { + if (TajoClient.isInRunningState(status.getState()) || status.getState() == QueryState.QUERY_SUCCEEDED) { displayFormatter.printProgress(sout, status); } - if (status.getState() != QueryState.QUERY_RUNNING && - status.getState() != QueryState.QUERY_NOT_ASSIGNED && - status.getState() != QueryState.QUERY_KILL_WAIT) { + if (TajoClient.isInCompleteState(status.getState()) && status.getState() != QueryState.QUERY_KILL_WAIT) { break; } else { - Thread.sleep(Math.min(200 * progressRetries, 1000)); + Thread.sleep(Math.min(100 * progressRetries, 1000)); progressRetries += 2; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java index 4f38858..58b7184 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java @@ -194,7 +194,7 @@ public class TajoAdmin { long end = queryInfo.getFinishTime(); long start = queryInfo.getStartTime(); String executionTime = decimalF.format((end-start) / 1000) + " sec"; - if (!TajoClient.isQueryRunnning(queryInfo.getState())) { + if (TajoClient.isInCompleteState(queryInfo.getState())) { writer.write("Finished Time: " + df.format(queryInfo.getFinishTime())); writer.write("\n"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index 333c8d6..3a90a48 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -446,12 +446,26 @@ public class TajoClient implements Closeable { return new QueryStatus(res); } - public static boolean isQueryRunnning(QueryState state) { - return state == QueryState.QUERY_NEW || - state == QueryState.QUERY_RUNNING || - state == QueryState.QUERY_MASTER_LAUNCHED || + /* query submit */ + public static boolean isInPreNewState(QueryState state) { + return state == QueryState.QUERY_NOT_ASSIGNED || state == QueryState.QUERY_MASTER_INIT || - state == QueryState.QUERY_NOT_ASSIGNED; + state == QueryState.QUERY_MASTER_LAUNCHED; + } + + /* query submitted. but is not running */ + public static boolean isInInitState(QueryState state) { + return state == QueryState.QUERY_NEW || state == QueryState.QUERY_INIT; + } + + /* query started. but is not complete */ + public static boolean isInRunningState(QueryState state) { + return isInInitState(state) || state == QueryState.QUERY_RUNNING; + } + + /* query complete */ + public static boolean isInCompleteState(QueryState state) { + return !isInPreNewState(state) && !isInRunningState(state); } public ResultSet getQueryResult(QueryId queryId) @@ -507,7 +521,7 @@ public class TajoClient implements Closeable { } QueryStatus status = getQueryStatus(queryId); - while(status != null && isQueryRunnning(status.getState())) { + while(status != null && !isInCompleteState(status.getState())) { try { Thread.sleep(500); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 8111ef6..4f4aaab 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -94,6 +94,7 @@ public class Query implements EventHandler<QueryEvent> { // State Machine private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine; + private QueryState queryState; // Transition Handler private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); @@ -231,10 +232,11 @@ public class Query implements EventHandler<QueryEvent> { this.writeLock = readWriteLock.writeLock(); stateMachine = stateMachineFactory.make(this); + queryState = stateMachine.getCurrentState(); } public float getProgress() { - QueryState state = getStateMachine().getCurrentState(); + QueryState state = getState(); if (state == QueryState.QUERY_SUCCEEDED) { return 1.0f; } else { @@ -253,6 +255,7 @@ public class Query implements EventHandler<QueryEvent> { } } else { subProgresses[idx] = 0.0f; + finished = false; } idx++; } @@ -337,7 +340,7 @@ public class Query implements EventHandler<QueryEvent> { return this.subqueries.values(); } - public QueryState getState() { + public QueryState getSynchronizedState() { readLock.lock(); try { return stateMachine.getCurrentState(); @@ -346,6 +349,11 @@ public class Query implements EventHandler<QueryEvent> { } } + /* non-blocking call for client API */ + public QueryState getState() { + return queryState; + } + public ExecutionBlockCursor getExecutionBlockCursor() { return cursor; } @@ -783,13 +791,13 @@ public class Query implements EventHandler<QueryEvent> { query.erroredSubQueryCount++; } else { LOG.error(String.format("Invalid SubQuery (%s) State %s at %s", - castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getState().name())); + castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name())); query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR)); } // if a subquery is succeeded and a query is running if (castEvent.getState() == SubQueryState.SUCCEEDED && // latest subquery succeeded - query.getState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR. + query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR. hasNext(query)) { // there remains at least one subquery. executeNextBlock(query); } else { // if a query is completed due to finished, kill, failure, or error @@ -842,21 +850,22 @@ public class Query implements EventHandler<QueryEvent> { LOG.info("Processing " + event.getQueryId() + " of type " + event.getType()); try { writeLock.lock(); - QueryState oldState = getState(); + QueryState oldState = getSynchronizedState(); try { getStateMachine().doTransition(event.getType(), event); + queryState = getSynchronizedState(); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state" + ", type:" + event + ", oldState:" + oldState.name() - + ", nextState:" + getState().name() + + ", nextState:" + getSynchronizedState().name() , e); eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR)); } //notify the eventhandler of state change - if (oldState != getState()) { - LOG.info(id + " Query Transitioned from " + oldState + " to " + getState()); + if (oldState != getSynchronizedState()) { + LOG.info(id + " Query Transitioned from " + oldState + " to " + getSynchronizedState()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index aed69b2..88589f9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -53,6 +53,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -95,6 +97,8 @@ public class QueryMaster extends CompositeService implements EventHandler { private RpcConnectionPool connPool; + private ExecutorService eventExecutor; + public QueryMaster(TajoWorker.WorkerContext workerContext) { super(QueryMaster.class.getName()); this.workerContext = workerContext; @@ -140,6 +144,7 @@ public class QueryMaster extends CompositeService implements EventHandler { finishedQueryMasterTaskCleanThread = new FinishedQueryMasterTaskCleanThread(); finishedQueryMasterTaskCleanThread.start(); + eventExecutor = Executors.newSingleThreadExecutor(); super.start(); } @@ -160,6 +165,11 @@ public class QueryMaster extends CompositeService implements EventHandler { if(finishedQueryMasterTaskCleanThread != null) { finishedQueryMasterTaskCleanThread.interrupt(); } + + if(eventExecutor != null){ + eventExecutor.shutdown(); + } + super.stop(); LOG.info("QueryMaster stop"); @@ -312,6 +322,10 @@ public class QueryMaster extends CompositeService implements EventHandler { return conf; } + public ExecutorService getEventExecutor(){ + return eventExecutor; + } + public TajoAsyncDispatcher getDispatcher() { return dispatcher; } http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 5885a1d..1889a56 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -296,7 +296,7 @@ public class QueryMasterTask extends CompositeService { QueryId queryId = event.getQueryId(); LOG.info("Query completion notified from " + queryId); - while (!isTerminatedState(query.getState())) { + while (!isTerminatedState(query.getSynchronizedState())) { try { synchronized (this) { wait(10); @@ -305,7 +305,7 @@ public class QueryMasterTask extends CompositeService { LOG.error(e); } } - LOG.info("Query final state: " + query.getState()); + LOG.info("Query final state: " + query.getSynchronizedState()); queryMasterContext.stopQuery(queryId); } http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index b6fe9da..8eff8a4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -87,6 +87,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private AbstractTaskScheduler taskScheduler; private QueryMasterTask.QueryMasterTaskContext context; private final List<String> diagnostics = new ArrayList<String>(); + private SubQueryState subQueryState; private long startTime; private long finishTime; @@ -261,6 +262,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); stateMachine = stateMachineFactory.make(this); + subQueryState = stateMachine.getCurrentState(); } public static boolean isRunningState(SubQueryState state) { @@ -323,7 +325,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { readLock.lock(); try { if (getState() == SubQueryState.NEW) { - return 0; + return 0.0f; } else { tempTasks = new ArrayList<QueryUnit>(tasks.values()); } @@ -338,7 +340,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } - return totalProgress/(float)tempTasks.size(); + if (totalProgress > 0.0f) { + return (float) Math.floor((totalProgress / (float) tempTasks.size()) * 1000.0f) / 1000.0f; + } else { + return 0.0f; + } } public int getSucceededObjectCount() { @@ -466,7 +472,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { return getId().compareTo(other.getId()); } - public SubQueryState getState() { + public SubQueryState getSynchronizedState() { readLock.lock(); try { return stateMachine.getCurrentState(); @@ -475,6 +481,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } + /* non-blocking call for client API */ + public SubQueryState getState() { + return subQueryState; + } + public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) { TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()}; long[] avgRows = new long[]{0, 0}; @@ -584,19 +595,20 @@ public class SubQuery implements EventHandler<SubQueryEvent> { @Override public void handle(SubQueryEvent event) { if (LOG.isDebugEnabled()) { - LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType() + ", preState=" + getState()); + LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType() + ", preState=" + getSynchronizedState()); } try { writeLock.lock(); - SubQueryState oldState = getState(); + SubQueryState oldState = getSynchronizedState(); try { getStateMachine().doTransition(event.getType(), event); + subQueryState = getSynchronizedState(); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state" + ", eventType:" + event.getType().name() + ", oldState:" + oldState.name() - + ", nextState:" + getState().name() + + ", nextState:" + getSynchronizedState().name() , e); eventHandler.handle(new SubQueryEvent(getId(), SubQueryEventType.SQ_INTERNAL_ERROR)); @@ -604,9 +616,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> { // notify the eventhandler of state change if (LOG.isDebugEnabled()) { - if (oldState != getState()) { + if (oldState != getSynchronizedState()) { LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to " - + getState()); + + getSynchronizedState()); } } } finally { @@ -622,7 +634,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { SubQueryEvent, SubQueryState> { @Override - public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { + public SubQueryState transition(final SubQuery subQuery, SubQueryEvent subQueryEvent) { subQuery.setStartTime(); ExecutionBlock execBlock = subQuery.getBlock(); SubQueryState state; @@ -633,24 +645,39 @@ public class SubQuery implements EventHandler<SubQueryEvent> { subQuery.finalizeStats(); state = SubQueryState.SUCCEEDED; } else { - ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock()); - DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId()); - setShuffleIfNecessary(subQuery, channel); - initTaskScheduler(subQuery); - schedule(subQuery); - subQuery.totalScheduledObjectsCount = subQuery.getTaskScheduler().remainingScheduledObjectNum(); - LOG.info(subQuery.totalScheduledObjectsCount + " objects are scheduled"); - - if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks - subQuery.stopScheduler(); - subQuery.finalizeStats(); - subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED)); - return SubQueryState.SUCCEEDED; - } else { - subQuery.taskScheduler.start(); - allocateContainers(subQuery); - return SubQueryState.INITED; - } + // execute pre-processing asyncronously + subQuery.getContext().getQueryMasterContext().getEventExecutor() + .submit(new Runnable() { + @Override + public void run() { + try { + ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock()); + DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId()); + setShuffleIfNecessary(subQuery, channel); + initTaskScheduler(subQuery); + schedule(subQuery); + subQuery.totalScheduledObjectsCount = subQuery.getTaskScheduler().remainingScheduledObjectNum(); + LOG.info(subQuery.totalScheduledObjectsCount + " objects are scheduled"); + + if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks + subQuery.stopScheduler(); + subQuery.finalizeStats(); + subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED)); + } else { + subQuery.taskScheduler.start(); + allocateContainers(subQuery); + + } + } catch (Exception e) { + LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e); + subQuery.setFinishTime(); + subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage())); + subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.ERROR)); + } + } + } + ); + state = SubQueryState.INITED; } } catch (Exception e) { LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e); @@ -863,7 +890,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { long aggregatedVolume = 0; for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) { SubQuery subquery = context.getSubQuery(childBlock.getId()); - if (subquery == null || subquery.getState() != SubQueryState.SUCCEEDED) { + if (subquery == null || subquery.getSynchronizedState() != SubQueryState.SUCCEEDED) { aggregatedVolume += getInputVolume(masterPlan, context, childBlock); } else { aggregatedVolume += subquery.getResultStats().getNumBytes(); @@ -1145,7 +1172,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { subQuery.abort(SubQueryState.KILLED); return SubQueryState.KILLED; } else { - LOG.error("Invalid State " + subQuery.getState() + " State"); + LOG.error("Invalid State " + subQuery.getSynchronizedState() + " State"); subQuery.abort(SubQueryState.ERROR); return SubQueryState.ERROR; } http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 79f14a2..e09a69e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -300,7 +300,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { containers.add(container); } - SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getState(); + SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getSynchronizedState(); if (!SubQuery.isRunningState(state)) { try { List<ContainerId> containerIds = new ArrayList<ContainerId>(); http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index abd4e98..d25013c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -133,7 +133,7 @@ public class TajoWorkerClientService extends AbstractService { RpcController controller, ClientProtos.GetQueryResultRequest request) throws ServiceException { QueryId queryId = new QueryId(request.getQueryId()); - Query query = workerContext.getQueryMaster().getQueryMasterTask(queryId, true).getQuery(); + QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder(); try { @@ -142,12 +142,12 @@ public class TajoWorkerClientService extends AbstractService { LOG.warn("Can't get current user name"); } - if(query == null) { + if(queryMasterTask == null || queryMasterTask.getQuery() == null) { builder.setErrorMessage("No Query for " + queryId); } else { - switch (query.getState()) { + switch (queryMasterTask.getState()) { case QUERY_SUCCEEDED: - builder.setTableDesc(query.getResultDesc().getProto()); + builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto()); break; case QUERY_FAILED: case QUERY_ERROR: @@ -196,10 +196,10 @@ public class TajoWorkerClientService extends AbstractService { Query query = queryMasterTask.getQuery(); if (query != null) { - builder.setState(query.getState()); + builder.setState(queryMasterTask.getState()); builder.setProgress(query.getProgress()); builder.setSubmitTime(query.getAppSubmitTime()); - if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { + if (queryMasterTask.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { builder.setFinishTime(query.getFinishTime()); } else { builder.setFinishTime(System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java index 3d6ed2b..2e69666 100644 --- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -658,7 +658,7 @@ public class TestTajoClient { QueryStatus queryStatus = client.getQueryStatus(queryId); assertNotNull(queryStatus); - assertTrue(!TajoClient.isQueryRunnning(queryStatus.getState())); + assertTrue(TajoClient.isInCompleteState(queryStatus.getState())); TajoResultSet resultSet = (TajoResultSet) client.getQueryResult(queryId); assertNotNull(resultSet);
