Repository: tez Updated Branches: refs/heads/master cc1d89cba -> 7b45e9a14
http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java index b164a6d..d59439e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.*; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -34,9 +33,10 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -44,13 +44,13 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.MockitoAnnotations; -import com.google.common.collect.Maps; +import com.google.common.collect.Lists; @SuppressWarnings("unchecked") public class TestInputReadyVertexManager { @Captor - ArgumentCaptor<List<TaskWithLocationHint>> requestCaptor; + ArgumentCaptor<List<ScheduleTaskRequest>> requestCaptor; @Before public void init() { @@ -77,23 +77,23 @@ public class TestInputReadyVertexManager { when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2); when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); mockInputVertices.put(mockSrcVertexId1, eProp1); - - Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); - initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - + InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); verify(mockContext, times(1)).vertexReconfigurationPlanned(); // source vertex configured manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); verify(mockContext, times(1)).doneReconfiguringVertex(); - verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); // then own vertex started - manager.onVertexStarted(initialCompletions); - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); - verify(mockContext, times(0)).scheduleVertexTasks(anyList()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 2); - verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture()); + manager.onVertexStarted(Collections.singletonList( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + verify(mockContext, times(0)).scheduleTasks(anyList()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); + verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(2, requestCaptor.getValue().size()); } @@ -118,36 +118,36 @@ public class TestInputReadyVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); mockInputVertices.put(mockSrcVertexId1, eProp1); - Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); - initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); verify(mockContext, times(1)).vertexReconfigurationPlanned(); // source vertex configured manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); verify(mockContext, times(1)).doneReconfiguringVertex(); - verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); - manager.onVertexStarted(initialCompletions); - verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); + manager.onVertexStarted(Collections.singletonList( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); + verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(0, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); - verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 2); - verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); + verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(2, requestCaptor.getValue().get(0) @@ -175,28 +175,28 @@ public class TestInputReadyVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); mockInputVertices.put(mockSrcVertexId1, eProp1); - Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); - initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); verify(mockContext, times(1)).vertexReconfigurationPlanned(); - verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); // ok to have source task complete before anything else - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // first own vertex started - manager.onVertexStarted(initialCompletions); + manager.onVertexStarted(Collections.singletonList( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); // no scheduling as we are not configured yet - verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); // then source vertex configured. now we start manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); verify(mockContext, times(1)).doneReconfiguringVertex(); - verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 2); - verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); + verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(2, requestCaptor.getValue().get(0) @@ -247,7 +247,7 @@ public class TestInputReadyVertexManager { mockInputVertices.put(mockSrcVertexId2, eProp2); mockInputVertices.put(mockSrcVertexId3, eProp3); - Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); + List<TaskAttemptIdentifier> initialCompletions = Lists.newArrayList(); // 1-1 sources do not match managed tasks. setParallelism called to make them match when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); @@ -280,8 +280,8 @@ public class TestInputReadyVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3); - initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0)); + initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0)); manager = new InputReadyVertexManager(mockContext); manager.initialize(); verify(mockContext, times(3)).vertexReconfigurationPlanned(); @@ -293,44 +293,53 @@ public class TestInputReadyVertexManager { verify(mockContext, times(2)).doneReconfiguringVertex(); manager.onVertexStarted(initialCompletions); // all 1-1 0's done but not scheduled because v1 is not done - manager.onSourceTaskCompleted(mockSrcVertexId3, 0); - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); // duplicate - manager.onSourceTaskCompleted(mockSrcVertexId2, 1); - verify(mockContext, times(0)).scheduleVertexTasks(anyList()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 2); // v1 done - verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0)); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // duplicate + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1)); + verify(mockContext, times(0)).scheduleTasks(anyList()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); // v1 done + verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(0, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion // 1-1 completion triggers since other 1-1 is done - manager.onSourceTaskCompleted(mockSrcVertexId3, 1); - verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 1)); + verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion // 1-1 completion does not trigger since other 1-1 is not done - manager.onSourceTaskCompleted(mockSrcVertexId3, 2); - verify(mockContext, times(2)).scheduleVertexTasks(anyList()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2)); + verify(mockContext, times(2)).scheduleTasks(anyList()); // 1-1 completion trigger start - manager.onSourceTaskCompleted(mockSrcVertexId2, 2); - verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2)); + verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId2, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(2, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion // no more starts - manager.onSourceTaskCompleted(mockSrcVertexId3, 2); - verify(mockContext, times(3)).scheduleVertexTasks(anyList()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2)); + verify(mockContext, times(3)).scheduleTasks(anyList()); } } http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 3a3c71b..18d4bc1 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -38,10 +38,17 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.records.TaskAttemptIdentifierImpl; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.TaskIdentifier; +import org.apache.tez.runtime.api.VertexIdentifier; import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; @@ -75,6 +82,10 @@ import static org.mockito.Mockito.when; @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestShuffleVertexManager { + TezVertexID vertexId = TezVertexID.fromString("vertex_1436907267600_195589_1_00"); + int taskId = 0; + List<TaskAttemptIdentifier> emptyCompletions = null; + @Test(timeout = 5000) public void testShuffleVertexManagerAutoParallelism() throws Exception { Configuration conf = new Configuration(); @@ -163,12 +174,12 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); scheduledTasks.clear(); - List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0]; - for (TaskWithLocationHint task : tasks) { + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { scheduledTasks.add(task.getTaskIndex()); } return null; - }}).when(mockContext).scheduleVertexTasks(anyList()); + }}).when(mockContext).scheduleTasks(anyList()); final Map<String, EdgeManagerPlugin> newEdgeManagers = new HashMap<String, EdgeManagerPlugin>(); @@ -224,7 +235,7 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0); when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); verify(mockContext, times(2)).vertexReconfigurationPlanned(); Assert.assertTrue(manager.bipartiteSources == 2); @@ -247,7 +258,7 @@ public class TestShuffleVertexManager { verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled // trigger start and processing of pending notification events - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 2); verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done Assert.assertTrue(manager.pendingTasks.isEmpty()); @@ -257,20 +268,18 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); - ByteBuffer payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteString().asReadOnlyByteBuffer(); - VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload); + VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex"); // parallelism not change due to large data size manager = createManager(conf, mockContext, 0.1f, 0.1f); verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); manager.onVertexManagerEventReceived(vmEvent); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); verify(mockContext, times(2)).doneReconfiguringVertex(); // trigger scheduling @@ -290,49 +299,54 @@ public class TestShuffleVertexManager { //{5,9,12,18} in bitmap long[] sizes = new long[]{(0l), (1000l * 1000l), (1010 * 1000l * 1000l), (50 * 1000l * 1000l)}; - RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes); - DataOutputBuffer dout = new DataOutputBuffer(); - partitionStats.serialize(dout); - ByteString - partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData()); - payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L) - .setPartitionStats(partitionStatsBytes).build().toByteString().asReadOnlyByteBuffer(); - vmEvent = VertexManagerEvent.create("Vertex", payload); + vmEvent = getVertexManagerEvent(sizes, 1L, "Vertex"); manager = createManager(conf, mockContext, 0.01f, 0.75f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); + vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId1)); + manager.onVertexManagerEventReceived(vmEvent); + Assert.assertEquals(1, manager.numVertexManagerEventsReceived); + + Assert.assertEquals(4, manager.stats.length); + Assert.assertEquals(0, manager.stats[0]); //0 MB bucket + Assert.assertEquals(1, manager.stats[1]); //1 MB bucket + Assert.assertEquals(100, manager.stats[2]); //100 MB bucket + Assert.assertEquals(10, manager.stats[3]); //10 MB bucket + + // sending again from a different version of the same task has not impact + TezTaskAttemptID taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1"); + vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId2)); manager.onVertexManagerEventReceived(vmEvent); + Assert.assertEquals(1, manager.numVertexManagerEventsReceived); - Assert.assertEquals(manager.stats.length, 4); - Assert.assertEquals(manager.stats[0], 0); //0 MB bucket - Assert.assertEquals(manager.stats[1], 1); //1 MB bucket - Assert.assertEquals(manager.stats[2], 100); //100 MB bucket - Assert.assertEquals(manager.stats[3], 10); //10 MB bucket + Assert.assertEquals(4, manager.stats.length); + Assert.assertEquals(0, manager.stats[0]); //0 MB bucket + Assert.assertEquals(1, manager.stats[1]); //1 MB bucket + Assert.assertEquals(100, manager.stats[2]); //100 MB bucket + Assert.assertEquals(10, manager.stats[3]); //10 MB bucket /** * Test for TEZ-978 * Delay determining parallelism until enough data has been received. */ scheduledTasks.clear(); - payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L).build().toByteString().asReadOnlyByteBuffer(); - vmEvent = VertexManagerEvent.create("Vertex", payload); //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself. manager = createManager(conf, mockContext, 0.01f, 0.75f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); //First task in src1 completed with small payload + vmEvent = getVertexManagerEvent(null, 1L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); //small payload - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertTrue(manager.determineParallelismAndApply() == false); Assert.assertEquals(4, manager.pendingTasks.size()); Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled @@ -341,8 +355,9 @@ public class TestShuffleVertexManager { Assert.assertEquals(1L, manager.completedSourceTasksOutputSize); //Second task in src1 completed with small payload + vmEvent = getVertexManagerEvent(null, 1L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); //small payload - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later Assert.assertTrue(manager.determineParallelismAndApply() == false); Assert.assertEquals(4, manager.pendingTasks.size()); @@ -352,17 +367,14 @@ public class TestShuffleVertexManager { Assert.assertEquals(2L, manager.completedSourceTasksOutputSize); //First task in src2 completed (with larger payload) to trigger determining parallelism - payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(1200L).build().toByteString() - .asReadOnlyByteBuffer(); - vmEvent = VertexManagerEvent.create("Vertex", payload); + vmEvent = getVertexManagerEvent(null, 1200L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertEquals(1, manager.pendingTasks.size()); Assert.assertEquals(1, scheduledTasks.size()); Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); @@ -374,14 +386,11 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40); scheduledTasks.clear(); - payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(100L).build().toByteString() - .asReadOnlyByteBuffer(); - vmEvent = VertexManagerEvent.create("Vertex", payload); + vmEvent = getVertexManagerEvent(null, 100L, "Vertex"); //min/max fraction of 0.0/0.2 manager = createManager(conf, mockContext, 0.0f, 0.2f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); @@ -391,7 +400,7 @@ public class TestShuffleVertexManager { //send 7 events with payload size as 100 for(int i=0;i<7;i++) { manager.onVertexManagerEventReceived(vmEvent); //small payload - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i)); //should not change parallelism verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap()); } @@ -401,7 +410,7 @@ public class TestShuffleVertexManager { //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(8)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8)); //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap()); @@ -413,41 +422,39 @@ public class TestShuffleVertexManager { // parallelism changed due to small data size scheduledTasks.clear(); - payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer(); - vmEvent = VertexManagerEvent.create("Vertex", payload); manager = createManager(conf, mockContext, 0.5f, 0.5f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + vmEvent = getVertexManagerEvent(null, 500L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertEquals(4, manager.pendingTasks.size()); Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(1, manager.numVertexManagerEventsReceived); Assert.assertEquals(500L, manager.completedSourceTasksOutputSize); // ignore duplicate completion - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertEquals(4, manager.pendingTasks.size()); Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(500L, manager.completedSourceTasksOutputSize); - + vmEvent = getVertexManagerEvent(null, 500L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); // managedVertex tasks reduced verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); @@ -461,7 +468,7 @@ public class TestShuffleVertexManager { Assert.assertEquals(1000L, manager.completedSourceTasksOutputSize); // more completions dont cause recalculation of parallelism - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); @@ -537,7 +544,7 @@ public class TestShuffleVertexManager { mockInputVertices.put(mockSrcVertexId3, eProp3); try { manager = createManager(conf, mockContext, 0.1f, 0.1f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertFalse(true); } catch (TezUncheckedException e) { Assert.assertTrue(e.getMessage().contains( @@ -549,7 +556,7 @@ public class TestShuffleVertexManager { // check initialization manager = createManager(conf, mockContext, 0.1f, 0.1f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 2); final HashSet<Integer> scheduledTasks = new HashSet<Integer>(); @@ -557,17 +564,17 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); scheduledTasks.clear(); - List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0]; - for (TaskWithLocationHint task : tasks) { + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { scheduledTasks.add(task.getTaskIndex()); } return null; - }}).when(mockContext).scheduleVertexTasks(anyList()); + }}).when(mockContext).scheduleTasks(anyList()); // source vertices have 0 tasks. immediate start of all managed tasks when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled @@ -594,7 +601,7 @@ public class TestShuffleVertexManager { // source vertex have some tasks. min, max == 0 manager = createManager(conf, mockContext, 0, 0); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.totalTasksToSchedule == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); @@ -607,122 +614,122 @@ public class TestShuffleVertexManager { // min, max > 0 and min == max manager = createManager(conf, mockContext, 0.25f, 0.25f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); // min, max > 0 and min == max == absolute max 1.0 manager = createManager(conf, mockContext, 1.0f, 1.0f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); // min, max > 0 and min == max manager = createManager(conf, mockContext, 1.0f, 1.0f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); // min, max > and min < max manager = createManager(conf, mockContext, 0.25f, 0.75f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.size() == 2); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); // completion of same task again should not get counted - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.size() == 2); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 0); Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); scheduledTasks.clear(); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); // we are done. no action + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action Assert.assertTrue(manager.pendingTasks.size() == 0); Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); // min, max > and min < max manager = createManager(conf, mockContext, 0.25f, 1.0f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.size() == 2); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 1); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 0); Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); @@ -732,9 +739,10 @@ public class TestShuffleVertexManager { /** * Tasks should be scheduled only when all source vertices are configured completely + * @throws IOException */ @Test(timeout = 5000) - public void test_Tez1649_with_scatter_gather_edges() { + public void test_Tez1649_with_scatter_gather_edges() throws IOException { Configuration conf = new Configuration(); conf.setBoolean( ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, @@ -778,10 +786,7 @@ public class TestShuffleVertexManager { when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3); when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3); - ByteBuffer payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(50L).build().toByteString().asReadOnlyByteBuffer(); - VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload); - + VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex"); // check initialization manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); @@ -790,14 +795,14 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); scheduledTasks.clear(); - List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0]; - for (TaskWithLocationHint task : tasks) { + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { scheduledTasks.add(task.getTaskIndex()); } return null; - }}).when(mockContext_R2).scheduleVertexTasks(anyList()); + }}).when(mockContext_R2).scheduleTasks(anyList()); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 3); manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); @@ -811,15 +816,15 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); //Send events for all tasks of m3. - manager.onSourceTaskCompleted(m3, new Integer(0)); - manager.onSourceTaskCompleted(m3, new Integer(1)); - manager.onSourceTaskCompleted(m3, new Integer(2)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); //Send an event for m2. But still we need to wait for at least 1 event from r1. - manager.onSourceTaskCompleted(m2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); @@ -848,7 +853,7 @@ public class TestShuffleVertexManager { when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3); manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); @@ -858,7 +863,7 @@ public class TestShuffleVertexManager { // Only need completed configuration notification from m3 manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(m3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3); } @@ -885,7 +890,10 @@ public class TestShuffleVertexManager { .build().toByteString() .asReadOnlyByteBuffer(); } + TaskAttemptIdentifierImpl taId = new TaskAttemptIdentifierImpl("dag", vertexName, + TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, taskId++), 0)); VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload); + vmEvent.setProducerAttemptIdentifier(taId); return vmEvent; } @@ -940,16 +948,16 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); scheduledTasks.clear(); - List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0]; - for (TaskWithLocationHint task : tasks) { + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { scheduledTasks.add(task.getTaskIndex()); } return null; - }}).when(mockContext).scheduleVertexTasks(anyList()); + }}).when(mockContext).scheduleTasks(anyList()); // check initialization manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 1); manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); @@ -960,7 +968,7 @@ public class TestShuffleVertexManager { Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); //Send an event for r1. - manager.onSourceTaskCompleted(r1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); @@ -975,13 +983,13 @@ public class TestShuffleVertexManager { manager.onVertexManagerEventReceived(vmEvent); //send VM event //Send an event for m2. - manager.onSourceTaskCompleted(m2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); //Send an event for m3. manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(m3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3); @@ -1043,16 +1051,16 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); scheduledTasks.clear(); - List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0]; - for (TaskWithLocationHint task : tasks) { + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { scheduledTasks.add(task.getTaskIndex()); } return null; - }}).when(mockContext).scheduleVertexTasks(anyList()); + }}).when(mockContext).scheduleTasks(anyList()); // check initialization manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 1); manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); @@ -1063,13 +1071,13 @@ public class TestShuffleVertexManager { Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); //Send events for 2 tasks of r1. - manager.onSourceTaskCompleted(r1, new Integer(0)); - manager.onSourceTaskCompleted(r1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); //Send an event for m2. - manager.onSourceTaskCompleted(m2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); @@ -1082,7 +1090,7 @@ public class TestShuffleVertexManager { //Still, wait for a configuration to be completed from other edges scheduledTasks.clear(); manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); @@ -1094,9 +1102,9 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); - manager.onSourceTaskCompleted(r1, new Integer(0)); - manager.onSourceTaskCompleted(r1, new Integer(1)); - manager.onSourceTaskCompleted(r1, new Integer(2)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 2)); //Tasks from non-scatter edges of m2 and m3 are not complete. Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); @@ -1109,7 +1117,7 @@ public class TestShuffleVertexManager { //try with a zero task vertex (with non-scatter-gather edges) scheduledTasks.clear(); manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); @@ -1118,7 +1126,7 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(m3)).thenReturn(3); //broadcast manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled @@ -1126,8 +1134,8 @@ public class TestShuffleVertexManager { Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); //Send 2 events for tasks of r1. - manager.onSourceTaskCompleted(r1, new Integer(0)); - manager.onSourceTaskCompleted(r1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(scheduledTasks.size() == 0); @@ -1139,7 +1147,7 @@ public class TestShuffleVertexManager { //try with all zero task vertices in non-SG edges scheduledTasks.clear(); manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); @@ -1149,10 +1157,22 @@ public class TestShuffleVertexManager { //Send 1 events for tasks of r1. manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(r1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3); } + + public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) { + VertexIdentifier mockVertex = mock(VertexIdentifier.class); + when(mockVertex.getName()).thenReturn(vName); + TaskIdentifier mockTask = mock(TaskIdentifier.class); + when(mockTask.getIdentifier()).thenReturn(tId); + when(mockTask.getVertexIdentifier()).thenReturn(mockVertex); + TaskAttemptIdentifier mockAttempt = mock(TaskAttemptIdentifier.class); + when(mockAttempt.getIdentifier()).thenReturn(0); + when(mockAttempt.getTaskIdentifier()).thenReturn(mockTask); + return mockAttempt; + } private ShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, float min, float max) { http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index 1d17b23..74efee2 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; -import java.util.Map; import java.util.Random; import org.slf4j.Logger; @@ -56,7 +55,7 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; @@ -71,6 +70,7 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.processor.SimpleProcessor; import org.junit.After; @@ -522,8 +522,8 @@ public class TestAMRecovery { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { - super.onSourceTaskCompleted(srcVertexName, taskId); + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { + super.onSourceTaskCompleted(attempt); completedTaskNum ++; if (getContext().getDAGAttemptNumber() == 1) { if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) { @@ -531,7 +531,8 @@ public class TestAMRecovery { System.exit(-1); } } else { - if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) { + if (completedTaskNum == getContext(). + getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) { System.exit(-1); } } @@ -561,8 +562,8 @@ public class TestAMRecovery { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { - super.onSourceTaskCompleted(srcVertexName, taskId); + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { + super.onSourceTaskCompleted(attempt); completedTaskNum ++; if (getContext().getDAGAttemptNumber() == 1) { if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) { @@ -570,7 +571,8 @@ public class TestAMRecovery { System.exit(-1); } } else { - if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) { + if (completedTaskNum == getContext(). + getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) { System.exit(-1); } } @@ -601,8 +603,8 @@ public class TestAMRecovery { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { - super.onSourceTaskCompleted(srcVertexName, taskId); + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { + super.onSourceTaskCompleted(attempt); completedTaskNum ++; if (getContext().getDAGAttemptNumber() == 1) { if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) { @@ -610,7 +612,8 @@ public class TestAMRecovery { System.exit(-1); } } else { - if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) { + if (completedTaskNum == getContext(). + getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) { System.exit(-1); } } @@ -642,26 +645,26 @@ public class TestAMRecovery { } @Override - public void onVertexStarted(Map<String, List<Integer>> completions) + public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception { if (getContext().getDAGAttemptNumber() == 1) { // only schedule one task if it is partiallyFinished case if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) { - getContext().scheduleVertexTasks(Lists.newArrayList(new TaskWithLocationHint(0, null))); + getContext().scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null))); return ; } } // schedule all tasks when it is not partiallyFinished int taskNum = getContext().getVertexNumTasks(getContext().getVertexName()); - List<TaskWithLocationHint> taskWithLocationHints = new ArrayList<TaskWithLocationHint>(); + List<ScheduleTaskRequest> taskWithLocationHints = new ArrayList<ScheduleTaskRequest>(); for (int i=0;i<taskNum;++i) { - taskWithLocationHints.add(new TaskWithLocationHint(i, null)); + taskWithLocationHints.add(ScheduleTaskRequest.create(i, null)); } - getContext().scheduleVertexTasks(taskWithLocationHints); + getContext().scheduleTasks(taskWithLocationHints); } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception { } @@ -703,9 +706,9 @@ public class TestAMRecovery { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { int curAttempt = getContext().getDAGAttemptNumber(); - super.onSourceTaskCompleted(srcVertexName, taskId); + super.onSourceTaskCompleted(attempt); int failOnAttempt = conf.getInt(FAIL_ON_ATTEMPT, 1); LOG.info("failOnAttempt:" + failOnAttempt); LOG.info("curAttempt:" + curAttempt); http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java index 49bb9f5..b8b46cb 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java @@ -79,6 +79,7 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.Writer; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; @@ -686,7 +687,7 @@ public class TestExceptionPropagation { } @Override - public void onVertexStarted(Map<String, List<Integer>> completions) { + public void onVertexStarted(List<TaskAttemptIdentifier> completions) { if (this.exLocation == ExceptionLocation.VM_ON_VERTEX_STARTED) { throw new RuntimeException(this.exLocation.name()); } @@ -729,11 +730,11 @@ public class TestExceptionPropagation { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) { + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { if (this.exLocation == ExceptionLocation.VM_ON_SOURCETASK_COMPLETED) { throw new RuntimeException(this.exLocation.name()); } - super.onSourceTaskCompleted(srcVertexName, attemptId); + super.onSourceTaskCompleted(attempt); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/7b45e9a1/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java index 7244d8d..5c6f855 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java +++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java @@ -40,7 +40,7 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; import org.apache.tez.dag.api.client.VertexStatus.State; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalOutput; @@ -49,6 +49,7 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.OutputCommitterContext; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.InputInitializer; @@ -57,15 +58,12 @@ import org.apache.tez.runtime.api.Writer; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; -import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.test.TestInput; import org.apache.tez.test.TestOutput; import org.apache.tez.test.TestProcessor; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; public class MultiAttemptDAG { @@ -102,14 +100,11 @@ public class MultiAttemptDAG { } @Override - public void onVertexStarted(Map<String, List<Integer>> completions) { + public void onVertexStarted(List<TaskAttemptIdentifier> completions) { if (completions != null) { - for (Entry<String, List<Integer>> entry : completions.entrySet()) { - LOG.info("Received completion events on vertexStarted" - + ", vertex=" + entry.getKey() - + ", completions=" + entry.getValue().size()); - numCompletions.addAndGet(entry.getValue().size()); - } + LOG.info("Received completion events on vertexStarted" + + ", completions=" + completions.size()); + numCompletions.addAndGet(completions.size()); } maybeScheduleTasks(); } @@ -129,20 +124,20 @@ public class MultiAttemptDAG { } else if (successAttemptId == getContext().getDAGAttemptNumber()) { LOG.info("Scheduling tasks for vertex=" + getContext().getVertexName()); int numTasks = getContext().getVertexNumTasks(getContext().getVertexName()); - List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks); + List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasks); for (int i=0; i<numTasks; ++i) { - scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null)); + scheduledTasks.add(ScheduleTaskRequest.create(i, null)); } - getContext().scheduleVertexTasks(scheduledTasks); + getContext().scheduleTasks(scheduledTasks); } } } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { LOG.info("Received completion events for source task" - + ", vertex=" + srcVertexName - + ", taskIdx=" + taskId); + + ", vertex=" + attempt.getTaskIdentifier().getVertexIdentifier().getName() + + ", taskIdx=" + attempt.getTaskIdentifier().getIdentifier()); numCompletions.incrementAndGet(); maybeScheduleTasks(); }
