Repository: tez
Updated Branches:
  refs/heads/master e0ee28ae6 -> f355a050c


http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
index 1afedb9..b586de6 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java
@@ -50,8 +50,8 @@ public class TestCartesianProductEdgeManagerPartitioned {
    */
   @Test(timeout = 5000)
   public void testTwoWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new 
int[]{3,4}, null, null, null);
+    CartesianProductEdgeManagerConfig emConfig = new 
CartesianProductEdgeManagerConfig(true,
+      new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, null);
     when(mockContext.getDestinationVertexNumTasks()).thenReturn(12);
     testTwoWayV0(emConfig);
     testTwoWayV1(emConfig);
@@ -144,9 +144,8 @@ public class TestCartesianProductEdgeManagerPartitioned {
     CartesianProductFilterDescriptor filterDescriptor =
       new CartesianProductFilterDescriptor(TestFilter.class.getName())
         .setUserPayload(UserPayload.create(buffer));
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new 
int[]{3,4}, null, null,
-        filterDescriptor);
+    CartesianProductEdgeManagerConfig emConfig = new 
CartesianProductEdgeManagerConfig(true,
+      new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, filterDescriptor);
     when(mockContext.getDestinationVertexNumTasks()).thenReturn(3);
     testTwoWayV0WithFilter(emConfig);
     testTwoWayV1WithFilter(emConfig);
@@ -205,8 +204,8 @@ public class TestCartesianProductEdgeManagerPartitioned {
    */
   @Test(timeout = 5000)
   public void testThreeWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(true, new 
String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, null, null);
+    CartesianProductEdgeManagerConfig emConfig = new 
CartesianProductEdgeManagerConfig(true,
+      new String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, 0, 0, null);
     when(mockContext.getDestinationVertexNumTasks()).thenReturn(24);
     testThreeWayV0(emConfig);
     testThreeWayV1(emConfig);

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
index db781f3..1ce9c8b 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java
@@ -23,6 +23,10 @@ import 
org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetad
 import org.junit.Before;
 import org.junit.Test;
 
+import static 
org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForDest;
+import static 
org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForInputError;
+import static 
org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForRouting;
+import static 
org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForSrc;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -40,186 +44,128 @@ public class TestCartesianProductEdgeManagerUnpartitioned 
{
     edgeManager = new CartesianProductEdgeManagerUnpartitioned(mockContext);
   }
 
-  /**
-   * Vertex v0 has 2 tasks
-   * Vertex v1 has 3 tasks
-   */
-  @Test(timeout = 5000)
-  public void testTwoWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, 
null,
-        new int[]{2,3}, new int[]{2,3}, null);
-    testTwoWayV0(emConfig);
-    testTwoWayV1(emConfig);
-  }
-
-  private void testTwoWayV0(CartesianProductEdgeManagerConfig config) throws 
Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v0");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData =
-      edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(compositeRoutingData);
+  static class TestData {
+    int srcId, destId, inputId;
+    Object expected;
 
-    compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 3);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
+    public TestData(int srcId, int destId, int inputId, Object expected) {
+      this.srcId = srcId;
+      this.destId = destId;
+      this.inputId = inputId;
+      this.expected = expected;
+    }
 
-    EventRouteMetadata routingData = 
edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
-    assertNull(routingData);
+    public static TestData dataForRouting(int srcId, int destId, Object 
expected) {
+      return new TestData(srcId, destId, -1, expected);
+    }
 
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 
3);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
+    public static TestData dataForInputError(int destId, int inputId, Object 
expected) {
+      return new TestData(-1, destId, inputId, expected);
+    }
 
-    assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
+    public static TestData dataForSrc(int srcId, Object expected) {
+      return new TestData(srcId, -1, -1, expected);
+    }
 
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(3, edgeManager.getNumDestinationConsumerTasks(1));
+    public static TestData dataForDest(int destId, Object expected) {
+      return new TestData(-1, destId, -1, expected);
+    }
   }
 
-  private void testTwoWayV1(CartesianProductEdgeManagerConfig config) throws 
Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v1");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData =
-      edgeManager.routeCompositeDataMovementEventToDestination(1, 2);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = 
edgeManager.routeInputSourceTaskFailedEventToDestination(1, 2);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 
1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
-
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(2, edgeManager.getNumDestinationConsumerTasks(1));
+  private void testEdgeManager(CartesianProductEdgeManagerConfig conf, String 
vName, int numTask,
+                               String groupName, TestData cDMEInvalid, 
TestData cDMEValid,
+                               TestData srcFailInvalid, TestData srcFailValid,
+                               TestData inputError, TestData numDestInput,
+                               TestData numSrcOutputTest, TestData 
numConsumerTest)
+    throws Exception {
+    when(mockContext.getSourceVertexName()).thenReturn(vName);
+    when(mockContext.getSourceVertexNumTasks()).thenReturn(numTask);
+    when(mockContext.getVertexGroupName()).thenReturn(groupName);
+    edgeManager.initialize(conf);
+
+    CompositeEventRouteMetadata cDME =
+      
edgeManager.routeCompositeDataMovementEventToDestination(cDMEInvalid.srcId,
+        cDMEInvalid.destId);
+    assertNull(cDME);
+
+    cDME = 
edgeManager.routeCompositeDataMovementEventToDestination(cDMEValid.srcId,
+      cDMEValid.destId);
+    assertNotNull(cDME);
+    CompositeEventRouteMetadata expectedCDME = 
(CompositeEventRouteMetadata)(cDMEValid.expected);
+    assertEquals(expectedCDME.getCount(), cDME.getCount());
+    assertEquals(expectedCDME.getTarget(), cDME.getTarget());
+    assertEquals(expectedCDME.getSource(), cDME.getSource());
+
+    EventRouteMetadata dme =
+      
edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailInvalid.srcId,
+        srcFailInvalid.destId);
+    assertNull(dme);
+
+    dme = 
edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailValid.srcId,
+      srcFailValid.destId);
+    assertNotNull(dme);
+    EventRouteMetadata expectedDME = 
(EventRouteMetadata)(srcFailValid.expected);
+    assertEquals(expectedDME.getNumEvents(), dme.getNumEvents());
+    assertArrayEquals(expectedDME.getTargetIndices(), dme.getTargetIndices());
+
+    assertEquals(inputError.expected,
+      edgeManager.routeInputErrorEventToSource(inputError.destId, 
inputError.inputId));
+
+    assertEquals(numDestInput.expected,
+      edgeManager.getNumDestinationTaskPhysicalInputs(numDestInput.destId));
+    assertEquals(numSrcOutputTest.expected,
+      edgeManager.getNumSourceTaskPhysicalOutputs(numSrcOutputTest.srcId));
+    assertEquals(numConsumerTest.expected,
+      edgeManager.getNumDestinationConsumerTasks(numConsumerTest.srcId));
   }
 
   /**
    * Vertex v0 has 2 tasks
    * Vertex v1 has 3 tasks
-   * Vertex v2 has 4 tasks
    */
   @Test(timeout = 5000)
