Updated Branches: refs/heads/TEZ-1 3bb954bd9 -> 82ee45e8d
TEZ-27. Create tests for DAG Scheduler (bikas) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/82ee45e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/82ee45e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/82ee45e8 Branch: refs/heads/TEZ-1 Commit: 82ee45e8d3d7f0168b49c534384217305ab81a6d Parents: 3bb954b Author: Bikas Saha <[email protected]> Authored: Tue May 28 19:20:30 2013 -0700 Committer: Bikas Saha <[email protected]> Committed: Tue May 28 19:20:30 2013 -0700 ---------------------------------------------------------------------- .../org/apache/tez/dag/app/dag/TaskAttempt.java | 5 + .../dag/app/dag/event/DAGEventSchedulerUpdate.java | 2 +- .../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 30 ++- .../tez/dag/app/dag/impl/DAGSchedulerMRR.java | 87 +++---- .../dag/app/dag/impl/DAGSchedulerNaturalOrder.java | 2 +- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 18 ++- .../tez/dag/app/rm/TaskSchedulerEventHandler.java | 2 +- .../tez/dag/app/dag/impl/TestDAGScheduler.java | 188 +++++++++++++++ 8 files changed, 277 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index 712f968..2628f0c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -27,13 +27,18 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.oldrecords.TaskAttemptReport; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezVertexID; /** * Read only view of TaskAttempt. */ public interface TaskAttempt { TezTaskAttemptID getID(); + TezVertexID getVertexID(); + TezDAGID getDAGID(); + TaskAttemptReport getReport(); List<String> getDiagnostics(); TezCounters getCounters(); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java index ff11e99..99b872e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java @@ -30,7 +30,7 @@ public class DAGEventSchedulerUpdate extends DAGEvent { private final UpdateType updateType; public DAGEventSchedulerUpdate(UpdateType updateType, TaskAttempt attempt) { - super(attempt.getID().getTaskID().getVertexID().getDAGId(), + super(attempt.getDAGID(), DAGEventType.DAG_SCHEDULER_UPDATE); this.attempt = attempt; this.updateType = updateType; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index cc6699d..3681c85 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -869,9 +869,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, for (Vertex v : dag.vertices.values()) { parseVertexEdges(dag, edgePlans, v); } - - dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler); - //dag.dagScheduler = new DAGSchedulerMRR(dag, dag.eventHandler); + + assignDAGScheduler(dag); // TODO Metrics //dag.metrics.endPreparingJob(dag); @@ -887,6 +886,31 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, return dag.finished(DAGState.FAILED); } } + + private void assignDAGScheduler(DAGImpl dag) { + boolean isMRR = true; + for(Vertex vertex : dag.vertices.values()) { + Map<Vertex, EdgeProperty> outVertices = vertex.getOutputVertices(); + if(outVertices == null || outVertices.isEmpty()) { + continue; + } + if(outVertices.size() > 1 || + outVertices.values().iterator().next().getConnectionPattern() != + EdgeProperty.ConnectionPattern.BIPARTITE) { + // more than 1 output OR single output is not bipartite + isMRR = false; + break; + } + } + + if(isMRR) { + LOG.info("Using MRR dag scheduler"); + dag.dagScheduler = new DAGSchedulerMRR(dag, dag.eventHandler); + } else { + LOG.info("Using Natural order dag scheduler"); + dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler); + } + } private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) { TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java index 12fdb10..d404155 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java @@ -48,7 +48,7 @@ public class DAGSchedulerMRR implements DAGScheduler { @Override public void vertexCompleted(Vertex vertex) { - if(currentPartitioner!= null) { + if(currentPartitioner != null) { if(vertex != currentPartitioner) { String message = vertex.getVertexId() + " finished. Expecting " + currentPartitioner + " to finish."; @@ -57,68 +57,59 @@ public class DAGSchedulerMRR implements DAGScheduler { } LOG.info("Current partitioner " + currentPartitioner.getVertexId() + " is completed. " - + (currentShuffler!=null?currentShuffler.getVertexId():"null") - + " is new partitioner"); + + (currentShuffler!=null ? + currentShuffler.getVertexId() + " is new partitioner": + "No current shuffler to replace the partitioner")); currentPartitioner = currentShuffler; - currentShuffler = null; - } else { - if(vertex != currentShuffler) { - String message = vertex.getVertexId() + " finished. Expecting " - + currentShuffler.getVertexId() + " to finish"; - LOG.fatal(message); - throw new TezException(message); - } + currentShuffler = null; } } @Override public void scheduleTask(DAGEventSchedulerUpdate event) { TaskAttempt attempt = event.getAttempt(); - Vertex vertex = dag.getVertex(attempt.getID().getTaskID().getVertexID()); + Vertex vertex = dag.getVertex(attempt.getVertexID()); int vertexDistanceFromRoot = vertex.getDistanceFromRoot(); - if(vertexDistanceFromRoot == 0) { + boolean reOrderPriority = false; + + if(currentPartitioner == null) { + // no partitioner. so set it. currentPartitioner = vertex; - LOG.info(vertex.getVertexId() + " is first partitioner"); + currentShufflerDepth = vertexDistanceFromRoot; + LOG.info(vertex.getVertexId() + " is new partitioner at depth " + + vertexDistanceFromRoot); + } else if (currentShuffler == null && + vertexDistanceFromRoot > currentShufflerDepth) { + // vertex not a partitioner. no shuffler set. has more depth than current + // shuffler. this must be the new shuffler. + currentShuffler = vertex; + currentShufflerDepth = vertexDistanceFromRoot; + LOG.info(vertex.getVertexId() + " is new shuffler at depth " + + currentShufflerDepth); } - if(vertexDistanceFromRoot > currentShufflerDepth) { - if(currentShuffler == null) { - currentShuffler = vertex; - currentShufflerDepth = vertexDistanceFromRoot; - LOG.info(currentShuffler.getVertexId() + " is new shuffler at depth " + - currentShufflerDepth); - } else { - if(currentShufflerDepth+1 == vertexDistanceFromRoot && - currentPartitioner == null - ) { - currentPartitioner = currentShuffler; - currentShuffler = vertex; - currentShufflerDepth = vertexDistanceFromRoot; - LOG.info("Shuffler " + currentPartitioner.getVertexId() + - " becomes partitioner as new shuffler " + - currentShuffler.getVertexId() + " has started at depth " + - currentShufflerDepth); - } else { - String message = vertex.getVertexId() - + " has scheduled tasks at depth " + vertexDistanceFromRoot - + " greater than depth " + currentShufflerDepth - + " of current shuffler " + currentShuffler.getVertexId() - + ". Unexpected."; - LOG.fatal(message); - throw new TezException(message); - } - } + + if(currentShuffler == vertex) { + // current shuffler vertex. assign special priority + reOrderPriority = true; } + + // sanity check + if(currentPartitioner != vertex && currentShuffler != vertex) { + String message = vertex.getVertexId() + " is neither the " + + " current partitioner: " + currentPartitioner.getVertexId() + + " nor the current shuffler: " + currentShuffler.getVertexId(); + LOG.fatal(message); + throw new TezException(message); + } // natural priority. Handles failures and retries. int priority = (vertexDistanceFromRoot + 1) * 3; - if(currentShuffler == vertex) { - if(currentPartitioner != null) { - // special priority for current reducers while current partitioners are - // still running. Schedule at priority one higher than natural priority - // of previous vertex. - priority -= 4; // this == (partitionerDepth+1)*3 - 1 - } + if(reOrderPriority) { + // special priority for current reducers while current partitioners are + // still running. Schedule at priority one higher than natural priority + // of previous vertex. + priority -= 4; // this == (partitionerDepth+1)*3 - 1 } else { if(attempt.getIsRescheduled()) { // higher priority for retries of failed attempts. Only makes sense in http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java index 607e5af..9e8729e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java @@ -50,7 +50,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler { @Override public void scheduleTask(DAGEventSchedulerUpdate event) { TaskAttempt attempt = event.getAttempt(); - Vertex vertex = dag.getVertex(attempt.getID().getTaskID().getVertexID()); + Vertex vertex = dag.getVertex(attempt.getVertexID()); int vertexDistanceFromRoot = vertex.getDistanceFromRoot(); // natural priority. Handles failures and retries. http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 5d4242d..5225ecf 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -90,8 +90,10 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.dag.utils.TezBuilderUtils; import org.apache.tez.engine.common.security.JobTokenIdentifier; @@ -290,6 +292,16 @@ public class TaskAttemptImpl implements TaskAttempt, return attemptId; } + @Override + public TezVertexID getVertexID() { + return attemptId.getTaskID().getVertexID(); + } + + @Override + public TezDAGID getDAGID() { + return getVertexID().getDAGId(); + } + TezTaskContext createRemoteTask() { Vertex vertex = getTask().getVertex(); DAG dag = vertex.getDAG(); @@ -601,7 +613,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptImpl ta) { DAGEventCounterUpdate jce = new DAGEventCounterUpdate( - ta.getID().getTaskID().getVertexID().getDAGId() + ta.getDAGID() ); jce.addCounterUpdate(DAGCounter.TOTAL_LAUNCHED_TASKS, 1); return jce; @@ -611,7 +623,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptImpl ta) { DAGEventCounterUpdate jce = new DAGEventCounterUpdate( - ta.getID().getTaskID().getVertexID().getDAGId() + ta.getDAGID() ); long slotMillis = computeSlotMillis(ta); @@ -624,7 +636,7 @@ public class TaskAttemptImpl implements TaskAttempt, TaskAttemptStateInternal taState) { DAGEventCounterUpdate jce = new DAGEventCounterUpdate( - taskAttempt.getID().getTaskID().getVertexID().getDAGId()); + taskAttempt.getDAGID()); long slotMillisIncrement = computeSlotMillis(taskAttempt); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index c61cbcc..8b98c68 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -461,7 +461,7 @@ public class TaskSchedulerEventHandler extends AbstractService sendEvent(new AMContainerEventLaunchRequest( containerId, - taskAttempt.getID().getTaskID().getVertexID(), + taskAttempt.getVertexID(), event.getJobToken(), // TODO getConf from AMSchedulerEventTALaunchRequest event.getCredentials(), false, event.getConf(), http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java new file mode 100644 index 0000000..988a116 --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.dag.impl; + +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.DAGScheduler; +import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; +import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezVertexID; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.*; + +public class TestDAGScheduler { + + class MockEventHandler implements EventHandler<TaskAttemptEventSchedule> { + TaskAttemptEventSchedule event; + @Override + public void handle(TaskAttemptEventSchedule event) { + this.event = event; + } + + } + + MockEventHandler mockEventHandler = new MockEventHandler(); + + //@Test(timeout=10000) + public void testDAGSchedulerNaturalOrder() { + DAG mockDag = mock(DAG.class); + Vertex mockVertex = mock(Vertex.class); + TaskAttempt mockAttempt = mock(TaskAttempt.class); + when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex); + when(mockVertex.getDistanceFromRoot()).thenReturn(0).thenReturn(1) + .thenReturn(2); + when(mockAttempt.getIsRescheduled()).thenReturn(false); + + DAGEventSchedulerUpdate event = new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt); + + DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag, + mockEventHandler); + scheduler.scheduleTask(event); + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2); + scheduler.scheduleTask(event); + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 4); + scheduler.scheduleTask(event); + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6); + + when(mockAttempt.getIsRescheduled()).thenReturn(true); + scheduler.scheduleTask(event); + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 5); + } + + @Test//(timeout=10000) + public void testDAGSchedulerMRR() { + DAG mockDag = mock(DAG.class); + TezDAGID dagId = new TezDAGID("1", 1, 1); + + Vertex mockVertex1 = mock(Vertex.class); + TezVertexID mockVertexId1 = new TezVertexID(dagId, 1); + when(mockVertex1.getVertexId()).thenReturn(mockVertexId1); + when(mockVertex1.getDistanceFromRoot()).thenReturn(0); + TaskAttempt mockAttempt1 = mock(TaskAttempt.class); + when(mockAttempt1.getVertexID()).thenReturn(mockVertexId1); + when(mockAttempt1.getIsRescheduled()).thenReturn(false); + when(mockDag.getVertex(mockVertexId1)).thenReturn(mockVertex1); + + Vertex mockVertex2 = mock(Vertex.class); + TezVertexID mockVertexId2 = new TezVertexID(dagId, 2); + when(mockVertex2.getVertexId()).thenReturn(mockVertexId2); + when(mockVertex2.getDistanceFromRoot()).thenReturn(1); + TaskAttempt mockAttempt2 = mock(TaskAttempt.class); + when(mockAttempt2.getVertexID()).thenReturn(mockVertexId2); + when(mockAttempt2.getIsRescheduled()).thenReturn(false); + when(mockDag.getVertex(mockVertexId2)).thenReturn(mockVertex2); + TaskAttempt mockAttempt2f = mock(TaskAttempt.class); + when(mockAttempt2f.getVertexID()).thenReturn(mockVertexId2); + when(mockAttempt2f.getIsRescheduled()).thenReturn(true); + + Vertex mockVertex3 = mock(Vertex.class); + TezVertexID mockVertexId3 = new TezVertexID(dagId, 3); + when(mockVertex3.getVertexId()).thenReturn(mockVertexId3); + when(mockVertex3.getDistanceFromRoot()).thenReturn(2); + TaskAttempt mockAttempt3 = mock(TaskAttempt.class); + when(mockAttempt3.getVertexID()).thenReturn(mockVertexId3); + when(mockAttempt3.getIsRescheduled()).thenReturn(false); + when(mockDag.getVertex(mockVertexId3)).thenReturn(mockVertex3); + + DAGEventSchedulerUpdate mockEvent1 = mock(DAGEventSchedulerUpdate.class); + when(mockEvent1.getAttempt()).thenReturn(mockAttempt1); + DAGEventSchedulerUpdate mockEvent2 = mock(DAGEventSchedulerUpdate.class); + when(mockEvent2.getAttempt()).thenReturn(mockAttempt2); + DAGEventSchedulerUpdate mockEvent2f = mock(DAGEventSchedulerUpdate.class); + when(mockEvent2f.getAttempt()).thenReturn(mockAttempt2f); + DAGEventSchedulerUpdate mockEvent3 = mock(DAGEventSchedulerUpdate.class); + when(mockEvent3.getAttempt()).thenReturn(mockAttempt3); + DAGScheduler scheduler = new DAGSchedulerMRR(mockDag, mockEventHandler); + + // M starts. M completes. R1 starts. R1 completes. R2 starts. R2 completes + scheduler.scheduleTask(mockEvent1); // M starts + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3); + scheduler.scheduleTask(mockEvent1); // M runs another + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3); + scheduler.vertexCompleted(mockVertex1); // M completes + scheduler.scheduleTask(mockEvent2); // R1 starts + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6); + scheduler.scheduleTask(mockEvent2); // R1 runs another + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6); + scheduler.scheduleTask(mockEvent2f); // R1 runs retry. Retry priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 4); + scheduler.vertexCompleted(mockVertex2); // R1 completes + scheduler.scheduleTask(mockEvent3); // R2 starts + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 9); + scheduler.scheduleTask(mockEvent3); // R2 runs another + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 9); + scheduler.vertexCompleted(mockVertex3); // R2 completes + + // M starts. R1 starts. M completes. R2 starts. R1 completes. R2 completes + scheduler.scheduleTask(mockEvent1); // M starts + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3); + scheduler.scheduleTask(mockEvent2); // R1 starts. Reordered priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2); + scheduler.scheduleTask(mockEvent1); // M runs another + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3); + scheduler.scheduleTask(mockEvent2); // R1 runs another. Reordered priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2); + scheduler.scheduleTask(mockEvent2f); // R1 runs retry. Reordered priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2); + scheduler.vertexCompleted(mockVertex1); // M completes + scheduler.scheduleTask(mockEvent3); // R2 starts. Reordered priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 5); + scheduler.scheduleTask(mockEvent2); // R1 runs another. Normal priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6); + scheduler.scheduleTask(mockEvent2f); // R1 runs retry. Retry priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 4); + scheduler.scheduleTask(mockEvent3); // R2 runs another. Reordered priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 5); + scheduler.vertexCompleted(mockVertex2); // R1 completes + scheduler.vertexCompleted(mockVertex3); // R2 completes + + // M starts. M completes. R1 starts. R2 starts. R1 completes. R2 completes + scheduler.scheduleTask(mockEvent1); // M starts + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3); + scheduler.vertexCompleted(mockVertex1); // M completes + scheduler.scheduleTask(mockEvent2); // R1 starts + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6); + scheduler.scheduleTask(mockEvent3); // R2 starts. Reordered priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 5); + scheduler.scheduleTask(mockEvent2); // R1 runs another + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6); + scheduler.vertexCompleted(mockVertex2); // R1 completes + scheduler.vertexCompleted(mockVertex3); // R2 completes + + // M starts. R1 starts. M completes. R1 completes. R2 starts. R2 completes + scheduler.scheduleTask(mockEvent1); // M starts + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3); + scheduler.scheduleTask(mockEvent2); // R1 starts. Reordered priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2); + scheduler.vertexCompleted(mockVertex1); // M completes + scheduler.scheduleTask(mockEvent2); // R1 starts. Normal priority + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6); + scheduler.vertexCompleted(mockVertex2); // R1 completes + scheduler.scheduleTask(mockEvent3); // R2 starts + Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 9); + scheduler.vertexCompleted(mockVertex3); // R2 completes + } +}
