Repository: tez Updated Branches: refs/heads/master dec7c1b52 -> a55fe80bf
http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java deleted file mode 100644 index f95daa7..0000000 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestCartesianProductVertexManagerUnpartitioned.java +++ /dev/null @@ -1,460 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tez.runtime.library.cartesianproduct; - -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; -import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.VertexLocationHint; -import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; -import org.apache.tez.dag.api.event.VertexState; -import org.apache.tez.dag.api.event.VertexStateUpdate; -import org.apache.tez.dag.records.TaskAttemptIdentifierImpl; -import org.apache.tez.dag.records.TezDAGID; -import org.apache.tez.dag.records.TezTaskAttemptID; -import org.apache.tez.dag.records.TezTaskID; -import org.apache.tez.dag.records.TezVertexID; -import org.apache.tez.runtime.api.TaskAttemptIdentifier; -import org.apache.tez.runtime.api.events.VertexManagerEvent; -import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.MockitoAnnotations; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.BROADCAST; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyMapOf; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class TestCartesianProductVertexManagerUnpartitioned { - private static long desiredBytesPerGroup = 1000; - @Captor - private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor; - @Captor - private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleRequestCaptor; - @Captor - private ArgumentCaptor<Integer> parallelismCaptor; - private CartesianProductVertexManagerUnpartitioned vertexManager; - private VertexManagerPluginContext ctx; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - ctx = mock(VertexManagerPluginContext.class); - vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx); - } - - /** - * v0 and v1 are two cartesian product sources - */ - private void setupDAGVertexOnly(boolean doGrouping) throws Exception { - when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2)); - setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3); - - CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig( - false, new String[]{"v0","v1"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null); - vertexManager.initialize(config); - } - - /** - * v0 and v1 are two cartesian product sources; v2 is broadcast source; without auto grouping - */ - private void setupDAGVertexOnlyWithBroadcast() throws Exception { - Map<String, EdgeProperty> edgePropertyMap = getEdgePropertyMap(2); - edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, null)); - when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); - setSrcParallelism(ctx, 2, 3, 5); - - CartesianProductVertexManagerConfig config = - new CartesianProductVertexManagerConfig( - false, new String[]{"v0","v1"}, null, 0, 0, false, 0, null); - vertexManager.initialize(config); - } - - /** - * v0 and g0 are two sources; g0 is vertex group of v1 and v2 - */ - private void setupDAGVertexGroup(boolean doGrouping) throws Exception { - when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3)); - setSrcParallelism(ctx, doGrouping ? 10: 1, 2, 3, 4); - - Map<String, List<String>> vertexGroupMap = new HashMap<>(); - vertexGroupMap.put("g0", Arrays.asList("v1", "v2")); - when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap); - - CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig( - false, new String[]{"v0","g0"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null); - vertexManager.initialize(config); - } - - /** - * g0 and g1 are two sources; g0 is vertex group of v0 and v1; g1 is vertex group of v2 and v3 - */ - private void setupDAGVertexGroupOnly(boolean doGrouping) throws Exception { - when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(4)); - setSrcParallelism(ctx, doGrouping ? 10 : 1, 2, 3, 4, 5); - - Map<String, List<String>> vertexGroupMap = new HashMap<>(); - vertexGroupMap.put("g0", Arrays.asList("v0", "v1")); - vertexGroupMap.put("g1", Arrays.asList("v2", "v3")); - when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap); - - CartesianProductVertexManagerConfig config = new CartesianProductVertexManagerConfig( - false, new String[]{"g0","g1"}, null, 0, 0, doGrouping, desiredBytesPerGroup, null); - vertexManager.initialize(config); - } - - private Map<String, EdgeProperty> getEdgePropertyMap(int numSrcV) { - Map<String, EdgeProperty> edgePropertyMap = new HashMap<>(); - for (int i = 0; i < numSrcV; i++) { - edgePropertyMap.put("v"+i, EdgeProperty.create(EdgeManagerPluginDescriptor.create( - CartesianProductEdgeManager.class.getName()), null, null, null, null)); - } - return edgePropertyMap; - } - - private void setSrcParallelism(VertexManagerPluginContext ctx, int multiplier, int... numTasks) { - int i = 0; - for (int numTask : numTasks) { - when(ctx.getVertexNumTasks(eq("v"+i))).thenReturn(numTask * multiplier); - i++; - } - } - - private TaskAttemptIdentifier getTaId(String vertexName, int taskId) { - return new TaskAttemptIdentifierImpl("dag", vertexName, - TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( - TezDAGID.getInstance("0", 0, 0), 0), taskId), 0)); - } - - private VertexManagerEvent getVMEevnt(long outputSize, String vName, int taskId) { - - VertexManagerEventPayloadProto.Builder builder = VertexManagerEventPayloadProto.newBuilder(); - builder.setOutputSize(outputSize); - VertexManagerEvent vmEvent = - VertexManagerEvent.create("cp vertex", builder.build().toByteString().asReadOnlyByteBuffer()); - vmEvent.setProducerAttemptIdentifier(getTaId(vName, taskId)); - return vmEvent; - } - - private void verifyEdgeProperties(EdgeProperty edgeProperty, String[] sources, - int[] numChunksPerSrc, int numChunk, int chunkIdOffset) - throws InvalidProtocolBufferException { - CartesianProductEdgeManagerConfig conf = CartesianProductEdgeManagerConfig.fromUserPayload( - edgeProperty.getEdgeManagerDescriptor().getUserPayload()); - assertArrayEquals(sources, conf.getSourceVertices().toArray()); - assertArrayEquals(numChunksPerSrc, conf.numChunksPerSrc); - assertEquals(numChunk, conf.numChunk); - assertEquals(chunkIdOffset, conf.chunkIdOffset); - } - - private void verifyScheduleRequest(int expectedTimes, int... expectedTid) { - verify(ctx, times(expectedTimes)).scheduleTasks(scheduleRequestCaptor.capture()); - if (expectedTimes > 0) { - List<ScheduleTaskRequest> requests = scheduleRequestCaptor.getValue(); - int i = 0; - for (int tid : expectedTid) { - assertEquals(tid, requests.get(i).getTaskIndex()); - i++; - } - } - } - - @Test(timeout = 5000) - public void testDAGVertexOnly() throws Exception { - setupDAGVertexOnly(false); - - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); - verify(ctx, never()).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); - - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(), - isNull(VertexLocationHint.class), edgePropertiesCaptor.capture()); - assertEquals(6, (int) parallelismCaptor.getValue()); - Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); - verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{2, 3}, 2, 0); - verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{2, 3}, 3, 0); - - vertexManager.onVertexStarted(null); - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); - verify(ctx, never()).scheduleTasks(scheduleRequestCaptor.capture()); - vertexManager.onSourceTaskCompleted(getTaId("v1", 0)); - verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture()); - verifyScheduleRequest(1, 0); - - vertexManager.onSourceTaskCompleted(getTaId("v1", 1)); - verify(ctx, times(2)).scheduleTasks(scheduleRequestCaptor.capture()); - verifyScheduleRequest(2, 1); - } - - @Test(timeout = 5000) - public void testDAGVertexOnlyWithGrouping() throws Exception { - setupDAGVertexOnly(true); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - - vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0)); - verify(ctx, never()).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); - - vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", 0)); - verify(ctx, never()).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); - - vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1)); - for (int i = 1; i < 30; i++) { - vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v1", i)); - } - verify(ctx, times(1)).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); - Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); - verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{10, 1}, 10, 0); - verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{10, 1}, 1, 0); - - vertexManager.onVertexStarted(null); - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); - vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); - for (int i = 0; i < 29; i++) { - vertexManager.onSourceTaskCompleted(getTaId("v1", i)); - } - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v1", 29)); - verifyScheduleRequest(1, 0); - } - - @Test(timeout = 5000) - public void testDAGVertexGroup() throws Exception { - setupDAGVertexGroup(false); - - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - verify(ctx, never()).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); - - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED)); - verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(), - isNull(VertexLocationHint.class), edgePropertiesCaptor.capture()); - assertEquals(14, (int) parallelismCaptor.getValue()); - Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); - - verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, new int[]{2, 7}, 2, 0); - verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, new int[]{2, 7}, 3, 0); - verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, new int[]{2, 7}, 4, 3); - - vertexManager.onVertexStarted(null); - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v0",1)); - vertexManager.onSourceTaskCompleted(getTaId("v1",2)); - verifyScheduleRequest(1, 9); - vertexManager.onSourceTaskCompleted(getTaId("v2", 0)); - verifyScheduleRequest(2, 10); - } - - @Test(timeout = 5000) - public void testDAGVertexGroupWithGrouping() throws Exception { - setupDAGVertexGroup(true); - - for (int i = 0; i < 3; i++) { - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED)); - } - - vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0)); - vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v1", 0)); - verify(ctx, never()).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); - - vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1)); - for (int i = 0; i < 40; i++) { - vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v2", i)); - } - - verify(ctx, times(1)).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); - Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); - verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "g0"}, new int[]{10, 31}, 10, 0); - verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "g0"}, new int[]{10, 31}, 30, 0); - verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"v0", "g0"}, new int[]{10, 31}, 1, 30); - - vertexManager.onVertexStarted(null); - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); - vertexManager.onSourceTaskCompleted(getTaId("v1", 10)); - vertexManager.onSourceTaskCompleted(getTaId("v2", 0)); - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); - verifyScheduleRequest(1, 10); - for (int i = 1; i < 40; i++) { - vertexManager.onSourceTaskCompleted(getTaId("v2", i)); - } - verifyScheduleRequest(2, 30); - } - - @Test(timeout = 5000) - public void testDAGVertexGroupOnly() throws Exception { - setupDAGVertexGroupOnly(false); - - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED)); - verify(ctx, never()).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); - - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v3", VertexState.CONFIGURED)); - verify(ctx, times(1)).reconfigureVertex(parallelismCaptor.capture(), - isNull(VertexLocationHint.class), edgePropertiesCaptor.capture()); - assertEquals(45, (int) parallelismCaptor.getValue()); - Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); - - verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, new int[]{5, 9}, 2, 0); - verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, new int[]{5, 9}, 3, 2); - verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, new int[]{5, 9}, 4, 0); - verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, new int[]{5, 9}, 5, 4); - - vertexManager.onVertexStarted(null); - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); - vertexManager.onSourceTaskCompleted(getTaId("v2", 3)); - verifyScheduleRequest(1, 12); - vertexManager.onSourceTaskCompleted(getTaId("v1", 2)); - verifyScheduleRequest(2, 39); - vertexManager.onSourceTaskCompleted(getTaId("v3", 0)); - verifyScheduleRequest(3, 13, 40); - } - - @Test(timeout = 5000) - public void testDAGVertexGroupOnlyWithGrouping() throws Exception { - setupDAGVertexGroupOnly(true); - - for (int i = 0; i < 4; i++) { - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED)); - } - - vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v0", 0)); - vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup, "v2", 0)); - verify(ctx, never()).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); - - vertexManager.onVertexManagerEventReceived(getVMEevnt(0, "v0", 1)); - for (int i = 0; i < 5; i++) { - vertexManager.onVertexManagerEventReceived(getVMEevnt(desiredBytesPerGroup/5, "v1", i)); - } - for (int i = 0; i < 50; i++) { - vertexManager.onVertexManagerEventReceived(getVMEevnt(1, "v3", i)); - } - - verify(ctx, times(1)).reconfigureVertex( - anyInt(), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); - Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); - verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"g0", "g1"}, new int[]{16, 41}, 10, 0); - verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"g0", "g1"}, new int[]{16, 41}, 6, 10); - verifyEdgeProperties(edgeProperties.get("v2"), new String[]{"g0", "g1"}, new int[]{16, 41}, 40, 0); - verifyEdgeProperties(edgeProperties.get("v3"), new String[]{"g0", "g1"}, new int[]{16, 41}, 1, 40); - - vertexManager.onVertexStarted(null); - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); - vertexManager.onSourceTaskCompleted(getTaId("v2", 20)); - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); - verifyScheduleRequest(1, 20); - vertexManager.onSourceTaskCompleted(getTaId("v3", 0)); - verifyScheduleRequest(1); - for (int i = 1; i < 50; i++) { - vertexManager.onSourceTaskCompleted(getTaId("v3", i)); - } - verifyScheduleRequest(2, 40); - vertexManager.onSourceTaskCompleted(getTaId("v1", 5)); - verifyScheduleRequest(2); - for (int i = 6; i < 10; i++) { - vertexManager.onSourceTaskCompleted(getTaId("v1", i)); - } - verifyScheduleRequest(3, 471, 491); - } - - @Test(timeout = 5000) - public void testSchedulingVertexOnlyWithBroadcast() throws Exception { - setupDAGVertexOnlyWithBroadcast(); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - vertexManager.onVertexStarted(null); - - verifyScheduleRequest(0); - vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); - vertexManager.onSourceTaskCompleted(getTaId("v1", 1)); - verifyScheduleRequest(0); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING)); - verifyScheduleRequest(1); - verify(ctx, times(1)).scheduleTasks(scheduleRequestCaptor.capture()); - } - - @Test(timeout = 5000) - public void testOnVertexStart() throws Exception { - setupDAGVertexOnly(false); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - - vertexManager.onVertexStarted(Arrays.asList(getTaId("v0", 0), getTaId("v1", 0))); - verifyScheduleRequest(1, 0); - } - - @Test(timeout = 5000) - public void testZeroSrcTask() throws Exception { - ctx = mock(VertexManagerPluginContext.class); - vertexManager = new CartesianProductVertexManagerUnpartitioned(ctx); - when(ctx.getVertexNumTasks(eq("v0"))).thenReturn(2); - when(ctx.getVertexNumTasks(eq("v1"))).thenReturn(0); - - CartesianProductVertexManagerConfig config = - new CartesianProductVertexManagerConfig( - false, new String[]{"v0","v1"}, null, 0, 0, false, 0, null); - Map<String, EdgeProperty> edgePropertyMap = new HashMap<>(); - edgePropertyMap.put("v0", EdgeProperty.create(EdgeManagerPluginDescriptor.create( - CartesianProductEdgeManager.class.getName()), null, null, null, null)); - edgePropertyMap.put("v1", EdgeProperty.create(EdgeManagerPluginDescriptor.create( - CartesianProductEdgeManager.class.getName()), null, null, null, null)); - when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); - - vertexManager.initialize(config); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); - vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); - vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>()); - vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); - vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductEdgeManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductEdgeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductEdgeManager.java new file mode 100644 index 0000000..ac7262e --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductEdgeManager.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import org.apache.tez.dag.api.EdgeManagerPluginContext; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.EventRouteMetadata; +import org.apache.tez.dag.api.EdgeManagerPluginOnDemand.CompositeEventRouteMetadata; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestFairCartesianProductEdgeManager { + private EdgeManagerPluginContext mockContext; + private FairCartesianProductEdgeManager edgeManager; + + @Before + public void setup() { + mockContext = mock(EdgeManagerPluginContext.class); + edgeManager = new FairCartesianProductEdgeManager(mockContext); + } + + static class TestData { + int srcId, destId, inputId; + Object expected; + + public TestData(int srcId, int destId, int inputId, Object expected) { + this.srcId = srcId; + this.destId = destId; + this.inputId = inputId; + this.expected = expected; + } + } + + private TestData dataForRouting(int srcId, int destId, Object expected) { + return new TestData(srcId, destId, -1, expected); + } + + private TestData dataForInputError(int destId, int inputId, Object expected) { + return new TestData(-1, destId, inputId, expected); + } + + private TestData dataForSrc(int srcId, Object expected) { + return new TestData(srcId, -1, -1, expected); + } + + private TestData dataForDest(int destId, Object expected) { + return new TestData(-1, destId, -1, expected); + } + + private void testEdgeManager(CartesianProductConfigProto conf, String vName, int numTask, + String groupName, TestData cDMEInvalid, TestData cDMEValid, + TestData srcFailInvalid, TestData srcFailValid, + TestData inputError, TestData numDestInput, + TestData numSrcOutputTest, TestData numConsumerTest) + throws Exception { + when(mockContext.getSourceVertexName()).thenReturn(vName); + when(mockContext.getSourceVertexNumTasks()).thenReturn(numTask); + when(mockContext.getVertexGroupName()).thenReturn(groupName); + edgeManager.initialize(conf); + + CompositeEventRouteMetadata cDME; + + if (cDMEInvalid != null) { + cDME = edgeManager.routeCompositeDataMovementEventToDestination(cDMEInvalid.srcId, + cDMEInvalid.destId); + assertNull(cDME); + } + + cDME = edgeManager.routeCompositeDataMovementEventToDestination(cDMEValid.srcId, + cDMEValid.destId); + assertNotNull(cDME); + CompositeEventRouteMetadata expectedCDME = (CompositeEventRouteMetadata)(cDMEValid.expected); + assertEquals(expectedCDME.getCount(), cDME.getCount()); + assertEquals(expectedCDME.getTarget(), cDME.getTarget()); + assertEquals(expectedCDME.getSource(), cDME.getSource()); + + EventRouteMetadata dme; + if (srcFailInvalid != null) { + dme = edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailInvalid.srcId, + srcFailInvalid.destId); + assertNull(dme); + } + + dme = edgeManager.routeInputSourceTaskFailedEventToDestination(srcFailValid.srcId, + srcFailValid.destId); + assertNotNull(dme); + EventRouteMetadata expectedDME = (EventRouteMetadata)(srcFailValid.expected); + assertEquals(expectedDME.getNumEvents(), dme.getNumEvents()); + assertArrayEquals(expectedDME.getTargetIndices(), dme.getTargetIndices()); + + assertEquals(inputError.expected, + edgeManager.routeInputErrorEventToSource(inputError.destId, inputError.inputId)); + + assertEquals(numDestInput.expected, + edgeManager.getNumDestinationTaskPhysicalInputs(numDestInput.destId)); + assertEquals(numSrcOutputTest.expected, + edgeManager.getNumSourceTaskPhysicalOutputs(numSrcOutputTest.srcId)); + assertEquals(numConsumerTest.expected, + edgeManager.getNumDestinationConsumerTasks(numConsumerTest.srcId)); + } + + /** + * Vertex v0 has 2 tasks, 2 chunks + * Vertex v1 has 30 tasks, 3 chunks + */ + @Test(timeout = 5000) + public void testTwoWayAllVertex() throws Exception { + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("v1") + .addNumChunks(2).addNumChunks(3).setMaxParallelism(10).setNumPartitionsForFairCase(10); + CartesianProductConfigProto config = builder.build(); + testEdgeManager(config, "v0", 2, null, dataForRouting(1, 1, null), + dataForRouting(1, 3, CompositeEventRouteMetadata.create(10, 0, 0)), + dataForRouting(1, 1, null), + dataForRouting(1, 3, EventRouteMetadata.create(10, new int[]{0,1,2,3,4,5,6,7,8,9})), + dataForInputError(1, 0, 0), dataForDest(1, 10), dataForSrc(1, 10), dataForSrc(1, 3)); + testEdgeManager(config, "v1", 30, null, dataForRouting(1, 2, null), + dataForRouting(1, 0, CompositeEventRouteMetadata.create(10, 10, 0)), + dataForRouting(1, 2, null), + dataForRouting(1, 0, EventRouteMetadata.create(10, new int[]{10,11,12,13,14,15,16,17,18,19})), + dataForInputError(1,0,10), dataForDest(1, 100), dataForSrc(1, 10), dataForSrc(1, 2)); + } + + /** + * Vertex v0 has 2 tasks, 2 chunks + * Vertex v1 has 30 tasks, 3 chunks + * Vertex v2 has 1 tasks, 4 chunks + */ + @Test(timeout = 5000) + public void testThreeWayAllVertex() throws Exception { + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("v1").addSources("v2") + .addNumChunks(2).addNumChunks(3).addNumChunks(4) + .setMaxParallelism(12).setNumPartitionsForFairCase(12); + CartesianProductConfigProto config = builder.build(); + testEdgeManager(config, "v0", 2, null, dataForRouting(1, 1, null), + dataForRouting(1, 12, CompositeEventRouteMetadata.create(12, 0, 0)), + dataForRouting(1, 1, null), + dataForRouting(1, 12, EventRouteMetadata.create(12, new int[]{0,1,2,3,4,5,6,7,8,9,10,11})), + dataForInputError(1, 0, 0), dataForDest(1, 12), dataForSrc(1, 12), dataForSrc(1, 12)); + testEdgeManager(config, "v1", 30, null, dataForRouting(1, 4, null), + dataForRouting(1, 13, CompositeEventRouteMetadata.create(12, 12, 0)), + dataForRouting(1, 4, null), + dataForRouting(1, 13, + EventRouteMetadata.create(12, new int[]{12,13,14,15,16,17,18,19,20,21,22,23})), + dataForInputError(1, 0, 0), dataForDest(1, 120), dataForSrc(1, 12), dataForSrc(1, 8)); + testEdgeManager(config, "v2", 1, null, + null, dataForRouting(0, 13, CompositeEventRouteMetadata.create(3, 0, 3)), + null, dataForRouting(0, 13, EventRouteMetadata.create(3, new int[]{0,1,2})), + dataForInputError(1, 0, 0), dataForDest(1, 3), dataForSrc(0, 12), dataForSrc(0, 24)); + } + + /** + * v0 with group g0 {v1, v2} + * Vertex v0 has 2 chunks + * Vertex v1 has 10 tasks + * Vertex v2 has 20 tasks + * Group g0 has 3 chunks + */ + @Test(timeout = 5000) + public void testTwoWayVertexWithVertexGroup() throws Exception { + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("g0") + .addNumChunks(2).addNumChunks(3).setPositionInGroup(10).setNumPartitionsForFairCase(10) + .addNumTaskPerVertexInGroup(10).addNumTaskPerVertexInGroup(20).setPositionInGroup(0); + testEdgeManager(builder.build(), "v1", 10, "g0", dataForRouting(0, 4, null), + dataForRouting(0, 3, CompositeEventRouteMetadata.create(10, 0, 0)), + dataForRouting(0, 4, null), + dataForRouting(0, 3, EventRouteMetadata.create(10, new int[]{0,1,2,3,4,5,6,7,8,9})), + dataForInputError(3, 0, 0), dataForDest(2, 34), dataForSrc(0, 10), dataForSrc(0, 2)); + builder.setPositionInGroup(1); + testEdgeManager(builder.build(), "v2", 20, "g0", dataForRouting(1, 1, null), + dataForRouting(6, 1, CompositeEventRouteMetadata.create(4, 33, 6)), + dataForRouting(1, 1, null), + dataForRouting(6, 1, EventRouteMetadata.create(4, new int[]{33,34,35,36})), + dataForInputError(1, 33, 6), dataForDest(0, 66), dataForSrc(1, 10), dataForSrc(6, 4)); + } + + /** + * group g0 {v1, v2} with group g1 {v3, v4} + * + * Vertex v0 has 2 tasks + * Vertex v1 has 4 tasks + * Group g0 has 2 chunks + * Group g1 has 3 chunks + */ + @Test(timeout = 5000) + public void testTwoWayAllVertexGroup() throws Exception { + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("g0").addSources("g1") + .addNumChunks(2).addNumChunks(3).setMaxParallelism(10).setNumPartitionsForFairCase(10) + .addNumTaskPerVertexInGroup(2).addNumTaskPerVertexInGroup(5).setPositionInGroup(0); + testEdgeManager(builder.build(), "v0", 2, "g0", dataForRouting(1, 1, null), + dataForRouting(0, 1, CompositeEventRouteMetadata.create(10, 0, 0)), + dataForRouting(1, 1, null), + dataForRouting(0, 1, EventRouteMetadata.create(10, new int[]{0,1,2,3,4,5,6,7,8,9})), + dataForInputError(1, 0, 0), dataForDest(1, 10), dataForSrc(1, 10), dataForSrc(1, 3)); + builder.setPositionInGroup(1); + testEdgeManager(builder.build(), "v1", 5, "g0", dataForRouting(3, 1, null), + dataForRouting(1, 1, CompositeEventRouteMetadata.create(10, 20, 0)), + dataForRouting(3, 1, null), + dataForRouting(1, 1, EventRouteMetadata.create(10, new int[]{20,21,22,23,24,25,26,27,28,29})), + dataForInputError(1, 15, 0), dataForDest(1, 25), dataForSrc(1, 10), dataForSrc(1, 3)); + } + + @Test(timeout = 5000) + public void testNumPartition() throws Exception { + when(mockContext.getSourceVertexName()).thenReturn("source"); + when(mockContext.getSourceVertexNumTasks()).thenReturn(10); + when(mockContext.getVertexGroupName()).thenReturn(null); + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(100); + + edgeManager.initialize(builder.build()); + assertEquals(10, edgeManager.getNumSourceTaskPhysicalOutputs(0)); + + builder.setNumPartitionsForFairCase(20); + edgeManager = new FairCartesianProductEdgeManager(mockContext); + edgeManager.initialize(builder.build()); + assertEquals(20, edgeManager.getNumSourceTaskPhysicalOutputs(0)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java new file mode 100644 index 0000000..01d7f0b --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.java @@ -0,0 +1,500 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.cartesianproduct; + +import com.google.common.primitives.Ints; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.VertexLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; +import org.apache.tez.dag.api.event.VertexState; +import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.records.TaskAttemptIdentifierImpl; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload.CartesianProductConfigProto; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.tez.dag.api.EdgeProperty.DataMovementType.BROADCAST; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyMapOf; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class TestFairCartesianProductVertexManager { + @Captor + private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor; + @Captor + private ArgumentCaptor<List<ScheduleTaskRequest>> scheduleRequestCaptor; + private FairCartesianProductVertexManager vertexManager; + private VertexManagerPluginContext ctx; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + ctx = mock(VertexManagerPluginContext.class); + vertexManager = new FairCartesianProductVertexManager(ctx); + } + + /** + * v0 and v1 are two cartesian product sources + */ + private void setupDAGVertexOnly(int maxParallelism, long minOpsPerWorker, + int srcParallelismMultiplier) throws Exception { + when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2)); + setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3); + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("v1") + .setMaxParallelism(maxParallelism).setMinOpsPerWorker(minOpsPerWorker) + .setNumPartitionsForFairCase(maxParallelism); + vertexManager.initialize(builder.build()); + } + + /** + * v0 and v1 are two cartesian product sources; v2 is broadcast source + */ + private void setupDAGVertexOnlyWithBroadcast(int maxParallelism, long minWorkloadPerWorker, + int srcParallelismMultiplier) throws Exception { + Map<String, EdgeProperty> edgePropertyMap = getEdgePropertyMap(2); + edgePropertyMap.put("v2", EdgeProperty.create(BROADCAST, null, null, null, null)); + when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); + setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3, 5); + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("v1") + .setMaxParallelism(maxParallelism).setMinOpsPerWorker(minWorkloadPerWorker) + .setNumPartitionsForFairCase(maxParallelism); + vertexManager.initialize(builder.build()); + } + + /** + * v0 and g0 are two sources; g0 is vertex group of v1 and v2 + */ + private void setupDAGVertexGroup(int maxParallelism, long minWorkloadPerWorker, + int srcParallelismMultiplier) throws Exception { + when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3)); + setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3, 4); + + Map<String, List<String>> vertexGroupMap = new HashMap<>(); + vertexGroupMap.put("g0", Arrays.asList("v1", "v2")); + when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap); + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("g0") + .setNumPartitionsForFairCase(maxParallelism).setMaxParallelism(maxParallelism) + .setMinOpsPerWorker(minWorkloadPerWorker); + vertexManager.initialize(builder.build()); + } + + /** + * g0 and g1 are two sources; g0 is vertex group of v0 and v1; g1 is vertex group of v2 and v3 + */ + private void setupDAGVertexGroupOnly(int maxParallelism, long minWorkloadPerWorker, + int srcParallelismMultiplier) throws Exception { + when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(4)); + setSrcParallelism(ctx, srcParallelismMultiplier, 2, 3, 4, 5); + + Map<String, List<String>> vertexGroupMap = new HashMap<>(); + vertexGroupMap.put("g0", Arrays.asList("v0", "v1")); + vertexGroupMap.put("g1", Arrays.asList("v2", "v3")); + when(ctx.getInputVertexGroups()).thenReturn(vertexGroupMap); + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("g0").addSources("g1") + .setNumPartitionsForFairCase(maxParallelism) + .setMaxParallelism(maxParallelism).setMinOpsPerWorker(minWorkloadPerWorker); + vertexManager.initialize(builder.build()); + } + + private Map<String, EdgeProperty> getEdgePropertyMap(int numSrcV) { + Map<String, EdgeProperty> edgePropertyMap = new HashMap<>(); + for (int i = 0; i < numSrcV; i++) { + edgePropertyMap.put("v"+i, EdgeProperty.create(EdgeManagerPluginDescriptor.create( + CartesianProductEdgeManager.class.getName()), null, null, null, null)); + } + return edgePropertyMap; + } + + private void setSrcParallelism(VertexManagerPluginContext ctx, int multiplier, int... numTasks) { + int i = 0; + for (int numTask : numTasks) { + when(ctx.getVertexNumTasks(eq("v"+i))).thenReturn(numTask * multiplier); + i++; + } + } + + private TaskAttemptIdentifier getTaId(String vertexName, int taskId) { + return new TaskAttemptIdentifierImpl("dag", vertexName, + TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( + TezDAGID.getInstance("0", 0, 0), 0), taskId), 0)); + } + + private VertexManagerEvent getVMEvent(long numRecord, String vName, int taskId) { + + VertexManagerEventPayloadProto.Builder builder = VertexManagerEventPayloadProto.newBuilder(); + builder.setNumRecord(numRecord); + VertexManagerEvent vmEvent = + VertexManagerEvent.create("cp vertex", builder.build().toByteString().asReadOnlyByteBuffer()); + vmEvent.setProducerAttemptIdentifier(getTaId(vName, taskId)); + return vmEvent; + } + + private void verifyEdgeProperties(EdgeProperty edgeProperty, String[] sources, + int[] numChunksPerSrc, int maxParallelism) + throws InvalidProtocolBufferException { + CartesianProductConfigProto config = CartesianProductConfigProto.parseFrom(ByteString.copyFrom( + edgeProperty.getEdgeManagerDescriptor().getUserPayload().getPayload())); + assertArrayEquals(sources, config.getSourcesList().toArray()); + assertArrayEquals(numChunksPerSrc, Ints.toArray(config.getNumChunksList())); + assertEquals(maxParallelism, config.getMaxParallelism()); + } + + private void verifyVertexGroupInfo(EdgeProperty edgeProperty, int positionInGroup, + int... numTaskPerVertexInGroup) + throws InvalidProtocolBufferException { + CartesianProductConfigProto config = CartesianProductConfigProto.parseFrom(ByteString.copyFrom( + edgeProperty.getEdgeManagerDescriptor().getUserPayload().getPayload())); + assertEquals(positionInGroup, config.getPositionInGroup()); + int i = 0; + for (int numTask : numTaskPerVertexInGroup) { + assertEquals(numTask, config.getNumTaskPerVertexInGroup(i)); + i++; + } + } + + private void verifyScheduleRequest(int expectedTimes, int... expectedTid) { + verify(ctx, times(expectedTimes)).scheduleTasks(scheduleRequestCaptor.capture()); + if (expectedTimes > 0) { + List<ScheduleTaskRequest> requests = scheduleRequestCaptor.getValue(); + int i = 0; + for (int tid : expectedTid) { + assertEquals(tid, requests.get(i).getTaskIndex()); + i++; + } + } + } + + @Test(timeout = 5000) + public void testDAGVertexOnlyGroupByMaxParallelism() throws Exception { + setupDAGVertexOnly(30, 1, 1); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + + vertexManager.onVertexManagerEventReceived(getVMEvent(250, "v0", 0)); + verify(ctx, never()).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); + + vertexManager.onVertexManagerEventReceived(getVMEvent(200, "v1", 0)); + verify(ctx, times(1)).reconfigureVertex( + eq(30), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{5, 6}, 30); + verifyVertexGroupInfo(edgeProperties.get("v0"), 0); + verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{5, 6}, 30); + verifyVertexGroupInfo(edgeProperties.get("v1"), 0); + + vertexManager.onVertexStarted(null); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v1", 0)); + verifyScheduleRequest(1, 0, 6, 1, 7); + vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); + verifyScheduleRequest(2, 12, 13, 18, 19, 24, 25); + } + + @Test(timeout = 5000) + public void testDAGVertexOnlyGroupByMinOpsPerWorker() throws Exception { + setupDAGVertexOnly(100, 10000, 10); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + + for (int i = 0; i < 20; i++) { + vertexManager.onVertexManagerEventReceived(getVMEvent(20, "v0", i)); + } + + for (int i = 0; i < 30; i++) { + vertexManager.onVertexManagerEventReceived(getVMEvent(10, "v1", i)); + } + + verify(ctx, times(1)).reconfigureVertex( + eq(12), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); + verifyEdgeProperties(edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{4, 3}, 100); + verifyEdgeProperties(edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{4, 3}, 100); + + vertexManager.onVertexStarted(null); + verifyScheduleRequest(0); + + for (int i = 0; i < 5; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v0", i)); + } + for (int i = 0; i < 10; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v1", 10 + i)); + } + verifyScheduleRequest(1, 1); + } + + @Test(timeout = 5000) + public void testDAGVertexGroup() throws Exception { + setupDAGVertexGroup(100, 1, 1); + + for (int i = 0; i < 3; i++) { + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED)); + } + + vertexManager.onVertexManagerEventReceived(getVMEvent(100, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(10, "v1", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(5, "v2", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(5, "v2", 1)); + verify(ctx, times(1)).reconfigureVertex( + eq(100), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); + for (int i = 0; i < 3; i++) { + verifyEdgeProperties(edgeProperties.get("v" + i), new String[]{"v0", "g0"}, + new int[]{20, 5}, 100); + } + + vertexManager.onVertexStarted(null); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v1", 0)); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v2", 0)); + verifyScheduleRequest(1, 0, 5, 10, 15, 20, 25, 30, 35, 40, 45); + vertexManager.onSourceTaskCompleted(getTaId("v1", 1)); + verifyScheduleRequest(1); + vertexManager.onSourceTaskCompleted(getTaId("v2", 1)); + verifyScheduleRequest(2, 1, 6, 11, 16, 21, 26, 31, 36, 41, 46); + } + + @Test(timeout = 5000) + public void testDAGVertexGroupOnly() throws Exception { + setupDAGVertexGroupOnly(100, 1, 1); + + for (int i = 0; i < 4; i++) { + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED)); + } + + vertexManager.onVertexManagerEventReceived(getVMEvent(20, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(20, "v1", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(5, "v2", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(5, "v2", 1)); + vertexManager.onVertexManagerEventReceived(getVMEvent(16, "v3", 0)); + + verify(ctx, times(1)).reconfigureVertex( + eq(100), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + Map<String, EdgeProperty> edgeProperties = edgePropertiesCaptor.getValue(); + for (int i = 0; i < 4; i++) { + verifyEdgeProperties(edgeProperties.get("v" + i), new String[]{"g0", "g1"}, + new int[]{10, 10}, 100); + } + verifyVertexGroupInfo(edgeProperties.get("v0"), 0); + verifyVertexGroupInfo(edgeProperties.get("v1"), 1, 2); + verifyVertexGroupInfo(edgeProperties.get("v2"), 0); + verifyVertexGroupInfo(edgeProperties.get("v3"), 1, 4); + + vertexManager.onVertexStarted(null); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v1", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v2", 1)); + verifyScheduleRequest(0); + vertexManager.onSourceTaskCompleted(getTaId("v3", 1)); + verifyScheduleRequest(1, 3, 13, 23); + } + + @Test(timeout = 5000) + public void testSchedulingVertexOnlyWithBroadcast() throws Exception { + setupDAGVertexOnlyWithBroadcast(30, 1, 1); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + + vertexManager.onVertexManagerEventReceived(getVMEvent(250, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(200, "v1", 0)); + verify(ctx, times(1)).reconfigureVertex( + eq(30), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + assertFalse(edgePropertiesCaptor.getValue().containsKey("v2")); + + vertexManager.onVertexStarted(null); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v1", 0)); + verifyScheduleRequest(0); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING)); + verifyScheduleRequest(1, 0, 1, 6, 7); + } + + + @Test(timeout = 5000) + public void testOnVertexStart() throws Exception { + setupDAGVertexOnly(6, 1, 1); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + vertexManager.onVertexManagerEventReceived(getVMEvent(100, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(100, "v1", 0)); + + verifyScheduleRequest(0); + + vertexManager.onVertexStarted(Arrays.asList(getTaId("v0", 0), getTaId("v1", 0))); + verifyScheduleRequest(1, 0); + } + + @Test(timeout = 5000) + public void testZeroSrcTask() throws Exception { + ctx = mock(VertexManagerPluginContext.class); + vertexManager = new FairCartesianProductVertexManager(ctx); + when(ctx.getVertexNumTasks("v0")).thenReturn(2); + when(ctx.getVertexNumTasks("v1")).thenReturn(0); + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("v1") + .addNumChunks(2).addNumChunks(3).setMaxParallelism(6); + CartesianProductConfigProto config = builder.build(); + + Map<String, EdgeProperty> edgePropertyMap = new HashMap<>(); + edgePropertyMap.put("v0", EdgeProperty.create(EdgeManagerPluginDescriptor.create( + CartesianProductEdgeManager.class.getName()), null, null, null, null)); + edgePropertyMap.put("v1", EdgeProperty.create(EdgeManagerPluginDescriptor.create( + CartesianProductEdgeManager.class.getName()), null, null, null, null)); + when(ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap); + + vertexManager.initialize(config); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + vertexManager.onVertexStarted(new ArrayList<TaskAttemptIdentifier>()); + vertexManager.onSourceTaskCompleted(getTaId("v0", 0)); + vertexManager.onSourceTaskCompleted(getTaId("v0", 1)); + } + + private void setupGroupingFractionTest() throws Exception { + when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2)); + setSrcParallelism(ctx, 10, 2, 3); + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("v1") + .setMaxParallelism(30).setMinOpsPerWorker(1) + .setNumPartitionsForFairCase(30).setGroupingFraction(0.5f); + vertexManager.initialize(builder.build()); + + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + } + @Test(timeout = 5000) + public void testGroupingFraction() throws Exception { + setupGroupingFractionTest(); + vertexManager.onVertexManagerEventReceived(getVMEvent(10000, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(10000, "v1", 0)); + for (int i = 0; i < 10; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v0", i)); + } + for (int i = 0; i < 14; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v1", i)); + } + verify(ctx, never()).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); + + vertexManager.onSourceTaskCompleted(getTaId("v1", 14)); + verify(ctx, times(1)).reconfigureVertex( + eq(24), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + } + + @Test(timeout = 5000) + public void testGroupFractionWithZeroStats() throws Exception { + setupGroupingFractionTest(); + + for (int i = 0; i < 10; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v0", i)); + } + for (int i = 0; i < 15; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v1", i)); + } + verify(ctx, never()).reconfigureVertex( + anyInt(), any(VertexLocationHint.class), anyMapOf(String.class, EdgeProperty.class)); + } + + @Test(timeout = 5000) + public void testGroupingFractionWithZeroOutput() throws Exception { + setupGroupingFractionTest(); + + for (int i = 0; i < 20; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v0", i)); + } + for (int i = 0; i < 30; i++) { + vertexManager.onSourceTaskCompleted(getTaId("v1", i)); + } + verify(ctx, times(1)).reconfigureVertex( + eq(0), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + } + + @Test(timeout = 5000) + public void testZeroSrcOutput() throws Exception { + setupDAGVertexOnly(10, 1, 1); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v0", 1)); + vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v1", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v1", 1)); + vertexManager.onVertexManagerEventReceived(getVMEvent(0, "v1", 2)); + verify(ctx, times(1)).reconfigureVertex( + eq(0), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + } + + @Test(timeout = 5000) + public void testDisableGrouping() throws Exception { + when(ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2)); + setSrcParallelism(ctx, 1, 2, 3); + + CartesianProductConfigProto.Builder builder = CartesianProductConfigProto.newBuilder(); + builder.setIsPartitioned(false).addSources("v0").addSources("v1") + .setMaxParallelism(30).setMinOpsPerWorker(1).setEnableGrouping(false); + vertexManager.initialize(builder.build()); + + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED)); + vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED)); + + vertexManager.onVertexManagerEventReceived(getVMEvent(250, "v0", 0)); + vertexManager.onVertexManagerEventReceived(getVMEvent(200, "v1", 0)); + verify(ctx, times(1)).reconfigureVertex( + eq(6), any(VertexLocationHint.class), edgePropertiesCaptor.capture()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java index 481bd7e..08b2efa 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/cartesianproduct/TestGrouper.java @@ -30,12 +30,12 @@ public class TestGrouper { @Test(timeout = 5000) public void testEvenlyGrouping() { grouper.init(4, 2); - assertEquals(0, grouper.getFirstTaskInGroup(0)); - assertEquals(2, grouper.getFirstTaskInGroup(1)); - assertEquals(2, grouper.getNumTasksInGroup(0)); - assertEquals(2, grouper.getNumTasksInGroup(1)); - assertEquals(1, grouper.getLastTaskInGroup(0)); - assertEquals(3, grouper.getLastTaskInGroup(1)); + assertEquals(0, grouper.getFirstItemInGroup(0)); + assertEquals(2, grouper.getFirstItemInGroup(1)); + assertEquals(2, grouper.getNumItemsInGroup(0)); + assertEquals(2, grouper.getNumItemsInGroup(1)); + assertEquals(1, grouper.getLastItemInGroup(0)); + assertEquals(3, grouper.getLastItemInGroup(1)); assertEquals(0, grouper.getGroupId(1)); assertEquals(1, grouper.getGroupId(2)); assertTrue(grouper.isInGroup(2, 1)); @@ -45,12 +45,12 @@ public class TestGrouper { @Test(timeout = 5000) public void testUnevenlyGrouping() { grouper.init(5, 2); - assertEquals(0, grouper.getFirstTaskInGroup(0)); - assertEquals(2, grouper.getFirstTaskInGroup(1)); - assertEquals(2, grouper.getNumTasksInGroup(0)); - assertEquals(3, grouper.getNumTasksInGroup(1)); - assertEquals(1, grouper.getLastTaskInGroup(0)); - assertEquals(4, grouper.getLastTaskInGroup(1)); + assertEquals(0, grouper.getFirstItemInGroup(0)); + assertEquals(2, grouper.getFirstItemInGroup(1)); + assertEquals(2, grouper.getNumItemsInGroup(0)); + assertEquals(3, grouper.getNumItemsInGroup(1)); + assertEquals(1, grouper.getLastItemInGroup(0)); + assertEquals(4, grouper.getLastItemInGroup(1)); assertEquals(0, grouper.getGroupId(1)); assertEquals(1, grouper.getGroupId(3)); assertTrue(grouper.isInGroup(3, 1)); @@ -60,9 +60,9 @@ public class TestGrouper { @Test(timeout = 5000) public void testSingleGroup() { grouper.init(4, 1); - assertEquals(0, grouper.getFirstTaskInGroup(0)); - assertEquals(4, grouper.getNumTasksInGroup(0)); - assertEquals(3, grouper.getLastTaskInGroup(0)); + assertEquals(0, grouper.getFirstItemInGroup(0)); + assertEquals(4, grouper.getNumItemsInGroup(0)); + assertEquals(3, grouper.getLastItemInGroup(0)); assertEquals(0, grouper.getGroupId(0)); assertEquals(0, grouper.getGroupId(3)); assertTrue(grouper.isInGroup(3, 0)); @@ -71,9 +71,9 @@ public class TestGrouper { @Test(timeout = 5000) public void testNoGrouping() { grouper.init(2, 2); - assertEquals(0, grouper.getFirstTaskInGroup(0)); - assertEquals(1, grouper.getNumTasksInGroup(0)); - assertEquals(0, grouper.getLastTaskInGroup(0)); + assertEquals(0, grouper.getFirstItemInGroup(0)); + assertEquals(1, grouper.getNumItemsInGroup(0)); + assertEquals(0, grouper.getLastItemInGroup(0)); assertEquals(0, grouper.getGroupId(0)); assertTrue(grouper.isInGroup(0, 0)); } http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/CartesianProduct.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/CartesianProduct.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/CartesianProduct.java new file mode 100644 index 0000000..6096f96 --- /dev/null +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/CartesianProduct.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.mapreduce.examples; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DataSinkDescriptor; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.OutputCommitterDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.examples.TezExampleBase; +import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.AbstractLogicalInput; +import org.apache.tez.runtime.api.AbstractLogicalOutput; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.InputContext; +import org.apache.tez.runtime.api.InputInitializer; +import org.apache.tez.runtime.api.InputInitializerContext; +import org.apache.tez.runtime.api.OutputCommitter; +import org.apache.tez.runtime.api.OutputCommitterContext; +import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.Writer; +import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; +import org.apache.tez.runtime.api.events.InputDataInformationEvent; +import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.runtime.library.api.KeyValueReader; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager; +import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.partitioner.RoundRobinPartitioner; +import org.apache.tez.runtime.library.processor.SimpleProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * This job has three vertices: two Tokenizers and one JoinProcessor. Each Tokenizer handles one + * input directory and generates tokens. CustomPartitioner separates tokens into 2 partitions + * according to the parity of token's first char. Then JoinProcessor does cartesian product of + * partitioned token sets. + */ +public class CartesianProduct extends TezExampleBase { + private static final int srcParallelism = 1; + private static final int numRecordPerSrc = 10; + private static final String INPUT = "Input1"; + private static final String OUTPUT = "Output"; + private static final String VERTEX1 = "Vertex1"; + private static final String VERTEX2 = "Vertex2"; + private static final String VERTEX3 = "Vertex3"; + private static final Logger LOG = LoggerFactory.getLogger(CartesianProduct.class); + private static final String[] sourceVertices = new String[] {VERTEX1, VERTEX2}; + + public static class TokenProcessor extends SimpleProcessor { + public TokenProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + Preconditions.checkArgument(getInputs().size() == 1); + Preconditions.checkArgument(getOutputs().size() == 1); + KeyValueReader kvReader = (KeyValueReader) getInputs().get(INPUT).getReader(); + KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(VERTEX3).getWriter(); + while (kvReader.next()) { + Object key = kvReader.getCurrentKey(); + Object value = kvReader.getCurrentValue(); + kvWriter.write(new Text((String)key), new IntWritable(1)); + } + } + } + + public static class JoinProcessor extends SimpleMRProcessor { + public JoinProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + KeyValueWriter kvWriter = (KeyValueWriter) getOutputs().get(OUTPUT).getWriter(); + KeyValueReader kvReader1 = (KeyValueReader) getInputs().get(VERTEX1).getReader(); + KeyValueReader kvReader2 = (KeyValueReader) getInputs().get(VERTEX2).getReader(); + Set<Object> leftSet = new HashSet<>(); + Set<Object> rightSet = new HashSet<>(); + + while (kvReader1.next()) { + Text key = (Text)(kvReader1.getCurrentKey()); + leftSet.add(new Text(key)); + } + while (kvReader2.next()) { + Text key = (Text)(kvReader2.getCurrentKey()); + rightSet.add(new Text(key)); + } + + for (Object l : leftSet) { + for (Object r : rightSet) { + kvWriter.write(l, r); + } + } + } + } + + public static class FakeInputInitializer extends InputInitializer { + + /** + * Constructor an instance of the InputInitializer. Classes extending this to create a + * InputInitializer, must provide the same constructor so that Tez can create an instance of + * the class at runtime. + * + * @param initializerContext initializer context which can be used to access the payload, vertex + * properties, etc + */ + public FakeInputInitializer(InputInitializerContext initializerContext) { + super(initializerContext); + } + + @Override + public List<Event> initialize() throws Exception { + List<Event> list = new ArrayList<>(); + list.add(InputConfigureVertexTasksEvent.create(srcParallelism, null, null)); + for (int i = 0; i < srcParallelism; i++) { + list.add(InputDataInformationEvent.createWithObjectPayload(i, null)); + } + return list; + } + + @Override + public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception { + + } + } + + public static class FakeInput extends AbstractLogicalInput { + + /** + * Constructor an instance of the LogicalInput. Classes extending this one to create a + * LogicalInput, must provide the same constructor so that Tez can create an instance of the + * class at runtime. + * + * @param inputContext the {@link InputContext} which provides + * the Input with context information within the running task. + * @param numPhysicalInputs the number of physical inputs that the logical input will + */ + public FakeInput(InputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + } + + @Override + public List<Event> initialize() throws Exception { + getContext().requestInitialMemory(0, null); + getContext().inputIsReady(); + return null; + } + + @Override + public void handleEvents(List<Event> inputEvents) throws Exception { + + } + + @Override + public List<Event> close() throws Exception { + return null; + } + + @Override + public void start() throws Exception { + + } + + @Override + public Reader getReader() throws Exception { + return new KeyValueReader() { + String[] keys = new String[numRecordPerSrc]; + + int i = -1; + + @Override + public boolean next() throws IOException { + if (i == -1) { + for (int j = 0; j < numRecordPerSrc; j++) { + keys[j] = ""+j; + } + } + i++; + return i < keys.length; + } + + @Override + public Object getCurrentKey() throws IOException { + return keys[i]; + } + + @Override + public Object getCurrentValue() throws IOException { + return keys[i]; + } + }; + } + } + + public static class FakeOutputCommitter extends OutputCommitter { + + /** + * Constructor an instance of the OutputCommitter. Classes extending this to create a + * OutputCommitter, must provide the same constructor so that Tez can create an instance of + * the class at runtime. + * + * @param committerContext committer context which can be used to access the payload, vertex + * properties, etc + */ + public FakeOutputCommitter(OutputCommitterContext committerContext) { + super(committerContext); + } + + @Override + public void initialize() throws Exception { + + } + + @Override + public void setupOutput() throws Exception { + + } + + @Override + public void commitOutput() throws Exception { + + } + + @Override + public void abortOutput(VertexStatus.State finalState) throws Exception { + + } + } + + public static class FakeOutput extends AbstractLogicalOutput { + + /** + * Constructor an instance of the LogicalOutput. Classes extending this one to create a + * LogicalOutput, must provide the same constructor so that Tez can create an instance of the + * class at runtime. + * + * @param outputContext the {@link OutputContext} which + * provides + * the Output with context information within the running task. + * @param numPhysicalOutputs the number of physical outputs that the logical output will + */ + public FakeOutput(OutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + + @Override + public List<Event> initialize() throws Exception { + getContext().requestInitialMemory(0, null); + return null; + } + + @Override + public void handleEvents(List<Event> outputEvents) { + + } + + @Override + public List<Event> close() throws Exception { + return null; + } + + @Override + public void start() throws Exception { + + } + + @Override + public Writer getWriter() throws Exception { + return new KeyValueWriter() { + @Override + public void write(Object key, Object value) throws IOException { + System.out.println(key + " XXX " + value); + } + }; + } + } + + private DAG createDAG(TezConfiguration tezConf) throws IOException { + InputDescriptor inputDescriptor = InputDescriptor.create(FakeInput.class.getName()); + InputInitializerDescriptor inputInitializerDescriptor = + InputInitializerDescriptor.create(FakeInputInitializer.class.getName()); + DataSourceDescriptor dataSourceDescriptor = + DataSourceDescriptor.create(inputDescriptor, inputInitializerDescriptor, null); + + Vertex v1 = Vertex.create(VERTEX1, ProcessorDescriptor.create(TokenProcessor.class.getName())); + v1.addDataSource(INPUT, dataSourceDescriptor); + Vertex v2 = Vertex.create(VERTEX2, ProcessorDescriptor.create(TokenProcessor.class.getName())); + v2.addDataSource(INPUT, dataSourceDescriptor); + + OutputDescriptor outputDescriptor = OutputDescriptor.create(FakeOutput.class.getName()); + OutputCommitterDescriptor outputCommitterDescriptor = + OutputCommitterDescriptor.create(FakeOutputCommitter.class.getName()); + DataSinkDescriptor dataSinkDescriptor = + DataSinkDescriptor.create(outputDescriptor, outputCommitterDescriptor, null); + + CartesianProductConfig cartesianProductConfig = + new CartesianProductConfig(Arrays.asList(sourceVertices)); + UserPayload userPayload = cartesianProductConfig.toUserPayload(tezConf); + + Vertex v3 = Vertex.create(VERTEX3, ProcessorDescriptor.create(JoinProcessor.class.getName())); + v3.addDataSink(OUTPUT, dataSinkDescriptor); + v3.setVertexManagerPlugin( + VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName()) + .setUserPayload(userPayload)); + + EdgeManagerPluginDescriptor edgeManagerDescriptor = + EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName()); + edgeManagerDescriptor.setUserPayload(userPayload); + UnorderedPartitionedKVEdgeConfig edgeConf = + UnorderedPartitionedKVEdgeConfig.newBuilder(Text.class.getName(), IntWritable.class.getName(), + RoundRobinPartitioner.class.getName()).build(); + EdgeProperty edgeProperty = edgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor); + + return DAG.create("CrossProduct").addVertex(v1).addVertex(v2).addVertex(v3) + .addEdge(Edge.create(v1, v3, edgeProperty)).addEdge(Edge.create(v2, v3, edgeProperty)); + } + + @Override + protected void printUsage() {} + + @Override + protected int validateArgs(String[] otherArgs) { + return 0; + } + + @Override + protected int runJob(String[] args, TezConfiguration tezConf, + TezClient tezClient) throws Exception { + DAG dag = createDAG(tezConf); + return runDag(dag, isCountersLog(), LOG); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new CartesianProduct(), args); + System.exit(res); + } +} + + http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java index 90d2825..cdbdf13 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java @@ -81,6 +81,8 @@ public class ExampleDriver { "Filters lines by the specified word using OneToOne edge"); pgd.addClass("multiplecommitsExample", MultipleCommitsExample.class, "Job with multiple commits in both vertex group and vertex"); + pgd.addClass("cartesianproduct", CartesianProduct.class, + "Cartesian Product Example"); exitCode = pgd.run(argv); } catch(Throwable e){ http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java index af1cb6f..e2fc53f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java @@ -858,8 +858,11 @@ public class TestFaultTolerance { String[] sourceVertices = {"v1", "v2"}; CartesianProductConfig cartesianProductConfig = new CartesianProductConfig(Arrays.asList(sourceVertices)); + TezConfiguration tezConf = new TezConfiguration(); + tezConf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_NUM_PARTITIONS, 1); + tezConf.setBoolean(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_ENABLE_GROUPING, false); UserPayload cartesianProductPayload = - cartesianProductConfig.toUserPayload(new TezConfiguration()); + cartesianProductConfig.toUserPayload(tezConf); v3.setVertexManagerPlugin( VertexManagerPluginDescriptor.create(CartesianProductVertexManager.class.getName()) http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java index 69dea7e..8b292ab 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; +import org.apache.tez.runtime.api.events.VertexManagerEvent; +import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.tez.dag.api.OutputDescriptor; @@ -86,6 +88,15 @@ public class TestOutput extends AbstractLogicalOutput { DataMovementEvent event = DataMovementEvent.create(i, result); events.add(event); } + + ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder vmBuilder = + ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setNumRecord(1); + + VertexManagerEvent vmEvent = VertexManagerEvent.create( + getContext().getDestinationVertexName(), + vmBuilder.build().toByteString().asReadOnlyByteBuffer()); + + events.add(vmEvent); return events; } http://git-wip-us.apache.org/repos/asf/tez/blob/a55fe80b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index be9b0bf..2dfc76d 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -54,6 +54,8 @@ import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; +import org.apache.tez.mapreduce.examples.CartesianProduct; +import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.slf4j.Logger; @@ -1333,4 +1335,15 @@ public class TestTezJobs { tezClient.stop(); } } + + @Test(timeout = 60000) + public void testCartesianProduct() throws Exception { + LOG.info("Running CartesianProduct Test"); + CartesianProduct job = new CartesianProduct(); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + tezConf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MAX_PARALLELISM, 10); + tezConf.setInt(CartesianProductVertexManager.TEZ_CARTESIAN_PRODUCT_MIN_OPS_PER_WORKER, 25); + Assert.assertEquals("CartesianProduct failed", job.run(tezConf, null, null), 0); + } }
