Repository: tez Updated Branches: refs/heads/master e0ee28ae6 -> f355a050c
http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java index 1afedb9..b586de6 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerPartitioned.java @@ -50,8 +50,8 @@ public class TestCartesianProductEdgeManagerPartitioned { */ @Test(timeout = 5000) public void testTwoWay() throws Exception { - CartesianProductEdgeManagerConfig emConfig = - new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new int[]{3,4}, null, null, null); + CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true, + new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, null); when(mockContext.getDestinationVertexNumTasks()).thenReturn(12); testTwoWayV0(emConfig); testTwoWayV1(emConfig); @@ -144,9 +144,8 @@ public class TestCartesianProductEdgeManagerPartitioned { CartesianProductFilterDescriptor filterDescriptor = new CartesianProductFilterDescriptor(TestFilter.class.getName()) .setUserPayload(UserPayload.create(buffer)); - CartesianProductEdgeManagerConfig emConfig = - new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1"}, new int[]{3,4}, null, null, - filterDescriptor); + CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true, + new String[]{"v0","v1"}, new int[]{3,4}, null, 0, 0, filterDescriptor); when(mockContext.getDestinationVertexNumTasks()).thenReturn(3); testTwoWayV0WithFilter(emConfig); testTwoWayV1WithFilter(emConfig); @@ -205,8 +204,8 @@ public class TestCartesianProductEdgeManagerPartitioned { */ @Test(timeout = 5000) public void testThreeWay() throws Exception { - CartesianProductEdgeManagerConfig emConfig = - new CartesianProductEdgeManagerConfig(true, new String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, null, null); + CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(true, + new String[]{"v0","v1","v2"}, new int[]{4,3,2}, null, 0, 0, null); when(mockContext.getDestinationVertexNumTasks()).thenReturn(24); testThreeWayV0(emConfig); testThreeWayV1(emConfig); http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java index db781f3..1ce9c8b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductEdgeManagerUnpartitioned.java @@ -23,6 +23,10 @@ import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetad import org.junit.Before; import org.junit.Test; +import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForDest; +import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForInputError; +import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForRouting; +import static org.apache.tez.runtime.library.cartesianproduct.TestCartesianProductEdgeManagerUnpartitioned.TestData.dataForSrc; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -40,186 +44,128 @@ public class TestCartesianProductEdgeManagerUnpartitioned { edgeManager = new CartesianProductEdgeManagerUnpartitioned(mockContext); } - /** - * Vertex v0 has 2 tasks - * Vertex v1 has 3 tasks - */ - @Test(timeout = 5000) - public void testTwoWay() throws Exception { - CartesianProductEdgeManagerConfig emConfig = - new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null, - new int[]{2,3}, new int[]{2,3}, null); - testTwoWayV0(emConfig); - testTwoWayV1(emConfig); - } - - private void testTwoWayV0(CartesianProductEdgeManagerConfig config) throws Exception { - when(mockContext.getSourceVertexName()).thenReturn("v0"); - when(mockContext.getSourceVertexNumTasks()).thenReturn(2); - edgeManager.initialize(config); - - CompositeEventRouteMetadata compositeRoutingData = - edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNull(compositeRoutingData); + static class TestData { + int srcId, destId, inputId; + Object expected; - compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 3); - assertNotNull(compositeRoutingData); - assertEquals(1, compositeRoutingData.getCount()); - assertEquals(0, compositeRoutingData.getTarget()); - assertEquals(0, compositeRoutingData.getSource()); + public TestData(int srcId, int destId, int inputId, Object expected) { + this.srcId = srcId; + this.destId = destId; + this.inputId = inputId; + this.expected = expected; + } - EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); - assertNull(routingData); + public static TestData dataForRouting(int srcId, int destId, Object expected) { + return new TestData(srcId, destId, -1, expected); + } - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 3); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); + public static TestData dataForInputError(int destId, int inputId, Object expected) { + return new TestData(-1, destId, inputId, expected); + } - assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0)); + public static TestData dataForSrc(int srcId, Object expected) { + return new TestData(srcId, -1, -1, expected); + } - assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1)); - assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1)); - assertEquals(3, edgeManager.getNumDestinationConsumerTasks(1)); + public static TestData dataForDest(int destId, Object expected) { + return new TestData(-1, destId, -1, expected); + } } - private void testTwoWayV1(CartesianProductEdgeManagerConfig config) throws Exception { - when(mockContext.getSourceVertexName()).thenReturn("v1"); - when(mockContext.getSourceVertexNumTasks()).thenReturn(3); - edgeManager.initialize(config); - - CompositeEventRouteMetadata compositeRoutingData = - edgeManager.routeCompositeDataMovementEventToDestination(1, 2); - assertNull(compositeRoutingData); - - compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(compositeRoutingData); - assertEquals(1, compositeRoutingData.getCount()); - assertEquals(0, compositeRoutingData.getTarget()); - assertEquals(0, compositeRoutingData.getSource()); - - EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 2); - assertNull(routingData); - - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - - assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0)); - - assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1)); - assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1)); - assertEquals(2, edgeManager.getNumDestinationConsumerTasks(1)); + private void testEdgeManager(CartesianProductEdgeManagerConfig conf, String vName, int numTask, + String groupName, TestData cDMEInvalid, TestData cDMEValid, + TestData srcFailInvalid, TestData srcFailValid, + TestData inputError, TestData numDestInput, + TestData numSrcOutputTest, TestData numConsumerTest) + throws Exception { + when(mockContext.getSourceVertexName()).thenReturn(vName); + when(mockContext.getSourceVertexNumTasks()).thenReturn(numTask); + when(mockContext.getVertexGroupName()).thenReturn(groupName); + edgeManager.initialize(conf); + + CompositeEventRouteMetadata cDME = + edgeManager.routeCompositeDataMovementEventToDestination(cDMEInvalid.srcId, + cDMEInvalid.destId); + assertNull(cDME); + + cDME = edgeManager.routeCompositeDataMovementEventToDestination(cDMEValid.srcId, + cDMEValid.destId); + assertNotNull(cDME); + CompositeEventRouteMetadata expectedCDME = (CompositeEventRouteMetadata)(cDMEValid.expected); + assertEquals(expectedCDME.getCount(), cDME.getCount()); + assertEquals(expectedCDME.getTarget(), cDME.getTarget()); + assertEquals(expectedCDME.getSource(), cDME.getSource()); + + EventRouteMetadata dme = + edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailInvalid.srcId, + srcFailInvalid.destId); + assertNull(dme); + + dme = edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailValid.srcId, + srcFailValid.destId); + assertNotNull(dme); + EventRouteMetadata expectedDME = (EventRouteMetadata)(srcFailValid.expected); + assertEquals(expectedDME.getNumEvents(), dme.getNumEvents()); + assertArrayEquals(expectedDME.getTargetIndices(), dme.getTargetIndices()); + + assertEquals(inputError.expected, + edgeManager.routeInputErrorEventToSource(inputError.destId, inputError.inputId)); + + assertEquals(numDestInput.expected, + edgeManager.getNumDestinationTaskPhysicalInputs(numDestInput.destId)); + assertEquals(numSrcOutputTest.expected, + edgeManager.getNumSourceTaskPhysicalOutputs(numSrcOutputTest.srcId)); + assertEquals(numConsumerTest.expected, + edgeManager.getNumDestinationConsumerTasks(numConsumerTest.srcId)); } /** * Vertex v0 has 2 tasks * Vertex v1 has 3 tasks - * Vertex v2 has 4 tasks */ @Test(timeout = 5000) - public void testThreeWay() throws Exception { - CartesianProductEdgeManagerConfig emConfig = - new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, null, - new int[]{2,3,4}, new int[]{2,3,4}, null); - testThreeWayV0(emConfig); - testThreeWayV1(emConfig); - testThreeWayV2(emConfig); - } - - private void testThreeWayV0(CartesianProductEdgeManagerConfig config) throws Exception { - when(mockContext.getSourceVertexName()).thenReturn("v0"); - when(mockContext.getSourceVertexNumTasks()).thenReturn(2); - edgeManager.initialize(config); - - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNull(compositeRoutingData); - - compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 12); - assertNotNull(compositeRoutingData); - assertEquals(1, compositeRoutingData.getCount()); - assertEquals(0, compositeRoutingData.getTarget()); - assertEquals(0, compositeRoutingData.getSource()); - - EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); - assertNull(routingData); - - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 12); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - - assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0)); - - assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1)); - assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1)); - assertEquals(12, edgeManager.getNumDestinationConsumerTasks(1)); - } - - private void testThreeWayV1(CartesianProductEdgeManagerConfig config) throws Exception { - when(mockContext.getSourceVertexName()).thenReturn("v1"); - when(mockContext.getSourceVertexNumTasks()).thenReturn(3); - edgeManager.initialize(config); - - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNull(compositeRoutingData); - - compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 16); - assertNotNull(compositeRoutingData); - assertEquals(1, compositeRoutingData.getCount()); - assertEquals(0, compositeRoutingData.getTarget()); - assertEquals(0, compositeRoutingData.getSource()); - - EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1); - assertNull(routingData); - - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 16); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - - assertEquals(0, edgeManager.routeInputErrorEventToSource(1, 0)); - - assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1)); - assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1)); - assertEquals(8, edgeManager.getNumDestinationConsumerTasks(1)); + public void testTwoWayAllVertex() throws Exception { + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null, + new int[]{2,3}, 2, 0, null), "v0", 2, null, + dataForRouting(1, 1, null), dataForRouting(1, 3, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(1, 1, null), dataForRouting(1, 3, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null, + new int[]{2,3}, 3, 0, null), "v1", 3, null, + dataForRouting(1, 2, null), dataForRouting(1, 1, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(1, 2, null), dataForRouting(1, 1, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(1,0,1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 2)); } - private void testThreeWayV2(CartesianProductEdgeManagerConfig config) throws Exception { - when(mockContext.getSourceVertexName()).thenReturn("v2"); - when(mockContext.getSourceVertexNumTasks()).thenReturn(4); - edgeManager.initialize(config); - - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0); - assertNull(compositeRoutingData); - - compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 13); - assertNotNull(compositeRoutingData); - assertEquals(1, compositeRoutingData.getCount()); - assertEquals(0, compositeRoutingData.getTarget()); - assertEquals(0, compositeRoutingData.getSource()); - - EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0); - assertNull(routingData); - - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 13); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - - assertEquals(1, edgeManager.routeInputErrorEventToSource(1, 0)); - - assertEquals(1, edgeManager.getNumDestinationTaskPhysicalInputs(1)); - assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(1)); - assertEquals(6, edgeManager.getNumDestinationConsumerTasks(1)); + /** + * Vertex v0 has 2 tasks + * Vertex v1 has 3 tasks + * Vertex v2 has 4 tasks + */ + @Test(timeout = 5000) + public void testThreeWayAllVertex() throws Exception { + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, + null, new int[]{2,3,4}, 2, 0, null), "v0", 2, null, + dataForRouting(1, 1, null), dataForRouting(1, 12, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(1, 1, null), dataForRouting(1, 12, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 12)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, + null, new int[]{2,3,4}, 3, 0, null), "v1", 3, null, + dataForRouting(1, 1, null), dataForRouting(1, 16, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(1, 1, null), dataForRouting(1, 16, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 8)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1","v2"}, + null, new int[]{2,3,4}, 4, 0, null), "v2", 4, null, + dataForRouting(1, 0, null), dataForRouting(1, 13, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(1, 0, null), dataForRouting(1, 13, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 6)); } @Test(timeout = 5000) public void testZeroSrcTask() { CartesianProductEdgeManagerConfig emConfig = new CartesianProductEdgeManagerConfig(false, new String[]{"v0", "v1"}, null, - new int[]{2,0}, new int[]{2,0}, null); + new int[]{2,0}, 0,0, null); testZeroSrcTaskV0(emConfig); testZeroSrcTaskV1(emConfig); } @@ -240,67 +186,103 @@ public class TestCartesianProductEdgeManagerUnpartitioned { } /** - * Vertex v0 has 20 tasks 10 groups - * Vertex v1 has 10 tasks 1 group + * Vertex v0 has 10 tasks 2 groups + * Vertex v1 has 30 tasks 3 group */ @Test(timeout = 5000) - public void testTwoWayAutoGrouping() throws Exception { - CartesianProductEdgeManagerConfig emConfig = - new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, null, - new int[]{20, 10}, new int[]{10,1}, null); - testTwoWayAutoGroupingV0(emConfig); - testTwoWayAutoGroupingV1(emConfig); + public void testTwoWayAllVertexAutoGrouping() throws Exception { + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, + null, new int[]{2,3}, 2, 0, null), "v0", 10, null, + dataForRouting(6, 1, null), dataForRouting(1, 0, CompositeEventRouteMetadata.create(1, 1, 0)), + dataForRouting(6, 1, null), dataForRouting(1, 0, EventRouteMetadata.create(1, new int[]{1})), + dataForInputError(1, 1, 1), dataForDest(1, 5), dataForSrc(1, 1), dataForSrc(1, 3)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","v1"}, + null, new int[]{2,3}, 3, 0, null), "v1", 30, null, + dataForRouting(6, 1, null), dataForRouting(11, 1, CompositeEventRouteMetadata.create(1, 1, 0)), + dataForRouting(6, 1, null), dataForRouting(11, 1, EventRouteMetadata.create(1, new int[]{1})), + dataForInputError(1, 1, 11), dataForDest(1, 10), dataForSrc(1, 1), dataForSrc(1, 2)); } - private void testTwoWayAutoGroupingV0(CartesianProductEdgeManagerConfig config) throws Exception { - when(mockContext.getSourceVertexName()).thenReturn("v0"); - when(mockContext.getSourceVertexNumTasks()).thenReturn(20); - edgeManager.initialize(config); - - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNull(compositeRoutingData); - - compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 0); - assertNotNull(compositeRoutingData); - assertEquals(1, compositeRoutingData.getCount()); - assertEquals(1, compositeRoutingData.getTarget()); - assertEquals(0, compositeRoutingData.getSource()); - - EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(2, 2); - assertNull(routingData); - - routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(2, 1); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{0}, routingData.getTargetIndices()); - - assertEquals(7, edgeManager.routeInputErrorEventToSource(3, 1)); - - assertEquals(2, edgeManager.getNumDestinationTaskPhysicalInputs(4)); - assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(5)); - assertEquals(1, edgeManager.getNumDestinationConsumerTasks(6)); + /** + * v0 with group g0 {v1, v2} + * Vertex v0 has 2 tasks + * Vertex v1 has 1 tasks + * Vertex v2 has 2 tasks + */ + @Test(timeout = 5000) + public void testTwoWayVertexWithVertexGroup() throws Exception { + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, + null, new int[]{2,3}, 2, 0, null), "v0", 2, null, + dataForRouting(1, 1, null), dataForRouting(1, 3, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(1, 1, null), dataForRouting(1, 3, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, + null, new int[]{2,3}, 1, 0, null), "v1", 1, "g0", + dataForRouting(0, 1, null), dataForRouting(0, 3, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(0, 1, null), dataForRouting(0, 3, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(3, 0, 0), dataForDest(0, 1), dataForSrc(0, 1), dataForSrc(0, 2)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, + null, new int[]{2,3}, 2, 1, null), "v2", 2, "g0", + dataForRouting(1, 1, null), dataForRouting(0, 1, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(1, 1, null), dataForRouting(0, 1, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 2)); } - private void testTwoWayAutoGroupingV1(CartesianProductEdgeManagerConfig config) throws Exception { - when(mockContext.getSourceVertexName()).thenReturn("v1"); - when(mockContext.getSourceVertexNumTasks()).thenReturn(10); - edgeManager.initialize(config); - - CompositeEventRouteMetadata compositeRoutingData = edgeManager.routeCompositeDataMovementEventToDestination(1, 1); - assertNotNull(compositeRoutingData); - assertEquals(1, compositeRoutingData.getCount()); - assertEquals(1, compositeRoutingData.getTarget()); - assertEquals(0, compositeRoutingData.getSource()); - - EventRouteMetadata routingData = edgeManager.routeInputSourceTaskFailedEventToDestination(2, 3); - assertNotNull(routingData); - assertEquals(1, routingData.getNumEvents()); - assertArrayEquals(new int[]{2}, routingData.getTargetIndices()); + /** + * group g0 {v1, v2} with group g1 {v3, v4} + * + * Vertex v0 has 1 tasks + * Vertex v1 has 2 tasks + * Vertex v2 has 3 tasks + * Vertex v3 has 4 tasks + */ + @Test(timeout = 5000) + public void testTwoWayAllVertexGroup() throws Exception { + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"}, + null, new int[]{3,7}, 1, 0, null), "v0", 1, "g0", + dataForRouting(0, 7, null), dataForRouting(0, 1, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(0, 7, null), dataForRouting(0, 1, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(1, 0, 0), dataForDest(1, 1), dataForSrc(0, 1), dataForSrc(0, 7)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"}, + null, new int[]{3,7}, 2, 1, null), "v1", 2, "g0", + dataForRouting(0, 1, null), dataForRouting(1, 15, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(0, 1, null), dataForRouting(1, 15, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(7, 0, 0), dataForDest(7, 1), dataForSrc(1, 1), dataForSrc(1, 7)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"}, + null, new int[]{3,7}, 3, 0, null), "v2", 3, "g1", + dataForRouting(1, 0, null), dataForRouting(1, 1, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(1, 0, null), dataForRouting(1, 1, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(1, 0, 1), dataForDest(1, 1), dataForSrc(1, 1), dataForSrc(1, 3)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"g0","g1"}, + null, new int[]{3,7}, 4, 3, null), "v3", 4, "g1", + dataForRouting(0, 1, null), dataForRouting(1, 4, CompositeEventRouteMetadata.create(1, 0, 0)), + dataForRouting(0, 1, null), dataForRouting(1, 4, EventRouteMetadata.create(1, new int[]{0})), + dataForInputError(4, 0, 1), dataForDest(4, 1), dataForSrc(1, 1), dataForSrc(1, 3)); + } - assertEquals(5, edgeManager.routeInputErrorEventToSource(4, 5)); - assertEquals(10, edgeManager.getNumDestinationTaskPhysicalInputs(6)); - assertEquals(1, edgeManager.getNumSourceTaskPhysicalOutputs(7)); - assertEquals(10, edgeManager.getNumDestinationConsumerTasks(8)); + /** + * v0 with group g0 {v1, v2} + * Vertex v0 has 10 tasks, 2 groups + * Vertex v1 has 10 tasks, 1 group + * Vertex v2 has 20 tasks, 2 groups + */ + @Test(timeout = 5000) + public void testTwoWayWithVertexGroupAutoGrouping() throws Exception { + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, + null, new int[]{2,3}, 2, 0, null), "v0", 10, null, + dataForRouting(0, 4, null), dataForRouting(2, 1, CompositeEventRouteMetadata.create(1, 2, 0)), + dataForRouting(0, 4, null), dataForRouting(2, 1, EventRouteMetadata.create(1, new int[]{2})), + dataForInputError(1, 3, 3), dataForDest(1, 5), dataForSrc(1, 1), dataForSrc(1, 3)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, + null, new int[]{2,3}, 1, 0, null), "v1", 10, "g0", + dataForRouting(1, 1, null), dataForRouting(2, 3, CompositeEventRouteMetadata.create(1, 2, 0)), + dataForRouting(1, 1, null), dataForRouting(2, 3, EventRouteMetadata.create(1, new int[]{2})), + dataForInputError(3, 1, 1), dataForDest(0, 10), dataForSrc(1, 1), dataForSrc(1, 2)); + testEdgeManager(new CartesianProductEdgeManagerConfig(false, new String[]{"v0","g0"}, + null, new int[]{2,3}, 2, 1, null), "v2", 20, "g0", + dataForRouting(11, 1, null), dataForRouting(12, 2, CompositeEventRouteMetadata.create(1, 2, 0)), + dataForRouting(11, 1, null), dataForRouting(12, 2, EventRouteMetadata.create(1, new int[]{2})), + dataForInputError(2, 2, 12), dataForDest(1, 10), dataForSrc(1, 1), dataForSrc(1, 2)); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java index f3a5851..5144e69 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManager.java @@ -29,6 +29,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -136,6 +137,24 @@ public class TestCartesianProductVertexManager { } @Test(timeout = 5000) + public void testCheckDAGConfigConsistentWithVertexGroup() throws Exception { + // positive case + edgePropertyMap.put("v2", cpEdge); + config = new CartesianProductConfig(new int[]{2, 3}, new String[]{"v0", "g0"}, null); + Map<String, List<String>> vertexGroups = new HashMap<>(); + vertexGroups.put("g0", Arrays.asList("v1", "v2")); + when(context.getInputVertexGroups()).thenReturn(vertexGroups); + vertexManager.initialize(); + + // vertex group is in cartesian product config, but one member doesn't have cp edge + edgePropertyMap.put("v2", broadcastEdge); + try { + vertexManager.initialize(); + assertTrue(false); + } catch (Exception ignored) {} + } + + @Test(timeout = 5000) public void testOtherEdgeType() throws Exception { // forbid other custom edge edgePropertyMap.put("v2", customEdge); http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java index bf369d9..5c6ffa7 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerConfig.java @@ -39,15 +39,15 @@ public class TestCartesianProductVertexManagerConfig { CartesianProductVertexManagerConfig vmConf = CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf)); assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING_DEFAULT, - vmConf.isEnableAutoGrouping()); + vmConf.enableAutoGrouping); assertEquals(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP_DEFAULT, - vmConf.getDesiredBytesPerGroup()); + vmConf.desiredBytesPerChunk); // auto group set in proto conf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_AUTO_GROUPING, true); conf.setLong(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_DESIRED_BYTES_PER_GROUP, 1000); vmConf = CartesianProductVertexManagerConfig.fromUserPayload(config.toUserPayload(conf)); - assertEquals(true, vmConf.isEnableAutoGrouping()); - assertEquals(1000, vmConf.getDesiredBytesPerGroup()); + assertEquals(true, vmConf.enableAutoGrouping); + assertEquals(1000, vmConf.desiredBytesPerChunk); } } http://git-wip-us.apache.org/repos/asf/tez/blob/f355a050/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java index d2ce378..f95daa7 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java @@ -17,9 +17,9 @@ */ package org.apache.tez.runtime.library.cartesianproduct; +import com.google.protobuf.InvalidProtocolBufferException; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; @@ -37,11 +37,10 @@ import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; -import org.mockito.Matchers; import org.mockito.MockitoAnnotations; import java.util.ArrayList; -import java.util.Formatter; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -49,8 +48,6 @@ import java.util.Map; import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.BROADCAST; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyMapOf; @@ -63,150 +60,385 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TestCartesianProductVertexManagerUnpartitioned { + private static long desiredBytesPerGroup = 1000; @Captor private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor; @Captor - private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleTaskRequestCaptor; + private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleRequestCaptor; + @Captor + private ArgumentCaptor<Integer> parallelismCaptor; private CartesianProductVertexManagerUnpartitioned vertexManager; - private VertexManagerPluginContext context; - private List<TaskAttemptIdentifier> allCompletions; + private VertexManagerPluginContext ctx; @Before - public void setup() throws Exception { + public void setup() { MockitoAnnotations.initMocks(this); - context = mock(VertexManagerPluginContext.class); - vertexManager = new CartesianProductVertexManagerUnpartitioned(context); + ctx = mock(VertexManagerPluginContext.class); + vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx); + } - Map<String, EdgeProperty> edgePropertyMap = new HashMap<>(); - edgePropertyMap.put("v0", EdgeProperty.create(EdgeManagerPluginDescriptor.create( - CartesianProductEdgeManager.class.getName()), null, null, null, null)); - edgePropertyMap.put("v1", EdgeProperty.create(EdgeManagerPluginDescriptor.create( - CartesianProductEdgeManager.class.getName()), null, null, null, null)); + /** + * v0 and v1 are two cartesian product sources + */ + private void setupDAGVertexOnly(boolean doGrouping) throws Exception { + when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2)); + setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3); + + CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig( + false, new String[]{"v0","v1"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null); + vertexManager.initialize(config); + } + + /** + * v0 and v1 are two cartesian product sources; v2 is broadcast source; without auto grouping + */ + private void setupDAGVertexOnlyWithBroadcast() throws Exception { + Map<String, EdgeProperty> edgePropertyMap = getEdgePropertyMap(2); edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, null)); - when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); - when(context.getVertexNumTasks(eq("v0"))).thenReturn(2); - when(context.getVertexNumTasks(eq("v1"))).thenReturn(3); - when(context.getVertexNumTasks(eq("v2"))).thenReturn(5); + when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); + setSrcParallelism(ctx, 2, 3, 5); CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig( false, new String[]{"v0","v1"}, null, 0, 0, false, 0, null); vertexManager.initialize(config); - allCompletions = new ArrayList<>(); - allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0", - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 0), 0), 0))); - allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0", - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 0), 1), 0))); - allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1", - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 1), 0), 0))); - allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1", - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 1), 1), 0))); - allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v1", - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 1), 2), 0))); - allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v2", + } + + /** + * v0 and g0 are two sources; g0 is vertex group of v1 and v2 + */ + private void setupDAGVertexGroup(boolean doGrouping) throws Exception { + when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3)); + setSrcParallelism(ctx, doGrouping ? 10: 1, 2, 3, 4); + + Map<String, List<String>> vertexGroupMap = new HashMap<>(); + vertexGroupMap.put("g0", Arrays.asList("v1", "v2")); + when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap); + + CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig( + false, new String[]{"v0","g0"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null); + vertexManager.initialize(config); + } + + /** + * g0 and g1 are two sources; g0 is vertex group of v0 and v1; g1 is vertex group of v2 and v3 + */ + private void setupDAGVertexGroupOnly(boolean doGrouping) throws Exception { + when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(4)); + setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3, 4, 5); + + Map<String, List<String>> vertexGroupMap = new HashMap<>(); + vertexGroupMap.put("g0", Arrays.asList("v0", "v1")); + vertexGroupMap.put("g1", Arrays.asList("v2", "v3")); + when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap); + + CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig( + false, new String[]{"g0","g1"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null); + vertexManager.initialize(config); + } + + private Map<String, EdgeProperty> getEdgePropertyMap(int numSrcV) { + Map<String, EdgeProperty> edgePropertyMap = new HashMap<>(); + for (int i = 0; i < numSrcV; i++) { + edgePropertyMap.put("v"+i, EdgeProperty.create(EdgeManagerPluginDescriptor.create( + CartesianProductEdgeManager.class.getName()), null, null, null, null)); + } + return edgePropertyMap; + } + + private void setSrcParallelism(VertexManagerPluginContext ctx, int multiplier, int... numTasks) { + int i = 0; + for (int numTask : numTasks) { + when(ctx.getVertexNumTasks(eq("v"+i))).thenReturn(numTask * multiplier); + i++; + } + } + + private TaskAttemptIdentifier getTaId(String vertexName, int taskId) { + return new TaskAttemptIdentifierImpl("dag", vertexName, TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 3), 0), 0))); + TezDAGID.getInstance("0", 0, 0), 0), taskId), 0)); + } + + private VertexManagerEvent getVMEevnt(long outputSize, String vName, int taskId) { + + VertexManagerEventPayloadProto.Builder builder = VertexManagerEventPayloadProto.newBuilder(); + builder.setOutputSize(outputSize); + VertexManagerEvent vmEvent = + VertexManagerEvent.create("cp vertex", builder.build().toByteString().asReadOnlyByteBuffer()); + vmEvent.setProducerAttemptIdentifier(getTaId(vName, taskId)); + return vmEvent; + } + + private void verifyEdgeProperties(EdgeProperty edgeProperty, String[] sources, + int[] numChunksPerSrc, int numChunk, int chunkIdOffset) + throws InvalidProtocolBufferException { + CartesianProductEdgeManagerConfig conf = CartesianProductEdgeManagerConfig.fromUserPayload( + edgeProperty.getEdgeManagerDescriptor().getUserPayload()); + assertArrayEquals(sources, conf.getSourceVertices().toArray()); + assertArrayEquals(numChunksPerSrc, conf.numChunksPerSrc); + assertEquals(numChunk, conf.numChunk); + assertEquals(chunkIdOffset, conf.chunkIdOffset); + } + + private void verifyScheduleRequest(int expectedTimes, int... expectedTid) { + verify(ctx, times(expectedTimes)).scheduleTasks(scheduleRequestCaptor.capture()); + if (expectedTimes > 0) { + List<ScheduleTaskRequest> requests = scheduleRequestCaptor.getValue(); + int i = 0; + for (int tid : expectedTid) { + assertEquals(tid, requests.get(i).getTaskIndex()); + i++; + } + } } @Test(timeout = 5000) - public void testReconfigureVertex() throws Exception { - ArgumentCaptor<Integer> parallelismCaptor = ArgumentCaptor.forClass(Integer.class); + public void testDAGVertexOnly() throws Exception { + setupDAGVertexOnly(false); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); - verify(context, never()).reconfigureVertex( + verify(ctx, never()).reconfigureVertex( anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(), + verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(), isNull(VertexLocationHint.class), edgePropertiesCaptor.capture()); - assertEquals(6, (int)parallelismCaptor.getValue()); + assertEquals(6, (int) parallelismCaptor.getValue()); Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); - assertFalse(edgeProperties.containsKey("v2")); - for (EdgeProperty edgeProperty : edgeProperties.values()) { - UserPayload payload = edgeProperty.getEdgeManagerDescriptor().getUserPayload(); - CartesianProductEdgeManagerConfig newConfig = - CartesianProductEdgeManagerConfig.fromUserPayload(payload); - assertArrayEquals(new int[]{2,3}, newConfig.getNumTasks()); - assertArrayEquals(new int[]{2,3}, newConfig.getNumGroups()); + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{2, 3}, 2, 0); + verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{2, 3}, 3, 0); + + vertexManager.onVertexStarted(null); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + verify(ctx, never()).scheduleTasks(scheduleRequestCaptor.capture()); + vertexManager.onSourceTaskCompleted(getTaId("v1", 0)); + verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture()); + verifyScheduleRequest(1, 0); + + vertexManager.onSourceTaskCompleted(getTaId("v1", 1)); + verify(ctx, times(2)).scheduleTasks(scheduleRequestCaptor.capture()); + verifyScheduleRequest(2, 1); + } + + @Test(timeout = 5000) + public void testDAGVertexOnlyWithGrouping() throws Exception { + setupDAGVertexOnly(true); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + + vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0)); + verify(ctx, never()).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); + + vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", 0)); + verify(ctx, never()).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); + + vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1)); + for (int i = 1; i < 30; i++) { + vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", i)); + } + verify(ctx, times(1)).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{10, 1}, 10, 0); + verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{10, 1}, 1, 0); + + vertexManager.onVertexStarted(null); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); + for (int i = 0; i < 29; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v1", i)); } + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v1", 29)); + verifyScheduleRequest(1, 0); } @Test(timeout = 5000) - public void testOnSourceTaskComplete() throws Exception { + public void testDAGVertexGroup() throws Exception { + setupDAGVertexGroup(false); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + verify(ctx, never()).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); + + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED)); + verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(), + isNull(VertexLocationHint.class), edgePropertiesCaptor.capture()); + assertEquals(14, (int) parallelismCaptor.getValue()); + Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); + + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, new int[]{2, 7}, 2, 0); + verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, new int[]{2, 7}, 3, 0); + verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, new int[]{2, 7}, 4, 3); + vertexManager.onVertexStarted(null); - verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any()); - vertexManager.onSourceTaskCompleted(allCompletions.get(0)); - verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any()); - vertexManager.onSourceTaskCompleted(allCompletions.get(2)); - // cannot start schedule because broadcast vertex isn't in RUNNING state - verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any()); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0",1)); + vertexManager.onSourceTaskCompleted(getTaId("v1",2)); + verifyScheduleRequest(1, 9); + vertexManager.onSourceTaskCompleted(getTaId("v2", 0)); + verifyScheduleRequest(2, 10); + } - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING)); - verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture()); - List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue(); - assertNotNull(requests); - assertEquals(1, requests.size()); - assertEquals(0, requests.get(0).getTaskIndex()); - - // v2 completion shouldn't matter - vertexManager.onSourceTaskCompleted(allCompletions.get(5)); - verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture()); - - vertexManager.onSourceTaskCompleted(allCompletions.get(3)); - verify(context, times(2)).scheduleTasks(scheduleTaskRequestCaptor.capture()); - requests = scheduleTaskRequestCaptor.getValue(); - assertNotNull(requests); - assertEquals(1, requests.size()); - assertEquals(1, requests.get(0).getTaskIndex()); + @Test(timeout = 5000) + public void testDAGVertexGroupWithGrouping() throws Exception { + setupDAGVertexGroup(true); + + for (int i = 0; i < 3; i++) { + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED)); + } + + vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v1", 0)); + verify(ctx, never()).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); + + vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1)); + for (int i = 0; i < 40; i++) { + vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v2", i)); + } + + verify(ctx, times(1)).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, new int[]{10, 31}, 10, 0); + verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, new int[]{10, 31}, 30, 0); + verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, new int[]{10, 31}, 1, 30); + + vertexManager.onVertexStarted(null); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v1", 10)); + vertexManager.onSourceTaskCompleted(getTaId("v2", 0)); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); + verifyScheduleRequest(1, 10); + for (int i = 1; i < 40; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v2", i)); + } + verifyScheduleRequest(2, 30); } - private void testOnVertexStartHelper(boolean broadcastRunning) throws Exception { + @Test(timeout = 5000) + public void testDAGVertexGroupOnly() throws Exception { + setupDAGVertexGroupOnly(false); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED)); + verify(ctx, never()).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - if (broadcastRunning) { - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v3", VertexState.CONFIGURED)); + verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(), + isNull(VertexLocationHint.class), edgePropertiesCaptor.capture()); + assertEquals(45, (int) parallelismCaptor.getValue()); + Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); + + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, new int[]{5, 9}, 2, 0); + verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, new int[]{5, 9}, 3, 2); + verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, new int[]{5, 9}, 4, 0); + verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, new int[]{5, 9}, 5, 4); + + vertexManager.onVertexStarted(null); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); + vertexManager.onSourceTaskCompleted(getTaId("v2", 3)); + verifyScheduleRequest(1, 12); + vertexManager.onSourceTaskCompleted(getTaId("v1", 2)); + verifyScheduleRequest(2, 39); + vertexManager.onSourceTaskCompleted(getTaId("v3", 0)); + verifyScheduleRequest(3, 13, 40); + } + + @Test(timeout = 5000) + public void testDAGVertexGroupOnlyWithGrouping() throws Exception { + setupDAGVertexGroupOnly(true); + + for (int i = 0; i < 4; i++) { + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED)); } - List<TaskAttemptIdentifier> completions = new ArrayList<>(); - completions.add(allCompletions.get(0)); - completions.add(allCompletions.get(2)); - completions.add(allCompletions.get(5)); - vertexManager.onVertexStarted(completions); + vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v2", 0)); + verify(ctx, never()).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); - if (!broadcastRunning) { - verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any()); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING)); + vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1)); + for (int i = 0; i < 5; i++) { + vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup/5, "v1", i)); + } + for (int i = 0; i < 50; i++) { + vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v3", i)); } - verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture()); - List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue(); - assertNotNull(requests); - assertEquals(1, requests.size()); - assertEquals(0, requests.get(0).getTaskIndex()); + verify(ctx, times(1)).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, new int[]{16, 41}, 10, 0); + verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, new int[]{16, 41}, 6, 10); + verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, new int[]{16, 41}, 40, 0); + verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, new int[]{16, 41}, 1, 40); + + vertexManager.onVertexStarted(null); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); + vertexManager.onSourceTaskCompleted(getTaId("v2", 20)); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + verifyScheduleRequest(1, 20); + vertexManager.onSourceTaskCompleted(getTaId("v3", 0)); + verifyScheduleRequest(1); + for (int i = 1; i < 50; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v3", i)); + } + verifyScheduleRequest(2, 40); + vertexManager.onSourceTaskCompleted(getTaId("v1", 5)); + verifyScheduleRequest(2); + for (int i = 6; i < 10; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v1", i)); + } + verifyScheduleRequest(3, 471, 491); } @Test(timeout = 5000) - public void testOnVertexStartWithBroadcastRunning() throws Exception { - testOnVertexStartHelper(true); + public void testSchedulingVertexOnlyWithBroadcast() throws Exception { + setupDAGVertexOnlyWithBroadcast(); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + vertexManager.onVertexStarted(null); + + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v1", 1)); + verifyScheduleRequest(0); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING)); + verifyScheduleRequest(1); + verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture()); } @Test(timeout = 5000) - public void testOnVertexStartWithoutBroadcastRunning() throws Exception { - testOnVertexStartHelper(false); + public void testOnVertexStart() throws Exception { + setupDAGVertexOnly(false); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + vertexManager.onVertexStarted(Arrays.asList(getTaId("v0", 0), getTaId("v1", 0))); + verifyScheduleRequest(1, 0); } @Test(timeout = 5000) public void testZeroSrcTask() throws Exception { - context = mock(VertexManagerPluginContext.class); - vertexManager = new CartesianProductVertexManagerUnpartitioned(context); - when(context.getVertexNumTasks(eq("v0"))).thenReturn(2); - when(context.getVertexNumTasks(eq("v1"))).thenReturn(0); + ctx = mock(VertexManagerPluginContext.class); + vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx); + when(ctx.getVertexNumTasks(eq("v0"))).thenReturn(2); + when(ctx.getVertexNumTasks(eq("v1"))).thenReturn(0); CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig( @@ -216,144 +448,13 @@ public class TestCartesianProductVertexManagerUnpartitioned { CartesianProductEdgeManager.class.getName()), null, null, null, null)); edgePropertyMap.put("v1", EdgeProperty.create(EdgeManagerPluginDescriptor.create( CartesianProductEdgeManager.class.getName()), null, null, null, null)); - when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); + when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); vertexManager.initialize(config); - allCompletions = new ArrayList<>(); - allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0", - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 0), 0), 0))); - allCompletions.add(new TaskAttemptIdentifierImpl("dag", "v0", - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 0), 1), 0))); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>()); - vertexManager.onSourceTaskCompleted(allCompletions.get(0)); - vertexManager.onSourceTaskCompleted(allCompletions.get(1)); - } - - @Test(timeout = 5000) - public void testAutoGrouping() throws Exception { - testAutoGroupingHelper(false); - testAutoGroupingHelper(true); - } - - private void testAutoGroupingHelper(boolean enableAutoGrouping) throws Exception { - int numTaskV0 = 20; - int numTaskV1 = 10; - long desiredBytesPerGroup = 1000; - long outputBytesPerTaskV0 = 500; - long outputBytesPerTaskV1 = 10; - int expectedNumGroupV0 = 10; - int expectedNumGroupV1 = 1; - ArgumentCaptor<Integer> parallelismCaptor = ArgumentCaptor.forClass(Integer.class); - CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig( - false, new String[]{"v0","v1"}, null, 0, 0, enableAutoGrouping, desiredBytesPerGroup, null); - Map<String, EdgeProperty> edgePropertyMap = new HashMap<>(); - EdgeProperty edgeProperty = EdgeProperty.create(EdgeManagerPluginDescriptor.create( - CartesianProductEdgeManager.class.getName()), null, null, null, null); - edgePropertyMap.put("v0", edgeProperty); - edgePropertyMap.put("v1", edgeProperty); - edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, null)); - when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); - when(context.getVertexNumTasks(eq("v0"))).thenReturn(2); - when(context.getVertexNumTasks(eq("v1"))).thenReturn(3); - - context = mock(VertexManagerPluginContext.class); - vertexManager = new CartesianProductVertexManagerUnpartitioned(context); - when(context.getVertexNumTasks(eq("v0"))).thenReturn(numTaskV0); - when(context.getVertexNumTasks(eq("v1"))).thenReturn(numTaskV1); - when(context.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); - - vertexManager.initialize(config); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING)); - if (!enableAutoGrouping) { - // auto grouping disabled, shouldn't auto group - verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(), - isNull(VertexLocationHint.class), edgePropertiesCaptor.capture()); - assertEquals(numTaskV0 * numTaskV1, parallelismCaptor.getValue().intValue()); - return; - } - - // not enough input size, shouldn't auto group - verify(context, never()).reconfigureVertex(anyInt(), any(VertexLocationHint.class), - anyMapOf(String.class, EdgeProperty.class)); - - // only v0 reach threshold or finish all task, shouldn't auto group - VertexManagerEventPayloadProto.Builder builder = VertexManagerEventPayloadProto.newBuilder(); - builder.setOutputSize(outputBytesPerTaskV0); - VertexManagerEventPayloadProto proto = builder.build(); - VertexManagerEvent vmEvent = - VertexManagerEvent.create("cp vertex", proto.toByteString().asReadOnlyByteBuffer()); - - for (int i = 0; i < desiredBytesPerGroup/outputBytesPerTaskV0; i++) { - vmEvent.setProducerAttemptIdentifier( - new TaskAttemptIdentifierImpl("dag", "v0", TezTaskAttemptID.fromString( - String.format("attempt_1441301219877_0109_1_00_%06d_0", i)))); - vertexManager.onVertexManagerEventReceived(vmEvent); - } - verify(context, never()).reconfigureVertex(anyInt(), any(VertexLocationHint.class), - anyMapOf(String.class, EdgeProperty.class)); - - // vmEvent from broadcast vertex shouldn't matter - vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "v2", - TezTaskAttemptID.fromString("attempt_1441301219877_0109_1_00_000000_0"))); - vertexManager.onVertexManagerEventReceived(vmEvent); - - // v1 finish all tasks but still doesn't reach threshold, auto group anyway - proto = builder.setOutputSize(outputBytesPerTaskV1).build(); - vmEvent = VertexManagerEvent.create("cp vertex", proto.toByteString().asReadOnlyByteBuffer()); - for (int i = 0; i < numTaskV1; i++) { - verify(context, never()).reconfigureVertex(anyInt(), any(VertexLocationHint.class), - anyMapOf(String.class, EdgeProperty.class)); - vmEvent.setProducerAttemptIdentifier( - new TaskAttemptIdentifierImpl("dag", "v1", TezTaskAttemptID.fromString( - String.format("attempt_1441301219877_0109_1_01_%06d_0", i)))); - vertexManager.onVertexManagerEventReceived(vmEvent); - } - verify(context, times(1)).reconfigureVertex(parallelismCaptor.capture(), - isNull(VertexLocationHint.class), edgePropertiesCaptor.capture()); - Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); - for (EdgeProperty property : edgeProperties.values()) { - UserPayload payload = property.getEdgeManagerDescriptor().getUserPayload(); - CartesianProductEdgeManagerConfig newConfig = - CartesianProductEdgeManagerConfig.fromUserPayload(payload); - assertArrayEquals(new int[]{numTaskV0, numTaskV1}, newConfig.getNumTasks()); - assertArrayEquals(new int[]{expectedNumGroupV0,expectedNumGroupV1}, newConfig.getNumGroups()); - } - - assertEquals(expectedNumGroupV0 * expectedNumGroupV1, parallelismCaptor.getValue().intValue()); - for (EdgeProperty property : edgePropertiesCaptor.getValue().values()) { - CartesianProductEdgeManagerConfig emConfig = - CartesianProductEdgeManagerConfig.fromUserPayload( - property.getEdgeManagerDescriptor().getUserPayload()); - assertArrayEquals(new int[] {numTaskV0, numTaskV1}, emConfig.getNumTasks()); - assertArrayEquals(new int[] {expectedNumGroupV0, expectedNumGroupV1}, emConfig.getNumGroups()); - } - - vertexManager.onVertexStarted(null); - // v0 t0 finish, shouldn't schedule - vertexManager.onSourceTaskCompleted(allCompletions.get(0)); - verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any()); - - // v1 all task finish, shouldn't schedule - for (int i = 0; i < numTaskV1; i++) { - vertexManager.onSourceTaskCompleted(new TaskAttemptIdentifierImpl("dag", "v1", - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 1), i), 0))); - verify(context, never()).scheduleTasks(Matchers.<List<ScheduleTaskRequest>>any()); - } - - // v0 t1 finish, should schedule - vertexManager.onSourceTaskCompleted(allCompletions.get(1)); - verify(context, times(1)).scheduleTasks(scheduleTaskRequestCaptor.capture()); - List<ScheduleTaskRequest> requests = scheduleTaskRequestCaptor.getValue(); - assertNotNull(requests); - assertEquals(1, requests.size()); - assertEquals(0, requests.get(0).getTaskIndex()); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); } } \ No newline at end of file