-  public void testThreeWay() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","v1","v2"}, null,
-        new int[]{2,3,4}, new int[]{2,3,4}, null);
-    testThreeWayV0(emConfig);
-    testThreeWayV1(emConfig);
-    testThreeWayV2(emConfig);
-  }
-
-  private void testThreeWayV0(CartesianProductEdgeManagerConfig config) throws 
Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v0");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(2);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 12);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = 
edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 
12);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
-
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(12, edgeManager.getNumDestinationConsumerTasks(1));
-  }
-
-  private void testThreeWayV1(CartesianProductEdgeManagerConfig config) throws 
Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v1");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(3);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 16);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = 
edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 
16);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0));
-
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(8, edgeManager.getNumDestinationConsumerTasks(1));
+  public void testTwoWayAllVertex() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","v1"}, null,
+      new int[]{2,3}, 2, 0, null), "v0", 2, null,
+      dataForRouting(1, 1, null), dataForRouting(1, 3, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(1, 3, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), 
dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","v1"}, null,
+        new int[]{2,3}, 3, 0, null), "v1", 3, null,
+      dataForRouting(1, 2, null), dataForRouting(1, 1, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 2, null), dataForRouting(1, 1, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1,0,1), dataForDest(1, 1), dataForSrc(1, 1), 
dataForSrc(1, 2));
   }
 
-  private void testThreeWayV2(CartesianProductEdgeManagerConfig config) throws 
Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v2");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(4);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 0);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 13);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(0, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = 
edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 
13);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0));
-
-    assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1));
-    assertEquals(6, edgeManager.getNumDestinationConsumerTasks(1));
+  /**
+   * Vertex v0 has 2 tasks
+   * Vertex v1 has 3 tasks
+   * Vertex v2 has 4 tasks
+   */
+  @Test(timeout = 5000)
+  public void testThreeWayAllVertex() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","v1","v2"},
+      null, new int[]{2,3,4}, 2, 0, null), "v0", 2, null,
+      dataForRouting(1, 1, null), dataForRouting(1, 12, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(1, 12, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), 
dataForSrc(1, 12));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","v1","v2"},
+        null, new int[]{2,3,4}, 3, 0, null), "v1", 3, null,
+      dataForRouting(1, 1, null), dataForRouting(1, 16, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(1, 16, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), 
dataForSrc(1, 8));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","v1","v2"},
+        null, new int[]{2,3,4}, 4, 0, null), "v2", 4, null,
+      dataForRouting(1, 0, null), dataForRouting(1, 13, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 0, null), dataForRouting(1, 13, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), 
dataForSrc(1, 6));
   }
 
   @Test(timeout = 5000)
   public void testZeroSrcTask() {
     CartesianProductEdgeManagerConfig emConfig =
       new CartesianProductEdgeManagerConfig(false, new String[]{"v0", "v1"}, 
null,
-        new int[]{2,0}, new int[]{2,0}, null);
+        new int[]{2,0}, 0,0, null);
     testZeroSrcTaskV0(emConfig);
     testZeroSrcTaskV1(emConfig);
   }
@@ -240,67 +186,103 @@ public class 
TestCartesianProductEdgeManagerUnpartitioned {
   }
 
   /**
-   * Vertex v0 has 20 tasks 10 groups
-   * Vertex v1 has 10 tasks 1 group
+   * Vertex v0 has 10 tasks 2 groups
+   * Vertex v1 has 30 tasks 3 group
    */
   @Test(timeout = 5000)
-  public void testTwoWayAutoGrouping() throws Exception {
-    CartesianProductEdgeManagerConfig emConfig =
-      new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, 
null,
-        new int[]{20, 10}, new int[]{10,1}, null);
-    testTwoWayAutoGroupingV0(emConfig);
-    testTwoWayAutoGroupingV1(emConfig);
+  public void testTwoWayAllVertexAutoGrouping() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","v1"},
+        null, new int[]{2,3}, 2, 0, null), "v0", 10, null,
+      dataForRouting(6, 1, null), dataForRouting(1, 0, 
CompositeEventRouteMetadata.create(1, 1, 0)),
+      dataForRouting(6, 1, null), dataForRouting(1, 0, 
EventRouteMetadata.create(1, new int[]{1})),
+      dataForInputError(1, 1, 1), dataForDest(1, 5), dataForSrc(1, 1), 
dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","v1"},
+        null, new int[]{2,3}, 3, 0, null), "v1", 30, null,
+      dataForRouting(6, 1, null), dataForRouting(11, 1, 
CompositeEventRouteMetadata.create(1, 1, 0)),
+      dataForRouting(6, 1, null), dataForRouting(11, 1, 
EventRouteMetadata.create(1, new int[]{1})),
+      dataForInputError(1, 1, 11), dataForDest(1, 10), dataForSrc(1, 1), 
dataForSrc(1, 2));
   }
 
