TAJO-1262: Rename the prefix 'SubQuery' to 'Stage'. Closes #314
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/3c833e2a Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/3c833e2a Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/3c833e2a Branch: refs/heads/index_support Commit: 3c833e2a8c3ff7ff8a2e1b4497afb390098856bf Parents: cf66a39 Author: Hyunsik Choi <[email protected]> Authored: Mon Dec 22 16:43:39 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon Dec 22 16:43:39 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + tajo-client/src/main/proto/ClientProtos.proto | 4 +- .../tajo/master/DefaultTaskScheduler.java | 36 +- .../apache/tajo/master/LazyTaskScheduler.java | 30 +- .../tajo/master/TaskSchedulerFactory.java | 12 +- .../tajo/master/event/QueryCompletedEvent.java | 8 +- .../tajo/master/event/QueryEventType.java | 4 +- .../tajo/master/event/QuerySubQueryEvent.java | 35 - .../tajo/master/event/StageCompletedEvent.java | 42 + .../event/StageContainerAllocationEvent.java | 38 + .../event/StageDiagnosticsUpdateEvent.java | 34 + .../apache/tajo/master/event/StageEvent.java | 35 + .../tajo/master/event/StageEventType.java | 43 + .../tajo/master/event/StageTaskEvent.java | 43 + .../master/event/SubQueryCompletedEvent.java | 42 - .../event/SubQueryContainerAllocationEvent.java | 38 - .../event/SubQueryDiagnosticsUpdateEvent.java | 34 - .../apache/tajo/master/event/SubQueryEvent.java | 35 - .../tajo/master/event/SubQueryEventType.java | 43 - .../tajo/master/event/SubQueryTaskEvent.java | 43 - .../apache/tajo/master/event/TaskEventType.java | 4 +- .../apache/tajo/master/querymaster/Query.java | 164 +-- .../querymaster/QueryMasterManagerService.java | 4 +- .../master/querymaster/QueryMasterTask.java | 28 +- .../tajo/master/querymaster/Repartitioner.java | 172 +-- .../apache/tajo/master/querymaster/Stage.java | 1342 +++++++++++++++++ .../tajo/master/querymaster/StageState.java | 30 + .../tajo/master/querymaster/SubQuery.java | 1343 ------------------ .../tajo/master/querymaster/SubQueryState.java | 30 - .../apache/tajo/master/querymaster/Task.java | 10 +- .../tajo/master/querymaster/TaskAttempt.java | 6 +- .../main/java/org/apache/tajo/util/JSPUtil.java | 36 +- .../apache/tajo/util/history/HistoryReader.java | 2 +- .../apache/tajo/util/history/HistoryWriter.java | 12 +- .../apache/tajo/util/history/QueryHistory.java | 23 +- .../apache/tajo/util/history/StageHistory.java | 270 ++++ .../tajo/util/history/SubQueryHistory.java | 270 ---- .../tajo/worker/TajoResourceAllocator.java | 14 +- .../main/java/org/apache/tajo/worker/Task.java | 20 +- .../apache/tajo/worker/TaskAttemptContext.java | 2 +- .../resources/webapps/admin/querydetail.jsp | 32 +- .../main/resources/webapps/admin/querytasks.jsp | 36 +- .../resources/webapps/worker/querydetail.jsp | 30 +- .../main/resources/webapps/worker/queryplan.jsp | 52 +- .../resources/webapps/worker/querytasks.jsp | 18 +- .../src/main/resources/webapps/worker/task.jsp | 10 +- .../org/apache/tajo/TajoTestingCluster.java | 8 +- .../org/apache/tajo/TestQueryIdFactory.java | 8 +- .../org/apache/tajo/client/TestTajoClient.java | 12 +- .../tajo/engine/query/TestGroupByQuery.java | 16 +- .../tajo/engine/query/TestTablePartitions.java | 2 +- .../tajo/engine/query/TestUnionQuery.java | 6 +- .../tajo/master/querymaster/TestKillQuery.java | 8 +- .../querymaster/TestTaskStatusUpdate.java | 18 +- .../util/history/TestHistoryWriterReader.java | 26 +- tajo-dist/pom.xml | 2 +- .../tajo/pullserver/PullServerAuxService.java | 16 +- .../tajo/pullserver/TajoPullServerService.java | 16 +- .../apache/tajo/storage/FileStorageManager.java | 6 +- 59 files changed, 2335 insertions(+), 2370 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 245918e..29b0c0b 100644 --- a/CHANGES +++ b/CHANGES @@ -240,6 +240,8 @@ Release 0.9.1 - unreleased SUB TASKS + TAJO-1262: Rename the prefix 'SubQuery' to 'Stage'. (hyunsik) + TAJO-324: Rename the prefix 'QueryUnit' to Task. (hyunsik) TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 51db763..a741268 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -253,7 +253,7 @@ message QueryInfoProto { optional int32 queryMasterInfoPort = 11; } -message SubQueryHistoryProto { +message StageHistoryProto { required string executionBlockId =1; required string state = 2; optional int64 startTime = 3; @@ -283,7 +283,7 @@ message QueryHistoryProto { optional string logicalPlan = 4; optional string distributedPlan = 5; repeated KeyValueProto sessionVariables = 6; - repeated SubQueryHistoryProto subQueryHistories = 7; + repeated StageHistoryProto stageHistories = 7; } message GetQueryHistoryResponse { http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index d9d496e..dd6233c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -40,7 +40,7 @@ import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptSched import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.master.querymaster.Task; import org.apache.tajo.master.querymaster.TaskAttempt; -import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.master.querymaster.Stage; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; @@ -60,7 +60,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class); private final TaskSchedulerContext context; - private SubQuery subQuery; + private Stage stage; private Thread schedulingThread; private AtomicBoolean stopEventHandling = new AtomicBoolean(false); @@ -71,10 +71,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private int nextTaskId = 0; private int scheduledObjectNum = 0; - public DefaultTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) { + public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) { super(DefaultTaskScheduler.class.getName()); this.context = context; - this.subQuery = subQuery; + this.stage = stage; } @Override @@ -117,8 +117,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private static final TaskAttemptId NULL_ATTEMPT_ID; public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; static { - ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0); + ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); + NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0); TajoWorkerProtocol.TaskRequestProto.Builder builder = TajoWorkerProtocol.TaskRequestProto.newBuilder(); @@ -192,13 +192,13 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event; if (context.isLeafQuery()) { TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(); - Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++); + Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++); task.addFragment(castEvent.getLeftFragment(), true); scheduledObjectNum++; if (castEvent.hasRightFragments()) { task.addFragments(castEvent.getRightFragments()); } - subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } else { fragmentsForNonLeafTask = new FileFragment[2]; fragmentsForNonLeafTask[0] = castEvent.getLeftFragment(); @@ -217,7 +217,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { FetchScheduleEvent castEvent = (FetchScheduleEvent) event; Map<String, List<FetchImpl>> fetches = castEvent.getFetches(); TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(); - Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++); + Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++); scheduledObjectNum++; for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) { task.addFetches(eachFetch.getKey(), eachFetch.getValue()); @@ -229,7 +229,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) { task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask)); } - subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } else if (event instanceof TaskAttemptToSchedulerEvent) { TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event; if (context.isLeafQuery()) { @@ -239,7 +239,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) { - // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler. + // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler. // This event is triggered by TaskAttempt. TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event; scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId()); @@ -832,7 +832,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } if (attemptId != null) { - Task task = subQuery.getTask(attemptId.getTaskId()); + Task task = stage.getTask(attemptId.getTaskId()); TaskRequest taskAssign = new TaskRequestImpl( attemptId, new ArrayList<FragmentProto>(task.getAllFragments()), @@ -840,8 +840,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { false, task.getLogicalPlan().toJson(), context.getMasterContext().getQueryContext(), - subQuery.getDataChannel(), subQuery.getBlock().getEnforcer()); - if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) { + stage.getDataChannel(), stage.getBlock().getEnforcer()); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { taskAssign.setInterQuery(); } @@ -888,7 +888,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { LOG.debug("Assigned based on * match"); Task task; - task = subQuery.getTask(attemptId.getTaskId()); + task = stage.getTask(attemptId.getTaskId()); TaskRequest taskAssign = new TaskRequestImpl( attemptId, Lists.newArrayList(task.getAllFragments()), @@ -896,9 +896,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { false, task.getLogicalPlan().toJson(), context.getMasterContext().getQueryContext(), - subQuery.getDataChannel(), - subQuery.getBlock().getEnforcer()); - if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) { + stage.getDataChannel(), + stage.getBlock().getEnforcer()); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { taskAssign.setInterQuery(); } for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) { http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java index 0ab19db..32af17b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java @@ -37,7 +37,7 @@ import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptSched import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.master.querymaster.Task; import org.apache.tajo.master.querymaster.TaskAttempt; -import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.master.querymaster.Stage; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.fragment.FileFragment; @@ -57,7 +57,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class); private final TaskSchedulerContext context; - private final SubQuery subQuery; + private final Stage stage; private Thread schedulingThread; private volatile boolean stopEventHandling; @@ -77,10 +77,10 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { private int nextTaskId = 0; private int containerNum; - public LazyTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) { + public LazyTaskScheduler(TaskSchedulerContext context, Stage stage) { super(LazyTaskScheduler.class.getName()); this.context = context; - this.subQuery = subQuery; + this.stage = stage; } @Override @@ -101,8 +101,8 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { @Override public void start() { - containerNum = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers( - subQuery.getContext().getQueryMasterContext().getWorkerContext(), + containerNum = stage.getContext().getResourceAllocator().calculateNumRequestContainers( + stage.getContext().getQueryMasterContext().getWorkerContext(), context.getEstimatedTaskNum(), 512); LOG.info("Start TaskScheduler"); @@ -129,8 +129,8 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { private static final TaskAttemptId NULL_ATTEMPT_ID; public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; static { - ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0); + ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); + NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0); TajoWorkerProtocol.TaskRequestProto.Builder builder = TajoWorkerProtocol.TaskRequestProto.newBuilder(); @@ -362,7 +362,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { String host = container.getTaskHostName(); TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(container.containerID, host, taskRequest.getCallback()); - Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++); + Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++); FragmentPair fragmentPair; List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>(); @@ -371,7 +371,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { long taskSize = adjustTaskSize(); LOG.info("Adjusted task size: " + taskSize); - TajoConf conf = subQuery.getContext().getConf(); + TajoConf conf = stage.getContext().getConf(); // host local, disk local String normalized = NetUtils.normalizeHost(host); Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID); @@ -450,7 +450,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size()); task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()])); - subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } } @@ -469,9 +469,9 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { taskRequest.getContainerId()); TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(container.containerID, container.getTaskHostName(), taskRequest.getCallback()); - Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++); + Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++); task.setFragment(scheduledFragments.getAllFragments()); - subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } } } @@ -485,8 +485,8 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { false, taskAttempt.getTask().getLogicalPlan().toJson(), context.getMasterContext().getQueryContext(), - subQuery.getDataChannel(), subQuery.getBlock().getEnforcer()); - if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) { + stage.getDataChannel(), stage.getBlock().getEnforcer()); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { taskAssign.setInterQuery(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java index 520ecd3..e5291e9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java @@ -20,7 +20,7 @@ package org.apache.tajo.master; import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.master.querymaster.Stage; import java.io.IOException; import java.lang.reflect.Constructor; @@ -29,7 +29,7 @@ import java.util.Map; public class TaskSchedulerFactory { private static Class<? extends AbstractTaskScheduler> CACHED_ALGORITHM_CLASS; private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); - private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, SubQuery.class }; + private static final Class<?>[] DEFAULT_PARAMS = { TaskSchedulerContext.class, Stage.class }; public static Class<? extends AbstractTaskScheduler> getTaskSchedulerClass(Configuration conf) throws IOException { @@ -46,7 +46,7 @@ public class TaskSchedulerFactory { } public static <T extends AbstractTaskScheduler> T get(Class<T> clazz, TaskSchedulerContext context, - SubQuery subQuery) { + Stage stage) { T result; try { Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); @@ -55,15 +55,15 @@ public class TaskSchedulerFactory { constructor.setAccessible(true); CONSTRUCTOR_CACHE.put(clazz, constructor); } - result = constructor.newInstance(new Object[]{context, subQuery}); + result = constructor.newInstance(new Object[]{context, stage}); } catch (Exception e) { throw new RuntimeException(e); } return result; } - public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, SubQuery subQuery) + public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, Stage stage) throws IOException { - return get(getTaskSchedulerClass(conf), context, subQuery); + return get(getTaskSchedulerClass(conf), context, stage); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java index dc75a1d..e5a9a32 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java @@ -19,14 +19,14 @@ package org.apache.tajo.master.event; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.querymaster.SubQueryState; +import org.apache.tajo.master.querymaster.StageState; public class QueryCompletedEvent extends QueryEvent { private final ExecutionBlockId executionBlockId; - private final SubQueryState finalState; + private final StageState finalState; public QueryCompletedEvent(final ExecutionBlockId executionBlockId, - SubQueryState finalState) { + StageState finalState) { super(executionBlockId.getQueryId(), QueryEventType.QUERY_COMPLETED); this.executionBlockId = executionBlockId; this.finalState = finalState; @@ -36,7 +36,7 @@ public class QueryCompletedEvent extends QueryEvent { return executionBlockId; } - public SubQueryState getState() { + public StageState getState() { return finalState; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java index edc0cd8..e38a3c4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java @@ -24,8 +24,8 @@ public enum QueryEventType { START, KILL, - // Producer: SubQuery - SUBQUERY_COMPLETED, + // Producer: Stage + STAGE_COMPLETED, // Producer: Query QUERY_COMPLETED, http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java deleted file mode 100644 index ae36a69..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.tajo.ExecutionBlockId; - -public class QuerySubQueryEvent extends QueryEvent { - private ExecutionBlockId executionBlockId; - - public QuerySubQueryEvent(final ExecutionBlockId id, - final QueryEventType queryEvent) { - super(id.getQueryId(), queryEvent); - this.executionBlockId = id; - } - - public ExecutionBlockId getExecutionBlockId() { - return this.executionBlockId; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java new file mode 100644 index 0000000..2d16fbe --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.master.querymaster.StageState; + +public class StageCompletedEvent extends QueryEvent { + private final ExecutionBlockId executionBlockId; + private final StageState finalState; + + public StageCompletedEvent(final ExecutionBlockId executionBlockId, + StageState finalState) { + super(executionBlockId.getQueryId(), QueryEventType.STAGE_COMPLETED); + this.executionBlockId = executionBlockId; + this.finalState = finalState; + } + + public ExecutionBlockId getExecutionBlockId() { + return executionBlockId; + } + + public StageState getState() { + return finalState; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java new file mode 100644 index 0000000..0d29e44 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.master.container.TajoContainer; + +import java.util.List; + +public class StageContainerAllocationEvent extends StageEvent { + private List<TajoContainer> allocatedContainer; + + public StageContainerAllocationEvent(final ExecutionBlockId id, + List<TajoContainer> allocatedContainer) { + super(id, StageEventType.SQ_CONTAINER_ALLOCATED); + this.allocatedContainer = allocatedContainer; + } + + public List<TajoContainer> getAllocatedContainer() { + return this.allocatedContainer; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java new file mode 100644 index 0000000..39afc92 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.ExecutionBlockId; + +public class StageDiagnosticsUpdateEvent extends StageEvent { + private final String msg; + + public StageDiagnosticsUpdateEvent(final ExecutionBlockId id, String diagnostic) { + super(id, StageEventType.SQ_DIAGNOSTIC_UPDATE); + this.msg = diagnostic; + } + + public String getDiagnosticUpdate() { + return msg; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java new file mode 100644 index 0000000..6fc4746 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.ExecutionBlockId; + +public class StageEvent extends AbstractEvent<StageEventType> { + private final ExecutionBlockId id; + + public StageEvent(ExecutionBlockId id, StageEventType stageEventType) { + super(stageEventType); + this.id = id; + } + + public ExecutionBlockId getStageId() { + return id; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java new file mode 100644 index 0000000..fa808d4 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +/** + * Event Types handled by Stage + */ +public enum StageEventType { + + // Producer: Query + SQ_INIT, + SQ_START, + SQ_CONTAINER_ALLOCATED, + SQ_KILL, + SQ_LAUNCH, + + // Producer: Task + SQ_TASK_COMPLETED, + SQ_FAILED, + + // Producer: Completed + SQ_STAGE_COMPLETED, + + // Producer: Any component + SQ_DIAGNOSTIC_UPDATE, + SQ_INTERNAL_ERROR +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java new file mode 100644 index 0000000..4377881 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.TaskId; +import org.apache.tajo.master.TaskState; + +/** + * Event Class: From Task to Stage + */ +public class StageTaskEvent extends StageEvent { + private TaskId taskId; + private TaskState state; + public StageTaskEvent(TaskId taskId, TaskState state) { + super(taskId.getExecutionBlockId(), StageEventType.SQ_TASK_COMPLETED); + this.taskId = taskId; + this.state = state; + } + + public TaskId getTaskId() { + return this.taskId; + } + + public TaskState getState() { + return state; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java deleted file mode 100644 index 6389798..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.querymaster.SubQueryState; - -public class SubQueryCompletedEvent extends QueryEvent { - private final ExecutionBlockId executionBlockId; - private final SubQueryState finalState; - - public SubQueryCompletedEvent(final ExecutionBlockId executionBlockId, - SubQueryState finalState) { - super(executionBlockId.getQueryId(), QueryEventType.SUBQUERY_COMPLETED); - this.executionBlockId = executionBlockId; - this.finalState = finalState; - } - - public ExecutionBlockId getExecutionBlockId() { - return executionBlockId; - } - - public SubQueryState getState() { - return finalState; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java deleted file mode 100644 index e617d53..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.container.TajoContainer; - -import java.util.List; - -public class SubQueryContainerAllocationEvent extends SubQueryEvent { - private List<TajoContainer> allocatedContainer; - - public SubQueryContainerAllocationEvent(final ExecutionBlockId id, - List<TajoContainer> allocatedContainer) { - super(id, SubQueryEventType.SQ_CONTAINER_ALLOCATED); - this.allocatedContainer = allocatedContainer; - } - - public List<TajoContainer> getAllocatedContainer() { - return this.allocatedContainer; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java deleted file mode 100644 index 0810e81..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.tajo.ExecutionBlockId; - -public class SubQueryDiagnosticsUpdateEvent extends SubQueryEvent { - private final String msg; - - public SubQueryDiagnosticsUpdateEvent(final ExecutionBlockId id, String diagnostic) { - super(id, SubQueryEventType.SQ_DIAGNOSTIC_UPDATE); - this.msg = diagnostic; - } - - public String getDiagnosticUpdate() { - return msg; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java deleted file mode 100644 index 2b3d598..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.hadoop.yarn.event.AbstractEvent; -import org.apache.tajo.ExecutionBlockId; - -public class SubQueryEvent extends AbstractEvent<SubQueryEventType> { - private final ExecutionBlockId id; - - public SubQueryEvent(ExecutionBlockId id, SubQueryEventType subQueryEventType) { - super(subQueryEventType); - this.id = id; - } - - public ExecutionBlockId getSubQueryId() { - return id; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java deleted file mode 100644 index 79b6e2e..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -/** - * Event Types handled by SubQuery - */ -public enum SubQueryEventType { - - // Producer: Query - SQ_INIT, - SQ_START, - SQ_CONTAINER_ALLOCATED, - SQ_KILL, - SQ_LAUNCH, - - // Producer: Task - SQ_TASK_COMPLETED, - SQ_FAILED, - - // Producer: Completed - SQ_SUBQUERY_COMPLETED, - - // Producer: Any component - SQ_DIAGNOSTIC_UPDATE, - SQ_INTERNAL_ERROR -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java deleted file mode 100644 index 816bc48..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.tajo.TaskId; -import org.apache.tajo.master.TaskState; - -/** - * Event Class: From Task to SubQuery - */ -public class SubQueryTaskEvent extends SubQueryEvent { - private TaskId taskId; - private TaskState state; - public SubQueryTaskEvent(TaskId taskId, TaskState state) { - super(taskId.getExecutionBlockId(), SubQueryEventType.SQ_TASK_COMPLETED); - this.taskId = taskId; - this.state = state; - } - - public TaskId getTaskId() { - return this.taskId; - } - - public TaskState getState() { - return state; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java index 9448863..0f26821 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java @@ -23,10 +23,10 @@ package org.apache.tajo.master.event; */ public enum TaskEventType { - //Producer:Client, SubQuery + //Producer:Client, Stage T_KILL, - //Producer:SubQuery + //Producer:Stage T_SCHEDULE, //Producer:TaskAttempt http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 918cc82..a626df1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -50,7 +50,7 @@ import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.QueryHistory; -import org.apache.tajo.util.history.SubQueryHistory; +import org.apache.tajo.util.history.StageHistory; import java.io.IOException; import java.util.*; @@ -65,7 +65,7 @@ public class Query implements EventHandler<QueryEvent> { private final TajoConf systemConf; private final Clock clock; private String queryStr; - private Map<ExecutionBlockId, SubQuery> subqueries; + private Map<ExecutionBlockId, Stage> stages; private final EventHandler eventHandler; private final MasterPlan plan; QueryMasterTask.QueryMasterTaskContext context; @@ -77,11 +77,11 @@ public class Query implements EventHandler<QueryEvent> { private long startTime; private long finishTime; private TableDesc resultDesc; - private int completedSubQueryCount = 0; - private int successedSubQueryCount = 0; - private int killedSubQueryCount = 0; - private int failedSubQueryCount = 0; - private int erroredSubQueryCount = 0; + private int completedStagesCount = 0; + private int successedStagesCount = 0; + private int killedStagesCount = 0; + private int failedStagesCount = 0; + private int erroredStagesCount = 0; private final List<String> diagnostics = new ArrayList<String>(); // Internal Variables @@ -96,7 +96,7 @@ public class Query implements EventHandler<QueryEvent> { // Transition Handler private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); - private static final SubQueryCompletedTransition SUBQUERY_COMPLETED_TRANSITION = new SubQueryCompletedTransition(); + private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition(); private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition(); protected static final StateMachineFactory @@ -120,8 +120,8 @@ public class Query implements EventHandler<QueryEvent> { // Transitions from RUNNING state .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING, - QueryEventType.SUBQUERY_COMPLETED, - SUBQUERY_COMPLETED_TRANSITION) + QueryEventType.STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) .addTransition(QueryState.QUERY_RUNNING, EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED, QueryState.QUERY_ERROR), @@ -132,7 +132,7 @@ public class Query implements EventHandler<QueryEvent> { DIAGNOSTIC_UPDATE_TRANSITION) .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT, QueryEventType.KILL, - new KillSubQueriesTransition()) + new KillAllStagesTransition()) .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR, QueryEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) @@ -143,8 +143,8 @@ public class Query implements EventHandler<QueryEvent> { DIAGNOSTIC_UPDATE_TRANSITION) // ignore-able transitions .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, - QueryEventType.SUBQUERY_COMPLETED, - SUBQUERY_COMPLETED_TRANSITION) + QueryEventType.STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, QueryEventType.KILL) .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR, @@ -153,8 +153,8 @@ public class Query implements EventHandler<QueryEvent> { // Transitions from KILL_WAIT state .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT, - QueryEventType.SUBQUERY_COMPLETED, - SUBQUERY_COMPLETED_TRANSITION) + QueryEventType.STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED, QueryState.QUERY_ERROR), @@ -191,7 +191,7 @@ public class Query implements EventHandler<QueryEvent> { INTERNAL_ERROR_TRANSITION) // Ignore-able transitions .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR, - EnumSet.of(QueryEventType.KILL, QueryEventType.SUBQUERY_COMPLETED)) + EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED)) .installTopology(); @@ -206,7 +206,7 @@ public class Query implements EventHandler<QueryEvent> { this.clock = context.getClock(); this.appSubmitTime = appSubmitTime; this.queryStr = queryStr; - this.subqueries = Maps.newConcurrentMap(); + this.stages = Maps.newConcurrentMap(); this.eventHandler = eventHandler; this.plan = plan; this.cursor = new ExecutionBlockCursor(plan, true); @@ -237,15 +237,15 @@ public class Query implements EventHandler<QueryEvent> { return 1.0f; } else { int idx = 0; - List<SubQuery> tempSubQueries = new ArrayList<SubQuery>(); - synchronized(subqueries) { - tempSubQueries.addAll(subqueries.values()); + List<Stage> tempStages = new ArrayList<Stage>(); + synchronized(stages) { + tempStages.addAll(stages.values()); } - float [] subProgresses = new float[tempSubQueries.size()]; - for (SubQuery subquery: tempSubQueries) { - if (subquery.getState() != SubQueryState.NEW) { - subProgresses[idx] = subquery.getProgress(); + float [] subProgresses = new float[tempStages.size()]; + for (Stage stage: tempStages) { + if (stage.getState() != StageState.NEW) { + subProgresses[idx] = stage.getProgress(); } else { subProgresses[idx] = 0.0f; } @@ -285,17 +285,17 @@ public class Query implements EventHandler<QueryEvent> { public QueryHistory getQueryHistory() { QueryHistory queryHistory = makeQueryHistory(); - queryHistory.setSubQueryHistories(makeSubQueryHistories()); + queryHistory.setStageHistories(makeStageHistories()); return queryHistory; } - private List<SubQueryHistory> makeSubQueryHistories() { - List<SubQueryHistory> subQueryHistories = new ArrayList<SubQueryHistory>(); - for(SubQuery eachSubQuery: getSubQueries()) { - subQueryHistories.add(eachSubQuery.getSubQueryHistory()); + private List<StageHistory> makeStageHistories() { + List<StageHistory> stageHistories = new ArrayList<StageHistory>(); + for(Stage eachStage : getStages()) { + stageHistories.add(eachStage.getStageHistory()); } - return subQueryHistories; + return stageHistories; } private QueryHistory makeQueryHistory() { @@ -348,20 +348,20 @@ public class Query implements EventHandler<QueryEvent> { return stateMachine; } - public void addSubQuery(SubQuery subquery) { - subqueries.put(subquery.getId(), subquery); + public void addStage(Stage stage) { + stages.put(stage.getId(), stage); } public QueryId getId() { return this.id; } - public SubQuery getSubQuery(ExecutionBlockId id) { - return this.subqueries.get(id); + public Stage getStage(ExecutionBlockId id) { + return this.stages.get(id); } - public Collection<SubQuery> getSubQueries() { - return this.subqueries.values(); + public Collection<Stage> getStages() { + return this.stages.values(); } public QueryState getSynchronizedState() { @@ -389,13 +389,13 @@ public class Query implements EventHandler<QueryEvent> { public void transition(Query query, QueryEvent queryEvent) { query.setStartTime(); - SubQuery subQuery = new SubQuery(query.context, query.getPlan(), + Stage stage = new Stage(query.context, query.getPlan(), query.getExecutionBlockCursor().nextBlock()); - subQuery.setPriority(query.priority--); - query.addSubQuery(subQuery); + stage.setPriority(query.priority--); + query.addStage(stage); - subQuery.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INIT)); - LOG.debug("Schedule unit plan: \n" + subQuery.getBlock().getPlan()); + stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT)); + LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan()); } } @@ -403,20 +403,20 @@ public class Query implements EventHandler<QueryEvent> { @Override public QueryState transition(Query query, QueryEvent queryEvent) { - QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent; + QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent; QueryState finalState; - if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) { - finalState = finalizeQuery(query, subQueryEvent); - } else if (subQueryEvent.getState() == SubQueryState.FAILED) { + if (stageEvent.getState() == StageState.SUCCEEDED) { + finalState = finalizeQuery(query, stageEvent); + } else if (stageEvent.getState() == StageState.FAILED) { finalState = QueryState.QUERY_FAILED; - } else if (subQueryEvent.getState() == SubQueryState.KILLED) { + } else if (stageEvent.getState() == StageState.KILLED) { finalState = QueryState.QUERY_KILLED; } else { finalState = QueryState.QUERY_ERROR; } if (finalState != QueryState.QUERY_SUCCEEDED) { - SubQuery lastStage = query.getSubQuery(subQueryEvent.getExecutionBlockId()); + Stage lastStage = query.getStage(stageEvent.getExecutionBlockId()); if (lastStage != null && lastStage.getTableMeta() != null) { StoreType storeType = lastStage.getTableMeta().getStoreType(); if (storeType != null) { @@ -436,7 +436,7 @@ public class Query implements EventHandler<QueryEvent> { } private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { - SubQuery lastStage = query.getSubQuery(event.getExecutionBlockId()); + Stage lastStage = query.getStage(event.getExecutionBlockId()); StoreType storeType = lastStage.getTableMeta().getStoreType(); try { LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot(); @@ -490,7 +490,7 @@ public class Query implements EventHandler<QueryEvent> { @Override public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) { - SubQuery lastStage = query.getSubQuery(finalExecBlockId); + Stage lastStage = query.getStage(finalExecBlockId); NodeType type = lastStage.getBlock().getPlan().getType(); return type != NodeType.CREATE_TABLE && type != NodeType.INSERT; } @@ -499,7 +499,7 @@ public class Query implements EventHandler<QueryEvent> { public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception { - SubQuery lastStage = query.getSubQuery(finalExecBlockId); + Stage lastStage = query.getStage(finalExecBlockId); TableMeta meta = lastStage.getTableMeta(); String nullChar = queryContext.get(SessionVars.NULL_CHAR); @@ -526,7 +526,7 @@ public class Query implements EventHandler<QueryEvent> { @Override public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) { - SubQuery lastStage = query.getSubQuery(finalExecBlockId); + Stage lastStage = query.getStage(finalExecBlockId); return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE; } @@ -534,7 +534,7 @@ public class Query implements EventHandler<QueryEvent> { public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception { CatalogService catalog = context.getWorkerContext().getCatalog(); - SubQuery lastStage = query.getSubQuery(finalExecBlockId); + Stage lastStage = query.getStage(finalExecBlockId); TableStats stats = lastStage.getResultStats(); CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan(); @@ -565,7 +565,7 @@ public class Query implements EventHandler<QueryEvent> { @Override public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) { - SubQuery lastStage = query.getSubQuery(finalExecBlockId); + Stage lastStage = query.getStage(finalExecBlockId); return lastStage.getBlock().getPlan().getType() == NodeType.INSERT; } @@ -575,7 +575,7 @@ public class Query implements EventHandler<QueryEvent> { throws Exception { CatalogService catalog = context.getWorkerContext().getCatalog(); - SubQuery lastStage = query.getSubQuery(finalExecBlockId); + Stage lastStage = query.getStage(finalExecBlockId); TableMeta meta = lastStage.getTableMeta(); TableStats stats = lastStage.getResultStats(); @@ -613,7 +613,7 @@ public class Query implements EventHandler<QueryEvent> { return directorySummary.getLength(); } - public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> { + public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> { private boolean hasNext(Query query) { ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); @@ -624,43 +624,43 @@ public class Query implements EventHandler<QueryEvent> { private void executeNextBlock(Query query) { ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); ExecutionBlock nextBlock = cursor.nextBlock(); - SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock); - nextSubQuery.setPriority(query.priority--); - query.addSubQuery(nextSubQuery); - nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT)); + Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock); + nextStage.setPriority(query.priority--); + query.addStage(nextStage); + nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT)); - LOG.info("Scheduling SubQuery:" + nextSubQuery.getId()); + LOG.info("Scheduling Stage:" + nextStage.getId()); if(LOG.isDebugEnabled()) { - LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority()); - LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan()); + LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority()); + LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan()); } } @Override public void transition(Query query, QueryEvent event) { try { - query.completedSubQueryCount++; - SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event; - - if (castEvent.getState() == SubQueryState.SUCCEEDED) { - query.successedSubQueryCount++; - } else if (castEvent.getState() == SubQueryState.KILLED) { - query.killedSubQueryCount++; - } else if (castEvent.getState() == SubQueryState.FAILED) { - query.failedSubQueryCount++; - } else if (castEvent.getState() == SubQueryState.ERROR) { - query.erroredSubQueryCount++; + query.completedStagesCount++; + StageCompletedEvent castEvent = (StageCompletedEvent) event; + + if (castEvent.getState() == StageState.SUCCEEDED) { + query.successedStagesCount++; + } else if (castEvent.getState() == StageState.KILLED) { + query.killedStagesCount++; + } else if (castEvent.getState() == StageState.FAILED) { + query.failedStagesCount++; + } else if (castEvent.getState() == StageState.ERROR) { + query.erroredStagesCount++; } else { - LOG.error(String.format("Invalid SubQuery (%s) State %s at %s", + LOG.error(String.format("Invalid Stage (%s) State %s at %s", castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name())); query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR)); } - // if a subquery is succeeded and a query is running - if (castEvent.getState() == SubQueryState.SUCCEEDED && // latest subquery succeeded + // if a stage is succeeded and a query is running + if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR. - hasNext(query)) { // there remains at least one subquery. - query.getSubQuery(castEvent.getExecutionBlockId()).waitingIntermediateReport(); + hasNext(query)) { // there remains at least one stage. + query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport(); executeNextBlock(query); } else { // if a query is completed due to finished, kill, failure, or error query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); @@ -687,12 +687,12 @@ public class Query implements EventHandler<QueryEvent> { } } - private static class KillSubQueriesTransition implements SingleArcTransition<Query, QueryEvent> { + private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> { @Override public void transition(Query query, QueryEvent event) { - synchronized (query.subqueries) { - for (SubQuery subquery : query.subqueries.values()) { - query.eventHandler.handle(new SubQueryEvent(subquery.getId(), SubQueryEventType.SQ_KILL)); + synchronized (query.stages) { + for (Stage stage : query.stages.values()) { + query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java index e7e2bc0..c2e1009 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java @@ -150,7 +150,7 @@ public class QueryMasterManagerService extends CompositeService if (queryMasterTask == null) { queryMasterTask = queryMaster.getQueryMasterTask(queryId, true); } - SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getTaskId().getExecutionBlockId()); + Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId()); Task task = sq.getTask(attemptId.getTaskId()); TaskAttempt attempt = task.getAttempt(attemptId.getId()); @@ -221,7 +221,7 @@ public class QueryMasterManagerService extends CompositeService QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId())); if (queryMasterTask != null) { ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId()); - queryMasterTask.getQuery().getSubQuery(ebId).receiveExecutionBlockReport(request); + queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request); } done.run(TajoWorker.TRUE_PROTO); } http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index c96b86e..e3d3d79 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -160,7 +160,7 @@ public class QueryMasterTask extends CompositeService { dispatcher = new TajoAsyncDispatcher(queryId.toString()); addService(dispatcher); - dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher()); + dispatcher.register(StageEventType.class, new StageEventDispatcher()); dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler()); @@ -255,7 +255,7 @@ public class QueryMasterTask extends CompositeService { public void handleTaskRequestEvent(TaskRequestEvent event) { ExecutionBlockId id = event.getExecutionBlockId(); - query.getSubQuery(id).handleTaskRequestEvent(event); + query.getStage(id).handleTaskRequestEvent(event); } public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) { @@ -274,13 +274,13 @@ public class QueryMasterTask extends CompositeService { } } - private class SubQueryEventDispatcher implements EventHandler<SubQueryEvent> { - public void handle(SubQueryEvent event) { - ExecutionBlockId id = event.getSubQueryId(); + private class StageEventDispatcher implements EventHandler<StageEvent> { + public void handle(StageEvent event) { + ExecutionBlockId id = event.getStageId(); if(LOG.isDebugEnabled()) { - LOG.debug("SubQueryEventDispatcher:" + id + "," + event.getType()); + LOG.debug("StageEventDispatcher:" + id + "," + event.getType()); } - query.getSubQuery(id).handle(event); + query.getStage(id).handle(event); } } @@ -291,7 +291,7 @@ public class QueryMasterTask extends CompositeService { if(LOG.isDebugEnabled()) { LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType()); } - Task task = query.getSubQuery(taskId.getExecutionBlockId()). + Task task = query.getStage(taskId.getExecutionBlockId()). getTask(taskId); task.handle(event); } @@ -301,8 +301,8 @@ public class QueryMasterTask extends CompositeService { implements EventHandler<TaskAttemptEvent> { public void handle(TaskAttemptEvent event) { TaskAttemptId attemptId = event.getTaskAttemptId(); - SubQuery subQuery = query.getSubQuery(attemptId.getTaskId().getExecutionBlockId()); - Task task = subQuery.getTask(attemptId.getTaskId()); + Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId()); + Task task = stage.getTask(attemptId.getTaskId()); TaskAttempt attempt = task.getAttempt(attemptId); attempt.handle(event); } @@ -311,8 +311,8 @@ public class QueryMasterTask extends CompositeService { private class TaskSchedulerDispatcher implements EventHandler<TaskSchedulerEvent> { public void handle(TaskSchedulerEvent event) { - SubQuery subQuery = query.getSubQuery(event.getExecutionBlockId()); - subQuery.getTaskScheduler().handle(event); + Stage stage = query.getStage(event.getExecutionBlockId()); + stage.getTaskScheduler().handle(event); } } @@ -627,8 +627,8 @@ public class QueryMasterTask extends CompositeService { return dispatcher; } - public SubQuery getSubQuery(ExecutionBlockId id) { - return query.getSubQuery(id); + public Stage getStage(ExecutionBlockId id) { + return query.getStage(id); } public Map<String, TableDesc> getTableDescMap() {
