TAJO-1015: Add executionblock event in worker. (jinho) Closes #124
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/15450e86 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/15450e86 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/15450e86 Branch: refs/heads/master Commit: 15450e868ae6a985deb988e679d19e1364c95526 Parents: 64416de Author: jhkim <[email protected]> Authored: Mon Sep 15 17:42:38 2014 +0900 Committer: jhkim <[email protected]> Committed: Mon Sep 15 17:42:38 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 3 + .../apache/tajo/master/TajoContainerProxy.java | 4 +- .../querymaster/QueryMasterManagerService.java | 5 +- .../tajo/master/querymaster/SubQuery.java | 2 +- .../tajo/worker/ExecutionBlockContext.java | 437 +++++++++++++++++++ .../worker/ExecutionBlockSharedResource.java | 36 +- .../tajo/worker/TajoResourceAllocator.java | 41 +- .../java/org/apache/tajo/worker/TajoWorker.java | 82 ++-- .../tajo/worker/TajoWorkerManagerService.java | 46 +- .../main/java/org/apache/tajo/worker/Task.java | 285 ++++-------- .../apache/tajo/worker/TaskAttemptContext.java | 47 +- .../java/org/apache/tajo/worker/TaskRunner.java | 277 ++---------- .../apache/tajo/worker/TaskRunnerManager.java | 253 ++++------- .../tajo/worker/event/TaskRunnerEvent.java | 41 ++ .../tajo/worker/event/TaskRunnerStartEvent.java | 51 +++ .../tajo/worker/event/TaskRunnerStopEvent.java | 28 ++ .../src/main/proto/QueryMasterProtocol.proto | 2 +- .../src/main/proto/TajoWorkerProtocol.proto | 5 +- .../org/apache/tajo/worker/TestFetcher.java | 2 + 20 files changed, 924 insertions(+), 725 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 6def7f8..61fcec0 100644 --- a/CHANGES +++ b/CHANGES @@ -431,6 +431,8 @@ Release 0.9.0 - unreleased SUB TASKS + TAJO-1015: Add executionblock event in worker. (jinho) + TAJO-783: Remove yarn-related code from tajo-core. (hyunsik) Release 0.8.0 http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 9aead24..a089b54 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -262,6 +262,9 @@ public class TajoConf extends Configuration { // Client RPC RPC_CLIENT_WORKER_THREAD_NUM("tajo.rpc.client.worker-thread-num", 4), + SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM("tajo.shuffle.rpc.client.worker-thread-num", + Runtime.getRuntime().availableProcessors()), + //Client service RPC Server MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.service.rpc.server.worker-thread-num", Runtime.getRuntime().availableProcessors() * 1), http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index f3e4b72..c317ba5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -101,7 +101,7 @@ public class TajoContainerProxy extends ContainerProxy { TajoWorkerProtocol.RunExecutionBlockRequestProto request = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder() - .setExecutionBlockId(executionBlockId.toString()) + .setExecutionBlockId(executionBlockId.getProto()) .setQueryMasterHost(myAddr.getHostName()) .setQueryMasterPort(myAddr.getPort()) .setNodeId(container.getNodeId().toString()) @@ -111,7 +111,7 @@ public class TajoContainerProxy extends ContainerProxy { .setPlanJson(planJson) .build(); - tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get()); + tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get()); } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java index 4f3c2ab..862dfef 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java @@ -40,8 +40,6 @@ import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.TajoWorker; import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class QueryMasterManagerService extends CompositeService implements QueryMasterProtocol.QueryMasterProtocolService.Interface { @@ -177,7 +175,7 @@ public class QueryMasterManagerService extends CompositeService @Override public void ping(RpcController controller, - TajoIdProtos.QueryUnitAttemptIdProto attemptId, + TajoIdProtos.ExecutionBlockIdProto requestProto, RpcCallback<PrimitiveProtos.BoolProto> done) { done.run(TajoWorker.TRUE_PROTO); } @@ -225,6 +223,7 @@ public class QueryMasterManagerService extends CompositeService ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId()); queryMasterTask.getQuery().getSubQuery(ebId).receiveExecutionBlockReport(request); } + done.run(TajoWorker.TRUE_PROTO); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 850ffe8..92ef9f9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -348,7 +348,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } if (totalProgress > 0.0f) { - return (float) Math.floor((totalProgress / (float) tempTasks.size()) * 1000.0f) / 1000.0f; + return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f; } else { return 0.0f; } http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java new file mode 100644 index 0000000..306ab66 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -0,0 +1,437 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.storage.StorageUtil; +import org.apache.tajo.util.Pair; +import org.apache.tajo.worker.event.TaskRunnerStartEvent; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.*; + +public class ExecutionBlockContext { + /** class logger */ + private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class); + + private TaskRunnerManager manager; + public AtomicInteger completedTasksNum = new AtomicInteger(); + public AtomicInteger succeededTasksNum = new AtomicInteger(); + public AtomicInteger killedTasksNum = new AtomicInteger(); + public AtomicInteger failedTasksNum = new AtomicInteger(); + + private ClientSocketChannelFactory channelFactory; + // for temporal or intermediate files + private FileSystem localFS; + // for input files + private FileSystem defaultFS; + private ExecutionBlockId executionBlockId; + private QueryContext queryContext; + private String plan; + + private ExecutionBlockSharedResource resource; + + private TajoQueryEngine queryEngine; + private RpcConnectionPool connPool; + private InetSocketAddress qmMasterAddr; + private TajoConf systemConf; + // for the doAs block + private UserGroupInformation taskOwner; + + private Reporter reporter; + + private AtomicBoolean stop = new AtomicBoolean(); + + // It keeps all of the query unit attempts while a TaskRunner is running. + private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap(); + + private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap(); + + public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, InetSocketAddress queryMaster) + throws Throwable { + this.manager = manager; + this.executionBlockId = event.getExecutionBlockId(); + this.connPool = RpcConnectionPool.getPool(manager.getTajoConf()); + this.qmMasterAddr = queryMaster; + this.systemConf = manager.getTajoConf(); + this.reporter = new Reporter(); + this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + this.localFS = FileSystem.getLocal(systemConf); + + // Setup QueryEngine according to the query plan + // Here, we can setup row-based query engine or columnar query engine. + this.queryEngine = new TajoQueryEngine(systemConf); + this.queryContext = event.getQueryContext(); + this.plan = event.getPlan(); + this.resource = new ExecutionBlockSharedResource(); + + init(); + } + + public void init() throws Throwable { + + LOG.info("Tajo Root Dir: " + systemConf.getVar(TajoConf.ConfVars.ROOT_DIR)); + LOG.info("Worker Local Dir: " + systemConf.getVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR)); + + LOG.info("QueryMaster Address:" + qmMasterAddr); + + UserGroupInformation.setConfiguration(systemConf); + // TODO - 'load credential' should be implemented + // Getting taskOwner + UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME)); + + // initialize DFS and LocalFileSystems + this.taskOwner = taskOwner; + this.reporter.startReporter(); + + // resource intiailization + try{ + this.resource.initialize(queryContext, plan); + } catch (Throwable e) { + getQueryMasterStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get()); + throw e; + } + } + + public ExecutionBlockSharedResource getSharedResource() { + return resource; + } + + public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub() throws Exception { + NettyClientBase clientBase = null; + try { + clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true); + return clientBase.getStub(); + } finally { + connPool.releaseConnection(clientBase); + } + } + + public void stop(){ + if(stop.getAndSet(true)){ + return; + } + + try { + reporter.stop(); + } catch (InterruptedException e) { + LOG.error(e); + } + + // If ExecutionBlock is stopped, all running or pending tasks will be marked as failed. + for (Task task : tasks.values()) { + if (task.getStatus() == TajoProtos.TaskAttemptState.TA_PENDING || + task.getStatus() == TajoProtos.TaskAttemptState.TA_RUNNING) { + task.setState(TajoProtos.TaskAttemptState.TA_FAILED); + try{ + task.abort(); + } catch (Throwable e){ + LOG.error(e); + } + } + } + tasks.clear(); + + resource.release(); + + try { + releaseShuffleChannelFactory(); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } + } + + public TajoConf getConf() { + return manager.getTajoConf(); + } + + public FileSystem getLocalFS() { + return localFS; + } + + public FileSystem getDefaultFS() { + return defaultFS; + } + + public LocalDirAllocator getLocalDirAllocator() { + return manager.getWorkerContext().getLocalDirAllocator(); + } + + public TajoQueryEngine getTQueryEngine() { + return queryEngine; + } + + // for the local temporal dir + public Path createBaseDir() throws IOException { + // the base dir for an output dir + String baseDir = getBaseOutputDir(executionBlockId).toString(); + Path baseDirPath = localFS.makeQualified(getLocalDirAllocator().getLocalPathForWrite(baseDir, systemConf)); + return baseDirPath; + } + + public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) { + Path workDir = + StorageUtil.concatPath( + executionBlockId.getQueryId().toString(), + "output", + String.valueOf(executionBlockId.getId())); + return workDir; + } + + public static Path getBaseInputDir(ExecutionBlockId executionBlockId) { + Path workDir = + StorageUtil.concatPath( + executionBlockId.getQueryId().toString(), + "in", + executionBlockId.toString()); + return workDir; + } + + public ExecutionBlockId getExecutionBlockId() { + return executionBlockId; + } + + public Map<QueryUnitAttemptId, Task> getTasks() { + return tasks; + } + + public Task getTask(QueryUnitAttemptId queryUnitAttemptId){ + return tasks.get(queryUnitAttemptId); + } + + public void stopTaskRunner(String id){ + manager.stopTaskRunner(id); + } + + public TaskRunner getTaskRunner(String taskRunnerId){ + return manager.getTaskRunner(taskRunnerId); + } + + public void addTaskHistory(String taskRunnerId, QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) { + histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory); + } + + public TaskRunnerHistory createTaskRunnerHistory(TaskRunner runner){ + histories.putIfAbsent(runner.getId(), new TaskRunnerHistory(runner.getContainerId(), executionBlockId)); + return histories.get(runner.getId()); + } + + public TajoWorker.WorkerContext getWorkerContext(){ + return manager.getWorkerContext(); + } + + protected ClientSocketChannelFactory getShuffleChannelFactory(){ + if(channelFactory == null) { + int workerNum = getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM); + channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum); + } + return channelFactory; + } + + protected void releaseShuffleChannelFactory(){ + if(channelFactory != null) { + channelFactory.shutdown(); + channelFactory.releaseExternalResources(); + channelFactory = null; + } + } + + private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception { + getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get()); + } + + protected void reportExecutionBlock(ExecutionBlockId ebId) { + ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder(); + reporterBuilder.setEbId(ebId.getProto()); + reporterBuilder.setReportSuccess(true); + reporterBuilder.setSucceededTasks(succeededTasksNum.get()); + try { + List<IntermediateEntryProto> intermediateEntries = Lists.newArrayList(); + List<HashShuffleAppenderManager.HashShuffleIntermediate> shuffles = + getWorkerContext().getHashShuffleAppenderManager().close(ebId); + if (shuffles == null) { + reporterBuilder.addAllIntermediateEntries(intermediateEntries); + sendExecutionBlockReport(reporterBuilder.build()); + return; + } + + IntermediateEntryProto.Builder intermediateBuilder = IntermediateEntryProto.newBuilder(); + IntermediateEntryProto.PageProto.Builder pageBuilder = IntermediateEntryProto.PageProto.newBuilder(); + FailureIntermediateProto.Builder failureBuilder = FailureIntermediateProto.newBuilder(); + + for (HashShuffleAppenderManager.HashShuffleIntermediate eachShuffle: shuffles) { + List<IntermediateEntryProto.PageProto> pages = Lists.newArrayList(); + List<FailureIntermediateProto> failureIntermediateItems = Lists.newArrayList(); + + for (Pair<Long, Integer> eachPage: eachShuffle.getPages()) { + pageBuilder.clear(); + pageBuilder.setPos(eachPage.getFirst()); + pageBuilder.setLength(eachPage.getSecond()); + pages.add(pageBuilder.build()); + } + + for(Pair<Long, Pair<Integer, Integer>> eachFailure: eachShuffle.getFailureTskTupleIndexes()) { + failureBuilder.clear(); + failureBuilder.setPagePos(eachFailure.getFirst()); + failureBuilder.setStartRowNum(eachFailure.getSecond().getFirst()); + failureBuilder.setEndRowNum(eachFailure.getSecond().getSecond()); + failureIntermediateItems.add(failureBuilder.build()); + } + intermediateBuilder.clear(); + + intermediateBuilder.setEbId(ebId.getProto()) + .setHost(getWorkerContext().getTajoWorkerManagerService().getBindAddr().getHostName() + ":" + + getWorkerContext().getPullServerPort()) + .setTaskId(-1) + .setAttemptId(-1) + .setPartId(eachShuffle.getPartId()) + .setVolume(eachShuffle.getVolume()) + .addAllPages(pages) + .addAllFailures(failureIntermediateItems); + intermediateEntries.add(intermediateBuilder.build()); + } + + // send intermediateEntries to QueryMaster + reporterBuilder.addAllIntermediateEntries(intermediateEntries); + + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + reporterBuilder.setReportSuccess(false); + if (e.getMessage() == null) { + reporterBuilder.setReportErrorMessage(e.getClass().getSimpleName()); + } else { + reporterBuilder.setReportErrorMessage(e.getMessage()); + } + } + try { + sendExecutionBlockReport(reporterBuilder.build()); + } catch (Throwable e) { + // can't send report to query master + LOG.fatal(e.getMessage(), e); + throw new RuntimeException(e); + } + } + + protected class Reporter { + private Thread reporterThread; + private AtomicBoolean reporterStop = new AtomicBoolean(); + private static final int PROGRESS_INTERVAL = 1000; + private static final int MAX_RETRIES = 10; + + public Reporter() { + this.reporterThread = new Thread(createReporterThread()); + this.reporterThread.setName("Task reporter"); + } + + public void startReporter(){ + this.reporterThread.start(); + } + + Runnable createReporterThread() { + + return new Runnable() { + int remainingRetries = MAX_RETRIES; + QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub; + @Override + public void run() { + while (!reporterStop.get() && !Thread.interrupted()) { + try { + masterStub = getQueryMasterStub(); + + if(tasks.size() == 0){ + masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get()); + } else { + for (Task task : new ArrayList<Task>(tasks.values())){ + + if (task.isRunning() && task.isProgressChanged()) { + task.updateProgress(); + masterStub.statusUpdate(null, task.getReport(), NullCallback.get()); + task.getContext().setProgressChanged(false); + } else { + task.updateProgress(); + } + } + } + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + remainingRetries -=1; + if (remainingRetries == 0) { + ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); + LOG.warn("Last retry, exiting "); + throw new RuntimeException(t); + } + } finally { + if (remainingRetries > 0 && !reporterStop.get()) { + synchronized (reporterThread) { + try { + reporterThread.wait(PROGRESS_INTERVAL); + } catch (InterruptedException e) { + } + } + } + } + } + } + }; + } + + public void stop() throws InterruptedException { + if (reporterStop.getAndSet(true)) { + return; + } + + if (reporterThread != null) { + // Intent of the lock is to not send an interupt in the middle of an + // umbilical.ping or umbilical.statusUpdate + synchronized (reporterThread) { + //Interrupt if sleeping. Otherwise wait for the RPC call to return. + reporterThread.notifyAll(); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java index a70fbfd..e77e265 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java @@ -32,14 +32,12 @@ import org.apache.tajo.engine.planner.logical.LogicalNode; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.util.Pair; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; public class ExecutionBlockSharedResource { private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class); private AtomicBoolean initializing = new AtomicBoolean(false); private volatile Boolean resourceInitSuccess = new Boolean(false); - private CountDownLatch initializedResourceLatch = new CountDownLatch(1); // Query private QueryContext context; @@ -50,27 +48,18 @@ public class ExecutionBlockSharedResource { private LogicalNode plan; private boolean codeGenEnabled = false; - public void initialize(final QueryContext context, final String planJson) throws InterruptedException { + public void initialize(final QueryContext context, final String planJson) { if (!initializing.getAndSet(true)) { - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - try { - ExecutionBlockSharedResource.this.context = context; - initPlan(planJson); - initCodeGeneration(); - resourceInitSuccess = true; - } catch (Throwable t) { - LOG.error(t); - LOG.error(ExceptionUtils.getStackTrace(t)); - } finally { - initializedResourceLatch.countDown(); - } - } - }); - thread.run(); - thread.join(); + try { + ExecutionBlockSharedResource.this.context = context; + initPlan(planJson); + initCodeGeneration(); + resourceInitSuccess = true; + } catch (Throwable t) { + LOG.error(t); + LOG.error(ExceptionUtils.getStackTrace(t)); + } if (!resourceInitSuccess) { throw new RuntimeException("Resource cannot be initialized"); @@ -91,11 +80,6 @@ public class ExecutionBlockSharedResource { } } - public boolean awaitInitializedResource() throws InterruptedException { - initializedResourceLatch.await(); - return resourceInitSuccess; - } - public LogicalNode getPlan() { return this.plan; } http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index aaff69c..2cc8f0c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -18,6 +18,7 @@ package org.apache.tajo.worker; +import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.*; import org.apache.tajo.master.event.ContainerAllocationEvent; import org.apache.tajo.master.event.ContainerAllocatorEventType; @@ -44,14 +46,13 @@ import org.apache.tajo.master.rm.Worker; import org.apache.tajo.master.rm.WorkerResource; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.util.ApplicationIdUtils; import org.apache.tajo.util.HAServiceUtil; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.net.InetSocketAddress; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -144,6 +145,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { launchTaskRunners(launchEvent); } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) { stopContainers(event.getContainers()); + stopExecutionBlock(event.getExecutionBlockId(), event.getContainers()); } } } @@ -158,6 +160,37 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { } } + public void stopExecutionBlock(final ExecutionBlockId executionBlockId, Collection<Container> containers) { + Set<NodeId> workers = Sets.newHashSet(); + for (Container container : containers){ + workers.add(container.getNodeId()); + } + + for (final NodeId worker : workers) { + executorService.submit(new Runnable() { + @Override + public void run() { + stopExecutionBlock(executionBlockId, worker); + } + }); + } + } + + private void stopExecutionBlock(ExecutionBlockId executionBlockId, NodeId worker) { + NettyClientBase tajoWorkerRpc = null; + try { + InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort()); + tajoWorkerRpc = RpcConnectionPool.getPool(tajoConf).getConnection(addr, TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); + + tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(), NullCallback.get()); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + } finally { + RpcConnectionPool.getPool(tajoConf).releaseConnection(tajoWorkerRpc); + } + } + protected static class LaunchRunner implements Runnable { private final ContainerProxy proxy; private final ContainerId id; http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 584c60e..a8d661b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -27,15 +27,12 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.shell.PathData; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.util.RackResolver; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryId; import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.ha.TajoMasterInfo; import org.apache.tajo.master.querymaster.QueryMaster; @@ -45,8 +42,11 @@ import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.util.*; import org.apache.tajo.storage.HashShuffleAppenderManager; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.HAServiceUtil; +import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.metrics.TajoSystemMetrics; import org.apache.tajo.webapp.StaticHttpServer; @@ -57,7 +57,6 @@ import java.lang.management.ThreadMXBean; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -125,6 +124,10 @@ public class TajoWorker extends CompositeService { private HashShuffleAppenderManager hashShuffleAppenderManager; + private AsyncDispatcher dispatcher; + + private LocalDirAllocator lDirAllocator; + public TajoWorker() throws Exception { super(TajoWorker.class.getName()); } @@ -166,7 +169,7 @@ public class TajoWorker extends CompositeService { } @Override - public void init(Configuration conf) { + public void serviceInit(Configuration conf) throws Exception { Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook())); this.systemConf = (TajoConf)conf; @@ -174,6 +177,7 @@ public class TajoWorker extends CompositeService { this.connPool = RpcConnectionPool.getPool(systemConf); this.workerContext = new WorkerContext(); + this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS); @@ -192,6 +196,9 @@ public class TajoWorker extends CompositeService { systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0); } + this.dispatcher = new AsyncDispatcher(); + addIfService(dispatcher); + // querymaster worker tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort); addService(tajoWorkerClientService); @@ -200,7 +207,7 @@ public class TajoWorker extends CompositeService { addService(queryMasterManagerService); // taskrunner worker - taskRunnerManager = new TaskRunnerManager(workerContext); + taskRunnerManager = new TaskRunnerManager(workerContext, dispatcher); addService(taskRunnerManager); tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort); @@ -240,30 +247,19 @@ public class TajoWorker extends CompositeService { ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort + ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort); - super.init(conf); + super.serviceInit(conf); tajoMasterInfo = new TajoMasterInfo(); - if(yarnContainerMode && queryMasterMode) { - tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(cmdArgs[2])); - connectToCatalog(); - - QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]); - queryMasterManagerService.getQueryMaster().reportQueryStatusToQueryMaster( - queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED); - } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode - taskRunnerManager.startTask(cmdArgs); + if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { + tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); + tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); } else { - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - tajoMasterInfo.setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tajoMasterInfo.setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); - } else { - tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars - .TAJO_MASTER_UMBILICAL_RPC_ADDRESS))); - tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars - .RESOURCE_TRACKER_RPC_ADDRESS))); - } - connectToCatalog(); + tajoMasterInfo.setTajoMasterAddress(NetUtils.createSocketAddr(systemConf.getVar(ConfVars + .TAJO_MASTER_UMBILICAL_RPC_ADDRESS))); + tajoMasterInfo.setWorkerResourceTrackerAddr(NetUtils.createSocketAddr(systemConf.getVar(ConfVars + .RESOURCE_TRACKER_RPC_ADDRESS))); } + connectToCatalog(); workerHeartbeatThread = new WorkerHeartbeatService(workerContext); workerHeartbeatThread.init(conf); @@ -309,13 +305,13 @@ public class TajoWorker extends CompositeService { } @Override - public void start() { - super.start(); + public void serviceStart() throws Exception { initWorkerMetrics(); + super.serviceStart(); } @Override - public void stop() { + public void serviceStop() throws Exception { if(stopped.getAndSet(true)) { return; } @@ -349,14 +345,11 @@ public class TajoWorker extends CompositeService { } if(deletionService != null) deletionService.stop(); - super.stop(); + super.serviceStop(); LOG.info("TajoWorker main thread exiting"); } public class WorkerContext { - private ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource> sharedResourceMap = - new ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource>(); - public QueryMaster getQueryMaster() { if (queryMasterManagerService == null) { return null; @@ -407,23 +400,8 @@ public class TajoWorker extends CompositeService { } } - public void initSharedResource(QueryContext queryContext, ExecutionBlockId blockId, String planJson) - throws InterruptedException { - - if (!sharedResourceMap.containsKey(blockId)) { - ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource(); - if (sharedResourceMap.putIfAbsent(blockId, resource) == null) { - resource.initialize(queryContext, planJson); - } - } - } - - public ExecutionBlockSharedResource getSharedResource(ExecutionBlockId blockId) { - return sharedResourceMap.get(blockId); - } - - public void releaseSharedResource(ExecutionBlockId blockId) { - sharedResourceMap.remove(blockId).release(); + public LocalDirAllocator getLocalDirAllocator(){ + return lDirAllocator; } protected void cleanup(String strPath) { http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index fa116c3..472ce1b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -25,14 +25,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; -import org.apache.tajo.*; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoIdProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.worker.event.TaskRunnerStartEvent; +import org.apache.tajo.worker.event.TaskRunnerStopEvent; import java.net.InetSocketAddress; @@ -112,19 +116,16 @@ public class TajoWorkerManagerService extends CompositeService } @Override - public void executeExecutionBlock(RpcController controller, + public void startExecutionBlock(RpcController controller, TajoWorkerProtocol.RunExecutionBlockRequestProto request, RpcCallback<PrimitiveProtos.BoolProto> done) { workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc(); try { - workerContext.initSharedResource( - new QueryContext(workerContext.getConf(), request.getQueryContext()), - TajoIdUtils.createExecutionBlockId(request.getExecutionBlockId()), request.getPlanJson()); String[] params = new String[7]; params[0] = "standby"; //mode(never used) - params[1] = request.getExecutionBlockId(); + params[1] = request.getExecutionBlockId().toString(); // NodeId has a form of hostname:port. params[2] = request.getNodeId(); params[3] = request.getContainerId(); @@ -133,7 +134,14 @@ public class TajoWorkerManagerService extends CompositeService params[4] = request.getQueryMasterHost(); params[5] = String.valueOf(request.getQueryMasterPort()); params[6] = request.getQueryOutputPath(); - workerContext.getTaskRunnerManager().startTask(params); + + ExecutionBlockId executionBlockId = new ExecutionBlockId(request.getExecutionBlockId()); + workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent( + params + , executionBlockId, + new QueryContext(workerContext.getConf(), request.getQueryContext()), + request.getPlanJson() + )); done.run(TajoWorker.TRUE_PROTO); } catch (Throwable t) { LOG.error(t.getMessage(), t); @@ -142,6 +150,21 @@ public class TajoWorkerManagerService extends CompositeService } @Override + public void stopExecutionBlock(RpcController controller, + TajoIdProtos.ExecutionBlockIdProto requestProto, + RpcCallback<PrimitiveProtos.BoolProto> done) { + try { + workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStopEvent( + new ExecutionBlockId(requestProto) + )); + done.run(TajoWorker.TRUE_PROTO); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + done.run(TajoWorker.FALSE_PROTO); + } + } + + @Override public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request, RpcCallback<PrimitiveProtos.BoolProto> done) { Task task = workerContext.getTaskRunnerManager().getTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request)); @@ -162,13 +185,10 @@ public class TajoWorkerManagerService extends CompositeService TajoWorkerProtocol.ExecutionBlockListProto ebIds, RpcCallback<PrimitiveProtos.BoolProto> done) { for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : ebIds.getExecutionBlockIdList()) { - String inputDir = TaskRunner.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); + String inputDir = ExecutionBlockContext.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); workerContext.cleanup(inputDir); - String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); + String outputDir = ExecutionBlockContext.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); workerContext.cleanup(outputDir); - - // Release shared resources - workerContext.releaseSharedResource(new ExecutionBlockId(executionBlockIdProto)); } done.run(TajoWorker.TRUE_PROTO); } http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index d0665ae..7b4cbe1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -25,8 +25,10 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos; @@ -43,11 +45,10 @@ import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.QueryUnitRequest; -import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; +import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.fragment.FileFragment; @@ -59,7 +60,7 @@ import java.net.URI; import java.text.NumberFormat; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ExecutorService; import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; @@ -69,11 +70,9 @@ public class Task { private final TajoConf systemConf; private final QueryContext queryContext; - private final FileSystem localFS; - private TaskRunner.TaskRunnerContext taskRunnerContext; - private final QueryMasterProtocolService.Interface masterProxy; - private final LocalDirAllocator lDirAllocator; + private final ExecutionBlockContext executionBlockContext; private final QueryUnitAttemptId taskId; + private final String taskRunnerId; private final Path taskDir; private final QueryUnitRequest request; @@ -85,7 +84,6 @@ public class Task { private boolean interQuery; private boolean killed = false; private boolean aborted = false; - private final Reporter reporter; private Path inputTableBaseDir; private long startTime; @@ -97,7 +95,6 @@ public class Task { private ShuffleType shuffleType = null; private Schema finalSchema = null; private TupleComparator sortComp = null; - private ClientSocketChannelFactory channelFactory = null; static final String OUTPUT_FILE_PREFIX="part-"; static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY = @@ -132,45 +129,27 @@ public class Task { } }; - public Task(QueryUnitAttemptId taskId, - final TaskRunner.TaskRunnerContext worker, - final QueryMasterProtocolService.Interface masterProxy, + public Task(String taskRunnerId, + Path baseDir, + QueryUnitAttemptId taskId, + final ExecutionBlockContext executionBlockContext, final QueryUnitRequest request) throws IOException { + this.taskRunnerId = taskRunnerId; this.request = request; this.taskId = taskId; - this.systemConf = worker.getConf(); + this.systemConf = executionBlockContext.getConf(); this.queryContext = request.getQueryContext(); - this.taskRunnerContext = worker; - this.masterProxy = masterProxy; - this.localFS = worker.getLocalFS(); - this.lDirAllocator = worker.getLocalDirAllocator(); - this.taskDir = StorageUtil.concatPath(taskRunnerContext.getBaseDir(), + this.executionBlockContext = executionBlockContext; + this.taskDir = StorageUtil.concatPath(baseDir, taskId.getQueryUnitId().getId() + "_" + taskId.getId()); - this.context = new TaskAttemptContext(queryContext, worker.getWorkerContext(), taskId, + this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId, request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir); this.context.setDataChannel(request.getDataChannel()); this.context.setEnforcer(request.getEnforcer()); this.inputStats = new TableStats(); - this.reporter = new Reporter(taskId, masterProxy); - this.reporter.startCommunicationThread(); - - - // resource intiailization - boolean resourceInitialized = false; - try { - resourceInitialized = context.getSharedResource().awaitInitializedResource(); - } catch (InterruptedException e) { - LOG.error("Failed Resource Initialization", e); - } finally { - if (!resourceInitialized) { - setState(TaskAttemptState.TA_FAILED); - return; - } - } - plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class); LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); if (scanNode != null) { @@ -234,11 +213,12 @@ public class Task { public void init() throws IOException { if (context.getState() == TaskAttemptState.TA_PENDING) { // initialize a task temporal dir + FileSystem localFS = executionBlockContext.getLocalFS(); localFS.mkdirs(taskDir); if (request.getFetches().size() > 0) { inputTableBaseDir = localFS.makeQualified( - lDirAllocator.getLocalPathForWrite( + executionBlockContext.getLocalDirAllocator().getLocalPathForWrite( getTaskAttemptDir(context.getTaskId()).toString(), systemConf)); localFS.mkdirs(inputTableBaseDir); Path tableDir; @@ -296,8 +276,9 @@ public class Task { } public void fetch() { + ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); for (Fetcher f : fetcherRunners) { - taskRunnerContext.getFetchLauncher().submit(new FetchRunner(context, f)); + executorService.submit(new FetchRunner(context, f)); } } @@ -305,25 +286,18 @@ public class Task { killed = true; context.stop(); context.setState(TaskAttemptState.TA_KILLED); - releaseChannelFactory(); } public void abort() { aborted = true; context.stop(); - releaseChannelFactory(); } public void cleanUp() { // remove itself from worker if (context.getState() == TaskAttemptState.TA_SUCCEEDED) { - try { - localFS.delete(context.getWorkDir(), true); - synchronized (taskRunnerContext.getTasks()) { - taskRunnerContext.getTasks().remove(this.getId()); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); + synchronized (executionBlockContext.getTasks()) { + executionBlockContext.getTasks().remove(this.getId()); } } else { LOG.error("QueryUnitAttemptId: " + context.getTaskId() + " status: " + context.getState()); @@ -332,7 +306,7 @@ public class Task { public TaskStatusProto getReport() { TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); - builder.setWorkerName(taskRunnerContext.getNodeId()); + builder.setWorkerName(executionBlockContext.getTaskRunner(taskRunnerId).getNodeId().toString()); builder.setId(context.getTaskId().getProto()) .setProgress(context.getProgress()) .setState(context.getState()); @@ -345,6 +319,23 @@ public class Task { return builder.build(); } + public boolean isRunning(){ + return context.getState() == TaskAttemptState.TA_RUNNING; + } + public boolean isProgressChanged() { + return context.isProgressChanged(); + } + + public void updateProgress() { + if(killed || aborted){ + return; + } + + if (executor != null && context.getProgress() < 1.0f) { + context.setExecutorProgress(executor.getProgress()); + } + } + private CatalogProtos.TableStatsProto reloadInputStats() { synchronized(inputStats) { if (this.executor == null) { @@ -419,12 +410,11 @@ public class Task { FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); context.updateAssignedFragments(inputTable, frags); } - releaseChannelFactory(); } - public void run() { + public void run() throws Exception { startTime = System.currentTimeMillis(); - Exception error = null; + Throwable error = null; try { context.setState(TaskAttemptState.TA_RUNNING); @@ -433,16 +423,17 @@ public class Task { // complete. waitForFetch(); context.setFetcherProgress(FETCHER_PROGRESS); - context.setProgress(FETCHER_PROGRESS); + context.setProgressChanged(true); + updateProgress(); } - this.executor = taskRunnerContext.getTQueryEngine(). + this.executor = executionBlockContext.getTQueryEngine(). createPlan(context, plan); this.executor.init(); - while(!killed && executor.next() != null) { + while(!killed && !aborted && executor.next() != null) { } - } catch (Exception e) { + } catch (Throwable e) { error = e ; LOG.error(e.getMessage(), e); aborted = true; @@ -452,22 +443,20 @@ public class Task { executor.close(); reloadInputStats(); } catch (IOException e) { - e.printStackTrace(); + LOG.error(e); } this.executor = null; } - context.setProgress(1.0f); - taskRunnerContext.completedTasksNum.incrementAndGet(); + executionBlockContext.completedTasksNum.incrementAndGet(); context.getHashShuffleAppenderManager().finalizeTask(taskId); - + QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getQueryMasterStub(); if (killed || aborted) { context.setExecutorProgress(0.0f); - context.setProgress(0.0f); if(killed) { context.setState(TaskAttemptState.TA_KILLED); - masterProxy.statusUpdate(null, getReport(), NullCallback.get()); - taskRunnerContext.killedTasksNum.incrementAndGet(); + queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); + executionBlockContext.killedTasksNum.incrementAndGet(); } else { context.setState(TaskAttemptState.TA_FAILED); TaskFatalErrorReport.Builder errorBuilder = @@ -482,47 +471,31 @@ public class Task { errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); } - masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get()); - taskRunnerContext.failedTasksNum.incrementAndGet(); - } - - // stopping the status report - try { - reporter.stopCommunicationThread(); - } catch (InterruptedException e) { - LOG.warn(e); + queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); + executionBlockContext.failedTasksNum.incrementAndGet(); } - } else { // if successful context.setProgress(1.0f); context.setState(TaskAttemptState.TA_SUCCEEDED); - - // stopping the status report - try { - reporter.stopCommunicationThread(); - } catch (InterruptedException e) { - LOG.warn(e); - } + executionBlockContext.succeededTasksNum.incrementAndGet(); TaskCompletionReport report = getTaskCompletionReport(); - masterProxy.done(null, report, NullCallback.get()); - taskRunnerContext.succeededTasksNum.incrementAndGet(); + queryMasterStub.done(null, report, NullCallback.get()); } finishTime = System.currentTimeMillis(); LOG.info(context.getTaskId() + " completed. " + - "Worker's task counter - total:" + taskRunnerContext.completedTasksNum.intValue() + - ", succeeded: " + taskRunnerContext.succeededTasksNum.intValue() - + ", killed: " + taskRunnerContext.killedTasksNum.intValue() - + ", failed: " + taskRunnerContext.failedTasksNum.intValue()); + "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + + ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() + + ", killed: " + executionBlockContext.killedTasksNum.intValue() + + ", failed: " + executionBlockContext.failedTasksNum.intValue()); cleanupTask(); } } public void cleanupTask() { - taskRunnerContext.addTaskHistory(getId(), createTaskHistory()); - taskRunnerContext.getTasks().remove(getId()); - taskRunnerContext = null; + executionBlockContext.addTaskHistory(taskRunnerId, getId(), createTaskHistory()); + executionBlockContext.getTasks().remove(getId()); fetcherRunners.clear(); fetcherRunners = null; @@ -532,11 +505,8 @@ public class Task { executor = null; } } catch (IOException e) { - e.printStackTrace(); + LOG.fatal(e.getMessage(), e); } - plan = null; - context = null; - releaseChannelFactory(); } public TaskHistory createTaskHistory() { @@ -577,7 +547,7 @@ public class Task { taskHistory.setFinishedFetchCount(i); } } catch (Exception e) { - e.printStackTrace(); + LOG.warn(e.getMessage(), e); } return taskHistory; @@ -673,7 +643,11 @@ public class Task { @VisibleForTesting public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { - return ((float)(totalFetcher - remainFetcher)) / (float)totalFetcher * FETCHER_PROGRESS; + if (totalFetcher > 0) { + return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; + } else { + return 0.0f; + } } private synchronized void fetcherFinished(TaskAttemptContext ctx) { @@ -681,24 +655,15 @@ public class Task { if(fetcherSize == 0) { return; } - try { - int numRunningFetcher = (int)(ctx.getFetchLatch().getCount()) - 1; - if (numRunningFetcher == 0) { - context.setProgress(FETCHER_PROGRESS); - } else { - context.setProgress(adjustFetchProcess(fetcherSize, numRunningFetcher)); - } - } finally { - ctx.getFetchLatch().countDown(); - } - } + ctx.getFetchLatch().countDown(); - private void releaseChannelFactory(){ - if(channelFactory != null) { - channelFactory.shutdown(); - channelFactory.releaseExternalResources(); - channelFactory = null; + int remainFetcher = (int) ctx.getFetchLatch().getCount(); + if (remainFetcher == 0) { + context.setFetcherProgress(FETCHER_PROGRESS); + } else { + context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); + context.setProgressChanged(true); } } @@ -706,15 +671,9 @@ public class Task { List<FetchImpl> fetches) throws IOException { if (fetches.size() > 0) { - - releaseChannelFactory(); - - - int workerNum = ctx.getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM); - channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum); - Path inputDir = lDirAllocator. - getLocalPathToRead( - getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); + ClientSocketChannelFactory channelFactory = executionBlockContext.getShuffleChannelFactory(); + Path inputDir = executionBlockContext.getLocalDirAllocator(). + getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); File storeDir; int i = 0; @@ -739,91 +698,9 @@ public class Task { } } - protected class Reporter { - private QueryMasterProtocolService.Interface masterStub; - private Thread pingThread; - private AtomicBoolean stop = new AtomicBoolean(false); - private static final int PROGRESS_INTERVAL = 3000; - private static final int MAX_RETRIES = 3; - private QueryUnitAttemptId taskId; - - public Reporter(QueryUnitAttemptId taskId, QueryMasterProtocolService.Interface masterStub) { - this.taskId = taskId; - this.masterStub = masterStub; - } - - Runnable createReporterThread() { - - return new Runnable() { - int remainingRetries = MAX_RETRIES; - @Override - public void run() { - while (!stop.get() && !context.isStopped()) { - try { - if(executor != null && context.getProgress() < 1.0f) { - float progress = executor.getProgress(); - context.setExecutorProgress(progress); - } - } catch (Throwable t) { - LOG.error("Get progress error: " + t.getMessage(), t); - } - - try { - if (context.isPorgressChanged()) { - masterStub.statusUpdate(null, getReport(), NullCallback.get()); - } else { - masterStub.ping(null, taskId.getProto(), NullCallback.get()); - } - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - remainingRetries -=1; - if (remainingRetries == 0) { - ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); - LOG.warn("Last retry, exiting "); - throw new RuntimeException(t); - } - } finally { - if (!context.isStopped() && remainingRetries > 0) { - synchronized (pingThread) { - try { - pingThread.wait(PROGRESS_INTERVAL); - } catch (InterruptedException e) { - } - } - } - } - } - } - }; - } - - public void startCommunicationThread() { - if (pingThread == null) { - pingThread = new Thread(createReporterThread()); - pingThread.setName("communication thread"); - pingThread.start(); - } - } - - public void stopCommunicationThread() throws InterruptedException { - if(stop.getAndSet(true)){ - return; - } - - if (pingThread != null) { - // Intent of the lock is to not send an interupt in the middle of an - // umbilical.ping or umbilical.statusUpdate - synchronized(pingThread) { - //Interrupt if sleeping. Otherwise wait for the RPC call to return. - pingThread.notifyAll(); - } - } - } - } - public static Path getTaskAttemptDir(QueryUnitAttemptId quid) { Path workDir = - StorageUtil.concatPath(TaskRunner.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()), + StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()), String.valueOf(quid.getQueryUnitId().getId()), String.valueOf(quid.getId())); return workDir; http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index d27fd6d..422ec2b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -83,15 +83,15 @@ public class TaskAttemptContext { private Map<Integer, Long> partitionOutputVolume; private HashShuffleAppenderManager hashShuffleAppenderManager; - public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext, + public TaskAttemptContext(QueryContext queryContext, final ExecutionBlockContext executionBlockContext, final QueryUnitAttemptId queryId, final FragmentProto[] fragments, final Path workDir) { this.queryContext = queryContext; - if (workerContext != null) { // For unit tests - this.workerContext = workerContext; - this.sharedResource = workerContext.getSharedResource(queryId.getQueryUnitId().getExecutionBlockId()); + if (executionBlockContext != null) { // For unit tests + this.workerContext = executionBlockContext.getWorkerContext(); + this.sharedResource = executionBlockContext.getSharedResource(); } this.queryId = queryId; @@ -315,22 +315,45 @@ public class TaskAttemptContext { public float getProgress() { return this.progress; } - + public void setProgress(float progress) { float previousProgress = this.progress; - this.progress = progress; - progressChanged.set(previousProgress != progress); + + if (Float.isNaN(progress) || Float.isInfinite(progress)) { + this.progress = 0.0f; + } else { + this.progress = progress; + } + + if (previousProgress != progress) { + setProgressChanged(true); + } } - public boolean isPorgressChanged() { + public boolean isProgressChanged() { return progressChanged.get(); } + + public void setProgressChanged(boolean changed){ + progressChanged.set(changed); + } + public void setExecutorProgress(float executorProgress) { - float adjustProgress = executorProgress * (1 - fetcherProgress); - setProgress(fetcherProgress + adjustProgress); + if(Float.isNaN(executorProgress) || Float.isInfinite(executorProgress)){ + executorProgress = 0.0f; + } + + if (hasFetchPhase()) { + setProgress(fetcherProgress + (executorProgress * 0.5f)); + } else { + setProgress(executorProgress); + } } public void setFetcherProgress(float fetcherProgress) { + if(Float.isNaN(fetcherProgress) || Float.isInfinite(fetcherProgress)){ + fetcherProgress = 0.0f; + } this.fetcherProgress = fetcherProgress; } @@ -375,10 +398,6 @@ public class TaskAttemptContext { return queryContext; } - public WorkerContext getWorkContext() { - return workerContext; - } - public QueryUnitAttemptId getQueryId() { return queryId; } http://git-wip-us.apache.org/repos/asf/tajo/blob/15450e86/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 32cd4f5..ea8ed82 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -18,42 +18,26 @@ package org.apache.tajo.worker; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryId; import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryUnitRequestImpl; -import org.apache.tajo.engine.utils.TupleCache; -import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; -import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.rpc.CallFuture; -import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.util.TajoIdUtils; -import org.apache.tajo.worker.TajoWorker.WorkerContext; -import java.net.InetSocketAddress; -import java.util.Map; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.ipc.TajoWorkerProtocol.*; @@ -67,142 +51,78 @@ public class TaskRunner extends AbstractService { private TajoConf systemConf; private volatile boolean stopped = false; + private Path baseDirPath; - private ExecutionBlockId executionBlockId; - private QueryId queryId; private NodeId nodeId; private ContainerId containerId; - // Cluster Management - //private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master; - - // for temporal or intermediate files - private FileSystem localFS; - // for input files - private FileSystem defaultFS; - - private TajoQueryEngine queryEngine; - // for Fetcher private ExecutorService fetchLauncher; - // It keeps all of the query unit attempts while a TaskRunner is running. - private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId, Task>(); - - private LocalDirAllocator lDirAllocator; // A thread to receive each assigned query unit and execute the query unit private Thread taskLauncher; // Contains the object references related for TaskRunner - private TaskRunnerContext taskRunnerContext; - // for the doAs block - private UserGroupInformation taskOwner; - - // for the local temporal dir - private String baseDir; - private Path baseDirPath; - - private TaskRunnerManager taskRunnerManager; + private ExecutionBlockContext executionBlockContext; private long finishTime; - private RpcConnectionPool connPool; - - private InetSocketAddress qmMasterAddr; - private TaskRunnerHistory history; - public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) { + public TaskRunner(ExecutionBlockContext executionBlockContext, String[] args) { super(TaskRunner.class.getName()); - this.taskRunnerManager = taskRunnerManager; - this.connPool = RpcConnectionPool.getPool(conf); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + ThreadFactory fetcherFactory = builder.setNameFormat("Fetcher executor #%d").build(); + this.systemConf = executionBlockContext.getConf(); this.fetchLauncher = Executors.newFixedThreadPool( - conf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM)); + systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory); try { - final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]); - - LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR)); - LOG.info("Worker Local Dir: " + conf.getVar(ConfVars.WORKER_TEMPORAL_DIR)); - - UserGroupInformation.setConfiguration(conf); - // QueryBlockId from String // NodeId has a form of hostname:port. - NodeId nodeId = ConverterUtils.toNodeId(args[2]); + this.nodeId = ConverterUtils.toNodeId(args[2]); this.containerId = ConverterUtils.toContainerId(args[3]); // QueryMaster's address - String host = args[4]; - int port = Integer.parseInt(args[5]); - this.qmMasterAddr = NetUtils.createSocketAddrForHost(host, port); - - LOG.info("QueryMaster Address:" + qmMasterAddr); - // TODO - 'load credential' should be implemented - // Getting taskOwner - UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.USERNAME)); - //taskOwner.addToken(token); - - // initialize MasterWorkerProtocol as an actual task owner. -// this.client = -// taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() { -// @Override -// public AsyncRpcClient run() throws Exception { -// return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr); -// } -// }); -// this.master = client.getStub(); - - this.executionBlockId = executionBlockId; - this.queryId = executionBlockId.getQueryId(); - this.nodeId = nodeId; - this.taskOwner = taskOwner; - - this.taskRunnerContext = new TaskRunnerContext(); - this.history = new TaskRunnerHistory(containerId, executionBlockId); + //String host = args[4]; + //int port = Integer.parseInt(args[5]); + + this.executionBlockContext = executionBlockContext; + this.history = executionBlockContext.createTaskRunnerHistory(this); this.history.setState(getServiceState()); } catch (Exception e) { LOG.error(e.getMessage(), e); } } - protected void sendExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport reporter) throws Exception { - QueryMasterProtocol.QueryMasterProtocolService.Interface qmClientService = null; - NettyClientBase qmClient = null; - try { - qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true); - qmClientService = qmClient.getStub(); - qmClientService.doneExecutionBlock(null, reporter, NullCallback.get()); - } finally { - connPool.releaseConnection(qmClient); - } + // TODO this is expensive. we should change to unique id + public String getId() { + return getId(getContext().getExecutionBlockId(), containerId); } - public String getId() { - return getId(executionBlockId, containerId); + public NodeId getNodeId(){ + return nodeId; + } + + public ContainerId getContainerId(){ + return containerId; } public static String getId(ExecutionBlockId executionBlockId, ContainerId containerId) { return executionBlockId + "," + containerId; } - public static Path getBaseOutputDir(ExecutionBlockId executionBlockId){ - Path workDir = - StorageUtil.concatPath( - executionBlockId.getQueryId().toString(), - "output", - String.valueOf(executionBlockId.getId())); - return workDir; + public TaskRunnerHistory getHistory(){ + return history; } - public static Path getBaseInputDir(ExecutionBlockId executionBlockId) { - Path workDir = - StorageUtil.concatPath( - executionBlockId.getQueryId().toString(), - "in", - executionBlockId.toString()); - return workDir; + public Path getTaskBaseDir(){ + return baseDirPath; + } + + public ExecutorService getFetchLauncher() { + return fetchLauncher; } @Override @@ -210,27 +130,13 @@ public class TaskRunner extends AbstractService { this.systemConf = (TajoConf)conf; try { - // initialize DFS and LocalFileSystems - defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(conf); - localFS = FileSystem.getLocal(conf); - // the base dir for an output dir - baseDir = getBaseOutputDir(executionBlockId).toString(); - - // initialize LocalDirAllocator - lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); - - baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf)); - LOG.info("TaskRunner basedir is created (" + baseDir +")"); - - // Setup QueryEngine according to the query plan - // Here, we can setup row-based query engine or columnar query engine. - this.queryEngine = new TajoQueryEngine(systemConf); + baseDirPath = getContext().createBaseDir(); + LOG.info("TaskRunner basedir is created (" + baseDirPath +")"); } catch (Throwable t) { t.printStackTrace(); LOG.error(t); } - super.init(conf); this.history.setState(getServiceState()); } @@ -253,23 +159,10 @@ public class TaskRunner extends AbstractService { // If this flag become true, taskLauncher will be terminated. this.stopped = true; - // If TaskRunner is stopped, all running or pending tasks will be marked as failed. - for (Task task : tasks.values()) { - if (task.getStatus() == TaskAttemptState.TA_PENDING || - task.getStatus() == TaskAttemptState.TA_RUNNING) { - task.setState(TaskAttemptState.TA_FAILED); - task.abort(); - } - } - - tasks.clear(); fetchLauncher.shutdown(); fetchLauncher = null; - this.queryEngine = null; - - TupleCache.getInstance().removeBroadcastCache(executionBlockId); - LOG.info("Stop TaskRunner: " + executionBlockId); + LOG.info("Stop TaskRunner: " + getId()); synchronized (this) { notifyAll(); } @@ -281,71 +174,8 @@ public class TaskRunner extends AbstractService { return finishTime; } - public class TaskRunnerContext { - public AtomicInteger completedTasksNum = new AtomicInteger(); - public AtomicInteger succeededTasksNum = new AtomicInteger(); - public AtomicInteger killedTasksNum = new AtomicInteger(); - public AtomicInteger failedTasksNum = new AtomicInteger(); - - public TajoConf getConf() { - return systemConf; - } - - public String getNodeId() { - return nodeId.toString(); - } - - public FileSystem getLocalFS() { - return localFS; - } - - public FileSystem getDefaultFS() { - return defaultFS; - } - - public LocalDirAllocator getLocalDirAllocator() { - return lDirAllocator; - } - - public TajoQueryEngine getTQueryEngine() { - return queryEngine; - } - - public Map<QueryUnitAttemptId, Task> getTasks() { - return tasks; - } - - public Task getTask(QueryUnitAttemptId taskId) { - return tasks.get(taskId); - } - - public ExecutorService getFetchLauncher() { - return fetchLauncher; - } - - public Path getBaseDir() { - return baseDirPath; - } - - public ExecutionBlockId getExecutionBlockId() { - return executionBlockId; - } - - public void addTaskHistory(QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) { - history.addTaskHistory(quAttemptId, taskHistory); - } - - public TaskRunnerHistory getExcutionBlockHistory(){ - return history; - } - - public WorkerContext getWorkerContext() { - return taskRunnerManager.getWorkerContext(); - } - } - - public TaskRunnerContext getContext() { - return taskRunnerContext; + public ExecutionBlockContext getContext() { + return executionBlockContext; } static void fatalError(QueryMasterProtocolService.Interface qmClientService, @@ -372,17 +202,15 @@ public class TaskRunner extends AbstractService { QueryUnitRequestProto taskRequest = null; while(!stopped) { - NettyClientBase qmClient = null; - QueryMasterProtocolService.Interface qmClientService = null; + QueryMasterProtocolService.Interface qmClientService; try { - qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true); - qmClientService = qmClient.getStub(); + qmClientService = getContext().getQueryMasterStub(); if (callFuture == null) { callFuture = new CallFuture<QueryUnitRequestProto>(); LOG.info("Request GetTask: " + getId()); GetTaskRequestProto request = GetTaskRequestProto.newBuilder() - .setExecutionBlockId(executionBlockId.getProto()) + .setExecutionBlockId(getExecutionBlockId().getProto()) .setContainerId(((ContainerIdPBImpl) containerId).getProto()) .build(); @@ -414,17 +242,14 @@ public class TaskRunner extends AbstractService { if (taskRequest.getShouldDie()) { LOG.info("Received ShouldDie flag:" + getId()); stop(); - if(taskRunnerManager != null) { - //notify to TaskRunnerManager - taskRunnerManager.stopTask(getId()); - taskRunnerManager= null; - } + //notify to TaskRunnerManager + getContext().stopTaskRunner(getId()); } else { - taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc(); + getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc(); LOG.info("Accumulated Received Task: " + (++receivedNum)); QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId()); - if (tasks.containsKey(taskAttemptId)) { + if (getContext().getTasks().containsKey(taskAttemptId)) { LOG.error("Duplicate Task Attempt: " + taskAttemptId); fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId); continue; @@ -433,9 +258,9 @@ public class TaskRunner extends AbstractService { LOG.info("Initializing: " + taskAttemptId); Task task; try { - task = new Task(taskAttemptId, taskRunnerContext, qmClientService, + task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext, new QueryUnitRequestImpl(taskRequest)); - tasks.put(taskAttemptId, task); + getContext().getTasks().put(taskAttemptId, task); task.init(); if (task.hasFetchPhase()) { @@ -453,9 +278,7 @@ public class TaskRunner extends AbstractService { } } } catch (Throwable t) { - t.printStackTrace(); - } finally { - connPool.releaseConnection(qmClient); + LOG.fatal(t.getMessage(), t); } } } @@ -463,12 +286,6 @@ public class TaskRunner extends AbstractService { taskLauncher.start(); } catch (Throwable t) { LOG.fatal("Unhandled exception. Starting shutdown.", t); - } finally { - for (Task t : tasks.values()) { - if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) { - t.abort(); - } - } } } @@ -480,6 +297,6 @@ public class TaskRunner extends AbstractService { } public ExecutionBlockId getExecutionBlockId() { - return this.executionBlockId; + return getContext().getExecutionBlockId(); } }
