TAJO-1615: Implement TaskManager. (jinho) Closes #595
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/36da0dac Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/36da0dac Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/36da0dac Branch: refs/heads/index_support Commit: 36da0dac7d90fa12b3d9cf11fa5b561a586e60ff Parents: dfcf41d Author: Jinho Kim <[email protected]> Authored: Mon Jun 8 16:52:05 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Jun 8 16:52:05 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../physical/HashShuffleFileWriteExec.java | 2 +- .../engine/planner/physical/PhysicalExec.java | 2 +- .../tajo/master/rm/TajoResourceTracker.java | 4 +- .../tajo/util/TajoUncaughtExceptionHandler.java | 70 ++ .../apache/tajo/util/history/HistoryWriter.java | 2 +- .../tajo/worker/ExecutionBlockContext.java | 83 +- .../org/apache/tajo/worker/LegacyTaskImpl.java | 844 +++++++++++++++++++ .../apache/tajo/worker/NodeResourceManager.java | 45 +- .../apache/tajo/worker/NodeStatusUpdater.java | 34 +- .../java/org/apache/tajo/worker/TajoWorker.java | 47 +- .../tajo/worker/TajoWorkerManagerService.java | 9 +- .../main/java/org/apache/tajo/worker/Task.java | 834 +----------------- .../apache/tajo/worker/TaskAttemptContext.java | 61 +- .../org/apache/tajo/worker/TaskContainer.java | 85 ++ .../org/apache/tajo/worker/TaskExecutor.java | 194 +++++ .../java/org/apache/tajo/worker/TaskImpl.java | 838 ++++++++++++++++++ .../org/apache/tajo/worker/TaskManager.java | 180 ++++ .../java/org/apache/tajo/worker/TaskRunner.java | 10 +- .../apache/tajo/worker/TaskRunnerHistory.java | 1 + .../apache/tajo/worker/TaskRunnerManager.java | 12 +- .../worker/event/ExecutionBlockStartEvent.java | 35 + .../worker/event/ExecutionBlockStopEvent.java | 37 + .../worker/event/NodeResourceAllocateEvent.java | 2 +- .../event/NodeResourceDeallocateEvent.java | 2 +- .../tajo/worker/event/NodeResourceEvent.java | 35 + .../worker/event/NodeResourceManagerEvent.java | 34 - .../tajo/worker/event/NodeStatusEvent.java | 11 +- .../tajo/worker/event/TaskExecutorEvent.java | 44 + .../tajo/worker/event/TaskManagerEvent.java | 43 + .../tajo/worker/event/TaskRunnerEvent.java | 1 + .../tajo/worker/event/TaskRunnerStartEvent.java | 44 +- .../tajo/worker/event/TaskRunnerStopEvent.java | 1 + .../tajo/worker/event/TaskStartEvent.java | 44 + .../src/main/proto/TajoWorkerProtocol.proto | 1 + .../apache/tajo/querymaster/TestKillQuery.java | 135 ++- .../apache/tajo/worker/MockExecutionBlock.java | 42 + .../tajo/worker/MockNodeResourceManager.java | 96 +++ .../tajo/worker/MockNodeStatusUpdater.java | 4 +- .../apache/tajo/worker/MockTaskExecutor.java | 141 ++++ .../org/apache/tajo/worker/MockTaskManager.java | 59 ++ .../apache/tajo/worker/MockWorkerContext.java | 129 +++ .../org/apache/tajo/worker/TestFetcher.java | 14 +- .../tajo/worker/TestNodeResourceManager.java | 135 +-- .../tajo/worker/TestNodeStatusUpdater.java | 54 +- .../apache/tajo/worker/TestTaskExecutor.java | 330 ++++++++ .../org/apache/tajo/worker/TestTaskManager.java | 185 ++++ 47 files changed, 3904 insertions(+), 1113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 44a8939..066e086 100644 --- a/CHANGES +++ b/CHANGES @@ -333,6 +333,8 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1615: Implement TaskManager. (jinho) + TAJO-1599: Implement NodeResourceManager and Status updater. (jinho) TAJO-1613: Rename StorageManager to Tablespace. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index f1e2fe5..1a92a7a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -86,7 +86,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { HashShuffleAppender appender = appenderMap.get(partId); if (appender == null) { appender = hashShuffleAppenderManager.getAppender(context.getConf(), - context.getQueryId().getTaskId().getExecutionBlockId(), partId, meta, outSchema); + context.getTaskId().getTaskId().getExecutionBlockId(), partId, meta, outSchema); appenderMap.put(partId, appender); } return appender; http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java index de14c9a..87a19a9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java @@ -79,7 +79,7 @@ public abstract class PhysicalExec implements SchemaObject { } protected Path getExecutorTmpDir() { - return new Path(context.getQueryId().getTaskId().getExecutionBlockId().getQueryId().toString(), + return new Path(context.getTaskId().getTaskId().getExecutionBlockId().getQueryId().toString(), UUID.randomUUID().toString()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index af28886..2a18de7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -188,7 +188,9 @@ public class TajoResourceTracker extends AbstractService implements TajoResource public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request, RpcCallback<TajoResourceTrackerProtocol.NodeHeartbeatResponseProto> done) { //TODO implement with ResourceManager for scheduler - throw new RuntimeException(new ServiceException(new NotImplementedException().getMessage())); + TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.Builder + response = TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.newBuilder(); + done.run(response.setCommand(TajoResourceTrackerProtocol.ResponseCommand.NORMAL).build()); } private Worker createWorkerResource(NodeHeartbeat request) { http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java b/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java new file mode 100644 index 0000000..c424154 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/TajoUncaughtExceptionHandler.java @@ -0,0 +1,70 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tajo.util; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ShutdownHookManager; + +import java.lang.Thread.UncaughtExceptionHandler; + +/** + * This class is intended to be installed by calling + * {@link Thread#setDefaultUncaughtExceptionHandler(UncaughtExceptionHandler)} + * In the main entry point. It is intended to try and cleanly shut down + * programs using the Yarn Event framework. + * + * Note: Right now it only will shut down the program if a Error is caught, but + * not any other exception. Anything else is just logged. + * + * this is an implementation copied from YarnUncaughtExceptionHandler + */ +public class TajoUncaughtExceptionHandler implements UncaughtExceptionHandler { + private static final Log LOG = LogFactory.getLog(TajoUncaughtExceptionHandler.class); + + @Override + public void uncaughtException(Thread t, Throwable e) { + if(ShutdownHookManager.get().isShutdownInProgress()) { + LOG.error("Thread " + t + " threw an Throwable, but we are shutting " + + "down, so ignoring this", e); + } else if(e instanceof Error) { + try { + LOG.fatal("Thread " + t + " threw an Error.", e); + } catch (Throwable err) { + //We don't want to not exit because of an issue with logging + } + + if(e instanceof OutOfMemoryError) { + //After catching an OOM java says it is undefined behavior, so don't + //even try to clean up or we can get stuck on shutdown. + try { + System.err.println("Halting due to Out Of Memory Error..."); + } catch (Throwable err) { + //Again we done want to exit because of logging issues. + } + ExitUtil.halt(-1); + } else { + //ExitUtil.terminate(-1); + } + } else { + LOG.error("Thread " + t + " threw an Exception.", e); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index e8ba304..daced3e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -217,7 +217,7 @@ public class HistoryWriter extends AbstractService { public void run() { LOG.info("HistoryWriter_" + processName + " started."); SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); - while (!stopped.get()) { + while (!stopped.get() && !Thread.interrupted()) { List<WriterFuture<WriterHolder>> histories = Lists.newArrayList(); try { http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 0cc3304..9e4a60f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -30,6 +30,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TajoProtos; import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; @@ -45,7 +46,6 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -61,10 +61,10 @@ public class ExecutionBlockContext { private static final Log LOG = LogFactory.getLog(ExecutionBlockContext.class); private TaskRunnerManager manager; - public AtomicInteger completedTasksNum = new AtomicInteger(); - public AtomicInteger succeededTasksNum = new AtomicInteger(); - public AtomicInteger killedTasksNum = new AtomicInteger(); - public AtomicInteger failedTasksNum = new AtomicInteger(); + protected AtomicInteger completedTasksNum = new AtomicInteger(); + protected AtomicInteger succeededTasksNum = new AtomicInteger(); + protected AtomicInteger killedTasksNum = new AtomicInteger(); + protected AtomicInteger failedTasksNum = new AtomicInteger(); private FileSystem localFS; // for input files @@ -95,17 +95,18 @@ public class ExecutionBlockContext { // It keeps all of the query unit attempts while a TaskRunner is running. private final ConcurrentMap<TaskAttemptId, Task> tasks = Maps.newConcurrentMap(); + @Deprecated private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap(); - public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerContext, - TaskRunnerManager manager, QueryContext queryContext, String plan, - ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster, - PlanProto.ShuffleType shuffleType) throws Throwable { + private final Map<TaskId, TaskHistory> taskHistories = Maps.newTreeMap(); + + public ExecutionBlockContext(TajoWorker.WorkerContext workerContext, + TaskRunnerManager manager, RunExecutionBlockRequestProto request) throws IOException { this.manager = manager; - this.executionBlockId = executionBlockId; + this.executionBlockId = new ExecutionBlockId(request.getExecutionBlockId()); this.connManager = RpcClientManager.getInstance(); - this.queryMaster = queryMaster; - this.systemConf = conf; + this.queryMaster = new WorkerConnectionInfo(request.getQueryMaster()); + this.systemConf = workerContext.getConf(); this.reporter = new Reporter(); this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); this.localFS = FileSystem.getLocal(systemConf); @@ -113,11 +114,11 @@ public class ExecutionBlockContext { // Setup QueryEngine according to the query plan // Here, we can setup row-based query engine or columnar query engine. this.queryEngine = new TajoQueryEngine(systemConf); - this.queryContext = queryContext; - this.plan = plan; + this.queryContext = new QueryContext(workerContext.getConf(), request.getQueryContext()); + this.plan = request.getPlanJson(); this.resource = new ExecutionBlockSharedResource(); this.workerContext = workerContext; - this.shuffleType = shuffleType; + this.shuffleType = request.getShuffleType(); } public void init() throws Throwable { @@ -131,7 +132,8 @@ public class ExecutionBlockContext { UserGroupInformation.setConfiguration(systemConf); // TODO - 'load credential' should be implemented // Getting taskOwner - UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME)); + UserGroupInformation + taskOwner = UserGroupInformation.createRemoteUser(systemConf.getVar(TajoConf.ConfVars.USERNAME)); // initialize DFS and LocalFileSystems this.taskOwner = taskOwner; @@ -144,7 +146,7 @@ public class ExecutionBlockContext { try { getStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get()); } catch (Throwable t) { - //ignore + LOG.error(t); } throw e; } @@ -183,9 +185,9 @@ public class ExecutionBlockContext { // If ExecutionBlock is stopped, all running or pending tasks will be marked as failed. for (Task task : tasks.values()) { - if (task.getStatus() == TajoProtos.TaskAttemptState.TA_PENDING || - task.getStatus() == TajoProtos.TaskAttemptState.TA_RUNNING) { - task.setState(TajoProtos.TaskAttemptState.TA_FAILED); + if (task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_PENDING || + task.getTaskContext().getState() == TajoProtos.TaskAttemptState.TA_RUNNING) { + try{ task.abort(); } catch (Throwable e){ @@ -194,7 +196,7 @@ public class ExecutionBlockContext { } } tasks.clear(); - + taskHistories.clear(); resource.release(); RpcClientManager.cleanup(client); } @@ -253,18 +255,40 @@ public class ExecutionBlockContext { return tasks.get(taskAttemptId); } + @Deprecated public void stopTaskRunner(String id){ manager.stopTaskRunner(id); } + @Deprecated public TaskRunner getTaskRunner(String taskRunnerId){ return manager.getTaskRunner(taskRunnerId); } + @Deprecated public void addTaskHistory(String taskRunnerId, TaskAttemptId quAttemptId, TaskHistory taskHistory) { histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory); } + public void addTaskHistory(TaskId taskId, TaskHistory taskHistory) { + taskHistories.put(taskId, taskHistory); + } + + public Map<TaskId, TaskHistory> getTaskHistories() { + return taskHistories; + } + + public void fatalError(TaskAttemptId taskAttemptId, String message) { + if (message == null) { + message = "No error message"; + } + TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder() + .setId(taskAttemptId.getProto()) + .setErrorMessage(message); + + getStub().fatalError(null, builder.build(), NullCallback.get()); + } + public TaskRunnerHistory createTaskRunnerHistory(TaskRunner runner){ histories.putIfAbsent(runner.getId(), new TaskRunnerHistory(runner.getContainerId(), executionBlockId)); return histories.get(runner.getId()); @@ -355,7 +379,6 @@ public class ExecutionBlockContext { protected class Reporter { private Thread reporterThread; - private AtomicBoolean reporterStop = new AtomicBoolean(); private static final int PROGRESS_INTERVAL = 1000; private static final int MAX_RETRIES = 10; @@ -374,7 +397,7 @@ public class ExecutionBlockContext { int remainingRetries = MAX_RETRIES; @Override public void run() { - while (!reporterStop.get() && !Thread.interrupted()) { + while (!isStopped() && !Thread.interrupted()) { try { Interface masterStub = getStub(); @@ -384,13 +407,11 @@ public class ExecutionBlockContext { } else { for (Task task : new ArrayList<Task>(tasks.values())){ - if (task.isRunning() && task.isProgressChanged()) { - task.updateProgress(); + if (task.getTaskContext().getState() == + TajoProtos.TaskAttemptState.TA_RUNNING && task.isProgressChanged()) { masterStub.statusUpdate(null, task.getReport(), NullCallback.get()); - task.getContext().setProgressChanged(false); - } else { - task.updateProgress(); } + task.updateProgress(); } } } catch (Throwable t) { @@ -402,7 +423,7 @@ public class ExecutionBlockContext { throw new RuntimeException(t); } } finally { - if (remainingRetries > 0 && !reporterStop.get()) { + if (remainingRetries > 0 && !isStopped()) { synchronized (reporterThread) { try { reporterThread.wait(PROGRESS_INTERVAL); @@ -417,10 +438,6 @@ public class ExecutionBlockContext { } public void stop() throws InterruptedException { - if (reporterStop.getAndSet(true)) { - return; - } - if (reporterThread != null) { // Intent of the lock is to not send an interupt in the middle of an // umbilical.ping or umbilical.statusUpdate http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java new file mode 100644 index 0000000..0721ef1 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java @@ -0,0 +1,844 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.netty.handler.codec.http.QueryStringDecoder; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.planner.physical.PhysicalExec; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.engine.query.TaskRequest; +import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.ipc.TajoWorkerProtocol.*; +import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.function.python.TajoScriptEngine; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.serder.LogicalNodeDeserializer; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.pullserver.TajoPullServerService; +import org.apache.tajo.pullserver.retriever.FileChunk; +import org.apache.tajo.rpc.NullCallback; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.NetUtils; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.URI; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; + +@Deprecated +public class LegacyTaskImpl implements Task { + private static final Log LOG = LogFactory.getLog(LegacyTaskImpl.class); + private static final float FETCHER_PROGRESS = 0.5f; + + private final TajoConf systemConf; + private final QueryContext queryContext; + private final ExecutionBlockContext executionBlockContext; + private final String taskRunnerId; + + private final Path taskDir; + private final TaskRequest request; + private TaskAttemptContext context; + private List<Fetcher> fetcherRunners; + private LogicalNode plan; + private final Map<String, TableDesc> descs = Maps.newHashMap(); + private PhysicalExec executor; + private boolean interQuery; + private Path inputTableBaseDir; + + private long startTime; + private long finishTime; + + private final TableStats inputStats; + private List<FileChunk> localChunks; + + // TODO - to be refactored + private ShuffleType shuffleType = null; + private Schema finalSchema = null; + private TupleComparator sortComp = null; + + public LegacyTaskImpl(String taskRunnerId, + Path baseDir, + TaskAttemptId taskId, + final ExecutionBlockContext executionBlockContext, + final TaskRequest request) throws IOException { + this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request); + } + + public LegacyTaskImpl(String taskRunnerId, + Path baseDir, + TaskAttemptId taskId, + TajoConf conf, + final ExecutionBlockContext executionBlockContext, + final TaskRequest request) throws IOException { + this.taskRunnerId = taskRunnerId; + this.request = request; + + this.systemConf = conf; + this.queryContext = request.getQueryContext(systemConf); + this.executionBlockContext = executionBlockContext; + this.taskDir = StorageUtil.concatPath(baseDir, + taskId.getTaskId().getId() + "_" + taskId.getId()); + + this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId, + request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir); + this.context.setDataChannel(request.getDataChannel()); + this.context.setEnforcer(request.getEnforcer()); + this.context.setState(TaskAttemptState.TA_PENDING); + this.inputStats = new TableStats(); + this.fetcherRunners = Lists.newArrayList(); + } + + public void initPlan() throws IOException { + plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan()); + LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN); + if (scanNode != null) { + for (LogicalNode node : scanNode) { + ScanNode scan = (ScanNode) node; + descs.put(scan.getCanonicalName(), scan.getTableDesc()); + } + } + + LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN); + if (partitionScanNode != null) { + for (LogicalNode node : partitionScanNode) { + PartitionedTableScanNode scan = (PartitionedTableScanNode) node; + descs.put(scan.getCanonicalName(), scan.getTableDesc()); + } + } + + interQuery = request.getProto().getInterQuery(); + if (interQuery) { + context.setInterQuery(); + this.shuffleType = context.getDataChannel().getShuffleType(); + + if (shuffleType == ShuffleType.RANGE_SHUFFLE) { + SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT); + this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()); + this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); + } + } else { + Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf)) + .getAppenderFilePath(getId(), queryContext.getStagingDir()); + LOG.info("Output File Path: " + outFilePath); + context.setOutputPath(outFilePath); + } + + this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>()); + LOG.info("=================================="); + LOG.info("* Stage " + request.getId() + " is initialized"); + LOG.info("* InterQuery: " + interQuery + + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") + + ", Fragments (num: " + request.getFragments().size() + ")" + + ", Fetches (total:" + request.getFetches().size() + ") :"); + + if(LOG.isDebugEnabled()) { + for (FetchImpl f : request.getFetches()) { + LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs()); + } + } + LOG.info("* Local task dir: " + taskDir); + if(LOG.isDebugEnabled()) { + LOG.debug("* plan:\n"); + LOG.debug(plan.toString()); + } + LOG.info("=================================="); + } + + private void startScriptExecutors() throws IOException { + for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { + executor.start(systemConf); + } + } + + private void stopScriptExecutors() { + for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) { + executor.shutdown(); + } + } + + @Override + public void init() throws IOException { + initPlan(); + startScriptExecutors(); + + if (context.getState() == TaskAttemptState.TA_PENDING) { + // initialize a task temporal dir + FileSystem localFS = executionBlockContext.getLocalFS(); + localFS.mkdirs(taskDir); + + if (request.getFetches().size() > 0) { + inputTableBaseDir = localFS.makeQualified( + executionBlockContext.getLocalDirAllocator().getLocalPathForWrite( + getTaskAttemptDir(context.getTaskId()).toString(), systemConf)); + localFS.mkdirs(inputTableBaseDir); + Path tableDir; + for (String inputTable : context.getInputTables()) { + tableDir = new Path(inputTableBaseDir, inputTable); + if (!localFS.exists(tableDir)) { + LOG.info("the directory is created " + tableDir.toUri()); + localFS.mkdirs(tableDir); + } + } + } + // for localizing the intermediate data + fetcherRunners.addAll(getFetchRunners(context, request.getFetches())); + } + } + + private TaskAttemptId getId() { + return context.getTaskId(); + } + + public String toString() { + return "queryId: " + this.getId() + " status: " + context.getState(); + } + + @Override + public boolean isStopped() { + return context.isStopped(); + } + + @Override + public TaskAttemptContext getTaskContext() { + return context; + } + + @Override + public ExecutionBlockContext getExecutionBlockContext() { + return executionBlockContext; + } + + @Override + public boolean hasFetchPhase() { + return fetcherRunners.size() > 0; + } + + @Override + public void fetch() { + ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher(); + for (Fetcher f : fetcherRunners) { + executorService.submit(new FetchRunner(context, f)); + } + } + + @Override + public void kill() { + stopScriptExecutors(); + context.setState(TaskAttemptState.TA_KILLED); + context.stop(); + } + + @Override + public void abort() { + stopScriptExecutors(); + context.setState(TajoProtos.TaskAttemptState.TA_FAILED); + context.stop(); + } + + @Override + public TaskStatusProto getReport() { + TaskStatusProto.Builder builder = TaskStatusProto.newBuilder(); + builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort()); + builder.setId(context.getTaskId().getProto()) + .setProgress(context.getProgress()) + .setState(context.getState()); + + builder.setInputStats(reloadInputStats()); + + if (context.getResultStats() != null) { + builder.setResultStats(context.getResultStats().getProto()); + } + return builder.build(); + } + + @Override + public boolean isProgressChanged() { + return context.isProgressChanged(); + } + + @Override + public void updateProgress() { + if(context != null && context.isStopped()){ + return; + } + + if (executor != null && context.getProgress() < 1.0f) { + context.setExecutorProgress(executor.getProgress()); + } + } + + private CatalogProtos.TableStatsProto reloadInputStats() { + synchronized(inputStats) { + if (this.executor == null) { + return inputStats.getProto(); + } + + TableStats executorInputStats = this.executor.getInputStats(); + + if (executorInputStats != null) { + inputStats.setValues(executorInputStats); + } + return inputStats.getProto(); + } + } + + private TaskCompletionReport getTaskCompletionReport() { + TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder(); + builder.setId(context.getTaskId().getProto()); + + builder.setInputStats(reloadInputStats()); + + if (context.hasResultStats()) { + builder.setResultStats(context.getResultStats().getProto()); + } else { + builder.setResultStats(new TableStats().getProto()); + } + + Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs(); + if (it.hasNext()) { + do { + Entry<Integer, String> entry = it.next(); + ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder(); + part.setPartId(entry.getKey()); + + // Set output volume + if (context.getPartitionOutputVolume() != null) { + for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) { + if (entry.getKey().equals(e.getKey())) { + part.setVolume(e.getValue().longValue()); + break; + } + } + } + + builder.addShuffleFileOutputs(part.build()); + } while (it.hasNext()); + } + + return builder.build(); + } + + private void waitForFetch() throws InterruptedException, IOException { + context.getFetchLatch().await(); + LOG.info(context.getTaskId() + " All fetches are done!"); + Collection<String> inputs = Lists.newArrayList(context.getInputTables()); + + // Get all broadcasted tables + Set<String> broadcastTableNames = new HashSet<String>(); + List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST); + if (broadcasts != null) { + for (EnforceProperty eachBroadcast : broadcasts) { + broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName()); + } + } + + // localize the fetched data and skip the broadcast table + for (String inputTable: inputs) { + if (broadcastTableNames.contains(inputTable)) { + continue; + } + File tableDir = new File(context.getFetchIn(), inputTable); + FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta()); + context.updateAssignedFragments(inputTable, frags); + } + } + + @Override + public void run() throws Exception { + startTime = System.currentTimeMillis(); + Throwable error = null; + try { + if(!context.isStopped()) { + context.setState(TaskAttemptState.TA_RUNNING); + if (context.hasFetchPhase()) { + // If the fetch is still in progress, the query unit must wait for + // complete. + waitForFetch(); + context.setFetcherProgress(FETCHER_PROGRESS); + updateProgress(); + } + + this.executor = executionBlockContext.getTQueryEngine(). + createPlan(context, plan); + this.executor.init(); + + while(!context.isStopped() && executor.next() != null) { + } + } + } catch (Throwable e) { + error = e ; + LOG.error(e.getMessage(), e); + stopScriptExecutors(); + context.stop(); + } finally { + if (executor != null) { + try { + executor.close(); + reloadInputStats(); + } catch (IOException e) { + LOG.error(e, e); + } + this.executor = null; + } + + executionBlockContext.completedTasksNum.incrementAndGet(); + context.getHashShuffleAppenderManager().finalizeTask(getId()); + + QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub(); + if (context.isStopped()) { + context.setExecutorProgress(0.0f); + + if (context.getState() == TaskAttemptState.TA_KILLED) { + queryMasterStub.statusUpdate(null, getReport(), NullCallback.get()); + executionBlockContext.killedTasksNum.incrementAndGet(); + } else { + context.setState(TaskAttemptState.TA_FAILED); + TaskFatalErrorReport.Builder errorBuilder = + TaskFatalErrorReport.newBuilder() + .setId(getId().getProto()); + if (error != null) { + if (error.getMessage() == null) { + errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); + } else { + errorBuilder.setErrorMessage(error.getMessage()); + } + errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); + } + + queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); + executionBlockContext.failedTasksNum.incrementAndGet(); + } + } else { + // if successful + context.stop(); + context.setProgress(1.0f); + context.setState(TaskAttemptState.TA_SUCCEEDED); + executionBlockContext.succeededTasksNum.incrementAndGet(); + + TaskCompletionReport report = getTaskCompletionReport(); + queryMasterStub.done(null, report, NullCallback.get()); + } + finishTime = System.currentTimeMillis(); + LOG.info(context.getTaskId() + " completed. " + + "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() + + ", succeeded: " + executionBlockContext.succeededTasksNum.intValue() + + ", killed: " + executionBlockContext.killedTasksNum.intValue() + + ", failed: " + executionBlockContext.failedTasksNum.intValue()); + } + } + + @Override + public void cleanup() { + TaskHistory taskHistory = createTaskHistory(); + executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory); + executionBlockContext.getTasks().remove(getId()); + + fetcherRunners.clear(); + fetcherRunners = null; + try { + if(executor != null) { + executor.close(); + executor = null; + } + } catch (IOException e) { + LOG.fatal(e.getMessage(), e); + } + + executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory); + stopScriptExecutors(); + } + + public TaskHistory createTaskHistory() { + TaskHistory taskHistory = null; + try { + taskHistory = new TaskHistory(context.getTaskId(), context.getState(), context.getProgress(), + startTime, finishTime, reloadInputStats()); + + if (context.getOutputPath() != null) { + taskHistory.setOutputPath(context.getOutputPath().toString()); + } + + if (context.getWorkDir() != null) { + taskHistory.setWorkingPath(context.getWorkDir().toString()); + } + + if (context.getResultStats() != null) { + taskHistory.setOutputStats(context.getResultStats().getProto()); + } + + if (hasFetchPhase()) { + taskHistory.setTotalFetchCount(fetcherRunners.size()); + int i = 0; + FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder(); + for (Fetcher fetcher : fetcherRunners) { + // TODO store the fetcher histories + if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { + builder.setStartTime(fetcher.getStartTime()); + builder.setFinishTime(fetcher.getFinishTime()); + builder.setFileLength(fetcher.getFileLen()); + builder.setMessageReceivedCount(fetcher.getMessageReceiveCount()); + builder.setState(fetcher.getState()); + + taskHistory.addFetcherHistory(builder.build()); + } + if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++; + } + taskHistory.setFinishedFetchCount(i); + } + } catch (Exception e) { + LOG.warn(e.getMessage(), e); + } + + return taskHistory; + } + + public int hashCode() { + return context.hashCode(); + } + + public boolean equals(Object obj) { + if (obj instanceof LegacyTaskImpl) { + LegacyTaskImpl other = (LegacyTaskImpl) obj; + return this.context.equals(other.context); + } + return false; + } + + private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta) + throws IOException { + Configuration c = new Configuration(systemConf); + c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///"); + FileSystem fs = FileSystem.get(c); + Path tablePath = new Path(file.getAbsolutePath()); + + List<FileFragment> listTablets = new ArrayList<FileFragment>(); + FileFragment tablet; + + FileStatus[] fileLists = fs.listStatus(tablePath); + for (FileStatus f : fileLists) { + if (f.getLen() == 0) { + continue; + } + tablet = new FileFragment(name, f.getPath(), 0l, f.getLen()); + listTablets.add(tablet); + } + + // Special treatment for locally pseudo fetched chunks + synchronized (localChunks) { + for (FileChunk chunk : localChunks) { + if (name.equals(chunk.getEbId())) { + tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); + listTablets.add(tablet); + LOG.info("One local chunk is added to listTablets"); + } + } + } + + FileFragment[] tablets = new FileFragment[listTablets.size()]; + listTablets.toArray(tablets); + + return tablets; + } + + private class FetchRunner implements Runnable { + private final TaskAttemptContext ctx; + private final Fetcher fetcher; + private int maxRetryNum; + + public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { + this.ctx = ctx; + this.fetcher = fetcher; + this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); + } + + @Override + public void run() { + int retryNum = 0; + int retryWaitTime = 1000; //sec + + try { // for releasing fetch latch + while(!context.isStopped() && retryNum < maxRetryNum) { + if (retryNum > 0) { + try { + Thread.sleep(retryWaitTime); + retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds + } catch (InterruptedException e) { + LOG.error(e); + } + LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")"); + } + try { + FileChunk fetched = fetcher.get(); + if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null + && fetched.getFile() != null) { + if (fetched.fromRemote() == false) { + localChunks.add(fetched); + LOG.info("Add a new FileChunk to local chunk list"); + } + break; + } + } catch (Throwable e) { + LOG.error("Fetch failed: " + fetcher.getURI(), e); + } + retryNum++; + } + } finally { + if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ + fetcherFinished(ctx); + } else { + if (retryNum == maxRetryNum) { + LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); + } + stopScriptExecutors(); + context.stop(); // retry task + ctx.getFetchLatch().countDown(); + } + } + } + } + + @VisibleForTesting + public static float adjustFetchProcess(int totalFetcher, int remainFetcher) { + if (totalFetcher > 0) { + return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS; + } else { + return 0.0f; + } + } + + private synchronized void fetcherFinished(TaskAttemptContext ctx) { + int fetcherSize = fetcherRunners.size(); + if(fetcherSize == 0) { + return; + } + + ctx.getFetchLatch().countDown(); + + int remainFetcher = (int) ctx.getFetchLatch().getCount(); + if (remainFetcher == 0) { + context.setFetcherProgress(FETCHER_PROGRESS); + } else { + context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher)); + } + } + + private List<Fetcher> getFetchRunners(TaskAttemptContext ctx, + List<FetchImpl> fetches) throws IOException { + + if (fetches.size() > 0) { + Path inputDir = executionBlockContext.getLocalDirAllocator(). + getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); + + int i = 0; + File storeDir; + File defaultStoreFile; + FileChunk storeChunk = null; + List<Fetcher> runnerList = Lists.newArrayList(); + + for (FetchImpl f : fetches) { + storeDir = new File(inputDir.toString(), f.getName()); + if (!storeDir.exists()) { + storeDir.mkdirs(); + } + + for (URI uri : f.getURIs()) { + defaultStoreFile = new File(storeDir, "in_" + i); + InetAddress address = InetAddress.getByName(uri.getHost()); + + WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); + if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { + boolean hasError = false; + try { + LOG.info("Try to get local file chunk at local host"); + storeChunk = getLocalStoredFileChunk(uri, systemConf); + } catch (Throwable t) { + hasError = true; + } + + // When a range request is out of range, storeChunk will be NULL. This case is normal state. + // So, we should skip and don't need to create storeChunk. + if (storeChunk == null && !hasError) { + continue; + } + + if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 + && hasError == false) { + storeChunk.setFromRemote(false); + } else { + storeChunk = new FileChunk(defaultStoreFile, 0, -1); + storeChunk.setFromRemote(true); + } + } else { + storeChunk = new FileChunk(defaultStoreFile, 0, -1); + storeChunk.setFromRemote(true); + } + + // If we decide that intermediate data should be really fetched from a remote host, storeChunk + // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it + storeChunk.setEbId(f.getName()); + Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); + LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); + runnerList.add(fetcher); + i++; + } + } + ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); + return runnerList; + } else { + return Lists.newArrayList(); + } + } + + private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { + // Parse the URI + LOG.info("getLocalStoredFileChunk starts"); + final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); + final List<String> types = params.get("type"); + final List<String> qids = params.get("qid"); + final List<String> taskIdList = params.get("ta"); + final List<String> stageIds = params.get("sid"); + final List<String> partIds = params.get("p"); + final List<String> offsetList = params.get("offset"); + final List<String> lengthList = params.get("length"); + + if (types == null || stageIds == null || qids == null || partIds == null) { + LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); + return null; + } + + if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { + LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); + return null; + } + + String queryId = qids.get(0); + String shuffleType = types.get(0); + String sid = stageIds.get(0); + String partId = partIds.get(0); + + if (shuffleType.equals("r") && taskIdList == null) { + LOG.error("Invalid URI - For range shuffle, taskId is required"); + return null; + } + List<String> taskIds = splitMaps(taskIdList); + + FileChunk chunk = null; + long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; + long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; + + LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId + + ", taskIds=" + taskIdList); + + // The working directory of Tajo worker for each query, including stage + String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; + + // If the stage requires a range shuffle + if (shuffleType.equals("r")) { + String ta = taskIds.get(0); + if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { + LOG.warn("Range shuffle - file not exist"); + return null; + } + Path path = executionBlockContext.getLocalFS().makeQualified( + executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); + String startKey = params.get("start").get(0); + String endKey = params.get("end").get(0); + boolean last = params.get("final") != null; + + try { + chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last); + } catch (Throwable t) { + LOG.error("getFileChunks() throws exception"); + return null; + } + + // If the stage requires a hash shuffle or a scattered hash shuffle + } else if (shuffleType.equals("h") || shuffleType.equals("s")) { + int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); + String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; + if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { + LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); + return null; + } + Path path = executionBlockContext.getLocalFS().makeQualified( + executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); + File file = new File(path.toUri()); + long startPos = (offset >= 0 && length >= 0) ? offset : 0; + long readLen = (offset >= 0 && length >= 0) ? length : file.length(); + + if (startPos >= file.length()) { + LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]"); + return null; + } + chunk = new FileChunk(file, startPos, readLen); + + } else { + LOG.error("Unknown shuffle type"); + return null; + } + + return chunk; + } + + private List<String> splitMaps(List<String> mapq) { + if (null == mapq) { + return null; + } + final List<String> ret = new ArrayList<String>(); + for (String s : mapq) { + Collections.addAll(ret, s.split(",")); + } + return ret; + } + + public static Path getTaskAttemptDir(TaskAttemptId quid) { + Path workDir = + StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), + String.valueOf(quid.getTaskId().getId()), + String.valueOf(quid.getId())); + return workDir; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java index 20eec6b..e763d13 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java @@ -30,27 +30,23 @@ import org.apache.tajo.resource.NodeResources; import org.apache.tajo.storage.DiskUtil; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.worker.event.NodeResourceAllocateEvent; -import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; -import org.apache.tajo.worker.event.NodeResourceManagerEvent; -import org.apache.tajo.worker.event.NodeStatusEvent; - -import java.util.concurrent.atomic.AtomicInteger; +import org.apache.tajo.worker.event.*; import static org.apache.tajo.ipc.TajoWorkerProtocol.*; -public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceManagerEvent> { +public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceEvent> { private static final Log LOG = LogFactory.getLog(NodeResourceManager.class); private final Dispatcher dispatcher; + private final EventHandler taskEventHandler; private NodeResource totalResource; private NodeResource availableResource; - private AtomicInteger allocatedSize; private TajoConf tajoConf; - public NodeResourceManager(Dispatcher dispatcher){ + public NodeResourceManager(Dispatcher dispatcher, EventHandler taskEventHandler) { super(NodeResourceManager.class.getName()); this.dispatcher = dispatcher; + this.taskEventHandler = taskEventHandler; } @Override @@ -61,14 +57,14 @@ public class NodeResourceManager extends AbstractService implements EventHandler this.tajoConf = (TajoConf)conf; this.totalResource = createWorkerResource(tajoConf); this.availableResource = NodeResources.clone(totalResource); - this.dispatcher.register(NodeResourceManagerEvent.EventType.class, this); - this.allocatedSize = new AtomicInteger(); + this.dispatcher.register(NodeResourceEvent.EventType.class, this); + super.serviceInit(conf); LOG.info("Initialized NodeResourceManager for " + totalResource); } @Override - public void handle(NodeResourceManagerEvent event) { + public void handle(NodeResourceEvent event) { if (event instanceof NodeResourceAllocateEvent) { NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event; @@ -76,22 +72,27 @@ public class NodeResourceManager extends AbstractService implements EventHandler for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) { NodeResource resource = new NodeResource(request.getResource()); if (allocate(resource)) { - allocatedSize.incrementAndGet(); - //TODO send task event to taskExecutor + if(allocateEvent.getRequest().hasExecutionBlockRequest()){ + //send ExecutionBlock start event to TaskManager + startExecutionBlock(allocateEvent.getRequest().getExecutionBlockRequest()); + } + + //send task start event to TaskExecutor + startTask(request.getTaskRequest(), resource); } else { + // reject the exceeded requests response.addCancellationTask(request); } } allocateEvent.getCallback().run(response.build()); } else if (event instanceof NodeResourceDeallocateEvent) { - allocatedSize.decrementAndGet(); NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event; release(deallocateEvent.getResource()); // send current resource to ResourceTracker getDispatcher().getEventHandler().handle( - new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, getAvailableResource())); + new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE)); } } @@ -107,10 +108,6 @@ public class NodeResourceManager extends AbstractService implements EventHandler return availableResource; } - public int getAllocatedSize() { - return allocatedSize.get(); - } - private boolean allocate(NodeResource resource) { //TODO consider the jvm free memory if (NodeResources.fitsIn(resource, availableResource)) { @@ -120,6 +117,14 @@ public class NodeResourceManager extends AbstractService implements EventHandler return false; } + protected void startExecutionBlock(RunExecutionBlockRequestProto request) { + taskEventHandler.handle(new ExecutionBlockStartEvent(request)); + } + + protected void startTask(TaskRequestProto request, NodeResource resource) { + taskEventHandler.handle(new TaskStartEvent(request, resource)); + } + private void release(NodeResource resource) { NodeResources.addTo(availableResource, resource); } http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java index 84ac419..d13cd50 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java @@ -57,16 +57,16 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N private volatile boolean isStopped; private volatile long heartBeatInterval; private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue; - private final WorkerConnectionInfo connectionInfo; + private final TajoWorker.WorkerContext workerContext; private final NodeResourceManager nodeResourceManager; private AsyncRpcClient rmClient; private ServiceTracker serviceTracker; private TajoResourceTrackerProtocolService.Interface resourceTracker; private int queueingLimit; - public NodeStatusUpdater(WorkerConnectionInfo connectionInfo, NodeResourceManager resourceManager) { + public NodeStatusUpdater(TajoWorker.WorkerContext workerContext, NodeResourceManager resourceManager) { super(NodeStatusUpdater.class.getSimpleName()); - this.connectionInfo = connectionInfo; + this.workerContext = workerContext; this.nodeResourceManager = resourceManager; } @@ -99,7 +99,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N this.isStopped = true; synchronized (updaterThread) { - updaterThread.notifyAll(); + updaterThread.interrupt(); + updaterThread.join(); } super.serviceStop(); LOG.info("NodeStatusUpdater stopped."); @@ -107,14 +108,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N @Override public void handle(NodeStatusEvent event) { - switch (event.getType()) { - case REPORT_RESOURCE: - heartBeatRequestQueue.add(event); //batch report to ResourceTracker - break; - case FLUSH_REPORTS: - heartBeatRequestQueue.add(event); //flush report to ResourceTracker - break; - } + heartBeatRequestQueue.add(event); } public int getQueueSize() { @@ -128,13 +122,13 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) { NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); requestProto.setAvailableResource(resource.getProto()); - requestProto.setWorkerId(connectionInfo.getId()); + requestProto.setWorkerId(workerContext.getConnectionInfo().getId()); return requestProto.build(); } private NodeHeartbeatRequestProto createHeartBeatReport() { NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); - requestProto.setWorkerId(connectionInfo.getId()); + requestProto.setWorkerId(workerContext.getConnectionInfo().getId()); return requestProto.build(); } @@ -142,8 +136,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); requestProto.setTotalResource(nodeResourceManager.getTotalResource().getProto()); requestProto.setAvailableResource(nodeResourceManager.getAvailableResource().getProto()); - requestProto.setWorkerId(connectionInfo.getId()); - requestProto.setConnectionInfo(connectionInfo.getProto()); + requestProto.setWorkerId(workerContext.getConnectionInfo().getId()); + requestProto.setConnectionInfo(workerContext.getConnectionInfo().getProto()); //TODO set node status to requestProto.setStatus() return requestProto.build(); @@ -231,8 +225,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N } if (!events.isEmpty()) { - // send last available resource; - lastResponse = sendHeartbeat(createResourceReport(events.get(events.size() - 1).getResource())); + // send current available resource; + lastResponse = sendHeartbeat(createResourceReport(nodeResourceManager.getAvailableResource())); } else { // send ping; lastResponse = sendHeartbeat(createHeartBeatReport()); @@ -250,10 +244,10 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N } } catch (NoSuchMethodException nsme) { LOG.fatal(nsme.getMessage(), nsme); - Runtime.getRuntime().halt(1); + Runtime.getRuntime().halt(-1); } catch (ClassNotFoundException cnfe) { LOG.fatal(cnfe.getMessage(), cnfe); - Runtime.getRuntime().halt(1); + Runtime.getRuntime().halt(-1); } catch (Exception e) { LOG.error(e.getMessage(), e); if (!isStopped) { http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 4f07ca6..fbd070e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -163,7 +163,7 @@ public class TajoWorker extends CompositeService { serviceTracker = ServiceTrackerFactory.get(systemConf); - this.workerContext = new WorkerContext(); + this.workerContext = new TajoWorkerContext(); this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS); @@ -386,7 +386,45 @@ public class TajoWorker extends CompositeService { LOG.info("TajoWorker main thread exiting"); } - public class WorkerContext { + public interface WorkerContext { + QueryMaster getQueryMaster(); + + TajoConf getConf(); + + ServiceTracker getServiceTracker(); + + QueryMasterManagerService getQueryMasterManagerService(); + + TaskRunnerManager getTaskRunnerManager(); + + CatalogService getCatalog(); + + WorkerConnectionInfo getConnectionInfo(); + + String getWorkerName(); + + LocalDirAllocator getLocalDirAllocator(); + + ClusterResourceSummary getClusterResource(); + + TajoSystemMetrics getWorkerSystemMetrics(); + + HashShuffleAppenderManager getHashShuffleAppenderManager(); + + HistoryWriter getTaskHistoryWriter(); + + HistoryReader getHistoryReader(); + + void cleanup(String strPath); + + void cleanupTemporalDirectories(); + + void setClusterResource(ClusterResourceSummary clusterResource); + + void setNumClusterNodes(int numClusterNodes); + } + + class TajoWorkerContext implements WorkerContext { public QueryMaster getQueryMaster() { if (queryMasterManagerService == null) { return null; @@ -430,7 +468,7 @@ public class TajoWorker extends CompositeService { return lDirAllocator; } - protected void cleanup(String strPath) { + public void cleanup(String strPath) { if (deletionService == null) return; LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); @@ -446,7 +484,7 @@ public class TajoWorker extends CompositeService { } } - protected void cleanupTemporalDirectories() { + public void cleanupTemporalDirectories() { if (deletionService == null) return; LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); @@ -627,6 +665,7 @@ public class TajoWorker extends CompositeService { } public static void main(String[] args) throws Exception { + Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG); TajoConf tajoConf = new TajoConf(); http://git-wip-us.apache.org/repos/asf/tajo/blob/36da0dac/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index bbf8564..de8afe8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -116,14 +116,7 @@ public class TajoWorkerManagerService extends CompositeService workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc(); try { - workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent( - new WorkerConnectionInfo(request.getQueryMaster()) - , new ExecutionBlockId(request.getExecutionBlockId()) - , request.getContainerId() - , new QueryContext(workerContext.getConf(), request.getQueryContext()), - request.getPlanJson(), - request.getShuffleType() - )); + workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(request)); done.run(TajoWorker.TRUE_PROTO); } catch (Throwable t) { LOG.error(t.getMessage(), t);