-  private void testTwoWayAutoGroupingV0(CartesianProductEdgeManagerConfig 
config) throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v0");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(20);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNull(compositeRoutingData);
-
-    compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 0);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(1, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = 
edgeManager.routeInputSourceTaskFailedEventToDestination(2, 2);
-    assertNull(routingData);
-
-    routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(2, 
1);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{0}, routingData.getTargetIndices());
-
-    assertEquals(7, edgeManager.routeInputErrorEventToSource(3, 1));
-
-    assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(4));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(5));
-    assertEquals(1, edgeManager.getNumDestinationConsumerTasks(6));
+  /**
+   * v0 with group g0 {v1, v2}
+   * Vertex v0 has 2 tasks
+   * Vertex v1 has 1 tasks
+   * Vertex v2 has 2 tasks
+   */
+  @Test(timeout = 5000)
+  public void testTwoWayVertexWithVertexGroup() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","g0"},
+        null, new int[]{2,3}, 2, 0, null), "v0", 2, null,
+      dataForRouting(1, 1, null), dataForRouting(1, 3, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(1, 3, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), 
dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","g0"},
+        null, new int[]{2,3}, 1, 0, null), "v1", 1, "g0",
+      dataForRouting(0, 1, null), dataForRouting(0, 3, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(0, 1, null), dataForRouting(0, 3, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(3, 0, 0), dataForDest(0, 1), dataForSrc(0, 1), 
dataForSrc(0, 2));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","g0"},
+        null, new int[]{2,3}, 2, 1, null), "v2", 2, "g0",
+      dataForRouting(1, 1, null), dataForRouting(0, 1, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 1, null), dataForRouting(0, 1, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), 
dataForSrc(1, 2));
   }
 
-  private void testTwoWayAutoGroupingV1(CartesianProductEdgeManagerConfig 
config) throws Exception {
-    when(mockContext.getSourceVertexName()).thenReturn("v1");
-    when(mockContext.getSourceVertexNumTasks()).thenReturn(10);
-    edgeManager.initialize(config);
-
-    CompositeEventRouteMetadata compositeRoutingData = 
edgeManager.routeCompositeDataMovementEventToDestination(1, 1);
-    assertNotNull(compositeRoutingData);
-    assertEquals(1, compositeRoutingData.getCount());
-    assertEquals(1, compositeRoutingData.getTarget());
-    assertEquals(0, compositeRoutingData.getSource());
-
-    EventRouteMetadata routingData = 
edgeManager.routeInputSourceTaskFailedEventToDestination(2, 3);
-    assertNotNull(routingData);
-    assertEquals(1, routingData.getNumEvents());
-    assertArrayEquals(new int[]{2}, routingData.getTargetIndices());
+  /**
+   * group g0 {v1, v2} with group g1 {v3, v4}
+   *
+   * Vertex v0 has 1 tasks
+   * Vertex v1 has 2 tasks
+   * Vertex v2 has 3 tasks
+   * Vertex v3 has 4 tasks
+   */
+  @Test(timeout = 5000)
+  public void testTwoWayAllVertexGroup() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"g0","g1"},
+        null, new int[]{3,7}, 1, 0, null), "v0", 1, "g0",
+      dataForRouting(0, 7, null), dataForRouting(0, 1, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(0, 7, null), dataForRouting(0, 1, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(0, 1), 
dataForSrc(0, 7));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"g0","g1"},
+        null, new int[]{3,7}, 2, 1, null), "v1", 2, "g0",
+      dataForRouting(0, 1, null), dataForRouting(1, 15, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(0, 1, null), dataForRouting(1, 15, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(7, 0, 0), dataForDest(7, 1), dataForSrc(1, 1), 
dataForSrc(1, 7));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"g0","g1"},
+        null, new int[]{3,7}, 3, 0, null), "v2", 3, "g1",
+      dataForRouting(1, 0, null), dataForRouting(1, 1, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(1, 0, null), dataForRouting(1, 1, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), 
dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"g0","g1"},
+        null, new int[]{3,7}, 4, 3, null), "v3", 4, "g1",
+      dataForRouting(0, 1, null), dataForRouting(1, 4, 
CompositeEventRouteMetadata.create(1, 0, 0)),
+      dataForRouting(0, 1, null), dataForRouting(1, 4, 
EventRouteMetadata.create(1, new int[]{0})),
+      dataForInputError(4, 0, 1), dataForDest(4, 1), dataForSrc(1, 1), 
dataForSrc(1, 3));
+  }
 
-    assertEquals(5, edgeManager.routeInputErrorEventToSource(4, 5));
 
