Repository: tajo Updated Branches: refs/heads/master 1baf8dce6 -> ddd39213d
TAJO-1593: Add missing stop condition to Taskrunner. (jinho) Closes #562 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ddd39213 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ddd39213 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ddd39213 Branch: refs/heads/master Commit: ddd39213d87fd76e9e80099e92010fd71b3d363e Parents: 1baf8dc Author: Jinho Kim <[email protected]> Authored: Fri May 8 23:42:03 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri May 8 23:42:03 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../apache/tajo/worker/ExecutionBlockContext.java | 15 ++++++++++----- .../main/java/org/apache/tajo/worker/TaskRunner.java | 1 + .../java/org/apache/tajo/rpc/AsyncRpcServer.java | 6 +++++- .../java/org/apache/tajo/rpc/BlockingRpcServer.java | 6 +++++- 5 files changed, 23 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9788307..0aad306 100644 --- a/CHANGES +++ b/CHANGES @@ -121,6 +121,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1593: Add missing stop condition to Taskrunner. (jinho) + TAJO-1556: "insert into select" with reordered column list does not work. (Contributed by Yongjin Choi, Committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 270000a..0cc3304 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -35,9 +35,8 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.*; +import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; @@ -50,6 +49,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -307,7 +307,10 @@ public class ExecutionBlockContext { getWorkerContext().getHashShuffleAppenderManager().close(ebId); if (shuffles == null) { reporterBuilder.addAllIntermediateEntries(intermediateEntries); - stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get()); + + CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); + stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); + callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); return; } @@ -340,7 +343,9 @@ public class ExecutionBlockContext { } } try { - stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get()); + CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); + stub.doneExecutionBlock(callFuture.getController(), reporterBuilder.build(), callFuture); + callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Throwable e) { // can't send report to query master LOG.fatal(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 31f25f0..774f358 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -241,6 +241,7 @@ public class TaskRunner extends AbstractService { // immediately. if (taskRequest.getShouldDie()) { LOG.info("Received ShouldDie flag:" + getId()); + break; } else { getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc(); LOG.info("Accumulated Received Task: " + (++receivedNum)); http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java index 22f47b0..134b3cf 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java @@ -142,7 +142,11 @@ public class AsyncRpcServer extends NettyServerBase { ctx.close(); } Throwable rootCause = ExceptionUtils.getRootCause(cause); - LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); + if(rootCause == null) { + LOG.fatal(ExceptionUtils.getMessage(cause), cause); + } else { + LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/ddd39213/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java index 93c28e3..007ada5 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java @@ -133,7 +133,11 @@ public class BlockingRpcServer extends NettyServerBase { ctx.close(); } Throwable rootCause = ExceptionUtils.getRootCause(cause); - LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); + if(rootCause == null) { + LOG.fatal(ExceptionUtils.getMessage(cause), cause); + } else { + LOG.fatal(ExceptionUtils.getMessage(rootCause), rootCause); + } } } }
