TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker. Closes #559
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/04167bdc Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/04167bdc Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/04167bdc Branch: refs/heads/index_support Commit: 04167bdc3bb04b53c5a245a9c18b6426ade82a26 Parents: b6b9d46 Author: Jinho Kim <[email protected]> Authored: Wed May 6 18:13:40 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Wed May 6 18:13:40 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/conf/TajoConf.java | 2 - .../org/apache/tajo/master/QueryInProgress.java | 31 ++--- .../querymaster/QueryMasterManagerService.java | 135 ++++++++----------- .../tajo/worker/ExecutionBlockContext.java | 32 +++-- .../java/org/apache/tajo/worker/TajoWorker.java | 1 + .../tajo/worker/TajoWorkerManagerService.java | 2 + .../main/java/org/apache/tajo/worker/Task.java | 4 +- .../java/org/apache/tajo/worker/TaskRunner.java | 43 +++--- .../src/main/proto/QueryMasterProtocol.proto | 14 +- .../org/apache/tajo/rpc/NettyClientBase.java | 7 +- .../org/apache/tajo/rpc/RpcClientManager.java | 9 ++ 12 files changed, 133 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index a790655..ebd88cd 100644 --- a/CHANGES +++ b/CHANGES @@ -24,6 +24,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker. + (jinho) + TAJO-1563: Improve RPC error handling. (jinho) TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index bfba290..46e7618 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -224,8 +224,6 @@ public class TajoConf extends Configuration { HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true, Validators.bool()), // RPC -------------------------------------------------------------------- - RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10), - // Internal RPC Client INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num", Runtime.getRuntime().availableProcessors() * 2), http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index d2286cf..6a074a2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource; import org.apache.tajo.ipc.QueryMasterProtocol; @@ -31,14 +32,13 @@ import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto; import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.*; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -92,7 +92,9 @@ public class QueryInProgress { try { getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); if (queryMasterRpcClient != null) { - queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); + CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); + queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture); + callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); } } catch (Throwable e) { catchException("Failed to kill query " + queryId + " by exception " + e, e); @@ -111,9 +113,7 @@ public class QueryInProgress { masterContext.getResourceManager().releaseQueryMaster(queryId); - if(queryMasterRpc != null) { - RpcClientManager.cleanup(queryMasterRpc); - } + RpcClientManager.cleanup(queryMasterRpc); try { masterContext.getHistoryWriter().appendAndFlush(queryInfo); @@ -156,8 +156,9 @@ public class QueryInProgress { private void connectQueryMaster() throws Exception { InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); LOG.info("Connect to QueryMaster:" + addr); - queryMasterRpc = - RpcClientManager.getInstance().getClient(addr, QueryMasterProtocol.class, true); + + RpcClientManager.cleanup(queryMasterRpc); + queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true); queryMasterRpcClient = queryMasterRpc.getStub(); } @@ -177,11 +178,7 @@ public class QueryInProgress { if(queryMasterRpcClient == null) { connectQueryMaster(); } - if(queryMasterRpcClient == null) { - LOG.info("No QueryMaster connection info."); - //TODO wait - return; - } + LOG.info("Call executeQuery to :" + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId); @@ -192,11 +189,15 @@ public class QueryInProgress { .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr())) .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build()); - queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get()); + CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>(); + queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture); + callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + querySubmitted.set(true); getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED); } catch (Exception e) { LOG.error("Failed to submit query " + queryId + " to master by exception " + e, e); + catchException(e.getMessage(), e); } finally { writeLock.unlock(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java index 85cc553..59933a7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java @@ -115,10 +115,6 @@ public class QueryMasterManagerService extends CompositeService return bindAddr; } - public String getHostAndPort() { - return bindAddr.getHostName() + ":" + bindAddr.getPort(); - } - @Override public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request, RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) { @@ -136,127 +132,106 @@ public class QueryMasterManagerService extends CompositeService } } catch (Exception e) { LOG.error(e.getMessage(), e); + controller.setFailed(e.getMessage()); } } @Override public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request, - RpcCallback<PrimitiveProtos.BoolProto> done) { - try { - QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId()); - TaskAttemptId attemptId = new TaskAttemptId(request.getId()); - QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId); - if (queryMasterTask == null) { - queryMasterTask = queryMaster.getQueryMasterTask(queryId, true); - } - Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId()); - Task task = sq.getTask(attemptId.getTaskId()); - TaskAttempt attempt = task.getAttempt(attemptId.getId()); + RpcCallback<PrimitiveProtos.NullProto> done) { + QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId()); + TaskAttemptId attemptId = new TaskAttemptId(request.getId()); + QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId); + if (queryMasterTask == null) { + queryMasterTask = queryMaster.getQueryMasterTask(queryId, true); + } + Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId()); + Task task = sq.getTask(attemptId.getTaskId()); + TaskAttempt attempt = task.getAttempt(attemptId.getId()); - if(LOG.isDebugEnabled()){ - LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name())); - } + if(LOG.isDebugEnabled()){ + LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name())); + } - if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) { - LOG.warn(attemptId + " Killed"); - attempt.handle( - new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED)); - } else { - queryMasterTask.getEventHandler().handle( - new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request)); - } - done.run(TajoWorker.TRUE_PROTO); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); + if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) { + LOG.warn(attemptId + " Killed"); + attempt.handle( + new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED)); + } else { + queryMasterTask.getEventHandler().handle( + new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request)); } + + done.run(TajoWorker.NULL_PROTO); } @Override public void ping(RpcController controller, TajoIdProtos.ExecutionBlockIdProto requestProto, - RpcCallback<PrimitiveProtos.BoolProto> done) { - done.run(TajoWorker.TRUE_PROTO); + RpcCallback<PrimitiveProtos.NullProto> done) { + done.run(TajoWorker.NULL_PROTO); } @Override public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report, - RpcCallback<PrimitiveProtos.BoolProto> done) { - try { - QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( - new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); - if (queryMasterTask != null) { - queryMasterTask.handleTaskFailed(report); - } else { - LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId())); - } - done.run(TajoWorker.TRUE_PROTO); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); + RpcCallback<PrimitiveProtos.NullProto> done) { + QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( + new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); + if (queryMasterTask != null) { + queryMasterTask.handleTaskFailed(report); + } else { + LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId())); } + done.run(TajoWorker.NULL_PROTO); } @Override public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report, - RpcCallback<PrimitiveProtos.BoolProto> done) { - try { - QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( - new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); - if (queryMasterTask != null) { - queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report)); - } - done.run(TajoWorker.TRUE_PROTO); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); + RpcCallback<PrimitiveProtos.NullProto> done) { + QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( + new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId())); + if (queryMasterTask != null) { + queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report)); } + done.run(TajoWorker.NULL_PROTO); } @Override public void doneExecutionBlock( RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request, - RpcCallback<PrimitiveProtos.BoolProto> done) { + RpcCallback<PrimitiveProtos.NullProto> done) { QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId())); if (queryMasterTask != null) { ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId()); queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request)); } - done.run(TajoWorker.TRUE_PROTO); + done.run(TajoWorker.NULL_PROTO); } @Override public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request, - RpcCallback<PrimitiveProtos.BoolProto> done) { + RpcCallback<PrimitiveProtos.NullProto> done) { QueryId queryId = new QueryId(request); QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId); if (queryMasterTask != null) { - Query query = queryMasterTask.getQuery(); - if (query != null) { - query.handle(new QueryEvent(queryId, QueryEventType.KILL)); - } + queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL)); } + done.run(TajoWorker.NULL_PROTO); } @Override public void executeQuery(RpcController controller, TajoWorkerProtocol.QueryExecutionRequestProto request, - RpcCallback<PrimitiveProtos.BoolProto> done) { - try { - workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc(); - - QueryId queryId = new QueryId(request.getQueryId()); - LOG.info("Receive executeQuery request:" + queryId); - queryMaster.handle(new QueryStartEvent(queryId, - new Session(request.getSession()), - new QueryContext(workerContext.getQueryMaster().getContext().getConf(), - request.getQueryContext()), request.getExprInJson().getValue(), - request.getLogicalPlanJson().getValue())); - done.run(TajoWorker.TRUE_PROTO); - } catch (Exception e) { - workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc(); - LOG.error(e.getMessage(), e); - done.run(TajoWorker.FALSE_PROTO); - } + RpcCallback<PrimitiveProtos.NullProto> done) { + workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc(); + + QueryId queryId = new QueryId(request.getQueryId()); + LOG.info("Receive executeQuery request:" + queryId); + queryMaster.handle(new QueryStartEvent(queryId, + new Session(request.getSession()), + new QueryContext(workerContext.getQueryMaster().getContext().getConf(), + request.getQueryContext()), request.getExprInJson().getValue(), + request.getLogicalPlanJson().getValue())); + done.run(TajoWorker.NULL_PROTO); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 0d26e6c..cd4b6a6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.ipc.TajoWorkerProtocol.*; +import static org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface; public class ExecutionBlockContext { /** class logger */ @@ -78,6 +79,8 @@ public class ExecutionBlockContext { private TajoQueryEngine queryEngine; private RpcClientManager connManager; private InetSocketAddress qmMasterAddr; + private NettyClientBase client; + private QueryMasterProtocol.QueryMasterProtocolService.Interface stub; private WorkerConnectionInfo queryMaster; private TajoConf systemConf; // for the doAs block @@ -132,16 +135,14 @@ public class ExecutionBlockContext { // initialize DFS and LocalFileSystems this.taskOwner = taskOwner; + this.stub = getRpcClient().getStub(); this.reporter.startReporter(); - // resource intiailization try{ this.resource.initialize(queryContext, plan); } catch (Throwable e) { try { - NettyClientBase client = getQueryMasterConnection(); - QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); - stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get()); + getStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get()); } catch (Throwable t) { //ignore } @@ -153,9 +154,20 @@ public class ExecutionBlockContext { return resource; } - public NettyClientBase getQueryMasterConnection() + private NettyClientBase getRpcClient() throws NoSuchMethodException, ConnectException, ClassNotFoundException { - return connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true); + if (client != null) return client; + + client = connManager.newClient(qmMasterAddr, QueryMasterProtocol.class, true); + return client; + } + + public Interface getStub() { + return stub; + } + + public boolean isStopped() { + return stop.get(); } public void stop(){ @@ -184,6 +196,7 @@ public class ExecutionBlockContext { tasks.clear(); resource.release(); + RpcClientManager.cleanup(client); } public TajoConf getConf() { @@ -282,8 +295,7 @@ public class ExecutionBlockContext { /* This case is that worker did not ran tasks */ if(completedTasksNum.get() == 0) return; - NettyClientBase client = getQueryMasterConnection(); - QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); + Interface stub = getStub(); ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder(); reporterBuilder.setEbId(ebId.getProto()); @@ -379,10 +391,8 @@ public class ExecutionBlockContext { public void run() { while (!reporterStop.get() && !Thread.interrupted()) { - NettyClientBase client = null; try { - client = getQueryMasterConnection(); - QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub(); + Interface masterStub = getStub(); if(tasks.size() == 0){ masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get()); http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 79b83e4..b666f80 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -77,6 +77,7 @@ import static org.apache.tajo.conf.TajoConf.ConfVars; public class TajoWorker extends CompositeService { public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build(); public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build(); + public static final PrimitiveProtos.NullProto NULL_PROTO = PrimitiveProtos.NullProto.newBuilder().build(); private static final Log LOG = LogFactory.getLog(TajoWorker.class); http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index 71d96c4..bbf8564 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -127,6 +127,7 @@ public class TajoWorkerManagerService extends CompositeService done.run(TajoWorker.TRUE_PROTO); } catch (Throwable t) { LOG.error(t.getMessage(), t); + controller.setFailed(t.getMessage()); done.run(TajoWorker.FALSE_PROTO); } } @@ -142,6 +143,7 @@ public class TajoWorkerManagerService extends CompositeService done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { LOG.error(e.getMessage(), e); + controller.setFailed(e.getMessage()); done.run(TajoWorker.FALSE_PROTO); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index a983f78..53ed73e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -440,9 +440,7 @@ public class Task { executionBlockContext.completedTasksNum.incrementAndGet(); context.getHashShuffleAppenderManager().finalizeTask(taskId); - NettyClientBase client = executionBlockContext.getQueryMasterConnection(); - - QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub(); + QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); if (context.isStopped()) { context.setExecutorProgress(0.0f); http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 6076913..31f25f0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -194,24 +194,8 @@ public class TaskRunner extends AbstractService { CallFuture<TaskRequestProto> callFuture = null; TaskRequestProto taskRequest = null; - while(!stopped) { - NettyClientBase client; - try { - client = executionBlockContext.getQueryMasterConnection(); - } catch (ConnectException ce) { - // NettyClientBase throws ConnectTimeoutException if connection was failed - stop(); - getContext().stopTaskRunner(getId()); - LOG.error("Connecting to QueryMaster was failed.", ce); - break; - } catch (Throwable t) { - LOG.fatal("Unable to handle exception: " + t.getMessage(), t); - stop(); - getContext().stopTaskRunner(getId()); - break; - } - - QueryMasterProtocolService.Interface qmClientService = client.getStub(); + while(!stopped && !executionBlockContext.isStopped()) { + QueryMasterProtocolService.Interface qmClientService = executionBlockContext.getStub(); try { if (callFuture == null) { @@ -243,8 +227,12 @@ public class TaskRunner extends AbstractService { } continue; } catch (ExecutionException ee) { - LOG.error(ee.getMessage(), ee); - break; + if(!getContext().isStopped()){ + LOG.error(ee.getMessage(), ee); + } else { + /* EB is stopped */ + break; + } } if (taskRequest != null) { @@ -253,9 +241,6 @@ public class TaskRunner extends AbstractService { // immediately. if (taskRequest.getShouldDie()) { LOG.info("Received ShouldDie flag:" + getId()); - stop(); - //notify to TaskRunnerManager - getContext().stopTaskRunner(getId()); } else { getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc(); LOG.info("Accumulated Received Task: " + (++receivedNum)); @@ -268,7 +253,7 @@ public class TaskRunner extends AbstractService { } LOG.info("Initializing: " + taskAttemptId); - Task task; + Task task = null; try { task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext, new TaskRequestImpl(taskRequest)); @@ -283,20 +268,22 @@ public class TaskRunner extends AbstractService { } catch (Throwable t) { LOG.error(t.getMessage(), t); fatalError(qmClientService, taskAttemptId, t.getMessage()); + if(task != null) { + task.cleanupTask(); + } } finally { callFuture = null; taskRequest = null; } } - } else { - stop(); - //notify to TaskRunnerManager - getContext().stopTaskRunner(getId()); } } catch (Throwable t) { LOG.fatal(t.getMessage(), t); } } + stop(); + //notify to TaskRunnerManager + getContext().stopTaskRunner(getId()); } }); taskLauncher.start(); http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/proto/QueryMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto index ae20309..855c2c6 100644 --- a/tajo-core/src/main/proto/QueryMasterProtocol.proto +++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto @@ -34,13 +34,13 @@ package hadoop.yarn; service QueryMasterProtocolService { //from Worker rpc getTask(GetTaskRequestProto) returns (TaskRequestProto); - rpc statusUpdate (TaskStatusProto) returns (BoolProto); - rpc ping (ExecutionBlockIdProto) returns (BoolProto); - rpc fatalError(TaskFatalErrorReport) returns (BoolProto); - rpc done (TaskCompletionReport) returns (BoolProto); - rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto); + rpc statusUpdate (TaskStatusProto) returns (NullProto); + rpc ping (ExecutionBlockIdProto) returns (NullProto); + rpc fatalError(TaskFatalErrorReport) returns (NullProto); + rpc done (TaskCompletionReport) returns (NullProto); + rpc doneExecutionBlock(ExecutionBlockReport) returns (NullProto); //from TajoMaster's QueryJobManager - rpc killQuery(QueryIdProto) returns (BoolProto); - rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto); + rpc killQuery(QueryIdProto) returns (NullProto); + rpc executeQuery(QueryExecutionRequestProto) returns (NullProto); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 8f6f9ed..0d86527 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -195,7 +195,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable if (maxRetries > retries) { retries++; - LOG.warn(future.cause() + " Try to reconnect : " + getKey().addr); + LOG.warn(getErrorMessage(ExceptionUtils.getMessage(future.cause())) + " Try to reconnect : " + getKey().addr); try { Thread.sleep(RpcConstants.DEFAULT_PAUSE); } catch (InterruptedException e) { @@ -246,8 +246,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable private String getErrorMessage(String message) { return "Exception [" + getKey().protocolClass.getCanonicalName() + - "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) - getChannel().remoteAddress()) + ")]: " + message; + "(" + getKey().addr + ")]: " + message; } @Override @@ -332,7 +331,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable throws Exception { Throwable rootCause = ExceptionUtils.getRootCause(cause); - LOG.error(getKey().addr + "," + getKey().protocolClass + "," + ExceptionUtils.getMessage(rootCause), rootCause); + LOG.error(getErrorMessage(ExceptionUtils.getMessage(rootCause)), rootCause); if (cause instanceof RecoverableException) { sendException((RecoverableException) cause); http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java index f8def7f..111754e 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java @@ -130,6 +130,15 @@ public class RpcClientManager { return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), retries, timeout, timeUnit, enablePing); } + public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr, + Class<?> protocolClass, + boolean asyncMode) + throws NoSuchMethodException, ClassNotFoundException, ConnectException { + + return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), + retries, getTimeoutSeconds(), TimeUnit.SECONDS, true); + } + public synchronized <T extends NettyClientBase> T newClient(RpcConnectionKey key, int retries, long timeout,