-    assertEquals(10, edgeManager.getNumDestinationTaskPhysicalInputs(6));
-    assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(7));
-    assertEquals(10, edgeManager.getNumDestinationConsumerTasks(8));
+  /**
+   * v0 with group g0 {v1, v2}
+   * Vertex v0 has 10 tasks, 2 groups
+   * Vertex v1 has 10 tasks, 1 group
+   * Vertex v2 has 20 tasks, 2 groups
+   */
+  @Test(timeout = 5000)
+  public void testTwoWayWithVertexGroupAutoGrouping() throws Exception {
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","g0"},
+        null, new int[]{2,3}, 2, 0, null), "v0", 10, null,
+      dataForRouting(0, 4, null), dataForRouting(2, 1, 
CompositeEventRouteMetadata.create(1, 2, 0)),
+      dataForRouting(0, 4, null), dataForRouting(2, 1, 
EventRouteMetadata.create(1, new int[]{2})),
+      dataForInputError(1, 3, 3), dataForDest(1, 5), dataForSrc(1, 1), 
dataForSrc(1, 3));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","g0"},
+        null, new int[]{2,3}, 1, 0, null), "v1", 10, "g0",
+      dataForRouting(1, 1, null), dataForRouting(2, 3, 
CompositeEventRouteMetadata.create(1, 2, 0)),
+      dataForRouting(1, 1, null), dataForRouting(2, 3, 
EventRouteMetadata.create(1, new int[]{2})),
+      dataForInputError(3, 1, 1), dataForDest(0, 10), dataForSrc(1, 1), 
dataForSrc(1, 2));
+    testEdgeManager(new CartesianProductEdgeManagerConfig(false, new 
String[]{"v0","g0"},
+        null, new int[]{2,3}, 2, 1, null), "v2", 20, "g0",
+      dataForRouting(11, 1, null), dataForRouting(12, 2, 
CompositeEventRouteMetadata.create(1, 2, 0)),
+      dataForRouting(11, 1, null), dataForRouting(12, 2, 
EventRouteMetadata.create(1, new int[]{2})),
+      dataForInputError(2, 2, 12), dataForDest(1, 10), dataForSrc(1, 1), 
dataForSrc(1, 2));
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
index f3a5851..5144e69 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java
@@ -29,6 +29,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -136,6 +137,24 @@ public class TestCartesianProductVertexManager {
   }
 
   @Test(timeout = 5000)
+  public void testCheckDAGConfigConsistentWithVertexGroup() throws Exception {
+    // positive case
+    edgePropertyMap.put("v2", cpEdge);
+    config = new CartesianProductConfig(new int[]{2, 3}, new String[]{"v0", 
"g0"}, null);
+    Map<String, List<String>> vertexGroups = new HashMap<>();
+    vertexGroups.put("g0", Arrays.asList("v1", "v2"));
+    when(context.getInputVertexGroups()).thenReturn(vertexGroups);
+    vertexManager.initialize();
+
+    // vertex group is in cartesian product config, but one member doesn't 
have cp edge
+    edgePropertyMap.put("v2", broadcastEdge);
+    try {
+      vertexManager.initialize();
+      assertTrue(false);
+    } catch (Exception ignored) {}
+  }
+
+  @Test(timeout = 5000)
   public void testOtherEdgeType() throws Exception {
     // forbid other custom edge
     edgePropertyMap.put("v2", customEdge);

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
index bf369d9..5c6ffa7 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java
@@ -39,15 +39,15 @@ public class TestCartesianProductVertexManagerConfig {
     CartesianProductVertexManagerConfig vmConf =
       
CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf));
     
assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT,
-      vmConf.isEnableAutoGrouping());
+      vmConf.enableAutoGrouping);
     
assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT,
-      vmConf.getDesiredBytesPerGroup());
+      vmConf.desiredBytesPerChunk);
 
     // auto group set in proto
     
conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING,
 true);
     
conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP,
 1000);
     vmConf = 
CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf));
-    assertEquals(true, vmConf.isEnableAutoGrouping());
-    assertEquals(1000, vmConf.getDesiredBytesPerGroup());
+    assertEquals(true, vmConf.enableAutoGrouping);
+    assertEquals(1000, vmConf.desiredBytesPerChunk);
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
----------------------------------------------------------------------
diff --git 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
index d2ce378..f95daa7 100644
--- 
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
+++ 
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java
@@ -17,9 +17,9 @@
  */
 package org.apache.tez.runtime.library.cartesianproduct;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
@@ -37,11 +37,10 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
-import org.mockito.Matchers;
 import org.mockito.MockitoAnnotations;
 
 import java.util.ArrayList;
-import java.util.Formatter;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -49,8 +48,6 @@ import java.util.Map;
 import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.BROADCAST;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyMapOf;
@@ -63,150 +60,385 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestCartesianProductVertexManagerUnpartitioned {
+  private static long desiredBytesPerGroup = 1000;
   @Captor
   private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;
   @Captor
-  private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleTaskRequestCaptor;
+  private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleRequestCaptor;
+  @Captor
+  private ArgumentCaptor<Integer> parallelismCaptor;
   private CartesianProductVertexManagerUnpartitioned vertexManager;
-  private VertexManagerPluginContext context;
-  private List<TaskAttemptIdentifier> allCompletions;
+  private VertexManagerPluginContext ctx;
 
   @Before
-  public void setup() throws Exception {
+  public void setup() {
     MockitoAnnotations.initMocks(this);
-    context = mock(VertexManagerPluginContext.class);
-    vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
+    ctx = mock(VertexManagerPluginContext.class);
+    vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx);
+  }
 
-    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
-    edgePropertyMap.put("v0", 
EdgeProperty.create(EdgeManagerPluginDescriptor.create(
-        CartesianProductEdgeManager.class.getName()), null, null, null, null));
-    edgePropertyMap.put("v1", 
EdgeProperty.create(EdgeManagerPluginDescriptor.create(
-      CartesianProductEdgeManager.class.getName()), null, null, null, null));
+  /**
+   * v0 and v1 are two cartesian product sources
+   */
+  private void setupDAGVertexOnly(boolean doGrouping) throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
+    setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3);
+
+    CartesianProductVertexManagerConfig config = new 
CartesianProductVertexManagerConfig(
+      false, new String[]{"v0","v1"}, null, 0, 0, doGrouping, 
desiredBytesPerGroup, null);
+    vertexManager.initialize(config);
+  }
+
+  /**
+   * v0 and v1 are two cartesian product sources; v2 is broadcast source; 
without auto grouping
+   */
+  private void setupDAGVertexOnlyWithBroadcast() throws Exception {
+    Map<String, EdgeProperty> edgePropertyMap = getEdgePropertyMap(2);
     edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, 
null));
-    when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
-    when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
-    when(context.getVertexNumTasks(eq("v1"))).thenReturn(3);
-    when(context.getVertexNumTasks(eq("v2"))).thenReturn(5);
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
+    setSrcParallelism(ctx, 2, 3, 5);
 
     CartesianProductVertexManagerConfig config =
       new CartesianProductVertexManagerConfig(
         false, new String[]{"v0","v1"}, null, 0, 0, false, 0, null);
     vertexManager.initialize(config);
