http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 7f05fa4..d4cc6e7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -23,8 +23,6 @@ import com.google.common.collect.Lists; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.event.Event; @@ -55,8 +53,8 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; import org.apache.tajo.master.*; import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; import org.apache.tajo.master.event.*; -import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext; -import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; +import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; +import org.apache.tajo.master.querymaster.Task.IntermediateEntry; import org.apache.tajo.master.container.TajoContainer; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.FileStorageManager; @@ -66,7 +64,7 @@ import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.history.QueryUnitHistory; +import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.util.history.SubQueryHistory; import org.apache.tajo.worker.FetchImpl; @@ -105,7 +103,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private long startTime; private long finishTime; - volatile Map<QueryUnitId, QueryUnit> tasks = new ConcurrentHashMap<QueryUnitId, QueryUnit>(); + volatile Map<TaskId, Task> tasks = new ConcurrentHashMap<TaskId, Task>(); volatile Map<TajoContainerId, TajoContainer> containers = new ConcurrentHashMap<TajoContainerId, TajoContainer>(); @@ -355,22 +353,22 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } public float getProgress() { - List<QueryUnit> tempTasks = null; + List<Task> tempTasks = null; readLock.lock(); try { if (getState() == SubQueryState.NEW) { return 0.0f; } else { - tempTasks = new ArrayList<QueryUnit>(tasks.values()); + tempTasks = new ArrayList<Task>(tasks.values()); } } finally { readLock.unlock(); } float totalProgress = 0.0f; - for (QueryUnit eachQueryUnit: tempTasks) { - if (eachQueryUnit.getLastAttempt() != null) { - totalProgress += eachQueryUnit.getLastAttempt().getProgress(); + for (Task eachTask : tempTasks) { + if (eachTask.getLastAttempt() != null) { + totalProgress += eachTask.getLastAttempt().getProgress(); } } @@ -393,7 +391,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { return block; } - public void addTask(QueryUnit task) { + public void addTask(Task task) { tasks.put(task.getId(), task); } @@ -401,7 +399,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { if (finalSubQueryHistory != null) { if (finalSubQueryHistory.getFinishTime() == 0) { finalSubQueryHistory = makeSubQueryHistory(); - finalSubQueryHistory.setQueryUnits(makeQueryUnitHistories()); + finalSubQueryHistory.setTasks(makeTaskHistories()); } return finalSubQueryHistory; } else { @@ -409,14 +407,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } } - private List<QueryUnitHistory> makeQueryUnitHistories() { - List<QueryUnitHistory> queryUnitHistories = new ArrayList<QueryUnitHistory>(); + private List<TaskHistory> makeTaskHistories() { + List<TaskHistory> taskHistories = new ArrayList<TaskHistory>(); - for(QueryUnit eachQueryUnit: getQueryUnits()) { - queryUnitHistories.add(eachQueryUnit.getQueryUnitHistory()); + for(Task eachTask : getTasks()) { + taskHistories.add(eachTask.getTaskHistory()); } - return queryUnitHistories; + return taskHistories; } private SubQueryHistory makeSubQueryHistory() { @@ -440,16 +438,16 @@ public class SubQuery implements EventHandler<SubQueryEvent> { long totalWriteBytes = 0; long totalWriteRows = 0; int numShuffles = 0; - for(QueryUnit eachQueryUnit: getQueryUnits()) { - numShuffles = eachQueryUnit.getShuffleOutpuNum(); - if (eachQueryUnit.getLastAttempt() != null) { - TableStats inputStats = eachQueryUnit.getLastAttempt().getInputStats(); + for(Task eachTask : getTasks()) { + numShuffles = eachTask.getShuffleOutpuNum(); + if (eachTask.getLastAttempt() != null) { + TableStats inputStats = eachTask.getLastAttempt().getInputStats(); if (inputStats != null) { totalInputBytes += inputStats.getNumBytes(); totalReadBytes += inputStats.getReadBytes(); totalReadRows += inputStats.getNumRows(); } - TableStats outputStats = eachQueryUnit.getLastAttempt().getResultStats(); + TableStats outputStats = eachTask.getLastAttempt().getResultStats(); if (outputStats != null) { totalWriteBytes += outputStats.getNumBytes(); totalWriteRows += outputStats.getNumRows(); @@ -511,11 +509,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> { return block.getId(); } - public QueryUnit[] getQueryUnits() { - return tasks.values().toArray(new QueryUnit[tasks.size()]); + public Task[] getTasks() { + return tasks.values().toArray(new Task[tasks.size()]); } - public QueryUnit getQueryUnit(QueryUnitId qid) { + public Task getTask(TaskId qid) { return tasks.get(qid); } @@ -635,7 +633,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { private TableStats[] computeStatFromTasks() { List<TableStats> inputStatsList = Lists.newArrayList(); List<TableStats> resultStatsList = Lists.newArrayList(); - for (QueryUnit unit : getQueryUnits()) { + for (Task unit : getTasks()) { resultStatsList.add(unit.getStats()); if (unit.getLastAttempt().getInputStats() != null) { inputStatsList.add(unit.getLastAttempt().getInputStats()); @@ -1100,13 +1098,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> { subQuery.getId(), fetches)); } - public static QueryUnit newEmptyQueryUnit(TaskSchedulerContext schedulerContext, - QueryUnitAttemptScheduleContext queryUnitContext, - SubQuery subQuery, int taskId) { + public static Task newEmptyTask(TaskSchedulerContext schedulerContext, + TaskAttemptScheduleContext taskContext, + SubQuery subQuery, int taskId) { ExecutionBlock execBlock = subQuery.getBlock(); - QueryUnit unit = new QueryUnit(schedulerContext.getMasterContext().getConf(), - queryUnitContext, - QueryIdFactory.newQueryUnitId(schedulerContext.getBlockId(), taskId), + Task unit = new Task(schedulerContext.getMasterContext().getConf(), + taskContext, + QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId), schedulerContext.isLeafQuery(), subQuery.eventHandler); unit.setLogicalPlan(execBlock.getPlan()); subQuery.addTask(unit); @@ -1176,7 +1174,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { public void transition(SubQuery subQuery, SubQueryEvent event) { SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event; - QueryUnit task = subQuery.getQueryUnit(taskEvent.getTaskId()); + Task task = subQuery.getTask(taskEvent.getTaskId()); if (task == null) { // task failed LOG.error(String.format("Task %s is absent", taskEvent.getTaskId())); @@ -1217,8 +1215,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> { subQuery.getTaskScheduler().stop(); } - for (QueryUnit queryUnit : subQuery.getQueryUnits()) { - subQuery.eventHandler.handle(new TaskEvent(queryUnit.getId(), TaskEventType.T_KILL)); + for (Task task : subQuery.getTasks()) { + subQuery.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL)); } } } @@ -1239,7 +1237,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { } this.finalSubQueryHistory = makeSubQueryHistory(); - this.finalSubQueryHistory.setQueryUnits(makeQueryUnitHistories()); + this.finalSubQueryHistory.setTasks(makeTaskHistories()); } public List<IntermediateEntry> getHashShuffleIntermediateEntries() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java new file mode 100644 index 0000000..7de3933 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java @@ -0,0 +1,907 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.querymaster; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.state.*; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TaskId; +import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.ipc.TajoWorkerProtocol.FailureIntermediateProto; +import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; +import org.apache.tajo.master.FragmentPair; +import org.apache.tajo.master.TaskState; +import org.apache.tajo.master.event.*; +import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.storage.DataLocation; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TajoIdUtils; +import org.apache.tajo.util.history.TaskHistory; +import org.apache.tajo.worker.FetchImpl; + +import java.net.URI; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; + +public class Task implements EventHandler<TaskEvent> { + /** Class Logger */ + private static final Log LOG = LogFactory.getLog(Task.class); + + private final Configuration systemConf; + private TaskId taskId; + private EventHandler eventHandler; + private StoreTableNode store = null; + private LogicalNode plan = null; + private List<ScanNode> scan; + + private Map<String, Set<FragmentProto>> fragMap; + private Map<String, Set<FetchImpl>> fetchMap; + + private int totalFragmentNum; + + private List<ShuffleFileOutput> shuffleFileOutputs; + private TableStats stats; + private final boolean isLeafTask; + private List<IntermediateEntry> intermediateData; + + private Map<TaskAttemptId, TaskAttempt> attempts; + private final int maxAttempts = 3; + private Integer nextAttempt = -1; + private TaskAttemptId lastAttemptId; + + private TaskAttemptId successfulAttempt; + private String succeededHost; + private int succeededHostPort; + private int succeededPullServerPort; + + private int failedAttempts; + private int finishedAttempts; // finish are total of success, failed and killed + + private long launchTime; + private long finishTime; + + private List<DataLocation> dataLocations = Lists.newArrayList(); + + private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition(); + + private TaskHistory finalTaskHistory; + + protected static final StateMachineFactory + <Task, TaskState, TaskEventType, TaskEvent> stateMachineFactory = + new StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent>(TaskState.NEW) + + // Transitions from NEW state + .addTransition(TaskState.NEW, TaskState.SCHEDULED, + TaskEventType.T_SCHEDULE, + new InitialScheduleTransition()) + .addTransition(TaskState.NEW, TaskState.KILLED, + TaskEventType.T_KILL, + new KillNewTaskTransition()) + + // Transitions from SCHEDULED state + .addTransition(TaskState.SCHEDULED, TaskState.RUNNING, + TaskEventType.T_ATTEMPT_LAUNCHED, + new AttemptLaunchedTransition()) + .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, + TaskEventType.T_KILL, + new KillTaskTransition()) + + // Transitions from RUNNING state + .addTransition(TaskState.RUNNING, TaskState.RUNNING, + TaskEventType.T_ATTEMPT_LAUNCHED) + .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED, + TaskEventType.T_ATTEMPT_SUCCEEDED, + new AttemptSucceededTransition()) + .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT, + TaskEventType.T_KILL, + new KillTaskTransition()) + .addTransition(TaskState.RUNNING, + EnumSet.of(TaskState.RUNNING, TaskState.FAILED), + TaskEventType.T_ATTEMPT_FAILED, + new AttemptFailedOrRetryTransition()) + + // Transitions from KILL_WAIT state + .addTransition(TaskState.KILL_WAIT, TaskState.KILLED, + TaskEventType.T_ATTEMPT_KILLED, + ATTEMPT_KILLED_TRANSITION) + .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT, + TaskEventType.T_ATTEMPT_LAUNCHED, + new KillTaskTransition()) + .addTransition(TaskState.KILL_WAIT, TaskState.FAILED, + TaskEventType.T_ATTEMPT_FAILED, + new AttemptFailedTransition()) + .addTransition(TaskState.KILL_WAIT, TaskState.KILLED, + TaskEventType.T_ATTEMPT_SUCCEEDED, + ATTEMPT_KILLED_TRANSITION) + // Ignore-able transitions. + .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT, + EnumSet.of( + TaskEventType.T_KILL, + TaskEventType.T_SCHEDULE)) + + // Transitions from SUCCEEDED state + // Ignore-able transitions + .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED, + EnumSet.of(TaskEventType.T_KILL, + TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) + + // Transitions from FAILED state + // Ignore-able transitions + .addTransition(TaskState.FAILED, TaskState.FAILED, + EnumSet.of(TaskEventType.T_KILL, + TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) + + // Transitions from KILLED state + .addTransition(TaskState.KILLED, TaskState.KILLED, TaskEventType.T_ATTEMPT_KILLED, new KillTaskTransition()) + // Ignore-able transitions + .addTransition(TaskState.KILLED, TaskState.KILLED, + EnumSet.of( + TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) + + .installTopology(); + + private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine; + + + private final Lock readLock; + private final Lock writeLock; + private TaskAttemptScheduleContext scheduleContext; + + public Task(Configuration conf, TaskAttemptScheduleContext scheduleContext, + TaskId id, boolean isLeafTask, EventHandler eventHandler) { + this.systemConf = conf; + this.taskId = id; + this.eventHandler = eventHandler; + this.isLeafTask = isLeafTask; + scan = new ArrayList<ScanNode>(); + fetchMap = Maps.newHashMap(); + fragMap = Maps.newHashMap(); + shuffleFileOutputs = new ArrayList<ShuffleFileOutput>(); + attempts = Collections.emptyMap(); + lastAttemptId = null; + nextAttempt = -1; + failedAttempts = 0; + + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + this.readLock = readWriteLock.readLock(); + this.writeLock = readWriteLock.writeLock(); + this.scheduleContext = scheduleContext; + + stateMachine = stateMachineFactory.make(this); + totalFragmentNum = 0; + } + + public boolean isLeafTask() { + return this.isLeafTask; + } + + public TaskState getState() { + readLock.lock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } + + public TaskAttemptState getLastAttemptStatus() { + TaskAttempt lastAttempt = getLastAttempt(); + if (lastAttempt != null) { + return lastAttempt.getState(); + } else { + return TaskAttemptState.TA_ASSIGNED; + } + } + + public TaskHistory getTaskHistory() { + if (finalTaskHistory != null) { + if (finalTaskHistory.getFinishTime() == 0) { + finalTaskHistory = makeTaskHistory(); + } + return finalTaskHistory; + } else { + return makeTaskHistory(); + } + } + + private TaskHistory makeTaskHistory() { + TaskHistory taskHistory = new TaskHistory(); + + TaskAttempt lastAttempt = getLastAttempt(); + if (lastAttempt != null) { + taskHistory.setId(lastAttempt.getId().toString()); + taskHistory.setState(lastAttempt.getState().toString()); + taskHistory.setProgress(lastAttempt.getProgress()); + } + taskHistory.setHostAndPort(succeededHost + ":" + succeededHostPort); + taskHistory.setRetryCount(this.getRetryCount()); + taskHistory.setLaunchTime(launchTime); + taskHistory.setFinishTime(finishTime); + + taskHistory.setNumShuffles(getShuffleOutpuNum()); + if (!getShuffleFileOutputs().isEmpty()) { + ShuffleFileOutput shuffleFileOutputs = getShuffleFileOutputs().get(0); + if (taskHistory.getNumShuffles() > 0) { + taskHistory.setShuffleKey("" + shuffleFileOutputs.getPartId()); + taskHistory.setShuffleFileName(shuffleFileOutputs.getFileName()); + } + } + + List<String> fragmentList = new ArrayList<String>(); + for (FragmentProto eachFragment : getAllFragments()) { + try { + Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment); + fragmentList.add(fragment.toString()); + } catch (Exception e) { + LOG.error(e.getMessage()); + fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage()); + } + } + taskHistory.setFragments(fragmentList.toArray(new String[]{})); + + List<String[]> fetchList = new ArrayList<String[]>(); + for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) { + for (FetchImpl f : e.getValue()) { + for (URI uri : f.getSimpleURIs()){ + fetchList.add(new String[] {e.getKey(), uri.toString()}); + } + } + } + + taskHistory.setFetchs(fetchList.toArray(new String[][]{})); + + List<String> dataLocationList = new ArrayList<String>(); + for(DataLocation eachLocation: getDataLocations()) { + dataLocationList.add(eachLocation.toString()); + } + + taskHistory.setDataLocations(dataLocationList.toArray(new String[]{})); + return taskHistory; + } + + public void setLogicalPlan(LogicalNode plan) { + this.plan = plan; + + LogicalNode node = plan; + ArrayList<LogicalNode> s = new ArrayList<LogicalNode>(); + s.add(node); + while (!s.isEmpty()) { + node = s.remove(s.size()-1); + if (node instanceof UnaryNode) { + UnaryNode unary = (UnaryNode) node; + s.add(s.size(), unary.getChild()); + } else if (node instanceof BinaryNode) { + BinaryNode binary = (BinaryNode) node; + s.add(s.size(), binary.getLeftChild()); + s.add(s.size(), binary.getRightChild()); + } else if (node instanceof ScanNode) { + scan.add((ScanNode)node); + } else if (node instanceof TableSubQueryNode) { + s.add(((TableSubQueryNode) node).getSubQuery()); + } + } + } + + private void addDataLocation(Fragment fragment) { + String[] hosts = fragment.getHosts(); + int[] diskIds = null; + if (fragment instanceof FileFragment) { + diskIds = ((FileFragment)fragment).getDiskIds(); + } + for (int i = 0; i < hosts.length; i++) { + dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i])); + } + } + + public void addFragment(Fragment fragment, boolean useDataLocation) { + Set<FragmentProto> fragmentProtos; + if (fragMap.containsKey(fragment.getTableName())) { + fragmentProtos = fragMap.get(fragment.getTableName()); + } else { + fragmentProtos = new HashSet<FragmentProto>(); + fragMap.put(fragment.getTableName(), fragmentProtos); + } + fragmentProtos.add(fragment.getProto()); + if (useDataLocation) { + addDataLocation(fragment); + } + totalFragmentNum++; + } + + public void addFragments(Collection<Fragment> fragments) { + for (Fragment eachFragment: fragments) { + addFragment(eachFragment, false); + } + } + + public void setFragment(FragmentPair[] fragmentPairs) { + for (FragmentPair eachFragmentPair : fragmentPairs) { + this.addFragment(eachFragmentPair.getLeftFragment(), true); + if (eachFragmentPair.getRightFragment() != null) { + this.addFragment(eachFragmentPair.getRightFragment(), true); + } + } + } + + public List<DataLocation> getDataLocations() { + return dataLocations; + } + + public String getSucceededHost() { + return succeededHost; + } + + public void addFetches(String tableId, Collection<FetchImpl> fetches) { + Set<FetchImpl> fetchSet; + if (fetchMap.containsKey(tableId)) { + fetchSet = fetchMap.get(tableId); + } else { + fetchSet = Sets.newHashSet(); + } + fetchSet.addAll(fetches); + fetchMap.put(tableId, fetchSet); + } + + public void setFetches(Map<String, Set<FetchImpl>> fetches) { + this.fetchMap.clear(); + this.fetchMap.putAll(fetches); + } + + public Collection<FragmentProto> getAllFragments() { + Set<FragmentProto> fragmentProtos = new HashSet<FragmentProto>(); + for (Set<FragmentProto> eachFragmentSet : fragMap.values()) { + fragmentProtos.addAll(eachFragmentSet); + } + return fragmentProtos; + } + + public LogicalNode getLogicalPlan() { + return this.plan; + } + + public TaskId getId() { + return taskId; + } + + public Collection<FetchImpl> getFetchHosts(String tableId) { + return fetchMap.get(tableId); + } + + public Collection<Set<FetchImpl>> getFetches() { + return fetchMap.values(); + } + + public Map<String, Set<FetchImpl>> getFetchMap() { + return fetchMap; + } + + public Collection<FetchImpl> getFetch(ScanNode scan) { + return this.fetchMap.get(scan.getTableName()); + } + + public ScanNode[] getScanNodes() { + return this.scan.toArray(new ScanNode[scan.size()]); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(plan.getType() + " \n"); + for (Entry<String, Set<FragmentProto>> e : fragMap.entrySet()) { + builder.append(e.getKey()).append(" : "); + for (FragmentProto fragment : e.getValue()) { + builder.append(fragment).append(", "); + } + } + for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) { + builder.append(e.getKey()).append(" : "); + for (FetchImpl t : e.getValue()) { + for (URI uri : t.getURIs()){ + builder.append(uri).append(" "); + } + } + } + + return builder.toString(); + } + + public void setStats(TableStats stats) { + this.stats = stats; + } + + public void setShuffleFileOutputs(List<ShuffleFileOutput> partitions) { + this.shuffleFileOutputs = Collections.unmodifiableList(partitions); + } + + public TableStats getStats() { + return this.stats; + } + + public List<ShuffleFileOutput> getShuffleFileOutputs() { + return this.shuffleFileOutputs; + } + + public int getShuffleOutpuNum() { + return this.shuffleFileOutputs.size(); + } + + public TaskAttempt newAttempt() { + TaskAttempt attempt = new TaskAttempt(scheduleContext, + QueryIdFactory.newTaskAttemptId(this.getId(), ++nextAttempt), + this, eventHandler); + lastAttemptId = attempt.getId(); + return attempt; + } + + public TaskAttempt getAttempt(TaskAttemptId attemptId) { + return attempts.get(attemptId); + } + + public TaskAttempt getAttempt(int attempt) { + return this.attempts.get(QueryIdFactory.newTaskAttemptId(this.getId(), attempt)); + } + + public TaskAttempt getLastAttempt() { + return getAttempt(this.lastAttemptId); + } + + public TaskAttempt getSuccessfulAttempt() { + readLock.lock(); + try { + if (null == successfulAttempt) { + return null; + } + return attempts.get(successfulAttempt); + } finally { + readLock.unlock(); + } + } + + public int getRetryCount () { + return this.nextAttempt; + } + + public int getTotalFragmentNum() { + return totalFragmentNum; + } + + private static class InitialScheduleTransition implements + SingleArcTransition<Task, TaskEvent> { + + @Override + public void transition(Task task, TaskEvent taskEvent) { + task.addAndScheduleAttempt(); + } + } + + public long getLaunchTime() { + return launchTime; + } + + public long getFinishTime() { + return finishTime; + } + + @VisibleForTesting + public void setLaunchTime(long launchTime) { + this.launchTime = launchTime; + } + + @VisibleForTesting + public void setFinishTime(long finishTime) { + this.finishTime = finishTime; + } + + public long getRunningTime() { + if(finishTime > 0) { + return finishTime - launchTime; + } else { + return System.currentTimeMillis() - launchTime; + } + } + + // This is always called in the Write Lock + private void addAndScheduleAttempt() { + // Create new task attempt + TaskAttempt attempt = newAttempt(); + if (LOG.isDebugEnabled()) { + LOG.debug("Created attempt " + attempt.getId()); + } + switch (attempts.size()) { + case 0: + attempts = Collections.singletonMap(attempt.getId(), attempt); + break; + + case 1: + Map<TaskAttemptId, TaskAttempt> newAttempts + = new LinkedHashMap<TaskAttemptId, TaskAttempt>(3); + newAttempts.putAll(attempts); + attempts = newAttempts; + attempts.put(attempt.getId(), attempt); + break; + + default: + attempts.put(attempt.getId(), attempt); + break; + } + + if (failedAttempts > 0) { + eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(), + TaskAttemptEventType.TA_RESCHEDULE)); + } else { + eventHandler.handle(new TaskAttemptScheduleEvent(systemConf, attempt.getId(), + TaskAttemptEventType.TA_SCHEDULE)); + } + } + + private void finishTask() { + this.finishTime = System.currentTimeMillis(); + finalTaskHistory = makeTaskHistory(); + } + + private static class KillNewTaskTransition implements SingleArcTransition<Task, TaskEvent> { + + @Override + public void transition(Task task, TaskEvent taskEvent) { + task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED)); + } + } + + private static class KillTaskTransition implements SingleArcTransition<Task, TaskEvent> { + + @Override + public void transition(Task task, TaskEvent taskEvent) { + task.finishTask(); + task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL)); + } + } + + private static class AttemptKilledTransition implements SingleArcTransition<Task, TaskEvent>{ + + @Override + public void transition(Task task, TaskEvent event) { + task.finishTask(); + task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED)); + } + } + + private static class AttemptSucceededTransition + implements SingleArcTransition<Task, TaskEvent>{ + + @Override + public void transition(Task task, + TaskEvent event) { + TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; + TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); + + task.successfulAttempt = attemptEvent.getTaskAttemptId(); + task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); + task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); + task.succeededPullServerPort = attempt.getWorkerConnectionInfo().getPullServerPort(); + + task.finishTask(); + task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED)); + } + } + + private static class AttemptLaunchedTransition implements SingleArcTransition<Task, TaskEvent> { + @Override + public void transition(Task task, + TaskEvent event) { + TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; + TaskAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId()); + task.launchTime = System.currentTimeMillis(); + task.succeededHost = attempt.getWorkerConnectionInfo().getHost(); + task.succeededHostPort = attempt.getWorkerConnectionInfo().getPeerRpcPort(); + } + } + + private static class AttemptFailedTransition implements SingleArcTransition<Task, TaskEvent> { + @Override + public void transition(Task task, TaskEvent event) { + TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; + LOG.info("============================================================="); + LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<"); + LOG.info("============================================================="); + task.failedAttempts++; + task.finishedAttempts++; + + task.finishTask(); + task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED)); + } + } + + private static class AttemptFailedOrRetryTransition implements + MultipleArcTransition<Task, TaskEvent, TaskState> { + + @Override + public TaskState transition(Task task, TaskEvent taskEvent) { + TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent; + task.failedAttempts++; + task.finishedAttempts++; + boolean retry = task.failedAttempts < task.maxAttempts; + + LOG.info("===================================================================================="); + LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + ", " + + "retry:" + retry + ", attempts:" + task.failedAttempts + " <<<"); + LOG.info("===================================================================================="); + + if (retry) { + if (task.successfulAttempt == null) { + task.addAndScheduleAttempt(); + } + } else { + task.finishTask(); + task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED)); + return TaskState.FAILED; + } + + return task.getState(); + } + } + + @Override + public void handle(TaskEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getTaskId() + " of type " + + event.getType()); + } + + try { + writeLock.lock(); + TaskState oldState = getState(); + try { + stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); + eventHandler.handle(new QueryEvent(TajoIdUtils.parseQueryId(getId().toString()), + QueryEventType.INTERNAL_ERROR)); + } + + //notify the eventhandler of state change + if (LOG.isDebugEnabled()) { + if (oldState != getState()) { + LOG.debug(taskId + " Task Transitioned from " + oldState + " to " + + getState()); + } + } + } + + finally { + writeLock.unlock(); + } + } + + public void setIntermediateData(Collection<IntermediateEntry> partitions) { + this.intermediateData = new ArrayList<IntermediateEntry>(partitions); + } + + public List<IntermediateEntry> getIntermediateData() { + return this.intermediateData; + } + + public static class PullHost implements Cloneable { + String host; + int port; + int hashCode; + + public PullHost(String pullServerAddr, int pullServerPort){ + this.host = pullServerAddr; + this.port = pullServerPort; + this.hashCode = Objects.hashCode(host, port); + } + public String getHost() { + return host; + } + + public int getPort() { + return this.port; + } + + public String getPullAddress() { + return host + ":" + port; + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PullHost) { + PullHost other = (PullHost) obj; + return host.equals(other.host) && port == other.port; + } + + return false; + } + + @Override + public PullHost clone() throws CloneNotSupportedException { + PullHost newPullHost = (PullHost) super.clone(); + newPullHost.host = host; + newPullHost.port = port; + newPullHost.hashCode = Objects.hashCode(newPullHost.host, newPullHost.port); + return newPullHost; + } + + @Override + public String toString() { + return host + ":" + port; + } + } + + public static class IntermediateEntry { + ExecutionBlockId ebId; + int taskId; + int attemptId; + int partId; + PullHost host; + long volume; + List<Pair<Long, Integer>> pages; + List<Pair<Long, Pair<Integer, Integer>>> failureRowNums; + + public IntermediateEntry(IntermediateEntryProto proto) { + this.ebId = new ExecutionBlockId(proto.getEbId()); + this.taskId = proto.getTaskId(); + this.attemptId = proto.getAttemptId(); + this.partId = proto.getPartId(); + + String[] pullHost = proto.getHost().split(":"); + this.host = new PullHost(pullHost[0], Integer.parseInt(pullHost[1])); + this.volume = proto.getVolume(); + + failureRowNums = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); + for (FailureIntermediateProto eachFailure: proto.getFailuresList()) { + + failureRowNums.add(new Pair(eachFailure.getPagePos(), + new Pair(eachFailure.getStartRowNum(), eachFailure.getEndRowNum()))); + } + + pages = new ArrayList<Pair<Long, Integer>>(); + for (IntermediateEntryProto.PageProto eachPage: proto.getPagesList()) { + pages.add(new Pair(eachPage.getPos(), eachPage.getLength())); + } + } + + public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host) { + this.taskId = taskId; + this.attemptId = attemptId; + this.partId = partId; + this.host = host; + } + + public IntermediateEntry(int taskId, int attemptId, int partId, PullHost host, long volume) { + this.taskId = taskId; + this.attemptId = attemptId; + this.partId = partId; + this.host = host; + this.volume = volume; + } + + public ExecutionBlockId getEbId() { + return ebId; + } + + public void setEbId(ExecutionBlockId ebId) { + this.ebId = ebId; + } + + public int getTaskId() { + return this.taskId; + } + + public int getAttemptId() { + return this.attemptId; + } + + public int getPartId() { + return this.partId; + } + + public PullHost getPullHost() { + return this.host; + } + + public long getVolume() { + return this.volume; + } + + public long setVolume(long volume) { + return this.volume = volume; + } + + public List<Pair<Long, Integer>> getPages() { + return pages; + } + + public void setPages(List<Pair<Long, Integer>> pages) { + this.pages = pages; + } + + public List<Pair<Long, Pair<Integer, Integer>>> getFailureRowNums() { + return failureRowNums; + } + + @Override + public int hashCode() { + return Objects.hashCode(ebId, taskId, partId, attemptId, host); + } + + public List<Pair<Long, Long>> split(long firstSplitVolume, long splitVolume) { + List<Pair<Long, Long>> splits = new ArrayList<Pair<Long, Long>>(); + + if (pages == null || pages.isEmpty()) { + return splits; + } + int pageSize = pages.size(); + + long currentOffset = -1; + long currentBytes = 0; + + long realSplitVolume = firstSplitVolume > 0 ? firstSplitVolume : splitVolume; + for (int i = 0; i < pageSize; i++) { + Pair<Long, Integer> eachPage = pages.get(i); + if (currentOffset == -1) { + currentOffset = eachPage.getFirst(); + } + if (currentBytes > 0 && currentBytes + eachPage.getSecond() >= realSplitVolume) { + splits.add(new Pair(currentOffset, currentBytes)); + currentOffset = eachPage.getFirst(); + currentBytes = 0; + realSplitVolume = splitVolume; + } + + currentBytes += eachPage.getSecond(); + } + + //add last + if (currentBytes > 0) { + splits.add(new Pair(currentOffset, currentBytes)); + } + return splits; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java new file mode 100644 index 0000000..63c6dbb --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/TaskAttempt.java @@ -0,0 +1,443 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.querymaster; + +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.state.*; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.event.*; +import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; +import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; +import org.apache.tajo.master.querymaster.Task.IntermediateEntry; +import org.apache.tajo.master.querymaster.Task.PullHost; +import org.apache.tajo.master.container.TajoContainerId; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleFileOutput; + +public class TaskAttempt implements EventHandler<TaskAttemptEvent> { + + private static final Log LOG = LogFactory.getLog(TaskAttempt.class); + + private final static int EXPIRE_TIME = 15000; + + private final TaskAttemptId id; + private final Task task; + final EventHandler eventHandler; + + private TajoContainerId containerId; + private WorkerConnectionInfo workerConnectionInfo; + private int expire; + + private final Lock readLock; + private final Lock writeLock; + + private final List<String> diagnostics = new ArrayList<String>(); + + private final TaskAttemptScheduleContext scheduleContext; + + private float progress; + private CatalogProtos.TableStatsProto inputStats; + private CatalogProtos.TableStatsProto resultStats; + + protected static final StateMachineFactory + <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> + stateMachineFactory = new StateMachineFactory + <TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> + (TaskAttemptState.TA_NEW) + + // Transitions from TA_NEW state + .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED, + TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition()) + .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED, + TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition()) + .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_KILL, + new TaskKilledCompleteTransition()) + + // Transitions from TA_UNASSIGNED state + .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED, + TaskAttemptEventType.TA_ASSIGNED, + new LaunchTransition()) + .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT, + TaskAttemptEventType.TA_KILL, + new KillUnassignedTaskTransition()) + + // Transitions from TA_ASSIGNED state + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED, + TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT, + TaskAttemptEventType.TA_KILL, + new KillTaskTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_KILL, + new KillTaskTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, + EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED), + TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_DONE, new SucceededTransition()) + .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED, + TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) + + // Transitions from TA_RUNNING state + .addTransition(TaskAttemptState.TA_RUNNING, + EnumSet.of(TaskAttemptState.TA_RUNNING), + TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition()) + .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT, + TaskAttemptEventType.TA_KILL, + new KillTaskTransition()) + .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_DONE, new SucceededTransition()) + .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED, + TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) + + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_LOCAL_KILLED, + new TaskKilledCompleteTransition()) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT, + TaskAttemptEventType.TA_ASSIGNED, + new KillTaskTransition()) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_SCHEDULE_CANCELED, + new TaskKilledCompleteTransition()) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_DONE, + new TaskKilledCompleteTransition()) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED, + TaskAttemptEventType.TA_FATAL_ERROR) + .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT, + EnumSet.of( + TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, + TaskAttemptEventType.TA_UPDATE)) + + // Transitions from TA_SUCCEEDED state + .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_UPDATE) + .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition()) + .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED, + TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition()) + // Ignore-able transitions + .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED, + TaskAttemptEventType.TA_KILL) + + // Transitions from TA_KILLED state + .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, + TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE) + // Ignore-able transitions + .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, + EnumSet.of( + TaskAttemptEventType.TA_UPDATE)) + .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED, + EnumSet.of( + TaskAttemptEventType.TA_LOCAL_KILLED, + TaskAttemptEventType.TA_KILL, + TaskAttemptEventType.TA_ASSIGNED, + TaskAttemptEventType.TA_DONE), + new TaskKilledCompleteTransition()) + .installTopology(); + + private final StateMachine<TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent> + stateMachine; + + + public TaskAttempt(final TaskAttemptScheduleContext scheduleContext, + final TaskAttemptId id, final Task task, + final EventHandler eventHandler) { + this.scheduleContext = scheduleContext; + this.id = id; + this.expire = TaskAttempt.EXPIRE_TIME; + this.task = task; + this.eventHandler = eventHandler; + + ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + this.readLock = readWriteLock.readLock(); + this.writeLock = readWriteLock.writeLock(); + + stateMachine = stateMachineFactory.make(this); + } + + public TaskAttemptState getState() { + readLock.lock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } + } + + public TaskAttemptId getId() { + return this.id; + } + + public boolean isLeafTask() { + return this.task.isLeafTask(); + } + + public Task getTask() { + return this.task; + } + + public WorkerConnectionInfo getWorkerConnectionInfo() { + return this.workerConnectionInfo; + } + + public void setContainerId(TajoContainerId containerId) { + this.containerId = containerId; + } + + public synchronized void setExpireTime(int expire) { + this.expire = expire; + } + + public synchronized void updateExpireTime(int period) { + this.setExpireTime(this.expire - period); + } + + public synchronized void resetExpireTime() { + this.setExpireTime(TaskAttempt.EXPIRE_TIME); + } + + public int getLeftTime() { + return this.expire; + } + + public float getProgress() { + return progress; + } + + public TableStats getInputStats() { + if (inputStats == null) { + return null; + } + + return new TableStats(inputStats); + } + + public TableStats getResultStats() { + if (resultStats == null) { + return null; + } + return new TableStats(resultStats); + } + + private void fillTaskStatistics(TaskCompletionReport report) { + this.progress = 1.0f; + + List<IntermediateEntry> partitions = new ArrayList<IntermediateEntry>(); + + if (report.getShuffleFileOutputsCount() > 0) { + this.getTask().setShuffleFileOutputs(report.getShuffleFileOutputsList()); + + PullHost host = new PullHost(getWorkerConnectionInfo().getHost(), getWorkerConnectionInfo().getPullServerPort()); + for (ShuffleFileOutput p : report.getShuffleFileOutputsList()) { + IntermediateEntry entry = new IntermediateEntry(getId().getTaskId().getId(), + getId().getId(), p.getPartId(), host, p.getVolume()); + partitions.add(entry); + } + } + this.getTask().setIntermediateData(partitions); + + if (report.hasInputStats()) { + this.inputStats = report.getInputStats(); + } + if (report.hasResultStats()) { + this.resultStats = report.getResultStats(); + this.getTask().setStats(new TableStats(resultStats)); + } + } + + private static class TaskAttemptScheduleTransition implements + SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) { + taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent( + EventType.T_SCHEDULE, taskAttempt.getTask().getId().getExecutionBlockId(), + taskAttempt.scheduleContext, taskAttempt)); + } + } + + private static class KillUnassignedTaskTransition implements + SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) { + taskAttempt.eventHandler.handle(new TaskAttemptToSchedulerEvent( + EventType.T_SCHEDULE_CANCEL, taskAttempt.getTask().getId().getExecutionBlockId(), + taskAttempt.scheduleContext, taskAttempt)); + } + } + + private static class LaunchTransition + implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent event) { + TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event; + taskAttempt.containerId = castEvent.getContainerId(); + taskAttempt.workerConnectionInfo = castEvent.getWorkerConnectionInfo(); + taskAttempt.eventHandler.handle( + new TaskTAttemptEvent(taskAttempt.getId(), + TaskEventType.T_ATTEMPT_LAUNCHED)); + } + } + + private static class TaskKilledCompleteTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent event) { + taskAttempt.getTask().handle(new TaskEvent(taskAttempt.getId().getTaskId(), + TaskEventType.T_ATTEMPT_KILLED)); + LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask"); + } + } + + private static class StatusUpdateTransition + implements MultipleArcTransition<TaskAttempt, TaskAttemptEvent, TaskAttemptState> { + + @Override + public TaskAttemptState transition(TaskAttempt taskAttempt, + TaskAttemptEvent event) { + TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event; + + taskAttempt.progress = updateEvent.getStatus().getProgress(); + taskAttempt.inputStats = updateEvent.getStatus().getInputStats(); + taskAttempt.resultStats = updateEvent.getStatus().getResultStats(); + + return TaskAttemptState.TA_RUNNING; + } + } + + private void addDiagnosticInfo(String diag) { + if (diag != null && !diag.equals("")) { + diagnostics.add(diag); + } + } + + private static class AlreadyAssignedTransition + implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ + + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent taskAttemptEvent) { + } + } + + private static class AlreadyDoneTransition + implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ + + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent taskAttemptEvent) { + } + } + + private static class SucceededTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + @Override + public void transition(TaskAttempt taskAttempt, + TaskAttemptEvent event) { + TaskCompletionReport report = ((TaskCompletionEvent)event).getReport(); + + try { + taskAttempt.fillTaskStatistics(report); + taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED)); + } catch (Throwable t) { + taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage())); + taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t)); + } + } + } + + private static class KillTaskTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent> { + + @Override + public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { + taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId, + LocalTaskEventType.KILL)); + } + } + + private static class FailedTransition implements SingleArcTransition<TaskAttempt, TaskAttemptEvent>{ + @Override + public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { + TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event; + taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED)); + taskAttempt.addDiagnosticInfo(errorEvent.errorMessage()); + LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost() + + " >> " + errorEvent.errorMessage()); + } + } + + @Override + public void handle(TaskAttemptEvent event) { + if (LOG.isDebugEnabled()) { + LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType()); + } + try { + writeLock.lock(); + TaskAttemptState oldState = getState(); + try { + stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state of " + event.getTaskAttemptId() + ")" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); + eventHandler.handle( + new SubQueryDiagnosticsUpdateEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(), + "Can't handle this event at current state of " + event.getTaskAttemptId() + ")")); + eventHandler.handle( + new SubQueryEvent(event.getTaskAttemptId().getTaskId().getExecutionBlockId(), + SubQueryEventType.SQ_INTERNAL_ERROR)); + } + + //notify the eventhandler of state change + if (LOG.isDebugEnabled()) { + if (oldState != getState()) { + LOG.debug(id + " TaskAttempt Transitioned from " + oldState + " to " + + getState()); + } + } + } + + finally { + writeLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index 6d3597d..0573197 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -27,9 +27,9 @@ import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.ha.HAService; import org.apache.tajo.master.querymaster.QueryInProgress; import org.apache.tajo.master.querymaster.QueryMasterTask; -import org.apache.tajo.master.querymaster.QueryUnit; +import org.apache.tajo.master.querymaster.Task; import org.apache.tajo.master.querymaster.SubQuery; -import org.apache.tajo.util.history.QueryUnitHistory; +import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.util.history.SubQueryHistory; import org.apache.tajo.worker.TaskRunnerHistory; import org.apache.tajo.worker.TaskRunner; @@ -42,28 +42,28 @@ import static org.apache.tajo.conf.TajoConf.ConfVars; public class JSPUtil { static DecimalFormat decimalF = new DecimalFormat("###.0"); - public static void sortQueryUnitArray(QueryUnit[] queryUnits, String sortField, String sortOrder) { + public static void sortTaskArray(Task[] tasks, String sortField, String sortOrder) { if(sortField == null || sortField.isEmpty()) { sortField = "id"; } - Arrays.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder))); + Arrays.sort(tasks, new TaskComparator(sortField, "asc".equals(sortOrder))); } - public static void sortQueryUnit(List<QueryUnit> queryUnits, String sortField, String sortOrder) { + public static void sortTasks(List<Task> tasks, String sortField, String sortOrder) { if(sortField == null || sortField.isEmpty()) { sortField = "id"; } - Collections.sort(queryUnits, new QueryUnitComparator(sortField, "asc".equals(sortOrder))); + Collections.sort(tasks, new TaskComparator(sortField, "asc".equals(sortOrder))); } - public static void sortQueryUnitHistory(List<QueryUnitHistory> queryUnits, String sortField, String sortOrder) { + public static void sortTaskHistory(List<TaskHistory> tasks, String sortField, String sortOrder) { if(sortField == null || sortField.isEmpty()) { sortField = "id"; } - Collections.sort(queryUnits, new QueryUnitHistoryComparator(sortField, "asc".equals(sortOrder))); + Collections.sort(tasks, new TaskHistoryComparator(sortField, "asc".equals(sortOrder))); } public static void sortTaskRunner(List<TaskRunner> taskRunners) { @@ -204,95 +204,95 @@ public class JSPUtil { return activeLabel; } - static class QueryUnitComparator implements Comparator<QueryUnit> { + static class TaskComparator implements Comparator<Task> { private String sortField; private boolean asc; - public QueryUnitComparator(String sortField, boolean asc) { + public TaskComparator(String sortField, boolean asc) { this.sortField = sortField; this.asc = asc; } @Override - public int compare(QueryUnit queryUnit, QueryUnit queryUnit2) { + public int compare(Task task, Task task2) { if(asc) { if("id".equals(sortField)) { - return queryUnit.getId().compareTo(queryUnit2.getId()); + return task.getId().compareTo(task2.getId()); } else if("host".equals(sortField)) { - String host1 = queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost(); - String host2 = queryUnit2.getSucceededHost() == null ? "-" : queryUnit2.getSucceededHost(); + String host1 = task.getSucceededHost() == null ? "-" : task.getSucceededHost(); + String host2 = task2.getSucceededHost() == null ? "-" : task2.getSucceededHost(); return host1.compareTo(host2); } else if("runTime".equals(sortField)) { - return compareLong(queryUnit.getRunningTime(), queryUnit2.getRunningTime()); + return compareLong(task.getRunningTime(), task2.getRunningTime()); } else if("startTime".equals(sortField)) { - return compareLong(queryUnit.getLaunchTime(), queryUnit2.getLaunchTime()); + return compareLong(task.getLaunchTime(), task2.getLaunchTime()); } else { - return queryUnit.getId().compareTo(queryUnit2.getId()); + return task.getId().compareTo(task2.getId()); } } else { if("id".equals(sortField)) { - return queryUnit2.getId().compareTo(queryUnit.getId()); + return task2.getId().compareTo(task.getId()); } else if("host".equals(sortField)) { - String host1 = queryUnit.getSucceededHost() == null ? "-" : queryUnit.getSucceededHost(); - String host2 = queryUnit2.getSucceededHost() == null ? "-" : queryUnit2.getSucceededHost(); + String host1 = task.getSucceededHost() == null ? "-" : task.getSucceededHost(); + String host2 = task2.getSucceededHost() == null ? "-" : task2.getSucceededHost(); return host2.compareTo(host1); } else if("runTime".equals(sortField)) { - if(queryUnit2.getLaunchTime() == 0) { + if(task2.getLaunchTime() == 0) { return -1; - } else if(queryUnit.getLaunchTime() == 0) { + } else if(task.getLaunchTime() == 0) { return 1; } - return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime()); + return compareLong(task2.getRunningTime(), task.getRunningTime()); } else if("startTime".equals(sortField)) { - return compareLong(queryUnit2.getLaunchTime(), queryUnit.getLaunchTime()); + return compareLong(task2.getLaunchTime(), task.getLaunchTime()); } else { - return queryUnit2.getId().compareTo(queryUnit.getId()); + return task2.getId().compareTo(task.getId()); } } } } - static class QueryUnitHistoryComparator implements Comparator<QueryUnitHistory> { + static class TaskHistoryComparator implements Comparator<TaskHistory> { private String sortField; private boolean asc; - public QueryUnitHistoryComparator(String sortField, boolean asc) { + public TaskHistoryComparator(String sortField, boolean asc) { this.sortField = sortField; this.asc = asc; } @Override - public int compare(QueryUnitHistory queryUnit, QueryUnitHistory queryUnit2) { + public int compare(TaskHistory task1, TaskHistory task2) { if(asc) { if("id".equals(sortField)) { - return queryUnit.getId().compareTo(queryUnit2.getId()); + return task1.getId().compareTo(task2.getId()); } else if("host".equals(sortField)) { - String host1 = queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort(); - String host2 = queryUnit2.getHostAndPort() == null ? "-" : queryUnit2.getHostAndPort(); + String host1 = task1.getHostAndPort() == null ? "-" : task1.getHostAndPort(); + String host2 = task2.getHostAndPort() == null ? "-" : task2.getHostAndPort(); return host1.compareTo(host2); } else if("runTime".equals(sortField)) { - return compareLong(queryUnit.getRunningTime(), queryUnit2.getRunningTime()); + return compareLong(task1.getRunningTime(), task2.getRunningTime()); } else if("startTime".equals(sortField)) { - return compareLong(queryUnit.getLaunchTime(), queryUnit2.getLaunchTime()); + return compareLong(task1.getLaunchTime(), task2.getLaunchTime()); } else { - return queryUnit.getId().compareTo(queryUnit2.getId()); + return task1.getId().compareTo(task2.getId()); } } else { if("id".equals(sortField)) { - return queryUnit2.getId().compareTo(queryUnit.getId()); + return task2.getId().compareTo(task1.getId()); } else if("host".equals(sortField)) { - String host1 = queryUnit.getHostAndPort() == null ? "-" : queryUnit.getHostAndPort(); - String host2 = queryUnit2.getHostAndPort() == null ? "-" : queryUnit2.getHostAndPort(); + String host1 = task1.getHostAndPort() == null ? "-" : task1.getHostAndPort(); + String host2 = task2.getHostAndPort() == null ? "-" : task2.getHostAndPort(); return host2.compareTo(host1); } else if("runTime".equals(sortField)) { - if(queryUnit2.getLaunchTime() == 0) { + if(task2.getLaunchTime() == 0) { return -1; - } else if(queryUnit.getLaunchTime() == 0) { + } else if(task1.getLaunchTime() == 0) { return 1; } - return compareLong(queryUnit2.getRunningTime(), queryUnit.getRunningTime()); + return compareLong(task2.getRunningTime(), task1.getRunningTime()); } else if("startTime".equals(sortField)) { - return compareLong(queryUnit2.getLaunchTime(), queryUnit.getLaunchTime()); + return compareLong(task2.getLaunchTime(), task1.getLaunchTime()); } else { - return queryUnit2.getId().compareTo(queryUnit.getId()); + return task2.getId().compareTo(task1.getId()); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java index 21bc725..9fb427f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java @@ -24,11 +24,10 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoWorkerProtocol.TaskHistoryProto; import org.apache.tajo.master.querymaster.QueryInfo; -import org.apache.tajo.worker.TaskHistory; import java.io.EOFException; import java.io.IOException; @@ -187,22 +186,21 @@ public class HistoryReader { } } - public List<QueryUnitHistory> getQueryUnitHistory(String queryId, String ebId) throws IOException { + public List<TaskHistory> getTaskHistory(String queryId, String ebId) throws IOException { Path queryHistoryFile = getQueryHistoryFilePath(queryId, 0); if (queryHistoryFile == null) { - return new ArrayList<QueryUnitHistory>(); + return new ArrayList<TaskHistory>(); } Path detailFile = new Path(queryHistoryFile.getParent(), ebId + HistoryWriter.HISTORY_FILE_POSTFIX); FileSystem fs = HistoryWriter.getNonCrcFileSystem(detailFile, tajoConf); if (!fs.exists(detailFile)) { - return new ArrayList<QueryUnitHistory>(); + return new ArrayList<TaskHistory>(); } FileStatus fileStatus = fs.getFileStatus(detailFile); if (fileStatus.getLen() > 100 * 1024 * 1024) { // 100MB - throw new IOException("QueryUnitHistory file is too big: " + - detailFile + ", " + fileStatus.getLen() + " bytes"); + throw new IOException("TaskHistory file is too big: " + detailFile + ", " + fileStatus.getLen() + " bytes"); } FSDataInputStream in = null; @@ -212,7 +210,7 @@ public class HistoryReader { in.readFully(buf, 0, buf.length); - return SubQueryHistory.fromJsonQueryUnits(new String(buf)); + return SubQueryHistory.fromJsonTasks(new String(buf)); } finally { if (in != null) { in.close(); @@ -220,7 +218,7 @@ public class HistoryReader { } } - public TaskHistory getTaskHistory(String queryUnitAttemptId, long startTime) throws IOException { + public org.apache.tajo.worker.TaskHistory getTaskHistory(String taskAttemptId, long startTime) throws IOException { FileSystem fs = HistoryWriter.getNonCrcFileSystem(taskHistoryParentPath, tajoConf); SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); @@ -276,9 +274,9 @@ public class HistoryReader { builder.clear(); TaskHistoryProto taskHistoryProto = builder.mergeFrom(buf).build(); - QueryUnitAttemptId attemptId = new QueryUnitAttemptId(taskHistoryProto.getQueryUnitAttemptId()); - if (attemptId.toString().equals(queryUnitAttemptId)) { - return new TaskHistory(taskHistoryProto); + TaskAttemptId attemptId = new TaskAttemptId(taskHistoryProto.getTaskAttemptId()); + if (attemptId.toString().equals(taskAttemptId)) { + return new org.apache.tajo.worker.TaskHistory(taskHistoryProto); } } } catch (EOFException e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index 17c9366..7e30f9c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -238,7 +238,7 @@ public class HistoryWriter extends AbstractService { writeTaskHistory((TaskHistory) eachHistory); } catch (Exception e) { LOG.error("Error while saving task history: " + - ((TaskHistory) eachHistory).getQueryUnitAttemptId() + ":" + e.getMessage(), e); + ((TaskHistory) eachHistory).getTaskAttemptId() + ":" + e.getMessage(), e); } break; case QUERY: @@ -301,7 +301,7 @@ public class HistoryWriter extends AbstractService { out = null; try { out = fs.create(path); - out.write(subQueryHistory.toQueryUnitsJson().getBytes()); + out.write(subQueryHistory.toTasksJson().getBytes()); LOG.info("Saving query unit: " + path); } finally { if (out != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java deleted file mode 100644 index 556a971..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.util.history; - -import com.google.gson.annotations.Expose; -import org.apache.tajo.engine.json.CoreGsonHelper; -import org.apache.tajo.json.GsonObject; - -public class QueryUnitHistory implements GsonObject { - @Expose private String id; - @Expose private String hostAndPort; - @Expose private int httpPort; - @Expose private String state; - @Expose private float progress; - @Expose private long launchTime; - @Expose private long finishTime; - @Expose private int retryCount; - - @Expose private int numShuffles; - @Expose private String shuffleKey; - @Expose private String shuffleFileName; - - @Expose private String[] dataLocations; - @Expose private String[] fragments; - @Expose private String[][] fetchs; - - public String getId() { - return id; - } - - public void setId(String id) { - this.id = id; - } - - public String getHostAndPort() { - return hostAndPort; - } - - public void setHostAndPort(String hostAndPort) { - this.hostAndPort = hostAndPort; - } - - public int getHttpPort() { - return httpPort; - } - - public void setHttpPort(int httpPort) { - this.httpPort = httpPort; - } - - public int getRetryCount() { - return retryCount; - } - - public void setRetryCount(int retryCount) { - this.retryCount = retryCount; - } - - public String getState() { - return state; - } - - public void setState(String state) { - this.state = state; - } - - public float getProgress() { - return progress; - } - - public void setProgress(float progress) { - this.progress = progress; - } - - public long getLaunchTime() { - return launchTime; - } - - public void setLaunchTime(long launchTime) { - this.launchTime = launchTime; - } - - public long getFinishTime() { - return finishTime; - } - - public void setFinishTime(long finishTime) { - this.finishTime = finishTime; - } - - public int getNumShuffles() { - return numShuffles; - } - - public void setNumShuffles(int numShuffles) { - this.numShuffles = numShuffles; - } - - public String getShuffleKey() { - return shuffleKey; - } - - public void setShuffleKey(String shuffleKey) { - this.shuffleKey = shuffleKey; - } - - public String getShuffleFileName() { - return shuffleFileName; - } - - public void setShuffleFileName(String shuffleFileName) { - this.shuffleFileName = shuffleFileName; - } - - public String[] getDataLocations() { - return dataLocations; - } - - public void setDataLocations(String[] dataLocations) { - this.dataLocations = dataLocations; - } - - public String[] getFragments() { - return fragments; - } - - public void setFragments(String[] fragments) { - this.fragments = fragments; - } - - public String[][] getFetchs() { - return fetchs; - } - - public void setFetchs(String[][] fetchs) { - this.fetchs = fetchs; - } - - @Override - public String toJson() { - return CoreGsonHelper.toJson(this, QueryUnitHistory.class); - } - - public long getRunningTime() { - if(finishTime > 0) { - return finishTime - launchTime; - } else { - return 0; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java index b3ac4d2..0afdf5a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java @@ -67,7 +67,7 @@ public class SubQueryHistory implements GsonObject { @Expose private int rackLocalAssigned; - private List<QueryUnitHistory> queryUnits; + private List<TaskHistory> tasks; public String getExecutionBlockId() { return executionBlockId; @@ -213,12 +213,12 @@ public class SubQueryHistory implements GsonObject { this.killedObjectCount = killedObjectCount; } - public List<QueryUnitHistory> getQueryUnits() { - return queryUnits; + public List<TaskHistory> getTasks() { + return tasks; } - public void setQueryUnits(List<QueryUnitHistory> queryUnits) { - this.queryUnits = queryUnits; + public void setTasks(List<TaskHistory> tasks) { + this.tasks = tasks; } @Override @@ -226,19 +226,19 @@ public class SubQueryHistory implements GsonObject { return CoreGsonHelper.toJson(this, SubQueryHistory.class); } - public String toQueryUnitsJson() { - if (queryUnits == null) { + public String toTasksJson() { + if (tasks == null) { return ""; } - return CoreGsonHelper.getInstance().toJson(queryUnits, new TypeToken<List<QueryUnitHistory>>() { + return CoreGsonHelper.getInstance().toJson(tasks, new TypeToken<List<TaskHistory>>() { }.getType()); } - public static List<QueryUnitHistory> fromJsonQueryUnits(String json) { + public static List<TaskHistory> fromJsonTasks(String json) { if (json == null || json.trim().isEmpty()) { - return new ArrayList<QueryUnitHistory>(); + return new ArrayList<TaskHistory>(); } - return CoreGsonHelper.getInstance().fromJson(json, new TypeToken<List<QueryUnitHistory>>() { + return CoreGsonHelper.getInstance().fromJson(json, new TypeToken<List<TaskHistory>>() { }.getType()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/util/history/TaskHistory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/TaskHistory.java new file mode 100644 index 0000000..35c56e1 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/TaskHistory.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.util.history; + +import com.google.gson.annotations.Expose; +import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.json.GsonObject; + +public class TaskHistory implements GsonObject { + @Expose private String id; + @Expose private String hostAndPort; + @Expose private int httpPort; + @Expose private String state; + @Expose private float progress; + @Expose private long launchTime; + @Expose private long finishTime; + @Expose private int retryCount; + + @Expose private int numShuffles; + @Expose private String shuffleKey; + @Expose private String shuffleFileName; + + @Expose private String[] dataLocations; + @Expose private String[] fragments; + @Expose private String[][] fetchs; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getHostAndPort() { + return hostAndPort; + } + + public void setHostAndPort(String hostAndPort) { + this.hostAndPort = hostAndPort; + } + + public int getHttpPort() { + return httpPort; + } + + public void setHttpPort(int httpPort) { + this.httpPort = httpPort; + } + + public int getRetryCount() { + return retryCount; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + } + + public float getProgress() { + return progress; + } + + public void setProgress(float progress) { + this.progress = progress; + } + + public long getLaunchTime() { + return launchTime; + } + + public void setLaunchTime(long launchTime) { + this.launchTime = launchTime; + } + + public long getFinishTime() { + return finishTime; + } + + public void setFinishTime(long finishTime) { + this.finishTime = finishTime; + } + + public int getNumShuffles() { + return numShuffles; + } + + public void setNumShuffles(int numShuffles) { + this.numShuffles = numShuffles; + } + + public String getShuffleKey() { + return shuffleKey; + } + + public void setShuffleKey(String shuffleKey) { + this.shuffleKey = shuffleKey; + } + + public String getShuffleFileName() { + return shuffleFileName; + } + + public void setShuffleFileName(String shuffleFileName) { + this.shuffleFileName = shuffleFileName; + } + + public String[] getDataLocations() { + return dataLocations; + } + + public void setDataLocations(String[] dataLocations) { + this.dataLocations = dataLocations; + } + + public String[] getFragments() { + return fragments; + } + + public void setFragments(String[] fragments) { + this.fragments = fragments; + } + + public String[][] getFetchs() { + return fetchs; + } + + public void setFetchs(String[][] fetchs) { + this.fetchs = fetchs; + } + + @Override + public String toJson() { + return CoreGsonHelper.toJson(this, TaskHistory.class); + } + + public long getRunningTime() { + if(finishTime > 0) { + return finishTime - launchTime; + } else { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index f18723f..dd3ee68 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; @@ -92,7 +92,7 @@ public class ExecutionBlockContext { private AtomicBoolean stop = new AtomicBoolean(); // It keeps all of the query unit attempts while a TaskRunner is running. - private final ConcurrentMap<QueryUnitAttemptId, Task> tasks = Maps.newConcurrentMap(); + private final ConcurrentMap<TaskAttemptId, Task> tasks = Maps.newConcurrentMap(); private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap(); @@ -242,12 +242,12 @@ public class ExecutionBlockContext { return executionBlockId; } - public Map<QueryUnitAttemptId, Task> getTasks() { + public Map<TaskAttemptId, Task> getTasks() { return tasks; } - public Task getTask(QueryUnitAttemptId queryUnitAttemptId){ - return tasks.get(queryUnitAttemptId); + public Task getTask(TaskAttemptId taskAttemptId){ + return tasks.get(taskAttemptId); } public void stopTaskRunner(String id){ @@ -258,7 +258,7 @@ public class ExecutionBlockContext { return manager.getTaskRunner(taskRunnerId); } - public void addTaskHistory(String taskRunnerId, QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) { + public void addTaskHistory(String taskRunnerId, TaskAttemptId quAttemptId, TaskHistory taskHistory) { histories.get(taskRunnerId).addTaskHistory(quAttemptId, taskHistory); }
