TEZ-27. Create tests for DAG Scheduler (bikas)

Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/82ee45e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/82ee45e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/82ee45e8

Branch: refs/heads/master
Commit: 82ee45e8d3d7f0168b49c534384217305ab81a6d
Parents: 3bb954b
Author: Bikas Saha <[email protected]>
Authored: Tue May 28 19:20:30 2013 -0700
Committer: Bikas Saha <[email protected]>
Committed: Tue May 28 19:20:30 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/dag/TaskAttempt.java    |    5 +
 .../dag/app/dag/event/DAGEventSchedulerUpdate.java |    2 +-
 .../org/apache/tez/dag/app/dag/impl/DAGImpl.java   |   30 ++-
 .../tez/dag/app/dag/impl/DAGSchedulerMRR.java      |   87 +++----
 .../dag/app/dag/impl/DAGSchedulerNaturalOrder.java |    2 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      |   18 ++-
 .../tez/dag/app/rm/TaskSchedulerEventHandler.java  |    2 +-
 .../tez/dag/app/dag/impl/TestDAGScheduler.java     |  188 +++++++++++++++
 8 files changed, 277 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 712f968..2628f0c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -27,13 +27,18 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezVertexID;
 
 /**
  * Read only view of TaskAttempt.
  */
 public interface TaskAttempt {
   TezTaskAttemptID getID();
+  TezVertexID getVertexID();
+  TezDAGID getDAGID();
+  
   TaskAttemptReport getReport();
   List<String> getDiagnostics();
   TezCounters getCounters();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
index ff11e99..99b872e 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
@@ -30,7 +30,7 @@ public class DAGEventSchedulerUpdate extends DAGEvent {
   private final UpdateType updateType;
   
   public DAGEventSchedulerUpdate(UpdateType updateType, TaskAttempt attempt) {
-    super(attempt.getID().getTaskID().getVertexID().getDAGId(), 
+    super(attempt.getDAGID(), 
           DAGEventType.DAG_SCHEDULER_UPDATE);
     this.attempt = attempt;
     this.updateType = updateType;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index cc6699d..3681c85 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -869,9 +869,8 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
         for (Vertex v : dag.vertices.values()) {
           parseVertexEdges(dag, edgePlans, v);
         }
-
-        dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
-        //dag.dagScheduler = new DAGSchedulerMRR(dag, dag.eventHandler);
+        
+        assignDAGScheduler(dag);
 
         // TODO Metrics
         //dag.metrics.endPreparingJob(dag);
@@ -887,6 +886,31 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
         return dag.finished(DAGState.FAILED);
       }
     }
+    
+    private void assignDAGScheduler(DAGImpl dag) {
+      boolean isMRR = true;
+      for(Vertex vertex : dag.vertices.values()) {
+        Map<Vertex, EdgeProperty> outVertices = vertex.getOutputVertices();
+        if(outVertices == null || outVertices.isEmpty()) {
+          continue;
+        }
+        if(outVertices.size() > 1 || 
+           outVertices.values().iterator().next().getConnectionPattern() != 
+           EdgeProperty.ConnectionPattern.BIPARTITE) {
+          // more than 1 output OR single output is not bipartite
+          isMRR = false;
+          break;
+        }          
+      }
+      
+      if(isMRR) {
+        LOG.info("Using MRR dag scheduler");
+        dag.dagScheduler = new DAGSchedulerMRR(dag, dag.eventHandler);
+      } else {
+        LOG.info("Using Natural order dag scheduler");
+        dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+      }
+    }
 
     private VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
       TezVertexID vertexId = TezBuilderUtils.newVertexID(dag.getID(), vId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
index 12fdb10..d404155 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerMRR.java
@@ -48,7 +48,7 @@ public class DAGSchedulerMRR implements DAGScheduler {
   
   @Override
   public void vertexCompleted(Vertex vertex) {
-    if(currentPartitioner!= null) {
+    if(currentPartitioner != null) {
       if(vertex != currentPartitioner) {
         String message = vertex.getVertexId() + " finished. Expecting "
             + currentPartitioner + " to finish.";
@@ -57,68 +57,59 @@ public class DAGSchedulerMRR implements DAGScheduler {
       }
       LOG.info("Current partitioner " + currentPartitioner.getVertexId()
           + " is completed. " 
-          + (currentShuffler!=null?currentShuffler.getVertexId():"null")
-          + " is new partitioner");
+          + (currentShuffler!=null ? 
+             currentShuffler.getVertexId() + " is new partitioner":
+             "No current shuffler to replace the partitioner"));
       currentPartitioner = currentShuffler;
-      currentShuffler = null;
-    } else {
-      if(vertex != currentShuffler) {
-        String message = vertex.getVertexId() + " finished. Expecting "
-            + currentShuffler.getVertexId() + " to finish";
-        LOG.fatal(message);
-        throw new TezException(message);
-      }      
+      currentShuffler = null;     
     }
   }
   
   @Override
   public void scheduleTask(DAGEventSchedulerUpdate event) {
     TaskAttempt attempt = event.getAttempt();
-    Vertex vertex = dag.getVertex(attempt.getID().getTaskID().getVertexID());
+    Vertex vertex = dag.getVertex(attempt.getVertexID());
     int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
-    if(vertexDistanceFromRoot == 0) {
+    boolean reOrderPriority = false;
+    
+    if(currentPartitioner == null) {
+      // no partitioner. so set it.
       currentPartitioner = vertex;
-      LOG.info(vertex.getVertexId() + " is first partitioner");
+      currentShufflerDepth = vertexDistanceFromRoot;
+      LOG.info(vertex.getVertexId() + " is new partitioner at depth "
+          + vertexDistanceFromRoot);
+    } else if (currentShuffler == null && 
+        vertexDistanceFromRoot > currentShufflerDepth) {
+      // vertex not a partitioner. no shuffler set. has more depth than current
+      // shuffler. this must be the new shuffler.
+      currentShuffler = vertex;
+      currentShufflerDepth = vertexDistanceFromRoot;
+      LOG.info(vertex.getVertexId() + " is new shuffler at depth "
+          + currentShufflerDepth);
     }
-    if(vertexDistanceFromRoot > currentShufflerDepth) {
-      if(currentShuffler == null) {
-        currentShuffler = vertex;
-        currentShufflerDepth = vertexDistanceFromRoot;
-        LOG.info(currentShuffler.getVertexId() + " is new shuffler at depth " 
+ 
-                 currentShufflerDepth);
-      } else {
-        if(currentShufflerDepth+1 == vertexDistanceFromRoot && 
-           currentPartitioner == null
-           ) {
-          currentPartitioner = currentShuffler;
-          currentShuffler = vertex;
-          currentShufflerDepth = vertexDistanceFromRoot;
-          LOG.info("Shuffler " + currentPartitioner.getVertexId() + 
-                   " becomes partitioner as new shuffler " + 
-                   currentShuffler.getVertexId() + " has started at depth " + 
-                   currentShufflerDepth);          
-        } else {
-          String message = vertex.getVertexId()
-              + " has scheduled tasks at depth " + vertexDistanceFromRoot
-              + " greater than depth " + currentShufflerDepth
-              + " of current shuffler " + currentShuffler.getVertexId()
-              + ". Unexpected.";
-          LOG.fatal(message);
-          throw new TezException(message);
-        }
-      }
+    
+    if(currentShuffler == vertex) {
+      // current shuffler vertex. assign special priority
+      reOrderPriority = true;
     }
+    
+    // sanity check
+    if(currentPartitioner != vertex && currentShuffler != vertex) {
+      String message = vertex.getVertexId() + " is neither the "
+          + " current partitioner: " + currentPartitioner.getVertexId()
+          + " nor the current shuffler: " + currentShuffler.getVertexId();
+      LOG.fatal(message);
+      throw new TezException(message);      
+    }    
 
     // natural priority. Handles failures and retries.
     int priority = (vertexDistanceFromRoot + 1) * 3;
     
-    if(currentShuffler == vertex) {
-      if(currentPartitioner != null) {
-        // special priority for current reducers while current partitioners 
are 
-        // still running. Schedule at priority one higher than natural 
priority 
-        // of previous vertex.
-        priority -= 4;  // this == (partitionerDepth+1)*3 - 1     
-      }
+    if(reOrderPriority) {
+      // special priority for current reducers while current partitioners are 
+      // still running. Schedule at priority one higher than natural priority 
+      // of previous vertex.
+      priority -= 4;  // this == (partitionerDepth+1)*3 - 1     
     } else {
       if(attempt.getIsRescheduled()) {
         // higher priority for retries of failed attempts. Only makes sense in

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
index 607e5af..9e8729e 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
@@ -50,7 +50,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler 
{
   @Override
   public void scheduleTask(DAGEventSchedulerUpdate event) {
     TaskAttempt attempt = event.getAttempt();
-    Vertex vertex = dag.getVertex(attempt.getID().getTaskID().getVertexID());
+    Vertex vertex = dag.getVertex(attempt.getVertexID());
     int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
 
     // natural priority. Handles failures and retries.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 5d4242d..5225ecf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -90,8 +90,10 @@ import 
org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 
@@ -290,6 +292,16 @@ public class TaskAttemptImpl implements TaskAttempt,
     return attemptId;
   }
   
+  @Override
+  public TezVertexID getVertexID() {
+    return attemptId.getTaskID().getVertexID();
+  }
+  
+  @Override
+  public TezDAGID getDAGID() {
+    return getVertexID().getDAGId();
+  }
+  
   TezTaskContext createRemoteTask() {
     Vertex vertex = getTask().getVertex();
     DAG dag = vertex.getDAG();
@@ -601,7 +613,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskAttemptImpl ta) {
     DAGEventCounterUpdate jce = 
         new DAGEventCounterUpdate(
-            ta.getID().getTaskID().getVertexID().getDAGId()
+            ta.getDAGID()
             );
     jce.addCounterUpdate(DAGCounter.TOTAL_LAUNCHED_TASKS, 1);
     return jce;
@@ -611,7 +623,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskAttemptImpl ta) {
     DAGEventCounterUpdate jce = 
         new DAGEventCounterUpdate(            
-            ta.getID().getTaskID().getVertexID().getDAGId()
+            ta.getDAGID()
             );
 
     long slotMillis = computeSlotMillis(ta);
@@ -624,7 +636,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       TaskAttemptStateInternal taState) {
     DAGEventCounterUpdate jce = 
         new DAGEventCounterUpdate(
-            taskAttempt.getID().getTaskID().getVertexID().getDAGId());
+            taskAttempt.getDAGID());
 
     long slotMillisIncrement = computeSlotMillis(taskAttempt);
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index c61cbcc..8b98c68 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -461,7 +461,7 @@ public class TaskSchedulerEventHandler extends 
AbstractService
 
       sendEvent(new AMContainerEventLaunchRequest(
           containerId,
-          taskAttempt.getID().getTaskID().getVertexID(),
+          taskAttempt.getVertexID(),
           event.getJobToken(),
           // TODO getConf from AMSchedulerEventTALaunchRequest
           event.getCredentials(), false, event.getConf(),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/82ee45e8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
new file mode 100644
index 0000000..988a116
--- /dev/null
+++ 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGScheduler;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+public class TestDAGScheduler {
+
+  class MockEventHandler implements EventHandler<TaskAttemptEventSchedule> {
+    TaskAttemptEventSchedule event;
+    @Override
+    public void handle(TaskAttemptEventSchedule event) {
+      this.event = event;
+    }
+    
+  }
+  
+  MockEventHandler mockEventHandler = new MockEventHandler();
+  
+  //@Test(timeout=10000)
+  public void testDAGSchedulerNaturalOrder() {
+    DAG mockDag = mock(DAG.class);
+    Vertex mockVertex = mock(Vertex.class);
+    TaskAttempt mockAttempt = mock(TaskAttempt.class);
+    when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
+    when(mockVertex.getDistanceFromRoot()).thenReturn(0).thenReturn(1)
+        .thenReturn(2);
+    when(mockAttempt.getIsRescheduled()).thenReturn(false);
+    
+    DAGEventSchedulerUpdate event = new DAGEventSchedulerUpdate(
+        DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt);    
+    
+    DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
+        mockEventHandler);
+    scheduler.scheduleTask(event);
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2);
+    scheduler.scheduleTask(event);
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 4);
+    scheduler.scheduleTask(event);
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6);
+    
+    when(mockAttempt.getIsRescheduled()).thenReturn(true);
+    scheduler.scheduleTask(event);
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 5);
+  }
+  
+  @Test//(timeout=10000)
+  public void testDAGSchedulerMRR() {
+    DAG mockDag = mock(DAG.class);
+    TezDAGID dagId = new TezDAGID("1", 1, 1);
+
+    Vertex mockVertex1 = mock(Vertex.class);
+    TezVertexID mockVertexId1 = new TezVertexID(dagId, 1);
+    when(mockVertex1.getVertexId()).thenReturn(mockVertexId1);
+    when(mockVertex1.getDistanceFromRoot()).thenReturn(0);
+    TaskAttempt mockAttempt1 = mock(TaskAttempt.class);
+    when(mockAttempt1.getVertexID()).thenReturn(mockVertexId1);
+    when(mockAttempt1.getIsRescheduled()).thenReturn(false);
+    when(mockDag.getVertex(mockVertexId1)).thenReturn(mockVertex1);
+    
+    Vertex mockVertex2 = mock(Vertex.class);
+    TezVertexID mockVertexId2 = new TezVertexID(dagId, 2);
+    when(mockVertex2.getVertexId()).thenReturn(mockVertexId2);
+    when(mockVertex2.getDistanceFromRoot()).thenReturn(1);
+    TaskAttempt mockAttempt2 = mock(TaskAttempt.class);
+    when(mockAttempt2.getVertexID()).thenReturn(mockVertexId2);
+    when(mockAttempt2.getIsRescheduled()).thenReturn(false);
+    when(mockDag.getVertex(mockVertexId2)).thenReturn(mockVertex2);
+    TaskAttempt mockAttempt2f = mock(TaskAttempt.class);
+    when(mockAttempt2f.getVertexID()).thenReturn(mockVertexId2);
+    when(mockAttempt2f.getIsRescheduled()).thenReturn(true);
+    
+    Vertex mockVertex3 = mock(Vertex.class);
+    TezVertexID mockVertexId3 = new TezVertexID(dagId, 3);
+    when(mockVertex3.getVertexId()).thenReturn(mockVertexId3);
+    when(mockVertex3.getDistanceFromRoot()).thenReturn(2);
+    TaskAttempt mockAttempt3 = mock(TaskAttempt.class);
+    when(mockAttempt3.getVertexID()).thenReturn(mockVertexId3);
+    when(mockAttempt3.getIsRescheduled()).thenReturn(false);
+    when(mockDag.getVertex(mockVertexId3)).thenReturn(mockVertex3);
+
+    DAGEventSchedulerUpdate mockEvent1 = mock(DAGEventSchedulerUpdate.class);
+    when(mockEvent1.getAttempt()).thenReturn(mockAttempt1);
+    DAGEventSchedulerUpdate mockEvent2 = mock(DAGEventSchedulerUpdate.class);
+    when(mockEvent2.getAttempt()).thenReturn(mockAttempt2);
+    DAGEventSchedulerUpdate mockEvent2f = mock(DAGEventSchedulerUpdate.class);
+    when(mockEvent2f.getAttempt()).thenReturn(mockAttempt2f);
+    DAGEventSchedulerUpdate mockEvent3 = mock(DAGEventSchedulerUpdate.class);
+    when(mockEvent3.getAttempt()).thenReturn(mockAttempt3);
+    DAGScheduler scheduler = new DAGSchedulerMRR(mockDag, mockEventHandler);
+    
+    // M starts. M completes. R1 starts. R1 completes. R2 starts. R2 completes
+    scheduler.scheduleTask(mockEvent1); // M starts
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3);
+    scheduler.scheduleTask(mockEvent1); // M runs another
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3);
+    scheduler.vertexCompleted(mockVertex1); // M completes
+    scheduler.scheduleTask(mockEvent2); // R1 starts
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6);
+    scheduler.scheduleTask(mockEvent2); // R1 runs another
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6);
+    scheduler.scheduleTask(mockEvent2f); // R1 runs retry. Retry priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 4);
+    scheduler.vertexCompleted(mockVertex2); // R1 completes
+    scheduler.scheduleTask(mockEvent3); // R2 starts
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 9);
+    scheduler.scheduleTask(mockEvent3); // R2 runs another
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 9);
+    scheduler.vertexCompleted(mockVertex3); // R2 completes  
+    
+    // M starts. R1 starts. M completes. R2 starts. R1 completes. R2 completes
+    scheduler.scheduleTask(mockEvent1); // M starts
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3);
+    scheduler.scheduleTask(mockEvent2); // R1 starts. Reordered priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2);
+    scheduler.scheduleTask(mockEvent1); // M runs another
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3);
+    scheduler.scheduleTask(mockEvent2); // R1 runs another. Reordered priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2);
+    scheduler.scheduleTask(mockEvent2f); // R1 runs retry. Reordered priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2);
+    scheduler.vertexCompleted(mockVertex1); // M completes
+    scheduler.scheduleTask(mockEvent3); // R2 starts. Reordered priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 5);
+    scheduler.scheduleTask(mockEvent2); // R1 runs another. Normal priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6);
+    scheduler.scheduleTask(mockEvent2f); // R1 runs retry. Retry priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 4);
+    scheduler.scheduleTask(mockEvent3); // R2 runs another. Reordered priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 5);
+    scheduler.vertexCompleted(mockVertex2); // R1 completes
+    scheduler.vertexCompleted(mockVertex3); // R2 completes      
+    
+    // M starts. M completes. R1 starts. R2 starts. R1 completes. R2 completes
+    scheduler.scheduleTask(mockEvent1); // M starts
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3);
+    scheduler.vertexCompleted(mockVertex1); // M completes
+    scheduler.scheduleTask(mockEvent2); // R1 starts
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6);
+    scheduler.scheduleTask(mockEvent3); // R2 starts. Reordered priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 5);
+    scheduler.scheduleTask(mockEvent2); // R1 runs another
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6);
+    scheduler.vertexCompleted(mockVertex2); // R1 completes
+    scheduler.vertexCompleted(mockVertex3); // R2 completes
+    
+    // M starts. R1 starts. M completes. R1 completes. R2 starts. R2 completes
+    scheduler.scheduleTask(mockEvent1); // M starts
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 3);
+    scheduler.scheduleTask(mockEvent2); // R1 starts. Reordered priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 2);
+    scheduler.vertexCompleted(mockVertex1); // M completes
+    scheduler.scheduleTask(mockEvent2); // R1 starts. Normal priority
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 6);
+    scheduler.vertexCompleted(mockVertex2); // R1 completes
+    scheduler.scheduleTask(mockEvent3); // R2 starts
+    Assert.assertTrue(mockEventHandler.event.getPriority().getPriority() == 9);
+    scheduler.vertexCompleted(mockVertex3); // R2 completes  
+  }
+}

Reply via email to