-    allCompletions = new ArrayList<>();
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
-      
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 0), 0), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
-      
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 0), 1), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
-      
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 1), 0), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
-      
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 1), 1), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1",
-      
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 1), 2), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v2",
+  }
+
+  /**
+   * v0 and g0 are two sources; g0 is vertex group of v1 and v2
+   */
+  private void setupDAGVertexGroup(boolean doGrouping) throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3));
+    setSrcParallelism(ctx, doGrouping ? 10: 1, 2, 3, 4);
+
+    Map<String, List<String>> vertexGroupMap = new HashMap<>();
+    vertexGroupMap.put("g0", Arrays.asList("v1", "v2"));
+    when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
+
+    CartesianProductVertexManagerConfig config = new 
CartesianProductVertexManagerConfig(
+      false, new String[]{"v0","g0"}, null, 0, 0, doGrouping, 
desiredBytesPerGroup, null);
+    vertexManager.initialize(config);
+  }
+
+  /**
+   * g0 and g1 are two sources; g0 is vertex group of v0 and v1; g1 is vertex 
group of v2 and v3
+   */
+  private void setupDAGVertexGroupOnly(boolean doGrouping) throws Exception {
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(4));
+    setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3, 4, 5);
+
+    Map<String, List<String>> vertexGroupMap = new HashMap<>();
+    vertexGroupMap.put("g0", Arrays.asList("v0", "v1"));
+    vertexGroupMap.put("g1", Arrays.asList("v2", "v3"));
+    when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
+
+    CartesianProductVertexManagerConfig config = new 
CartesianProductVertexManagerConfig(
+      false, new String[]{"g0","g1"}, null, 0, 0, doGrouping, 
desiredBytesPerGroup, null);
+    vertexManager.initialize(config);
+  }
+
+  private Map<String, EdgeProperty> getEdgePropertyMap(int numSrcV) {
+    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
+    for (int i = 0; i < numSrcV; i++) {
+      edgePropertyMap.put("v"+i, 
EdgeProperty.create(EdgeManagerPluginDescriptor.create(
+        CartesianProductEdgeManager.class.getName()), null, null, null, null));
+    }
+    return edgePropertyMap;
+  }
+
+  private void setSrcParallelism(VertexManagerPluginContext ctx, int 
multiplier, int... numTasks) {
+    int i = 0;
+    for (int numTask : numTasks) {
+      when(ctx.getVertexNumTasks(eq("v"+i))).thenReturn(numTask * multiplier);
+      i++;
+    }
+  }
+
+  private TaskAttemptIdentifier getTaId(String vertexName, int taskId) {
+    return new TaskAttemptIdentifierImpl("dag", vertexName,
       
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 3), 0), 0)));
+        TezDAGID.getInstance("0", 0, 0), 0), taskId), 0));
+  }
+
+  private VertexManagerEvent getVMEevnt(long outputSize, String vName, int 
taskId) {
+
+    VertexManagerEventPayloadProto.Builder builder = 
VertexManagerEventPayloadProto.newBuilder();
+    builder.setOutputSize(outputSize);
+    VertexManagerEvent vmEvent =
+      VertexManagerEvent.create("cp vertex", 
builder.build().toByteString().asReadOnlyByteBuffer());
+    vmEvent.setProducerAttemptIdentifier(getTaId(vName, taskId));
+    return vmEvent;
+  }
+
+  private void verifyEdgeProperties(EdgeProperty edgeProperty, String[] 
sources,
+                                    int[] numChunksPerSrc, int numChunk, int 
chunkIdOffset)
+    throws InvalidProtocolBufferException {
+    CartesianProductEdgeManagerConfig conf = 
CartesianProductEdgeManagerConfig.fromUserPayload(
+      edgeProperty.getEdgeManagerDescriptor().getUserPayload());
+    assertArrayEquals(sources, conf.getSourceVertices().toArray());
+    assertArrayEquals(numChunksPerSrc, conf.numChunksPerSrc);
+    assertEquals(numChunk, conf.numChunk);
+    assertEquals(chunkIdOffset, conf.chunkIdOffset);
+  }
+
+  private void verifyScheduleRequest(int expectedTimes, int... expectedTid) {
+    verify(ctx, 
times(expectedTimes)).scheduleTasks(scheduleRequestCaptor.capture());
+    if (expectedTimes > 0) {
+      List<ScheduleTaskRequest> requests = scheduleRequestCaptor.getValue();
+      int i = 0;
+      for (int tid : expectedTid) {
+        assertEquals(tid, requests.get(i).getTaskIndex());
+        i++;
+      }
+    }
   }
 
   @Test(timeout = 5000)
-  public void testReconfigureVertex() throws Exception {
-    ArgumentCaptor<Integer> parallelismCaptor = 
ArgumentCaptor.forClass(Integer.class);
+  public void testDAGVertexOnly() throws Exception {
+    setupDAGVertexOnly(false);
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", 
VertexState.CONFIGURED));
-    verify(context, never()).reconfigureVertex(
+    verify(ctx, never()).reconfigureVertex(
       anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, 
EdgeProperty.class));
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", 
VertexState.CONFIGURED));
-    verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+    verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(),
       isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    assertEquals(6, (int)parallelismCaptor.getValue());
