TEZ-2914. Ability to limit vertex concurrency (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/34eb75d7 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/34eb75d7 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/34eb75d7 Branch: refs/heads/TEZ-2980 Commit: 34eb75d709bbe6e0417f9b4023f4fa1cec81bd8b Parents: 12bd908 Author: Bikas Saha <[email protected]> Authored: Fri Dec 25 19:39:04 2015 -0800 Committer: Bikas Saha <[email protected]> Committed: Fri Dec 25 19:39:04 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/TezConfiguration.java | 12 +- .../apache/tez/dag/app/dag/DAGScheduler.java | 68 +++++++++- .../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +- .../app/dag/event/DAGEventSchedulerUpdate.java | 3 +- .../DAGEventSchedulerUpdateTAAssigned.java | 36 ------ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 17 +-- .../app/dag/impl/DAGSchedulerNaturalOrder.java | 15 +-- .../DAGSchedulerNaturalOrderControlled.java | 15 +-- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 43 ++----- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 6 + .../tez/dag/app/rm/TaskSchedulerManager.java | 2 - .../apache/tez/dag/app/MockDAGAppMaster.java | 13 +- .../tez/dag/app/TestMockDAGAppMaster.java | 50 ++++++++ .../tez/dag/app/dag/impl/TestDAGImpl.java | 4 - .../tez/dag/app/dag/impl/TestDAGScheduler.java | 127 ++++++++++++++++++- .../TestDAGSchedulerNaturalOrderControlled.java | 38 +++--- .../dag/app/rm/TestTaskSchedulerManager.java | 6 +- 18 files changed, 306 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a3b0fa6..25cfd86 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-2914. Ability to limit vertex concurrency TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex TEZ-3006. Remove unused import in TestHistoryParser. TEZ-2910. Set caller context for tracing ( integrate with HDFS-9184 ). http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index b707857..9f7777f 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -537,7 +537,17 @@ public class TezConfiguration extends Configuration { public static final String TEZ_AM_MAX_APP_ATTEMPTS = TEZ_AM_PREFIX + "max.app.attempts"; public static final int TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT = 2; - + + /** + * Int value. The maximum number of attempts that can run concurrently for a given vertex. + * Setting <=0 implies no limit + */ + @ConfigurationScope(Scope.VERTEX) + @ConfigurationProperty(type="integer") + public static final String TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY = + TEZ_AM_PREFIX + "vertex.max-task-concurrency"; + public static final int TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT = -1; + /** * Int value. The maximum number of attempts that can fail for a particular task before the task is failed. * This does not count killed attempts. Task failure results in DAG failure. http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java index 2d3b006..87a6261 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java @@ -18,16 +18,70 @@ package org.apache.tez.dag.app.dag; +import java.util.Map; +import java.util.Queue; + import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; -import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned; +import org.apache.tez.dag.records.TezVertexID; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; -public interface DAGScheduler { +public abstract class DAGScheduler { + private static class VertexInfo { + int concurrencyLimit; + int concurrency; + Queue<DAGEventSchedulerUpdate> pendingAttempts = Lists.newLinkedList(); + + VertexInfo(int limit) { + this.concurrencyLimit = limit; + } + } - public void vertexCompleted(Vertex vertex); + Map<TezVertexID, VertexInfo> vertexInfo = null; - public void scheduleTask(DAGEventSchedulerUpdate event); + public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) { + if (vertexInfo == null) { + vertexInfo = Maps.newHashMap(); + } + if (concurrency > 0) { + vertexInfo.put(vId, new VertexInfo(concurrency)); + } + } - public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event); - - public void taskSucceeded(DAGEventSchedulerUpdate event); + public void scheduleTask(DAGEventSchedulerUpdate event) { + VertexInfo vInfo = null; + if (vertexInfo != null) { + vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID()); + } + scheduleTaskWithLimit(event, vInfo); + } + + private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) { + if (vInfo != null) { + if (vInfo.concurrency >= vInfo.concurrencyLimit) { + vInfo.pendingAttempts.add(event); + return; // already at max concurrency + } + vInfo.concurrency++; + } + scheduleTaskEx(event); + } + + public void taskCompleted(DAGEventSchedulerUpdate event) { + taskCompletedEx(event); + if (vertexInfo != null) { + VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID()); + if (vInfo != null) { + vInfo.concurrency--; + if (!vInfo.pendingAttempts.isEmpty()) { + scheduleTaskWithLimit(vInfo.pendingAttempts.poll(), vInfo); + } + } + } + } + + public abstract void scheduleTaskEx(DAGEventSchedulerUpdate event); + + public abstract void taskCompletedEx(DAGEventSchedulerUpdate event); } http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 9fc73a2..54f2ffa 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -85,7 +85,7 @@ public interface Vertex extends Comparable<Vertex> { */ TezCounters getCachedCounters(); - + int getMaxTaskConcurrency(); Map<TezTaskID, Task> getTasks(); Task getTask(TezTaskID taskID); Task getTask(int taskIndex); http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java index a436a8c..eda02b5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java @@ -24,8 +24,7 @@ public class DAGEventSchedulerUpdate extends DAGEvent { public enum UpdateType { TA_SCHEDULE, - TA_SCHEDULED, - TA_SUCCEEDED + TA_COMPLETED } private final TaskAttempt attempt; http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java deleted file mode 100644 index 8e27843..0000000 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java +++ /dev/null @@ -1,36 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tez.dag.app.dag.event; - -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.tez.dag.app.dag.TaskAttempt; - -public class DAGEventSchedulerUpdateTAAssigned extends DAGEventSchedulerUpdate { - - final Container container; - - public DAGEventSchedulerUpdateTAAssigned(TaskAttempt attempt, Container container) { - super(DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULED, attempt); - this.container = container; - } - - public Container getContainer() { - return container; - } -} http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 3d47450..60f933f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -104,7 +104,6 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate; import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate; import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; -import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned; import org.apache.tez.dag.app.dag.event.DAGEventStartDag; import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted; @@ -1592,6 +1591,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, LOG.info("Using DAG Scheduler: " + dagSchedulerClassName); dag.dagScheduler = ReflectionUtils.createClazzInstance(dagSchedulerClassName, new Class<?>[] { DAG.class, EventHandler.class}, new Object[] {dag, dag.eventHandler}); + for (Vertex v : dag.vertices.values()) { + dag.dagScheduler.addVertexConcurrencyLimit(v.getVertexId(), v.getMaxTaskConcurrency()); + } } private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) { @@ -1903,10 +1905,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, Vertex vertex = job.vertices.get(vertexEvent.getVertexId()); job.numCompletedVertices++; if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) { - if (!job.reRunningVertices.contains(vertex.getVertexId())) { - // vertex succeeded for the first time - job.dagScheduler.vertexCompleted(vertex); - } forceTransitionToKillWait = !(job.vertexSucceeded(vertex)); } else if (vertexEvent.getVertexState() == VertexState.FAILED) { @@ -2146,13 +2144,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, case TA_SCHEDULE: dag.dagScheduler.scheduleTask(sEvent); break; - case TA_SCHEDULED: - DAGEventSchedulerUpdateTAAssigned taEvent = - (DAGEventSchedulerUpdateTAAssigned) sEvent; - dag.dagScheduler.taskScheduled(taEvent); - break; - case TA_SUCCEEDED: - dag.dagScheduler.taskSucceeded(sEvent); + case TA_COMPLETED: + dag.dagScheduler.taskCompleted(sEvent); break; default: throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:" http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java index 8d42227..4246ad0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java @@ -26,11 +26,10 @@ import org.apache.tez.dag.app.dag.DAGScheduler; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; -import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned; import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; @SuppressWarnings("rawtypes") -public class DAGSchedulerNaturalOrder implements DAGScheduler { +public class DAGSchedulerNaturalOrder extends DAGScheduler { private static final Logger LOG = LoggerFactory.getLogger(DAGSchedulerNaturalOrder.class); @@ -44,11 +43,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler { } @Override - public void vertexCompleted(Vertex vertex) { - } - - @Override - public void scheduleTask(DAGEventSchedulerUpdate event) { + public void scheduleTaskEx(DAGEventSchedulerUpdate event) { TaskAttempt attempt = event.getAttempt(); Vertex vertex = dag.getVertex(attempt.getVertexID()); int vertexDistanceFromRoot = vertex.getDistanceFromRoot(); @@ -69,11 +64,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler { } @Override - public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) { - } - - @Override - public void taskSucceeded(DAGEventSchedulerUpdate event) { + public void taskCompletedEx(DAGEventSchedulerUpdate event) { } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java index 2469a2f..0802dce 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java @@ -36,7 +36,6 @@ import org.apache.tez.dag.app.dag.DAGScheduler; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; -import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned; import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -50,7 +49,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; * - generic slow start mechanism across all vertices - independent of the type of edges. */ @SuppressWarnings("rawtypes") -public class DAGSchedulerNaturalOrderControlled implements DAGScheduler { +public class DAGSchedulerNaturalOrderControlled extends DAGScheduler { private static final Logger LOG = LoggerFactory.getLogger(DAGSchedulerNaturalOrderControlled.class); @@ -72,13 +71,9 @@ public class DAGSchedulerNaturalOrderControlled implements DAGScheduler { this.handler = dispatcher; } - @Override - public void vertexCompleted(Vertex vertex) { - } - // TODO Does ordering matter - it currently depends on the order returned by vertex.getOutput* @Override - public void scheduleTask(DAGEventSchedulerUpdate event) { + public void scheduleTaskEx(DAGEventSchedulerUpdate event) { TaskAttempt attempt = event.getAttempt(); Vertex vertex = dag.getVertex(attempt.getVertexID()); int vertexDistanceFromRoot = vertex.getDistanceFromRoot(); @@ -241,11 +236,7 @@ public class DAGSchedulerNaturalOrderControlled implements DAGScheduler { } @Override - public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) { - } - - @Override - public void taskSucceeded(DAGEventSchedulerUpdate event) { + public void taskCompletedEx(DAGEventSchedulerUpdate event) { } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 0f76a63..c00d674 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -825,40 +825,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId, TaskAttemptStateInternal attemptState) { this.sendTaskAttemptCompletionEvent(attemptId, attemptState); - } - - // TODO: Recovery - /* - private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) { - TaskFinishedEvent tfe = - new TaskFinishedEvent(task.taskId, - task.successfulAttempt, - task.getFinishTime(task.successfulAttempt), - task.taskId.getTaskType(), - taskState.toString(), - task.getCounters()); - return tfe; - } - - private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TezTaskAttemptID taId) { - StringBuilder errorSb = new StringBuilder(); - if (diag != null) { - for (String d : diag) { - errorSb.append(", ").append(d); - } + if (getInternalState() != TaskStateInternal.SUCCEEDED) { + sendDAGSchedulerFinishedEvent(attemptId); // not a retro active action } - TaskFailedEvent taskFailedEvent = new TaskFailedEvent( - TypeConverter.fromYarn(task.taskId), - // Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition. - task.getFinishTime(taId), - TypeConverter.fromYarn(task.getType()), - errorSb.toString(), - taskState.toString(), - taId == null ? null : TypeConverter.fromYarn(taId)); - return taskFailedEvent; } - */ + private void sendDAGSchedulerFinishedEvent(TezTaskAttemptID taId) { + // send notification to DAG scheduler + eventHandler.handle(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, attempts.get(taId))); + } + private static void unSucceed(TaskImpl task) { task.commitAttempt = null; task.successfulAttempt = null; @@ -1105,10 +1082,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { .getID(), diagnostics, errCause)); } } - // send notification to DAG scheduler - task.eventHandler.handle(new DAGEventSchedulerUpdate( - DAGEventSchedulerUpdate.UpdateType.TA_SUCCEEDED, task.attempts - .get(task.successfulAttempt))); return task.finished(TaskStateInternal.SUCCEEDED); } } http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 93baa0a..065974e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1184,6 +1184,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl readLock.unlock(); } } + + @Override + public int getMaxTaskConcurrency() { + return vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, + TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT); + } public VertexStats getVertexStats() { http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java index dbf8e38..f688b57 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java @@ -69,7 +69,6 @@ import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError; import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType; -import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned; import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA; import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted; @@ -569,7 +568,6 @@ public class TaskSchedulerManager extends AbstractService implements sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(), event.getContainerContext(), event.getLauncherId(), event.getTaskCommId())); } - sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container)); sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(), event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event .getContainerContext().getCredentials(), event.getPriority())); http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java index bc7fa98..b322e05 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java @@ -83,7 +83,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -@SuppressWarnings("unchecked") public class MockDAGAppMaster extends DAGAppMaster { private static final Logger LOG = LoggerFactory.getLogger(MockDAGAppMaster.class); @@ -95,6 +94,7 @@ public class MockDAGAppMaster extends DAGAppMaster { EventsDelegate eventsDelegate; CountersDelegate countersDelegate; StatisticsDelegate statsDelegate; + ContainerDelegate containerDelegate; long launcherSleepTime = 1; boolean doSleep = true; int handlerConcurrency = 1; @@ -115,6 +115,11 @@ public class MockDAGAppMaster extends DAGAppMaster { public static interface EventsDelegate { public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time); } + + public static interface ContainerDelegate { + public void stop(ContainerStopRequest event); + public void launch(ContainerLaunchRequest event); + } // mock container launcher does not launch real tasks. // Upon, launch of a container is simulates the container asking for tasks @@ -268,6 +273,9 @@ public class MockDAGAppMaster extends DAGAppMaster { void stop(ContainerStopRequest event) { // remove from simulated container list containers.remove(event.getContainerId()); + if (containerDelegate != null) { + containerDelegate.stop(event); + } getContext().containerStopRequested(event.getContainerId()); } @@ -277,6 +285,9 @@ public class MockDAGAppMaster extends DAGAppMaster { event.getContainerLaunchContext()); containers.put(event.getContainerId(), cData); containersToProcess.add(cData); + if (containerDelegate != null) { + containerDelegate.launch(event); + } getContext().containerLaunched(event.getContainerId()); } http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index b0bc571..d5ee67d 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.SystemUtils; @@ -73,6 +74,7 @@ import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.api.client.VertexStatus.State; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate; +import org.apache.tez.dag.app.MockDAGAppMaster.ContainerDelegate; import org.apache.tez.dag.app.MockDAGAppMaster.EventsDelegate; import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher; import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData; @@ -100,6 +102,8 @@ import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TaskStatistics; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; +import org.apache.tez.serviceplugins.api.ContainerLaunchRequest; +import org.apache.tez.serviceplugins.api.ContainerStopRequest; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -406,6 +410,52 @@ public class TestMockDAGAppMaster { tezClient.stop(); } + + @Test (timeout = 100000) + public void testConcurrencyLimit() throws Exception { + // the test relies on local mode behavior of launching a new container per task. + // so task concurrency == container concurrency + TezConfiguration tezconf = new TezConfiguration(defaultConf); + + final int concurrencyLimit = 5; + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, + null, false, false, concurrencyLimit*4, 1000); + + tezClient.start(); + + MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); + MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(false); + + final AtomicInteger concurrency = new AtomicInteger(0); + final AtomicBoolean exceededConcurrency = new AtomicBoolean(false); + mockApp.containerDelegate = new ContainerDelegate() { + @Override + public void stop(ContainerStopRequest event) { + concurrency.decrementAndGet(); + } + @Override + public void launch(ContainerLaunchRequest event) { + int maxConc = concurrency.incrementAndGet(); + if (maxConc > concurrencyLimit) { + exceededConcurrency.set(true); + } + System.out.println("Launched: " + maxConc); + } + }; + DAG dag = DAG.create("testConcurrencyLimit"); + Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 20).setConf( + TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, String.valueOf(concurrencyLimit)); + dag.addVertex(vA); + + mockLauncher.startScheduling(true); + DAGClient dagClient = tezClient.submitDAG(dag); + dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState()); + Assert.assertFalse(exceededConcurrency.get()); + tezClient.stop(); + } + @Test (timeout = 10000) public void testBasicCounters() throws Exception { http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 1809230..2158368 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -1617,7 +1617,6 @@ public class TestDAGImpl { Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(1, dag.getSuccessfulVertices()); Assert.assertEquals(1, dag.numCompletedVertices); - verify(dag.dagScheduler, times(1)).vertexCompleted(v); dispatcher.getEventHandler().handle( new VertexEventTaskReschedule(TezTaskID.getInstance(vId, 0))); @@ -1634,9 +1633,6 @@ public class TestDAGImpl { Assert.assertEquals(1, dag.getSuccessfulVertices()); Assert.assertEquals(1, dag.numCompletedVertices); - // re-completion is not notified again - verify(dag.dagScheduler, times(1)).vertexCompleted(v); - } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java index 913f5fa..a28f367 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java @@ -25,27 +25,34 @@ import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.junit.Assert; import org.junit.Test; +import com.google.common.collect.Lists; + import static org.mockito.Mockito.*; +import java.util.List; + public class TestDAGScheduler { class MockEventHandler implements EventHandler<TaskAttemptEventSchedule> { TaskAttemptEventSchedule event; + List<TaskAttemptEventSchedule> events = Lists.newLinkedList(); @Override public void handle(TaskAttemptEventSchedule event) { this.event = event; + this.events.add(event); } - } - MockEventHandler mockEventHandler = new MockEventHandler(); @Test(timeout=5000) public void testDAGSchedulerNaturalOrder() { + MockEventHandler mockEventHandler = new MockEventHandler(); DAG mockDag = mock(DAG.class); Vertex mockVertex = mock(Vertex.class); TaskAttempt mockAttempt = mock(TaskAttempt.class); @@ -58,15 +65,125 @@ public class TestDAGScheduler { DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag, mockEventHandler); - scheduler.scheduleTask(event); + scheduler.scheduleTaskEx(event); Assert.assertEquals(1, mockEventHandler.event.getPriorityHighLimit()); Assert.assertEquals(3, mockEventHandler.event.getPriorityLowLimit()); - scheduler.scheduleTask(event); + scheduler.scheduleTaskEx(event); Assert.assertEquals(4, mockEventHandler.event.getPriorityHighLimit()); Assert.assertEquals(6, mockEventHandler.event.getPriorityLowLimit()); - scheduler.scheduleTask(event); + scheduler.scheduleTaskEx(event); Assert.assertEquals(7, mockEventHandler.event.getPriorityHighLimit()); Assert.assertEquals(9, mockEventHandler.event.getPriorityLowLimit()); } + @Test(timeout=5000) + public void testConcurrencyLimit() { + MockEventHandler mockEventHandler = new MockEventHandler(); + DAG mockDag = mock(DAG.class); + TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00"); + TezVertexID vId1 = TezVertexID.fromString("vertex_1436907267600_195589_1_01"); + TezTaskID tId0 = TezTaskID.getInstance(vId0, 0); + TezTaskID tId1 = TezTaskID.getInstance(vId1, 0); + + TaskAttempt mockAttempt; + + Vertex mockVertex = mock(Vertex.class); + when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex); + when(mockVertex.getDistanceFromRoot()).thenReturn(0); + + DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag, + mockEventHandler); + scheduler.addVertexConcurrencyLimit(vId0, 0); // not effective + + // schedule beyond limit and it gets scheduled + mockAttempt = mock(TaskAttempt.class); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 0)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(1, mockEventHandler.events.size()); + mockAttempt = mock(TaskAttempt.class); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 1)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(2, mockEventHandler.events.size()); + mockAttempt = mock(TaskAttempt.class); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 2)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(3, mockEventHandler.events.size()); + + mockEventHandler.events.clear(); + List<TaskAttempt> mockAttempts = Lists.newArrayList(); + int completed = 0; + int requested = 0; + int scheduled = 0; + scheduler.addVertexConcurrencyLimit(vId1, 2); // effective + + // schedule beyond limit and it gets buffered + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled + Assert.assertEquals(mockAttempts.get(scheduled).getID(), + mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order + scheduled++; + + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled + Assert.assertEquals(mockAttempts.get(scheduled).getID(), + mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order + scheduled++; + + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered + + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered + + scheduler.taskCompleted(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++))); + Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled + Assert.assertEquals(mockAttempts.get(scheduled).getID(), + mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order + scheduled++; + + scheduler.taskCompleted(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++))); + Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled + Assert.assertEquals(mockAttempts.get(scheduled).getID(), + mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order + scheduled++; + + scheduler.taskCompleted(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++))); + Assert.assertEquals(scheduled, mockEventHandler.events.size()); // no extra scheduling + + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled + Assert.assertEquals(mockAttempts.get(scheduled).getID(), + mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order + scheduled++; + + } + + + } http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java index bc86761..63137c7 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java @@ -63,35 +63,35 @@ public class TestDAGSchedulerNaturalOrderControlled { // Schedule all tasks belonging to v0 for (int i = 0; i < vertices[0].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class)); reset(eventHandler); // Schedule 3 tasks belonging to v2 for (int i = 0; i < 3; i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0)); } verify(eventHandler, times(3)).handle(any(Event.class)); reset(eventHandler); // Schedule 3 tasks belonging to v3 for (int i = 0; i < 3; i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0)); } verify(eventHandler, times(3)).handle(any(Event.class)); reset(eventHandler); // Schedule remaining tasks belonging to v2 for (int i = 3; i < vertices[2].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[2].getTotalTasks() - 3)).handle(any(Event.class)); reset(eventHandler); // Schedule remaining tasks belonging to v3 for (int i = 3; i < vertices[3].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[3].getTotalTasks() - 3)).handle(any(Event.class)); reset(eventHandler); @@ -99,7 +99,7 @@ public class TestDAGSchedulerNaturalOrderControlled { // Schedule all tasks belonging to v4 for (int i = 0; i < vertices[4].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[4].getTotalTasks())).handle(any(Event.class)); reset(eventHandler); @@ -122,7 +122,7 @@ public class TestDAGSchedulerNaturalOrderControlled { // Schedule all tasks belonging to v0 for (int i = 0; i < vertices[0].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class)); reset(eventHandler); @@ -130,14 +130,14 @@ public class TestDAGSchedulerNaturalOrderControlled { // v2 behaving as if configured with slow-start. // Schedule all tasks belonging to v3. for (int i = 0; i < vertices[3].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class)); reset(eventHandler); // Scheduling all tasks belonging to v4. None should get scheduled. for (int i = 0; i < vertices[4].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0)); } verify(eventHandler, never()).handle(any(Event.class)); reset(eventHandler); @@ -145,14 +145,14 @@ public class TestDAGSchedulerNaturalOrderControlled { // v2 now starts scheduling ... // Schedule 3 tasks for v2 initially. for (int i = 0; i < 3; i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0)); } verify(eventHandler, times(3)).handle(any(Event.class)); reset(eventHandler); // Schedule remaining tasks belonging to v2 for (int i = 3; i < vertices[2].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0)); } ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class); // All of v2 and v3 should be sent out. @@ -190,7 +190,7 @@ public class TestDAGSchedulerNaturalOrderControlled { // Schedule all tasks belonging to v0 for (int i = 0; i < vertices[0].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class)); reset(eventHandler); @@ -200,14 +200,14 @@ public class TestDAGSchedulerNaturalOrderControlled { // v2 will change parallelism // Schedule all tasks belonging to v3 for (int i = 0; i < vertices[3].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class)); reset(eventHandler); // Schedule all tasks belonging to v4 for (int i = 0; i < vertices[4].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0)); } verify(eventHandler, never()).handle(any(Event.class)); reset(eventHandler); @@ -218,7 +218,7 @@ public class TestDAGSchedulerNaturalOrderControlled { // Schedule all tasks belonging to v2 for (int i = 0; i < vertices[2].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[2].getTotalTasks() + vertices[4].getTotalTasks())) .handle(any(Event.class)); @@ -241,7 +241,7 @@ public class TestDAGSchedulerNaturalOrderControlled { // Schedule all but 1 task belonging to v0 for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0)); } verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class)); reset(eventHandler); @@ -249,7 +249,7 @@ public class TestDAGSchedulerNaturalOrderControlled { // Schedule all tasks belonging to v2 for (int i = 0; i < vertices[2].getTotalTasks(); i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0)); } // Nothing should be scheduled verify(eventHandler, never()).handle(any(Event.class)); @@ -257,14 +257,14 @@ public class TestDAGSchedulerNaturalOrderControlled { // Schedule an extra attempt for all but 1 task belonging to v0 for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) { - dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 1)); + dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 1)); } // Only v0 requests should have gone out verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class)); reset(eventHandler); // Schedule last task of v0, with attempt 1 - dagScheduler.scheduleTask( + dagScheduler.scheduleTaskEx( createScheduleRequest(vertices[0].getVertexId(), vertices[0].getTotalTasks() - 1, 1)); // One v0 request and all of v2 should have gone out verify(eventHandler, times(1 + vertices[2].getTotalTasks())).handle(any(Event.class)); http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java index 4db51b9..8e4e4f0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java @@ -191,10 +191,10 @@ public class TestTaskSchedulerManager { new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint, priority, containerContext, 0, 0, 0); schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container); - assertEquals(2, mockEventHandler.events.size()); - assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA); + assertEquals(1, mockEventHandler.events.size()); + assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA); AMContainerEventAssignTA assignEvent = - (AMContainerEventAssignTA) mockEventHandler.events.get(1); + (AMContainerEventAssignTA) mockEventHandler.events.get(0); assertEquals(priority, assignEvent.getPriority()); assertEquals(mockAttemptId, assignEvent.getTaskAttemptId()); }
