http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java deleted file mode 100644 index 0515e72..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java +++ /dev/null @@ -1,1342 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.state.*; -import org.apache.hadoop.yarn.util.Records; -import org.apache.tajo.*; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.ColumnStats; -import org.apache.tajo.catalog.statistics.StatisticsUtil; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.json.CoreGsonHelper; -import org.apache.tajo.engine.planner.PhysicalPlannerImpl; -import org.apache.tajo.engine.planner.enforce.Enforcer; -import org.apache.tajo.engine.planner.global.DataChannel; -import org.apache.tajo.engine.planner.global.ExecutionBlock; -import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.ipc.TajoMasterProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; -import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; -import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; -import org.apache.tajo.master.*; -import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; -import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; -import org.apache.tajo.master.querymaster.Task.IntermediateEntry; -import org.apache.tajo.master.container.TajoContainer; -import org.apache.tajo.master.container.TajoContainerId; -import org.apache.tajo.storage.FileStorageManager; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.logical.*; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.unit.StorageUnit; -import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.history.TaskHistory; -import org.apache.tajo.util.history.StageHistory; -import org.apache.tajo.worker.FetchImpl; - -import java.io.IOException; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.tajo.conf.TajoConf.ConfVars; -import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; - - -/** - * Stage plays a role in controlling an ExecutionBlock and is a finite state machine. - */ -public class Stage implements EventHandler<StageEvent> { - - private static final Log LOG = LogFactory.getLog(Stage.class); - - private MasterPlan masterPlan; - private ExecutionBlock block; - private int priority; - private Schema schema; - private TableMeta meta; - private TableStats resultStatistics; - private TableStats inputStatistics; - private EventHandler<Event> eventHandler; - private AbstractTaskScheduler taskScheduler; - private QueryMasterTask.QueryMasterTaskContext context; - private final List<String> diagnostics = new ArrayList<String>(); - private StageState stageState; - - private long startTime; - private long finishTime; - - volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>(); - volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId, - TajoContainer>(); - - private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); - private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); - private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition(); - private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition(); - private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION = - new AllocatedContainersCancelTransition(); - private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition(); - private StateMachine<StageState, StageEventType, StageEvent> stateMachine; - - protected static final StateMachineFactory<Stage, StageState, - StageEventType, StageEvent> stateMachineFactory = - new StateMachineFactory <Stage, StageState, - StageEventType, StageEvent> (StageState.NEW) - - // Transitions from NEW state - .addTransition(StageState.NEW, - EnumSet.of(StageState.INITED, StageState.ERROR, StageState.SUCCEEDED), - StageEventType.SQ_INIT, - new InitAndRequestContainer()) - .addTransition(StageState.NEW, StageState.NEW, - StageEventType.SQ_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(StageState.NEW, StageState.KILLED, - StageEventType.SQ_KILL) - .addTransition(StageState.NEW, StageState.ERROR, - StageEventType.SQ_INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - - // Transitions from INITED state - .addTransition(StageState.INITED, StageState.RUNNING, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINER_LAUNCH_TRANSITION) - .addTransition(StageState.INITED, StageState.INITED, - StageEventType.SQ_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(StageState.INITED, StageState.KILL_WAIT, - StageEventType.SQ_KILL, new KillTasksTransition()) - .addTransition(StageState.INITED, StageState.ERROR, - StageEventType.SQ_INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - - // Transitions from RUNNING state - .addTransition(StageState.RUNNING, StageState.RUNNING, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINER_LAUNCH_TRANSITION) - .addTransition(StageState.RUNNING, StageState.RUNNING, - StageEventType.SQ_TASK_COMPLETED, - TASK_COMPLETED_TRANSITION) - .addTransition(StageState.RUNNING, - EnumSet.of(StageState.SUCCEEDED, StageState.FAILED), - StageEventType.SQ_STAGE_COMPLETED, - STAGE_COMPLETED_TRANSITION) - .addTransition(StageState.RUNNING, StageState.RUNNING, - StageEventType.SQ_FAILED, - TASK_COMPLETED_TRANSITION) - .addTransition(StageState.RUNNING, StageState.RUNNING, - StageEventType.SQ_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(StageState.RUNNING, StageState.KILL_WAIT, - StageEventType.SQ_KILL, - new KillTasksTransition()) - .addTransition(StageState.RUNNING, StageState.ERROR, - StageEventType.SQ_INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - // Ignore-able Transition - .addTransition(StageState.RUNNING, StageState.RUNNING, - StageEventType.SQ_START) - - // Transitions from KILL_WAIT state - .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, - EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition()) - .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, - StageEventType.SQ_TASK_COMPLETED, - TASK_COMPLETED_TRANSITION) - .addTransition(StageState.KILL_WAIT, - EnumSet.of(StageState.SUCCEEDED, StageState.FAILED, StageState.KILLED), - StageEventType.SQ_STAGE_COMPLETED, - STAGE_COMPLETED_TRANSITION) - .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, - StageEventType.SQ_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, - StageEventType.SQ_FAILED, - TASK_COMPLETED_TRANSITION) - .addTransition(StageState.KILL_WAIT, StageState.ERROR, - StageEventType.SQ_INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - - // Transitions from SUCCEEDED state - .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, - StageEventType.SQ_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(StageState.SUCCEEDED, StageState.ERROR, - StageEventType.SQ_INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - // Ignore-able events - .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, - EnumSet.of( - StageEventType.SQ_START, - StageEventType.SQ_KILL, - StageEventType.SQ_CONTAINER_ALLOCATED)) - - // Transitions from KILLED state - .addTransition(StageState.KILLED, StageState.KILLED, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.KILLED, StageState.KILLED, - StageEventType.SQ_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(StageState.KILLED, StageState.ERROR, - StageEventType.SQ_INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - // Ignore-able transitions - .addTransition(StageState.KILLED, StageState.KILLED, - EnumSet.of( - StageEventType.SQ_START, - StageEventType.SQ_KILL, - StageEventType.SQ_CONTAINER_ALLOCATED, - StageEventType.SQ_FAILED)) - - // Transitions from FAILED state - .addTransition(StageState.FAILED, StageState.FAILED, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.FAILED, StageState.FAILED, - StageEventType.SQ_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(StageState.FAILED, StageState.ERROR, - StageEventType.SQ_INTERNAL_ERROR, - INTERNAL_ERROR_TRANSITION) - // Ignore-able transitions - .addTransition(StageState.FAILED, StageState.FAILED, - EnumSet.of( - StageEventType.SQ_START, - StageEventType.SQ_KILL, - StageEventType.SQ_CONTAINER_ALLOCATED, - StageEventType.SQ_FAILED)) - - // Transitions from ERROR state - .addTransition(StageState.ERROR, StageState.ERROR, - StageEventType.SQ_CONTAINER_ALLOCATED, - CONTAINERS_CANCEL_TRANSITION) - .addTransition(StageState.ERROR, StageState.ERROR, - StageEventType.SQ_DIAGNOSTIC_UPDATE, - DIAGNOSTIC_UPDATE_TRANSITION) - // Ignore-able transitions - .addTransition(StageState.ERROR, StageState.ERROR, - EnumSet.of( - StageEventType.SQ_START, - StageEventType.SQ_KILL, - StageEventType.SQ_FAILED, - StageEventType.SQ_INTERNAL_ERROR, - StageEventType.SQ_STAGE_COMPLETED)) - - .installTopology(); - - private final Lock readLock; - private final Lock writeLock; - - private int totalScheduledObjectsCount; - private int succeededObjectCount = 0; - private int completedTaskCount = 0; - private int succeededTaskCount = 0; - private int killedObjectCount = 0; - private int failedObjectCount = 0; - private TaskSchedulerContext schedulerContext; - private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>(); - private AtomicInteger completeReportReceived = new AtomicInteger(0); - private StageHistory finalStageHistory; - - public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) { - this.context = context; - this.masterPlan = masterPlan; - this.block = block; - this.eventHandler = context.getEventHandler(); - - ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - this.readLock = readWriteLock.readLock(); - this.writeLock = readWriteLock.writeLock(); - stateMachine = stateMachineFactory.make(this); - stageState = stateMachine.getCurrentState(); - } - - public static boolean isRunningState(StageState state) { - return state == StageState.INITED || state == StageState.NEW || state == StageState.RUNNING; - } - - public QueryMasterTask.QueryMasterTaskContext getContext() { - return context; - } - - public MasterPlan getMasterPlan() { - return masterPlan; - } - - public DataChannel getDataChannel() { - return masterPlan.getOutgoingChannels(getId()).iterator().next(); - } - - public EventHandler<Event> getEventHandler() { - return eventHandler; - } - - public AbstractTaskScheduler getTaskScheduler() { - return taskScheduler; - } - - public void setStartTime() { - startTime = context.getClock().getTime(); - } - - @SuppressWarnings("UnusedDeclaration") - public long getStartTime() { - return this.startTime; - } - - public void setFinishTime() { - finishTime = context.getClock().getTime(); - } - - @SuppressWarnings("UnusedDeclaration") - public long getFinishTime() { - return this.finishTime; - } - - public float getTaskProgress() { - readLock.lock(); - try { - if (getState() == StageState.NEW) { - return 0; - } else { - return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount; - } - } finally { - readLock.unlock(); - } - } - - public float getProgress() { - List<Task> tempTasks = null; - readLock.lock(); - try { - if (getState() == StageState.NEW) { - return 0.0f; - } else { - tempTasks = new ArrayList<Task>(tasks.values()); - } - } finally { - readLock.unlock(); - } - - float totalProgress = 0.0f; - for (Task eachTask : tempTasks) { - if (eachTask.getLastAttempt() != null) { - totalProgress += eachTask.getLastAttempt().getProgress(); - } - } - - if (totalProgress > 0.0f) { - return (float) Math.floor((totalProgress / (float) Math.max(tempTasks.size(), 1)) * 1000.0f) / 1000.0f; - } else { - return 0.0f; - } - } - - public int getSucceededObjectCount() { - return succeededObjectCount; - } - - public int getTotalScheduledObjectsCount() { - return totalScheduledObjectsCount; - } - - public ExecutionBlock getBlock() { - return block; - } - - public void addTask(Task task) { - tasks.put(task.getId(), task); - } - - public StageHistory getStageHistory() { - if (finalStageHistory != null) { - if (finalStageHistory.getFinishTime() == 0) { - finalStageHistory = makeStageHistory(); - finalStageHistory.setTasks(makeTaskHistories()); - } - return finalStageHistory; - } else { - return makeStageHistory(); - } - } - - private List<TaskHistory> makeTaskHistories() { - List<TaskHistory> taskHistories = new ArrayList<TaskHistory>(); - - for(Task eachTask : getTasks()) { - taskHistories.add(eachTask.getTaskHistory()); - } - - return taskHistories; - } - - private StageHistory makeStageHistory() { - StageHistory stageHistory = new StageHistory(); - - stageHistory.setExecutionBlockId(getId().toString()); - stageHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan())); - stageHistory.setState(getState().toString()); - stageHistory.setStartTime(startTime); - stageHistory.setFinishTime(finishTime); - stageHistory.setSucceededObjectCount(succeededObjectCount); - stageHistory.setKilledObjectCount(killedObjectCount); - stageHistory.setFailedObjectCount(failedObjectCount); - stageHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount); - stageHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned()); - stageHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned()); - - long totalInputBytes = 0; - long totalReadBytes = 0; - long totalReadRows = 0; - long totalWriteBytes = 0; - long totalWriteRows = 0; - int numShuffles = 0; - for(Task eachTask : getTasks()) { - numShuffles = eachTask.getShuffleOutpuNum(); - if (eachTask.getLastAttempt() != null) { - TableStats inputStats = eachTask.getLastAttempt().getInputStats(); - if (inputStats != null) { - totalInputBytes += inputStats.getNumBytes(); - totalReadBytes += inputStats.getReadBytes(); - totalReadRows += inputStats.getNumRows(); - } - TableStats outputStats = eachTask.getLastAttempt().getResultStats(); - if (outputStats != null) { - totalWriteBytes += outputStats.getNumBytes(); - totalWriteRows += outputStats.getNumRows(); - } - } - } - - stageHistory.setTotalInputBytes(totalInputBytes); - stageHistory.setTotalReadBytes(totalReadBytes); - stageHistory.setTotalReadRows(totalReadRows); - stageHistory.setTotalWriteBytes(totalWriteBytes); - stageHistory.setTotalWriteRows(totalWriteRows); - stageHistory.setNumShuffles(numShuffles); - stageHistory.setProgress(getProgress()); - return stageHistory; - } - - /** - * It finalizes this stage. It is only invoked when the stage is succeeded. - */ - public void complete() { - cleanup(); - finalizeStats(); - setFinishTime(); - eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED)); - } - - /** - * It finalizes this stage. Unlike {@link Stage#complete()}, - * it is invoked when a stage is abnormally finished. - * - * @param finalState The final stage state - */ - public void abort(StageState finalState) { - // TODO - - // - committer.abortStage(...) - // - record Stage Finish Time - // - CleanUp Tasks - // - Record History - cleanup(); - setFinishTime(); - eventHandler.handle(new StageCompletedEvent(getId(), finalState)); - } - - public StateMachine<StageState, StageEventType, StageEvent> getStateMachine() { - return this.stateMachine; - } - - public void setPriority(int priority) { - this.priority = priority; - } - - - public int getPriority() { - return this.priority; - } - - public ExecutionBlockId getId() { - return block.getId(); - } - - public Task[] getTasks() { - return tasks.values().toArray(new Task[tasks.size()]); - } - - public Task getTask(TaskId qid) { - return tasks.get(qid); - } - - public Schema getSchema() { - return schema; - } - - public TableMeta getTableMeta() { - return meta; - } - - public TableStats getResultStats() { - return resultStatistics; - } - - public TableStats getInputStats() { - return inputStatistics; - } - - public List<String> getDiagnostics() { - readLock.lock(); - try { - return diagnostics; - } finally { - readLock.unlock(); - } - } - - protected void addDiagnostic(String diag) { - diagnostics.add(diag); - } - - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(this.getId()); - return sb.toString(); - } - - @Override - public boolean equals(Object o) { - if (o instanceof Stage) { - Stage other = (Stage)o; - return getId().equals(other.getId()); - } - return false; - } - - @Override - public int hashCode() { - return getId().hashCode(); - } - - public int compareTo(Stage other) { - return getId().compareTo(other.getId()); - } - - public StageState getSynchronizedState() { - readLock.lock(); - try { - return stateMachine.getCurrentState(); - } finally { - readLock.unlock(); - } - } - - /* non-blocking call for client API */ - public StageState getState() { - return stageState; - } - - public static TableStats[] computeStatFromUnionBlock(Stage stage) { - TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()}; - long[] avgRows = new long[]{0, 0}; - long[] numBytes = new long[]{0, 0}; - long[] readBytes = new long[]{0, 0}; - long[] numRows = new long[]{0, 0}; - int[] numBlocks = new int[]{0, 0}; - int[] numOutputs = new int[]{0, 0}; - - List<ColumnStats> columnStatses = Lists.newArrayList(); - - MasterPlan masterPlan = stage.getMasterPlan(); - Iterator<ExecutionBlock> it = masterPlan.getChilds(stage.getBlock()).iterator(); - while (it.hasNext()) { - ExecutionBlock block = it.next(); - Stage childStage = stage.context.getStage(block.getId()); - TableStats[] childStatArray = new TableStats[]{ - childStage.getInputStats(), childStage.getResultStats() - }; - for (int i = 0; i < 2; i++) { - if (childStatArray[i] == null) { - continue; - } - avgRows[i] += childStatArray[i].getAvgRows(); - numBlocks[i] += childStatArray[i].getNumBlocks(); - numBytes[i] += childStatArray[i].getNumBytes(); - readBytes[i] += childStatArray[i].getReadBytes(); - numOutputs[i] += childStatArray[i].getNumShuffleOutputs(); - numRows[i] += childStatArray[i].getNumRows(); - } - columnStatses.addAll(childStatArray[1].getColumnStats()); - } - - for (int i = 0; i < 2; i++) { - stat[i].setNumBlocks(numBlocks[i]); - stat[i].setNumBytes(numBytes[i]); - stat[i].setReadBytes(readBytes[i]); - stat[i].setNumShuffleOutputs(numOutputs[i]); - stat[i].setNumRows(numRows[i]); - stat[i].setAvgRows(avgRows[i]); - } - stat[1].setColumnStats(columnStatses); - - return stat; - } - - private TableStats[] computeStatFromTasks() { - List<TableStats> inputStatsList = Lists.newArrayList(); - List<TableStats> resultStatsList = Lists.newArrayList(); - for (Task unit : getTasks()) { - resultStatsList.add(unit.getStats()); - if (unit.getLastAttempt().getInputStats() != null) { - inputStatsList.add(unit.getLastAttempt().getInputStats()); - } - } - TableStats inputStats = StatisticsUtil.aggregateTableStat(inputStatsList); - TableStats resultStats = StatisticsUtil.aggregateTableStat(resultStatsList); - return new TableStats[]{inputStats, resultStats}; - } - - private void stopScheduler() { - // If there are launched TaskRunners, send the 'shouldDie' message to all r - // via received task requests. - if (taskScheduler != null) { - taskScheduler.stop(); - } - } - - private void releaseContainers() { - // If there are still live TaskRunners, try to kill the containers. - eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values())); - } - - /** - * It computes all stats and sets the intermediate result. - */ - private void finalizeStats() { - TableStats[] statsArray; - if (block.hasUnion()) { - statsArray = computeStatFromUnionBlock(this); - } else { - statsArray = computeStatFromTasks(); - } - - DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0); - - // if store plan (i.e., CREATE or INSERT OVERWRITE) - StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan()); - if (storeType == null) { - // get default or store type - storeType = StoreType.CSV; - } - - schema = channel.getSchema(); - meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet()); - inputStatistics = statsArray[0]; - resultStatistics = statsArray[1]; - } - - @Override - public void handle(StageEvent event) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState=" - + getSynchronizedState()); - } - - try { - writeLock.lock(); - StageState oldState = getSynchronizedState(); - try { - getStateMachine().doTransition(event.getType(), event); - stageState = getSynchronizedState(); - } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state" - + ", eventType:" + event.getType().name() - + ", oldState:" + oldState.name() - + ", nextState:" + getSynchronizedState().name() - , e); - eventHandler.handle(new StageEvent(getId(), - StageEventType.SQ_INTERNAL_ERROR)); - } - - // notify the eventhandler of state change - if (LOG.isDebugEnabled()) { - if (oldState != getSynchronizedState()) { - LOG.debug(getId() + " Stage Transitioned from " + oldState + " to " - + getSynchronizedState()); - } - } - } finally { - writeLock.unlock(); - } - } - - public void handleTaskRequestEvent(TaskRequestEvent event) { - taskScheduler.handleTaskRequestEvent(event); - } - - private static class InitAndRequestContainer implements MultipleArcTransition<Stage, - StageEvent, StageState> { - - @Override - public StageState transition(final Stage stage, StageEvent stageEvent) { - stage.setStartTime(); - ExecutionBlock execBlock = stage.getBlock(); - StageState state; - - try { - // Union operator does not require actual query processing. It is performed logically. - if (execBlock.hasUnion()) { - stage.finalizeStats(); - state = StageState.SUCCEEDED; - } else { - ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); - DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId()); - setShuffleIfNecessary(stage, channel); - initTaskScheduler(stage); - // execute pre-processing asyncronously - stage.getContext().getQueryMasterContext().getEventExecutor() - .submit(new Runnable() { - @Override - public void run() { - try { - schedule(stage); - stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum(); - LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled"); - - if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks - stage.complete(); - } else { - if(stage.getSynchronizedState() == StageState.INITED) { - stage.taskScheduler.start(); - allocateContainers(stage); - } else { - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); - } - } - } catch (Throwable e) { - LOG.error("Stage (" + stage.getId() + ") ERROR: ", e); - stage.setFinishTime(); - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage())); - stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR)); - } - } - } - ); - state = StageState.INITED; - } - } catch (Throwable e) { - LOG.error("Stage (" + stage.getId() + ") ERROR: ", e); - stage.setFinishTime(); - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage())); - stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR)); - return StageState.ERROR; - } - - return state; - } - - private void initTaskScheduler(Stage stage) throws IOException { - TajoConf conf = stage.context.getConf(); - stage.schedulerContext = new TaskSchedulerContext(stage.context, - stage.getMasterPlan().isLeaf(stage.getId()), stage.getId()); - stage.taskScheduler = TaskSchedulerFactory.get(conf, stage.schedulerContext, stage); - stage.taskScheduler.init(conf); - LOG.info(stage.taskScheduler.getName() + " is chosen for the task scheduling for " + stage.getId()); - } - - /** - * If a parent block requires a repartition operation, the method sets proper repartition - * methods and the number of partitions to a given Stage. - */ - private static void setShuffleIfNecessary(Stage stage, DataChannel channel) { - if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) { - int numTasks = calculateShuffleOutputNum(stage, channel); - Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel); - } - } - - /** - * Getting the total memory of cluster - * - * @param stage - * @return mega bytes - */ - private static int getClusterTotalMemory(Stage stage) { - List<TajoMasterProtocol.WorkerResourceProto> workers = - stage.context.getQueryMasterContext().getQueryMaster().getAllWorker(); - - int totalMem = 0; - for (TajoMasterProtocol.WorkerResourceProto worker : workers) { - totalMem += worker.getMemoryMB(); - } - return totalMem; - } - /** - * Getting the desire number of partitions according to the volume of input data. - * This method is only used to determine the partition key number of hash join or aggregation. - * - * @param stage - * @return - */ - public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) { - TajoConf conf = stage.context.getConf(); - MasterPlan masterPlan = stage.getMasterPlan(); - ExecutionBlock parent = masterPlan.getParent(stage.getBlock()); - - LogicalNode grpNode = null; - if (parent != null) { - grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.GROUP_BY); - if (grpNode == null) { - grpNode = PlannerUtil.findMostBottomNode(parent.getPlan(), NodeType.DISTINCT_GROUP_BY); - } - } - - // We assume this execution block the first stage of join if two or more tables are included in this block, - if (parent != null && parent.getScanNodes().length >= 2) { - List<ExecutionBlock> childs = masterPlan.getChilds(parent); - - // for outer - ExecutionBlock outer = childs.get(0); - long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer); - - // for inner - ExecutionBlock inner = childs.get(1); - long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner); - LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, " - + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB"); - - long bigger = Math.max(outerVolume, innerVolume); - - int mb = (int) Math.ceil((double) bigger / 1048576); - LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); - - int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE)); - - if (masterPlan.getContext().containsKey(SessionVars.TEST_MIN_TASK_NUM)) { - taskNum = masterPlan.getContext().getInt(SessionVars.TEST_MIN_TASK_NUM); - LOG.warn("!!!!! TESTCASE MODE !!!!!"); - } - - // The shuffle output numbers of join may be inconsistent by execution block order. - // Thus, we need to compare the number with DataChannel output numbers. - // If the number is right, the number and DataChannel output numbers will be consistent. - int outerShuffleOutputNum = 0, innerShuffleOutputNum = 0; - for (DataChannel eachChannel : masterPlan.getOutgoingChannels(outer.getId())) { - outerShuffleOutputNum = Math.max(outerShuffleOutputNum, eachChannel.getShuffleOutputNum()); - } - for (DataChannel eachChannel : masterPlan.getOutgoingChannels(inner.getId())) { - innerShuffleOutputNum = Math.max(innerShuffleOutputNum, eachChannel.getShuffleOutputNum()); - } - if (outerShuffleOutputNum != innerShuffleOutputNum - && taskNum != outerShuffleOutputNum - && taskNum != innerShuffleOutputNum) { - LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" + - ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) + - ", outerShuffleOutptNum=" + outerShuffleOutputNum + - ", innerShuffleOutputNum=" + innerShuffleOutputNum); - taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum); - } - - LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum); - - return taskNum; - // Is this stage the first step of group-by? - } else if (grpNode != null) { - boolean hasGroupColumns = true; - if (grpNode.getType() == NodeType.GROUP_BY) { - hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; - } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { - // Find current distinct stage node. - DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); - if (distinctNode == null) { - LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode"); - distinctNode = (DistinctGroupbyNode)grpNode; - } - hasGroupColumns = distinctNode.getGroupingColumns().length > 0; - - Enforcer enforcer = stage.getBlock().getEnforcer(); - if (enforcer == null) { - LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null."); - } - EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); - if (property != null) { - if (property.getDistinct().getIsMultipleAggregation()) { - MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage(); - if (multiAggStage != MultipleAggregationStage.THRID_STAGE) { - hasGroupColumns = true; - } - } - } - } - if (!hasGroupColumns) { - LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); - return 1; - } else { - long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); - - int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB); - LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB"); - // determine the number of task - int taskNum = (int) Math.ceil((double) volumeByMB / - masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE)); - LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum); - return taskNum; - } - } else { - LOG.info("============>>>>> Unexpected Case! <<<<<================"); - long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); - - int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB"); - // determine the number of task per 128MB - int taskNum = (int) Math.ceil((double)mb / 128); - LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum); - return taskNum; - } - } - - private static void schedule(Stage stage) throws IOException { - MasterPlan masterPlan = stage.getMasterPlan(); - ExecutionBlock execBlock = stage.getBlock(); - if (stage.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan - scheduleFragmentsForLeafQuery(stage); - } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join - Repartitioner.scheduleFragmentsForJoinQuery(stage.schedulerContext, stage); - } else { // Case 3: Others (Sort or Aggregation) - int numTasks = getNonLeafTaskNum(stage); - Repartitioner.scheduleFragmentsForNonLeafTasks(stage.schedulerContext, masterPlan, stage, numTasks); - } - } - - /** - * Getting the desire number of tasks according to the volume of input data - * - * @param stage - * @return - */ - public static int getNonLeafTaskNum(Stage stage) { - // Getting intermediate data size - long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock()); - - int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB"); - // determine the number of task per 64MB - int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64)); - LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum); - return maxTaskNum; - } - - public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMasterTaskContext context, - ExecutionBlock execBlock) { - Map<String, TableDesc> tableMap = context.getTableDescMap(); - if (masterPlan.isLeaf(execBlock)) { - ScanNode[] outerScans = execBlock.getScanNodes(); - long maxVolume = 0; - for (ScanNode eachScanNode: outerScans) { - TableStats stat = tableMap.get(eachScanNode.getCanonicalName()).getStats(); - if (stat.getNumBytes() > maxVolume) { - maxVolume = stat.getNumBytes(); - } - } - return maxVolume; - } else { - long aggregatedVolume = 0; - for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) { - Stage stage = context.getStage(childBlock.getId()); - if (stage == null || stage.getSynchronizedState() != StageState.SUCCEEDED) { - aggregatedVolume += getInputVolume(masterPlan, context, childBlock); - } else { - aggregatedVolume += stage.getResultStats().getNumBytes(); - } - } - - return aggregatedVolume; - } - } - - public static void allocateContainers(Stage stage) { - ExecutionBlock execBlock = stage.getBlock(); - - //TODO consider disk slot - int requiredMemoryMBPerTask = 512; - - int numRequest = stage.getContext().getResourceAllocator().calculateNumRequestContainers( - stage.getContext().getQueryMasterContext().getWorkerContext(), - stage.schedulerContext.getEstimatedTaskNum(), - requiredMemoryMBPerTask - ); - - final Resource resource = Records.newRecord(Resource.class); - - resource.setMemory(requiredMemoryMBPerTask); - - LOG.info("Request Container for " + stage.getId() + " containers=" + numRequest); - - Priority priority = Records.newRecord(Priority.class); - priority.setPriority(stage.getPriority()); - ContainerAllocationEvent event = - new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ, - stage.getId(), priority, resource, numRequest, - stage.masterPlan.isLeaf(execBlock), 0.0f); - stage.eventHandler.handle(event); - } - - private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOException { - ExecutionBlock execBlock = stage.getBlock(); - ScanNode[] scans = execBlock.getScanNodes(); - Preconditions.checkArgument(scans.length == 1, "Must be Scan Query"); - ScanNode scan = scans[0]; - TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName()); - - Collection<Fragment> fragments; - TableMeta meta = table.getMeta(); - - // Depending on scanner node's type, it creates fragments. If scan is for - // a partitioned table, It will creates lots fragments for all partitions. - // Otherwise, it creates at least one fragments for a table, which may - // span a number of blocks or possibly consists of a number of files. - if (scan.getType() == NodeType.PARTITIONS_SCAN) { - // After calling this method, partition paths are removed from the physical plan. - FileStorageManager storageManager = - (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()); - fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table); - } else { - StorageManager storageManager = - StorageManager.getStorageManager(stage.getContext().getConf(), meta.getStoreType()); - fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan); - } - - Stage.scheduleFragments(stage, fragments); - if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) { - //Leaf task of DefaultTaskScheduler should be fragment size - // EstimatedTaskNum determined number of initial container - stage.schedulerContext.setEstimatedTaskNum(fragments.size()); - } else { - TajoConf conf = stage.context.getConf(); - stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024); - int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() / - (double) stage.schedulerContext.getTaskSize()); - stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum); - } - } - } - - public static void scheduleFragment(Stage stage, Fragment fragment) { - stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, - stage.getId(), fragment)); - } - - - public static void scheduleFragments(Stage stage, Collection<Fragment> fragments) { - for (Fragment eachFragment : fragments) { - scheduleFragment(stage, eachFragment); - } - } - - public static void scheduleFragments(Stage stage, Collection<Fragment> leftFragments, - Collection<Fragment> broadcastFragments) { - for (Fragment eachLeafFragment : leftFragments) { - scheduleFragment(stage, eachLeafFragment, broadcastFragments); - } - } - - public static void scheduleFragment(Stage stage, - Fragment leftFragment, Collection<Fragment> rightFragments) { - stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, - stage.getId(), leftFragment, rightFragments)); - } - - public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) { - stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, - stage.getId(), fetches)); - } - - public static Task newEmptyTask(TaskSchedulerContext schedulerContext, - TaskAttemptScheduleContext taskContext, - Stage stage, int taskId) { - ExecutionBlock execBlock = stage.getBlock(); - Task unit = new Task(schedulerContext.getMasterContext().getConf(), - taskContext, - QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId), - schedulerContext.isLeafQuery(), stage.eventHandler); - unit.setLogicalPlan(execBlock.getPlan()); - stage.addTask(unit); - return unit; - } - - private static class ContainerLaunchTransition - implements SingleArcTransition<Stage, StageEvent> { - - @Override - public void transition(Stage stage, StageEvent event) { - try { - StageContainerAllocationEvent allocationEvent = - (StageContainerAllocationEvent) event; - for (TajoContainer container : allocationEvent.getAllocatedContainer()) { - TajoContainerId cId = container.getId(); - if (stage.containers.containsKey(cId)) { - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), - "Duplicated containers are allocated: " + cId.toString())); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); - } - stage.containers.put(cId, container); - } - LOG.info("Stage (" + stage.getId() + ") has " + stage.containers.size() + " containers!"); - stage.eventHandler.handle( - new LaunchTaskRunnersEvent(stage.getId(), allocationEvent.getAllocatedContainer(), - stage.getContext().getQueryContext(), - CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class)) - ); - - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START)); - } catch (Throwable t) { - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), - ExceptionUtils.getStackTrace(t))); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); - } - } - } - - /** - * It is used in KILL_WAIT state against Contained Allocated event. - * It just returns allocated containers to resource manager. - */ - private static class AllocatedContainersCancelTransition implements SingleArcTransition<Stage, StageEvent> { - @Override - public void transition(Stage stage, StageEvent event) { - try { - StageContainerAllocationEvent allocationEvent = - (StageContainerAllocationEvent) event; - stage.eventHandler.handle( - new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, - stage.getId(), allocationEvent.getAllocatedContainer())); - LOG.info(String.format("[%s] %d allocated containers are canceled", - stage.getId().toString(), - allocationEvent.getAllocatedContainer().size())); - } catch (Throwable t) { - stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), - ExceptionUtils.getStackTrace(t))); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); - } - } - } - - private static class TaskCompletedTransition implements SingleArcTransition<Stage, StageEvent> { - - @Override - public void transition(Stage stage, - StageEvent event) { - StageTaskEvent taskEvent = (StageTaskEvent) event; - Task task = stage.getTask(taskEvent.getTaskId()); - - if (task == null) { // task failed - LOG.error(String.format("Task %s is absent", taskEvent.getTaskId())); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); - } else { - stage.completedTaskCount++; - - if (taskEvent.getState() == TaskState.SUCCEEDED) { - stage.succeededObjectCount++; - } else if (task.getState() == TaskState.KILLED) { - stage.killedObjectCount++; - } else if (task.getState() == TaskState.FAILED) { - stage.failedObjectCount++; - // if at least one task is failed, try to kill all tasks. - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); - } - - LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)", - stage.getId(), - stage.getTotalScheduledObjectsCount(), - stage.succeededObjectCount, - stage.killedObjectCount, - stage.failedObjectCount)); - - if (stage.totalScheduledObjectsCount == - stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) { - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); - } - } - } - } - - private static class KillTasksTransition implements SingleArcTransition<Stage, StageEvent> { - - @Override - public void transition(Stage stage, StageEvent stageEvent) { - if(stage.getTaskScheduler() != null){ - stage.getTaskScheduler().stop(); - } - - for (Task task : stage.getTasks()) { - stage.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL)); - } - } - } - - private void cleanup() { - stopScheduler(); - releaseContainers(); - - if (!getContext().getQueryContext().getBool(SessionVars.DEBUG_ENABLED)) { - List<ExecutionBlock> childs = getMasterPlan().getChilds(getId()); - List<TajoIdProtos.ExecutionBlockIdProto> ebIds = Lists.newArrayList(); - - for (ExecutionBlock executionBlock : childs) { - ebIds.add(executionBlock.getId().getProto()); - } - - getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds); - } - - this.finalStageHistory = makeStageHistory(); - this.finalStageHistory.setTasks(makeTaskHistories()); - } - - public List<IntermediateEntry> getHashShuffleIntermediateEntries() { - return hashShuffleIntermediateEntries; - } - - protected void waitingIntermediateReport() { - LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get()); - synchronized(completeReportReceived) { - long startTime = System.currentTimeMillis(); - while (true) { - if (completeReportReceived.get() >= tasks.size()) { - LOG.info(getId() + ", completed waiting IntermediateReport"); - return; - } else { - try { - completeReportReceived.wait(10 * 1000); - } catch (InterruptedException e) { - } - long elapsedTime = System.currentTimeMillis() - startTime; - if (elapsedTime >= 120 * 1000) { - LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms"); - abort(StageState.FAILED); - return; - } - } - } - } - } - - public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) { - LOG.info(getId() + ", receiveExecutionBlockReport:" + report.getSucceededTasks()); - if (!report.getReportSuccess()) { - LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage()); - abort(StageState.FAILED); - return; - } - if (report.getIntermediateEntriesCount() > 0) { - synchronized (hashShuffleIntermediateEntries) { - for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) { - hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm)); - } - } - } - synchronized(completeReportReceived) { - completeReportReceived.addAndGet(report.getSucceededTasks()); - completeReportReceived.notifyAll(); - } - } - - private static class StageCompleteTransition implements MultipleArcTransition<Stage, StageEvent, StageState> { - - @Override - public StageState transition(Stage stage, StageEvent stageEvent) { - // TODO - Commit Stage - // TODO - records succeeded, failed, killed completed task - // TODO - records metrics - try { - LOG.info(String.format("Stage completed - %s (total=%d, success=%d, killed=%d)", - stage.getId().toString(), - stage.getTotalScheduledObjectsCount(), - stage.getSucceededObjectCount(), - stage.killedObjectCount)); - - if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) { - if (stage.failedObjectCount > 0) { - stage.abort(StageState.FAILED); - return StageState.FAILED; - } else if (stage.killedObjectCount > 0) { - stage.abort(StageState.KILLED); - return StageState.KILLED; - } else { - LOG.error("Invalid State " + stage.getSynchronizedState() + " State"); - stage.abort(StageState.ERROR); - return StageState.ERROR; - } - } else { - stage.complete(); - return StageState.SUCCEEDED; - } - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - stage.abort(StageState.ERROR); - return StageState.ERROR; - } - } - } - - private static class DiagnosticsUpdateTransition implements SingleArcTransition<Stage, StageEvent> { - @Override - public void transition(Stage stage, StageEvent event) { - stage.addDiagnostic(((StageDiagnosticsUpdateEvent) event).getDiagnosticUpdate()); - } - } - - private static class InternalErrorTransition implements SingleArcTransition<Stage, StageEvent> { - @Override - public void transition(Stage stage, StageEvent stageEvent) { - stage.abort(StageState.ERROR); - } - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java deleted file mode 100644 index 82a06fe..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -public enum StageState { - NEW, - INITED, - RUNNING, - SUCCEEDED, - FAILED, - KILL_WAIT, - KILLED, - ERROR -} http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java deleted file mode 100644 index 5475791..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java +++ /dev/null @@ -1,907 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.querymaster; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Objects; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.state.*; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TaskAttemptId; -import org.apache.tajo.TaskId; -import org.apache.tajo.TajoProtos.TaskAttemptState; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto; -import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; -import org.apache.tajo.master.FragmentPair; -import org.apache.tajo.master.TaskState; -import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; -import org.apache.tajo.plan.logical.*; -import org.apache.tajo.storage.DataLocation; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.util.Pair; -import org.apache.tajo.util.TajoIdUtils; -import org.apache.tajo.util.history.TaskHistory; -import org.apache.tajo.worker.FetchImpl; - -import java.net.URI; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; - -public class Task implements EventHandler<TaskEvent> { - /** Class Logger */ - private static final Log LOG = LogFactory.getLog(Task.class); - - private final Configuration systemConf; - private TaskId taskId; - private EventHandler eventHandler; - private StoreTableNode store = null; - private LogicalNode plan = null; - private List<ScanNode> scan; - - private Map<String, Set<FragmentProto>> fragMap; - private Map<String, Set<FetchImpl>> fetchMap; - - private int totalFragmentNum; - - private List<ShuffleFileOutput> shuffleFileOutputs; - private TableStats stats; - private final boolean isLeafTask; - private List<IntermediateEntry> intermediateData; - - private Map<TaskAttemptId, TaskAttempt> attempts; - private final int maxAttempts = 3; - private Integer nextAttempt = -1; - private TaskAttemptId lastAttemptId; - - private TaskAttemptId successfulAttempt; - private String succeededHost; - private int succeededHostPort; - private int succeededPullServerPort; - - private int failedAttempts; - private int finishedAttempts; // finish are total of success, failed and killed - - private long launchTime; - private long finishTime; - - private List<DataLocation> dataLocations = Lists.newArrayList(); - - private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition(); - - private TaskHistory finalTaskHistory; - - protected static final StateMachineFactory - <Task, TaskState, TaskEventType, TaskEvent> stateMachineFactory = - new StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent>(TaskState.NEW) - - // Transitions from NEW state - .addTransition(TaskState.NEW, TaskState.SCHEDULED, - TaskEventType.T_SCHEDULE, - new InitialScheduleTransition()) - .addTransition(TaskState.NEW, TaskState.KILLED, - TaskEventType.T_KILL, - new KillNewTaskTransition()) - - // Transitions from SCHEDULED state - .addTransition(TaskState.SCHEDULED, TaskState.RUNNING, - TaskEventType.T_ATTEMPT_LAUNCHED, - new AttemptLaunchedTransition()) - .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, - TaskEventType.T_KILL, - new KillTaskTransition()) - - // Transitions from RUNNING state - .addTransition(TaskState.RUNNING, TaskState.RUNNING, - TaskEventType.T_ATTEMPT_LAUNCHED) - .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED, - TaskEventType.T_ATTEMPT_SUCCEEDED, - new AttemptSucceededTransition()) - .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT, - TaskEventType.T_KILL, - new KillTaskTransition()) - .addTransition(TaskState.RUNNING, - EnumSet.of(TaskState.RUNNING, TaskState.FAILED), - TaskEventType.T_ATTEMPT_FAILED, - new AttemptFailedOrRetryTransition()) - - // Transitions from KILL_WAIT state - .addTransition(TaskState.KILL_WAIT, TaskState.KILLED, - TaskEventType.T_ATTEMPT_KILLED, - ATTEMPT_KILLED_TRANSITION) - .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT, - TaskEventType.T_ATTEMPT_LAUNCHED, - new KillTaskTransition()) - .addTransition(TaskState.KILL_WAIT, TaskState.FAILED, - TaskEventType.T_ATTEMPT_FAILED, - new AttemptFailedTransition()) - .addTransition(TaskState.KILL_WAIT, TaskState.KILLED, - TaskEventType.T_ATTEMPT_SUCCEEDED, - ATTEMPT_KILLED_TRANSITION) - // Ignore-able transitions. - .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT, - EnumSet.of( - TaskEventType.T_KILL, - TaskEventType.T_SCHEDULE)) - - // Transitions from SUCCEEDED state - // Ignore-able transitions - .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED, - EnumSet.of(TaskEventType.T_KILL, - TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) - - // Transitions from FAILED state - // Ignore-able transitions - .addTransition(TaskState.FAILED, TaskState.FAILED, - EnumSet.of(TaskEventType.T_KILL, - TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) - - // Transitions from KILLED state - .addTransition(TaskState.KILLED, TaskState.KILLED, TaskEventType.T_ATTEMPT_KILLED, new KillTaskTransition()) - // Ignore-able transitions - .addTransition(TaskState.KILLED, TaskState.KILLED, - EnumSet.of( - TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) - - .installTopology(); - - private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine; - - - private final Lock readLock; - private final Lock writeLock; - private TaskAttemptScheduleContext scheduleContext; - - public Task(Configuration conf, TaskAttemptScheduleContext scheduleContext, - TaskId id, boolean isLeafTask, EventHandler eventHandler) { - this.systemConf = conf; - this.taskId = id; - this.eventHandler = eventHandler; - this.isLeafTask = isLeafTask; - scan = new ArrayList<ScanNode>(); - fetchMap = Maps.newHashMap(); - fragMap = Maps.newHashMap(); - shuffleFileOutputs = new ArrayList<ShuffleFileOutput>(); - attempts = Collections.emptyMap(); - lastAttemptId = null; - nextAttempt = -1; - failedAttempts = 0; - - ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - this.readLock = readWriteLock.readLock(); - this.writeLock = readWriteLock.writeLock(); - this.scheduleContext = scheduleContext; - - stateMachine = stateMachineFactory.make(this); - totalFragmentNum = 0; - } - - public boolean isLeafTask() { - return this.isLeafTask; - } - - public TaskState getState() { - readLock.lock(); - try { - return stateMachine.getCurrentState(); - } finally { - readLock.unlock(); - } - } - - public TaskAttemptState getLastAttemptStatus() { - TaskAttempt lastAttempt = getLastAttempt(); - if (lastAttempt != null) { - return lastAttempt.getState(); - } else { - return TaskAttemptState.TA_ASSIGNED; - } - } - - public TaskHistory getTaskHistory() { - if (finalTaskHistory != null) { - if (finalTaskHistory.getFinishTime() == 0) { - finalTaskHistory = makeTaskHistory(); - } - return finalTaskHistory; - } else { - return makeTaskHistory(); - } - } - - private TaskHistory makeTaskHistory() { - TaskHistory taskHistory = new TaskHistory(); - - TaskAttempt lastAttempt = getLastAttempt(); - if (lastAttempt != null) { - taskHistory.setId(lastAttempt.getId().toString()); - taskHistory.setState(lastAttempt.getState().toString()); - taskHistory.setProgress(lastAttempt.getProgress()); - } - taskHistory.setHostAndPort(succeededHost + ":" + succeededHostPort); - taskHistory.setRetryCount(this.getRetryCount()); - taskHistory.setLaunchTime(launchTime); - taskHistory.setFinishTime(finishTime); - - taskHistory.setNumShuffles(getShuffleOutpuNum()); - if (!getShuffleFileOutputs().isEmpty()) { - ShuffleFileOutput shuffleFileOutputs = getShuffleFileOutputs().get(0); - if (taskHistory.getNumShuffles() > 0) { - taskHistory.setShuffleKey("" + shuffleFileOutputs.getPartId()); - taskHistory.setShuffleFileName(shuffleFileOutputs.getFileName()); - } - } - - List<String> fragmentList = new ArrayList<String>(); - for (FragmentProto eachFragment : getAllFragments()) { - try { - Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment); - fragmentList.add(fragment.toString()); - } catch (Exception e) { - LOG.error(e.getMessage()); - fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage()); - } - } - taskHistory.setFragments(fragmentList.toArray(new String[]{})); - - List<String[]> fetchList = new ArrayList<String[]>(); - for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) { - for (FetchImpl f : e.getValue()) { - for (URI uri : f.getSimpleURIs()){ - fetchList.add(new String[] {e.getKey(), uri.toString()}); - } - } - } - - taskHistory.setFetchs(fetchList.toArray(new String[][]{})); - - List<String> dataLocationList = new ArrayList<String>(); - for(DataLocation eachLocation: getDataLocations()) { - dataLocationList.add(eachLocation.toString()); - } - - taskHistory.setDataLocations(dataLocationList.toArray(new String[]{})); - return taskHistory; - } - - public void setLogicalPlan(LogicalNode plan) { - this.plan = plan; - - LogicalNode node = plan; - ArrayList<LogicalNode> s = new ArrayList<LogicalNode>(); - s.add(node); - while (!s.isEmpty()) { - node = s.remove(s.size()-1); - if (node instanceof UnaryNode) { - UnaryNode unary = (UnaryNode) node; - s.add(s.size(), unary.getChild()); - } else if (node instanceof BinaryNode) { - BinaryNode binary = (BinaryNode) node; - s.add(s.size(), binary.getLeftChild()); - s.add(s.size(), binary.getRightChild()); - } else if (node instanceof ScanNode) { - scan.add((ScanNode)node); - } else if (node instanceof TableSubQueryNode) { - s.add(((TableSubQueryNode) node).getSubQuery()); - } - } - } - - private void addDataLocation(Fragment fragment) { - String[] hosts = fragment.getHosts(); - int[] diskIds = null; - if (fragment instanceof FileFragment) { - diskIds = ((FileFragment)fragment).getDiskIds(); - } - for (int i = 0; i < hosts.length; i++) { - dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i])); - } - } - - public void addFragment(Fragment fragment, boolean useDataLocation) { - Set<FragmentProto> fragmentProtos; - if (fragMap.containsKey(fragment.getTableName())) { - fragmentProtos = fragMap.get(fragment.getTableName()); - } else { - fragmentProtos = new HashSet<FragmentProto>(); - fragMap.put(fragment.getTableName(), fragmentProtos); - } - fragmentProtos.add(fragment.getProto()); - if (useDataLocation) { - addDataLocation(fragment); - } - totalFragmentNum++; - } - - public void addFragments(Collection<Fragment> fragments) { - for (Fragment eachFragment: fragments) { - addFragment(eachFragment, false); - } - } - - public void setFragment(FragmentPair[] fragmentPairs) { - for (FragmentPair eachFragmentPair : fragmentPairs) { - this.addFragment(eachFragmentPair.getLeftFragment(), true); - if (eachFragmentPair.getRightFragment() != null) { - this.addFragment(eachFragmentPair.getRightFragment(), true); - } - } - } - - public List<DataLocation> getDataLocations() { - return dataLocations; - } - - public String getSucceededHost() { - return succeededHost; - } - - public void addFetches(String tableId, Collection<FetchImpl> fetches) { - Set<FetchImpl> fetchSet; - if (fetchMap.containsKey(tableId)) { - fetchSet = fetchMap.get(tableId); - } else { - fetchSet = Sets.newHashSet(); - } - fetchSet.addAll(fetches); - fetchMap.put(tableId, fetchSet); - } - - public void setFetches(Map<String, Set<FetchImpl>> fetches) { - this.fetchMap.clear(); - this.fetchMap.putAll(fetches); - } - - public Collection<FragmentProto> getAllFragments() { - Set<FragmentProto> fragmentProtos = new HashSet<FragmentProto>(); - for (Set<FragmentProto> eachFragmentSet : fragMap.values()) { - fragmentProtos.addAll(eachFragmentSet); - } - return fragmentProtos; - } - - public LogicalNode getLogicalPlan() { - return this.plan; - } - - public TaskId getId() { - return taskId; - } - - public Collection<FetchImpl> getFetchHosts(String tableId) { - return fetchMap.get(tableId); - } - - public Collection<Set<FetchImpl>> getFetches() { - return fetchMap.values(); - } - - public Map<String, Set<FetchImpl>> getFetchMap() { - return fetchMap; - } - - public Collection<FetchImpl> getFetch(ScanNode scan) { - return this.fetchMap.get(scan.getTableName()); - } - - public ScanNode[] getScanNodes() { - return this.scan.toArray(new ScanNode[scan.size()]); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append(plan.getType() + " \n"); - for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) { - builder.append(e.getKey()).append(" : "); - for (FragmentProto fragment : e.getValue()) { - builder.append(fragment).append(", "); - } - } - for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) { - builder.append(e.getKey()).append(" : "); - for (FetchImpl t : e.getValue()) { - for (URI uri : t.getURIs()){ - builder.append(uri).append(" "); - } - } - } - - return builder.toString(); - } - - public void setStats(TableStats stats) { - this.stats = stats; - } - - public void setShuffleFileOutputs(List<ShuffleFileOutput> partitions) { - this.shuffleFileOutputs = Collections.unmodifiableList(partitions); - } - - public TableStats getStats() { - return this.stats; - } - - public List<ShuffleFileOutput> getShuffleFileOutputs() { - return this.shuffleFileOutputs; - } - - public int getShuffleOutpuNum() { - return this.shuffleFileOutputs.size(); - } - - public TaskAttempt newAttempt() { - TaskAttempt attempt = new TaskAttempt(scheduleContext, - QueryIdFactory.newTaskAttemptId(this.getId(), ++nextAttempt), - this, eventHandler); - lastAttemptId = attempt.getId(); - return attempt; - } - - public TaskAttempt getAttempt(TaskAttemptId attemptId) { - return attempts.get(attemptId); - } - - public TaskAttempt getAttempt(int attempt) { - return this.attempts.get(QueryIdFactory.newTaskAttemptId(this.getId(), attempt)); - } - - public TaskAttempt getLastAttempt() { - return getAttempt(this.lastAttemptId); - } - - public TaskAttempt getSuccessfulAttempt() { - readLock.lock(); - try { - if (null == successfulAttempt) { - return null; - } - return attempts.get(successfulAttempt); - } finally { - readLock.unlock(); - } - } - - public int getRetryCount () { - return this.nextAttempt; - } - - public int getTotalFragmentNum() { - return totalFragmentNum; - } - - private static class InitialScheduleTransition implements - SingleArcTransition<Task, TaskEvent> { - - @Override - public void transition(Task task, TaskEvent taskEvent) { - task.addAndScheduleAttempt(); - } - } - - public long getLaunchTime() { - return launchTime; - } - - public long getFinishTime() { - return finishTime; - } - - @VisibleForTesting - public void setLaunchTime(long launchTime) { - this.launchTime = launchTime; - } - - @VisibleForTesting - public void setFinishTime(long finishTime) { - this.finishTime = finishTime; - } - - public long getRunningTime() { - if(finishTime > 0) { - return finishTime - launchTime; - } else { - return System.currentTimeMillis() - launchTime; - } - } - - // This is always called in the Write Lock - private void addAndScheduleAttempt() { - // Create new task attempt - TaskAttempt attempt = newAttempt(); - if (LOG.isDebugEnabled()) { - LOG.debug("Created attempt " + attempt.getId()); - } - switch (attempts.size()) { - case 0: - attempts = Collections.singletonMap(attempt.getId(), attempt); - break; - - case 1: - Map<TaskAttemptId, TaskAttempt> newAttempts - = new LinkedHashMap<TaskAttemptId, TaskAttempt>(3); - newAttempts.putAll(attempts); - attempts = newAttempts; - attempts.put(attempt.getId(), attempt); - break; - - default: - attempts.put(attempt.getId(), attempt); - break; - } - - if (failedAttempts > 0) { - eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(), - TaskAttemptEventType.TA_RESCHEDULE)); - } else { - eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(), - TaskAttemptEventType.TA_SCHEDULE)); - } - } - - private void finishTask() { - this.finishTime = System.currentTimeMillis(); - finalTaskHistory = makeTaskHistory(); - } - - private static class KillNewTaskTransition implements SingleArcTransition<Task, TaskEvent> { - - @Override - public void transition(Task task, TaskEvent taskEvent) { - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.KILLED)); - } - } - - private static class KillTaskTransition implements SingleArcTransition<Task, TaskEvent> { - - @Override - public void transition(Task task, TaskEvent taskEvent) { - task.finishTask(); - task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL)); - } - } - - private static class AttemptKilledTransition implements SingleArcTransition<Task, TaskEvent>{ - - @Override - public void transition(Task task, TaskEvent event) { - task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.KILLED)); - } - } - - private static class AttemptSucceededTransition - implements SingleArcTransition<Task, TaskEvent>{ - - @Override - public void transition(Task task, - TaskEvent event) { - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; - TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); - - task.successfulAttempt = attemptEvent.getTaskAttemptId(); - task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); - task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); - task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort(); - - task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(event.getTaskId(), TaskState.SUCCEEDED)); - } - } - - private static class AttemptLaunchedTransition implements SingleArcTransition<Task, TaskEvent> { - @Override - public void transition(Task task, - TaskEvent event) { - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; - TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); - task.launchTime = System.currentTimeMillis(); - task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); - task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); - } - } - - private static class AttemptFailedTransition implements SingleArcTransition<Task, TaskEvent> { - @Override - public void transition(Task task, TaskEvent event) { - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; - LOG.info("============================================================="); - LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<"); - LOG.info("============================================================="); - task.failedAttempts++; - task.finishedAttempts++; - - task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED)); - } - } - - private static class AttemptFailedOrRetryTransition implements - MultipleArcTransition<Task, TaskEvent, TaskState> { - - @Override - public TaskState transition(Task task, TaskEvent taskEvent) { - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent; - task.failedAttempts++; - task.finishedAttempts++; - boolean retry = task.failedAttempts < task.maxAttempts; - - LOG.info("===================================================================================="); - LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + ", " + - "retry:" + retry + ", attempts:" + task.failedAttempts + " <<<"); - LOG.info("===================================================================================="); - - if (retry) { - if (task.successfulAttempt == null) { - task.addAndScheduleAttempt(); - } - } else { - task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED)); - return TaskState.FAILED; - } - - return task.getState(); - } - } - - @Override - public void handle(TaskEvent event) { - if (LOG.isDebugEnabled()) { - LOG.debug("Processing " + event.getTaskId() + " of type " - + event.getType()); - } - - try { - writeLock.lock(); - TaskState oldState = getState(); - try { - stateMachine.doTransition(event.getType(), event); - } catch (InvalidStateTransitonException e) { - LOG.error("Can't handle this event at current state" - + ", eventType:" + event.getType().name() - + ", oldState:" + oldState.name() - + ", nextState:" + getState().name() - , e); - eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()), - QueryEventType.INTERNAL_ERROR)); - } - - //notify the eventhandler of state change - if (LOG.isDebugEnabled()) { - if (oldState != getState()) { - LOG.debug(taskId + " Task Transitioned from " + oldState + " to " - + getState()); - } - } - } - - finally { - writeLock.unlock(); - } - } - - public void setIntermediateData(Collection<IntermediateEntry> partitions) { - this.intermediateData = new ArrayList<IntermediateEntry>(partitions); - } - - public List<IntermediateEntry> getIntermediateData() { - return this.intermediateData; - } - - public static class PullHost implements Cloneable { - String host; - int port; - int hashCode; - - public PullHost(String pullServerAddr, int pullServerPort){ - this.host = pullServerAddr; - this.port = pullServerPort; - this.hashCode = Objects.hashCode(host, port); - } - public String getHost() { - return host; - } - - public int getPort() { - return this.port; - } - - public String getPullAddress() { - return host + ":" + port; - } - - @Override - public int hashCode() { - return hashCode; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof PullHost) { - PullHost other = (PullHost) obj; - return host.equals(other.host) && port == other.port; - } - - return false; - } - - @Override - public PullHost clone() throws CloneNotSupportedException { - PullHost newPullHost = (PullHost) super.clone(); - newPullHost.host = host; - newPullHost.port = port; - newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port); - return newPullHost; - } - - @Override - public String toString() { - return host + ":" + port; - } - } - - public static class IntermediateEntry { - ExecutionBlockId ebId; - int taskId; - int attemptId; - int partId; - PullHost host; - long volume; - List<Pair<Long, Integer>> pages; - List<Pair<Long, Pair<Integer, Integer>>> failureRowNums; - - public IntermediateEntry(IntermediateEntryProto proto) { - this.ebId = new ExecutionBlockId(proto.getEbId()); - this.taskId = proto.getTaskId(); - this.attemptId = proto.getAttemptId(); - this.partId = proto.getPartId(); - - String[] pullHost = proto.getHost().split(":"); - this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1])); - this.volume = proto.getVolume(); - - failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); - for (FailureIntermediateProto eachFailure: proto.getFailuresList()) { - - failureRowNums.add(new Pair(eachFailure.getPagePos(), - new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum()))); - } - - pages = new ArrayList<Pair<Long, Integer>>(); - for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) { - pages.add(new Pair(eachPage.getPos(), eachPage.getLength())); - } - } - - public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) { - this.taskId = taskId; - this.attemptId = attemptId; - this.partId = partId; - this.host = host; - } - - public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host, long volume) { - this.taskId = taskId; - this.attemptId = attemptId; - this.partId = partId; - this.host = host; - this.volume = volume; - } - - public ExecutionBlockId getEbId() { - return ebId; - } - - public void setEbId(ExecutionBlockId ebId) { - this.ebId = ebId; - } - - public int getTaskId() { - return this.taskId; - } - - public int getAttemptId() { - return this.attemptId; - } - - public int getPartId() { - return this.partId; - } - - public PullHost getPullHost() { - return this.host; - } - - public long getVolume() { - return this.volume; - } - - public long setVolume(long volume) { - return this.volume = volume; - } - - public List<Pair<Long, Integer>> getPages() { - return pages; - } - - public void setPages(List<Pair<Long, Integer>> pages) { - this.pages = pages; - } - - public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() { - return failureRowNums; - } - - @Override - public int hashCode() { - return Objects.hashCode(ebId, taskId, partId, attemptId, host); - } - - public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) { - List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>(); - - if (pages == null || pages.isEmpty()) { - return splits; - } - int pageSize = pages.size(); - - long currentOffset = -1; - long currentBytes = 0; - - long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume; - for (int i = 0; i < pageSize; i++) { - Pair<Long, Integer> eachPage = pages.get(i); - if (currentOffset == -1) { - currentOffset = eachPage.getFirst(); - } - if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) { - splits.add(new Pair(currentOffset, currentBytes)); - currentOffset = eachPage.getFirst(); - currentBytes = 0; - realSplitVolume = splitVolume; - } - - currentBytes += eachPage.getSecond(); - } - - //add last - if (currentBytes > 0) { - splits.add(new Pair(currentOffset, currentBytes)); - } - return splits; - } - } -}