+    assertEquals(6, (int) parallelismCaptor.getValue());
     Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-    assertFalse(edgeProperties.containsKey("v2"));
-    for (EdgeProperty edgeProperty : edgeProperties.values()) {
-      UserPayload payload = 
edgeProperty.getEdgeManagerDescriptor().getUserPayload();
-      CartesianProductEdgeManagerConfig newConfig =
-        CartesianProductEdgeManagerConfig.fromUserPayload(payload);
-      assertArrayEquals(new int[]{2,3}, newConfig.getNumTasks());
-      assertArrayEquals(new int[]{2,3}, newConfig.getNumGroups());
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, 
new int[]{2, 3}, 2, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, 
new int[]{2, 3}, 3, 0);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    verify(ctx, never()).scheduleTasks(scheduleRequestCaptor.capture());
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
+    verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture());
+    verifyScheduleRequest(1, 0);
+
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 1));
+    verify(ctx, times(2)).scheduleTasks(scheduleRequestCaptor.capture());
+    verifyScheduleRequest(2, 1);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGVertexOnlyWithGrouping() throws Exception {
+    setupDAGVertexOnly(true);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", 
VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", 
VertexState.CONFIGURED));
+
+    
vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, 
"v0", 0));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, 
EdgeProperty.class));
+
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", 0));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, 
EdgeProperty.class));
+
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1));
+    for (int i = 1; i < 30; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", i));
+    }
+    verify(ctx, times(1)).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, 
new int[]{10, 1}, 10, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, 
new int[]{10, 1}, 1, 0);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+    for (int i = 0; i < 29; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v1", i));
     }
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 29));
+    verifyScheduleRequest(1, 0);
   }
 
   @Test(timeout = 5000)
-  public void testOnSourceTaskComplete() throws Exception {
+  public void testDAGVertexGroup() throws Exception {
+    setupDAGVertexGroup(false);
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", 
VertexState.CONFIGURED));
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", 
VertexState.CONFIGURED));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, 
EdgeProperty.class));
+
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", 
VertexState.CONFIGURED));
+    verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    assertEquals(14, (int) parallelismCaptor.getValue());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, 
new int[]{2, 7}, 2, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, 
new int[]{2, 7}, 3, 0);
+    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, 
new int[]{2, 7}, 4, 3);
+
     vertexManager.onVertexStarted(null);
-    verify(context, 
never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-    vertexManager.onSourceTaskCompleted(allCompletions.get(0));
-    verify(context, 
never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-    vertexManager.onSourceTaskCompleted(allCompletions.get(2));
-    // cannot start schedule because broadcast vertex isn't in RUNNING state
-    verify(context, 
never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0",1));
+    vertexManager.onSourceTaskCompleted(getTaId("v1",2));
+    verifyScheduleRequest(1, 9);
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 0));
+    verifyScheduleRequest(2, 10);
+  }
 
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", 
VertexState.RUNNING));
-    verify(context, 
times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-    List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
-    assertNotNull(requests);
-    assertEquals(1, requests.size());
-    assertEquals(0, requests.get(0).getTaskIndex());
-
-    // v2 completion shouldn't matter
-    vertexManager.onSourceTaskCompleted(allCompletions.get(5));
-    verify(context, 
times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-
-    vertexManager.onSourceTaskCompleted(allCompletions.get(3));
-    verify(context, 
times(2)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-    requests = scheduleTaskRequestCaptor.getValue();
-    assertNotNull(requests);
-    assertEquals(1, requests.size());
-    assertEquals(1, requests.get(0).getTaskIndex());
+  @Test(timeout = 5000)
+  public void testDAGVertexGroupWithGrouping() throws Exception {
+    setupDAGVertexGroup(true);
+
+    for (int i = 0; i < 3; i++) {
+      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, 
VertexState.CONFIGURED));
+    }
+
+    
vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, 
"v0", 0));
+    
vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, 
"v1", 0));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, 
EdgeProperty.class));
+
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1));
+    for (int i = 0; i < 40; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v2", i));
+    }
+
+    verify(ctx, times(1)).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, 
new int[]{10, 31}, 10, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, 
new int[]{10, 31}, 30, 0);
+    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, 
new int[]{10, 31}, 1, 30);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 10));
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 0));
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+    verifyScheduleRequest(1, 10);
+    for (int i = 1; i < 40; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v2", i));
+    }
+    verifyScheduleRequest(2, 30);
   }
 
-  private void testOnVertexStartHelper(boolean broadcastRunning) throws 
Exception {
+  @Test(timeout = 5000)
+  public void testDAGVertexGroupOnly() throws Exception {
+    setupDAGVertexGroupOnly(false);
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", 
VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", 
VertexState.CONFIGURED));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, 
EdgeProperty.class));
+
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", 
VertexState.CONFIGURED));
-    if (broadcastRunning) {
-      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", 
VertexState.RUNNING));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v3", 
VertexState.CONFIGURED));
+    verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(),
+      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    assertEquals(45, (int) parallelismCaptor.getValue());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, 
new int[]{5, 9}, 2, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, 
new int[]{5, 9}, 3, 2);
+    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, 
new int[]{5, 9}, 4, 0);
+    verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, 
new int[]{5, 9}, 5, 4);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 3));
+    verifyScheduleRequest(1, 12);
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 2));
+    verifyScheduleRequest(2, 39);
+    vertexManager.onSourceTaskCompleted(getTaId("v3", 0));
+    verifyScheduleRequest(3, 13, 40);
+  }
+
+  @Test(timeout = 5000)
+  public void testDAGVertexGroupOnlyWithGrouping() throws Exception {
+    setupDAGVertexGroupOnly(true);
+
+    for (int i = 0; i < 4; i++) {
+      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, 
VertexState.CONFIGURED));
     }
 
-    List<TaskAttemptIdentifier> completions = new ArrayList<>();
-    completions.add(allCompletions.get(0));
-    completions.add(allCompletions.get(2));
-    completions.add(allCompletions.get(5));
-    vertexManager.onVertexStarted(completions);
+    
vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, 
"v0", 0));
+    
vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, 
"v2", 0));
+    verify(ctx, never()).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, 
EdgeProperty.class));
 
-    if (!broadcastRunning) {
-      verify(context, 
never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-      vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", 
VertexState.RUNNING));
+    vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1));
+    for (int i = 0; i < 5; i++) {
+      
vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup/5, 
"v1", i));
+    }
+    for (int i = 0; i < 50; i++) {
+      vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v3", i));
     }
 
-    verify(context, 
times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-    List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
-    assertNotNull(requests);
-    assertEquals(1, requests.size());
-    assertEquals(0, requests.get(0).getTaskIndex());
+    verify(ctx, times(1)).reconfigureVertex(
+      anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture());
+    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
+    verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, 
new int[]{16, 41}, 10, 0);
+    verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, 
new int[]{16, 41}, 6, 10);
+    verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, 
new int[]{16, 41}, 40, 0);
+    verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, 
new int[]{16, 41}, 1, 40);
+
+    vertexManager.onVertexStarted(null);
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
+    vertexManager.onSourceTaskCompleted(getTaId("v2", 20));
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    verifyScheduleRequest(1, 20);
+    vertexManager.onSourceTaskCompleted(getTaId("v3", 0));
+    verifyScheduleRequest(1);
+    for (int i = 1; i < 50; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v3", i));
+    }
+    verifyScheduleRequest(2, 40);
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 5));
+    verifyScheduleRequest(2);
+    for (int i = 6; i < 10; i++) {
+      vertexManager.onSourceTaskCompleted(getTaId("v1", i));
+    }
+    verifyScheduleRequest(3, 471, 491);
   }
 
   @Test(timeout = 5000)
-  public void testOnVertexStartWithBroadcastRunning() throws Exception {
-    testOnVertexStartHelper(true);
+  public void testSchedulingVertexOnlyWithBroadcast() throws Exception {
+    setupDAGVertexOnlyWithBroadcast();
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", 
VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", 
VertexState.CONFIGURED));
+    vertexManager.onVertexStarted(null);
+
+    verifyScheduleRequest(0);
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v1", 1));
+    verifyScheduleRequest(0);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", 
VertexState.RUNNING));
+    verifyScheduleRequest(1);
+    verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture());
   }
 
   @Test(timeout = 5000)
-  public void testOnVertexStartWithoutBroadcastRunning() throws Exception {
-    testOnVertexStartHelper(false);
+  public void testOnVertexStart() throws Exception {
+    setupDAGVertexOnly(false);
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", 
VertexState.CONFIGURED));
+    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", 
VertexState.CONFIGURED));
 
+    vertexManager.onVertexStarted(Arrays.asList(getTaId("v0", 0), 
getTaId("v1", 0)));
+    verifyScheduleRequest(1, 0);
   }
 
   @Test(timeout = 5000)
   public void testZeroSrcTask() throws Exception {
-    context = mock(VertexManagerPluginContext.class);
-    vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
-    when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
-    when(context.getVertexNumTasks(eq("v1"))).thenReturn(0);
+    ctx = mock(VertexManagerPluginContext.class);
+    vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx);
+    when(ctx.getVertexNumTasks(eq("v0"))).thenReturn(2);
+    when(ctx.getVertexNumTasks(eq("v1"))).thenReturn(0);
 
     CartesianProductVertexManagerConfig config =
       new CartesianProductVertexManagerConfig(
@@ -216,144 +448,13 @@ public class 
TestCartesianProductVertexManagerUnpartitioned {
       CartesianProductEdgeManager.class.getName()), null, null, null, null));
     edgePropertyMap.put("v1", 
EdgeProperty.create(EdgeManagerPluginDescriptor.create(
       CartesianProductEdgeManager.class.getName()), null, null, null, null));
-    when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
+    when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
 
     vertexManager.initialize(config);
-    allCompletions = new ArrayList<>();
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
-      
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 0), 0), 0)));
-    allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0",
-      
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-        TezDAGID.getInstance("0", 0, 0), 0), 1), 0)));
-
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", 
VertexState.CONFIGURED));
     vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", 
VertexState.CONFIGURED));
     vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>());
-    vertexManager.onSourceTaskCompleted(allCompletions.get(0));
-    vertexManager.onSourceTaskCompleted(allCompletions.get(1));
-  }
-
-  @Test(timeout = 5000)
-  public void testAutoGrouping() throws Exception {
-    testAutoGroupingHelper(false);
-    testAutoGroupingHelper(true);
-  }
-
-  private void testAutoGroupingHelper(boolean enableAutoGrouping) throws 
Exception {
-    int numTaskV0 = 20;
-    int numTaskV1 = 10;
-    long desiredBytesPerGroup = 1000;
-    long outputBytesPerTaskV0 = 500;
-    long outputBytesPerTaskV1 = 10;
-    int expectedNumGroupV0 = 10;
-    int expectedNumGroupV1 = 1;
-    ArgumentCaptor<Integer> parallelismCaptor = 
ArgumentCaptor.forClass(Integer.class);
-    CartesianProductVertexManagerConfig config = new 
CartesianProductVertexManagerConfig(
-      false, new String[]{"v0","v1"}, null, 0, 0, enableAutoGrouping, 
desiredBytesPerGroup, null);
-    Map<String, EdgeProperty> edgePropertyMap = new HashMap<>();
-    EdgeProperty edgeProperty = 
EdgeProperty.create(EdgeManagerPluginDescriptor.create(
-      CartesianProductEdgeManager.class.getName()), null, null, null, null);
-    edgePropertyMap.put("v0", edgeProperty);
-    edgePropertyMap.put("v1", edgeProperty);
-    edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, 
null));
-    when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
-    when(context.getVertexNumTasks(eq("v0"))).thenReturn(2);
-    when(context.getVertexNumTasks(eq("v1"))).thenReturn(3);
-
-    context = mock(VertexManagerPluginContext.class);
-    vertexManager = new CartesianProductVertexManagerUnpartitioned(context);
-    when(context.getVertexNumTasks(eq("v0"))).thenReturn(numTaskV0);
-    when(context.getVertexNumTasks(eq("v1"))).thenReturn(numTaskV1);
-    when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
-
-    vertexManager.initialize(config);
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", 
VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", 
VertexState.CONFIGURED));
-    vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", 
VertexState.RUNNING));
-    if (!enableAutoGrouping) {
-      // auto grouping disabled, shouldn't auto group
-      verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
-        isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
-      assertEquals(numTaskV0 * numTaskV1, 
parallelismCaptor.getValue().intValue());
-      return;
-    }
-
-    // not enough input size, shouldn't auto group
-    verify(context, never()).reconfigureVertex(anyInt(), 
any(VertexLocationHint.class),
-      anyMapOf(String.class, EdgeProperty.class));
-
-    // only v0 reach threshold or finish all task, shouldn't auto group
-    VertexManagerEventPayloadProto.Builder builder = 
VertexManagerEventPayloadProto.newBuilder();
-    builder.setOutputSize(outputBytesPerTaskV0);
-    VertexManagerEventPayloadProto proto = builder.build();
-    VertexManagerEvent vmEvent =
-      VertexManagerEvent.create("cp vertex", 
proto.toByteString().asReadOnlyByteBuffer());
-
-    for (int i = 0; i < desiredBytesPerGroup/outputBytesPerTaskV0; i++) {
-      vmEvent.setProducerAttemptIdentifier(
-        new TaskAttemptIdentifierImpl("dag", "v0", TezTaskAttemptID.fromString(
-          String.format("attempt_1441301219877_0109_1_00_%06d_0", i))));
-      vertexManager.onVertexManagerEventReceived(vmEvent);
-    }
-    verify(context, never()).reconfigureVertex(anyInt(), 
any(VertexLocationHint.class),
-      anyMapOf(String.class, EdgeProperty.class));
-
-    // vmEvent from broadcast vertex shouldn't matter
-    vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", 
"v2",
-        
TezTaskAttemptID.fromString("attempt_1441301219877_0109_1_00_000000_0")));
-    vertexManager.onVertexManagerEventReceived(vmEvent);
-
-    // v1 finish all tasks but still doesn't reach threshold, auto group anyway
-    proto = builder.setOutputSize(outputBytesPerTaskV1).build();
-    vmEvent = VertexManagerEvent.create("cp vertex", 
proto.toByteString().asReadOnlyByteBuffer());
-    for (int i = 0; i < numTaskV1; i++) {
-      verify(context, never()).reconfigureVertex(anyInt(), 
any(VertexLocationHint.class),
-        anyMapOf(String.class, EdgeProperty.class));
-      vmEvent.setProducerAttemptIdentifier(
-        new TaskAttemptIdentifierImpl("dag", "v1", TezTaskAttemptID.fromString(
-          String.format("attempt_1441301219877_0109_1_01_%06d_0", i))));
-      vertexManager.onVertexManagerEventReceived(vmEvent);
-    }
-    verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(),
-      isNull(VertexLocationHint.class), edgePropertiesCaptor.capture());
-    Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue();
-    for (EdgeProperty property : edgeProperties.values()) {
-      UserPayload payload = 
property.getEdgeManagerDescriptor().getUserPayload();
-      CartesianProductEdgeManagerConfig newConfig =
-        CartesianProductEdgeManagerConfig.fromUserPayload(payload);
-      assertArrayEquals(new int[]{numTaskV0, numTaskV1}, 
newConfig.getNumTasks());
-      assertArrayEquals(new int[]{expectedNumGroupV0,expectedNumGroupV1}, 
newConfig.getNumGroups());
-    }
-
-    assertEquals(expectedNumGroupV0 * expectedNumGroupV1, 
parallelismCaptor.getValue().intValue());
-    for (EdgeProperty property : edgePropertiesCaptor.getValue().values()) {
-      CartesianProductEdgeManagerConfig emConfig =
-        CartesianProductEdgeManagerConfig.fromUserPayload(
-          property.getEdgeManagerDescriptor().getUserPayload());
-      assertArrayEquals(new int[] {numTaskV0, numTaskV1}, 
emConfig.getNumTasks());
-      assertArrayEquals(new int[] {expectedNumGroupV0, expectedNumGroupV1}, 
emConfig.getNumGroups());
-    }
-
-    vertexManager.onVertexStarted(null);
-    // v0 t0 finish, shouldn't schedule
-    vertexManager.onSourceTaskCompleted(allCompletions.get(0));
-    verify(context, 
never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-
-    // v1 all task finish, shouldn't schedule
-    for (int i = 0; i < numTaskV1; i++) {
-      vertexManager.onSourceTaskCompleted(new TaskAttemptIdentifierImpl("dag", 
"v1",
-        
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
-          TezDAGID.getInstance("0", 0, 0), 1), i), 0)));
-      verify(context, 
never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any());
-    }
-
-    // v0 t1 finish, should schedule
-    vertexManager.onSourceTaskCompleted(allCompletions.get(1));
-    verify(context, 
times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture());
-    List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue();
-    assertNotNull(requests);
-    assertEquals(1, requests.size());
-    assertEquals(0, requests.get(0).getTaskIndex());
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
+    vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
   }
 }
\ No newline at end of file

Reply via email to