http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java index b164a6d..d59439e 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java @@ -23,7 +23,6 @@ import static org.mockito.Mockito.*; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -34,9 +33,10 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -44,13 +44,13 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.MockitoAnnotations; -import com.google.common.collect.Maps; +import com.google.common.collect.Lists; @SuppressWarnings("unchecked") public class TestInputReadyVertexManager { @Captor - ArgumentCaptor<List<TaskWithLocationHint>> requestCaptor; + ArgumentCaptor<List<ScheduleTaskRequest>> requestCaptor; @Before public void init() { @@ -77,23 +77,23 @@ public class TestInputReadyVertexManager { when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2); when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); mockInputVertices.put(mockSrcVertexId1, eProp1); - - Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); - initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - + InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); verify(mockContext, times(1)).vertexReconfigurationPlanned(); // source vertex configured manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); verify(mockContext, times(1)).doneReconfiguringVertex(); - verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); // then own vertex started - manager.onVertexStarted(initialCompletions); - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); - verify(mockContext, times(0)).scheduleVertexTasks(anyList()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 2); - verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture()); + manager.onVertexStarted(Collections.singletonList( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + verify(mockContext, times(0)).scheduleTasks(anyList()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); + verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(2, requestCaptor.getValue().size()); } @@ -118,36 +118,36 @@ public class TestInputReadyVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); mockInputVertices.put(mockSrcVertexId1, eProp1); - Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); - initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); verify(mockContext, times(1)).vertexReconfigurationPlanned(); // source vertex configured manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); verify(mockContext, times(1)).doneReconfiguringVertex(); - verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); - manager.onVertexStarted(initialCompletions); - verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); + manager.onVertexStarted(Collections.singletonList( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); + verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(0, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); - verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 2); - verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); + verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(2, requestCaptor.getValue().get(0) @@ -175,28 +175,28 @@ public class TestInputReadyVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3); mockInputVertices.put(mockSrcVertexId1, eProp1); - Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); - initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); manager.initialize(); verify(mockContext, times(1)).vertexReconfigurationPlanned(); - verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); // ok to have source task complete before anything else - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // first own vertex started - manager.onVertexStarted(initialCompletions); + manager.onVertexStarted(Collections.singletonList( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0))); // no scheduling as we are not configured yet - verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture()); // then source vertex configured. now we start manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); verify(mockContext, times(1)).doneReconfiguringVertex(); - verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 2); - verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture()); + verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); + verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(2, requestCaptor.getValue().get(0) @@ -247,7 +247,7 @@ public class TestInputReadyVertexManager { mockInputVertices.put(mockSrcVertexId2, eProp2); mockInputVertices.put(mockSrcVertexId3, eProp3); - Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); + List<TaskAttemptIdentifier> initialCompletions = Lists.newArrayList(); // 1-1 sources do not match managed tasks. setParallelism called to make them match when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); @@ -280,8 +280,8 @@ public class TestInputReadyVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3); - initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0)); + initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0)); manager = new InputReadyVertexManager(mockContext); manager.initialize(); verify(mockContext, times(3)).vertexReconfigurationPlanned(); @@ -293,44 +293,53 @@ public class TestInputReadyVertexManager { verify(mockContext, times(2)).doneReconfiguringVertex(); manager.onVertexStarted(initialCompletions); // all 1-1 0's done but not scheduled because v1 is not done - manager.onSourceTaskCompleted(mockSrcVertexId3, 0); - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); - manager.onSourceTaskCompleted(mockSrcVertexId1, 1); // duplicate - manager.onSourceTaskCompleted(mockSrcVertexId2, 1); - verify(mockContext, times(0)).scheduleVertexTasks(anyList()); - manager.onSourceTaskCompleted(mockSrcVertexId1, 2); // v1 done - verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0)); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // duplicate + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1)); + verify(mockContext, times(0)).scheduleTasks(anyList()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); // v1 done + verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(0, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion // 1-1 completion triggers since other 1-1 is done - manager.onSourceTaskCompleted(mockSrcVertexId3, 1); - verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 1)); + verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(1, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion // 1-1 completion does not trigger since other 1-1 is not done - manager.onSourceTaskCompleted(mockSrcVertexId3, 2); - verify(mockContext, times(2)).scheduleVertexTasks(anyList()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2)); + verify(mockContext, times(2)).scheduleTasks(anyList()); // 1-1 completion trigger start - manager.onSourceTaskCompleted(mockSrcVertexId2, 2); - verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2)); + verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); - Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue()); + Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex()); Assert.assertEquals(mockSrcVertexId2, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getVertexName()); Assert.assertEquals(2, requestCaptor.getValue().get(0) .getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion // no more starts - manager.onSourceTaskCompleted(mockSrcVertexId3, 2); - verify(mockContext, times(3)).scheduleVertexTasks(anyList()); + manager.onSourceTaskCompleted( + TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2)); + verify(mockContext, times(3)).scheduleTasks(anyList()); } }
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index d71eba2..df08060 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -34,10 +34,18 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.records.TaskAttemptIdentifierImpl; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; +import org.apache.tez.runtime.api.TaskIdentifier; +import org.apache.tez.runtime.api.VertexIdentifier; +import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; @@ -68,6 +76,10 @@ import static org.mockito.Mockito.when; @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestShuffleVertexManager { + TezVertexID vertexId = TezVertexID.fromString("vertex_1436907267600_195589_1_00"); + int taskId = 0; + List<TaskAttemptIdentifier> emptyCompletions = null; + @Test(timeout = 5000) public void testShuffleVertexManagerAutoParallelism() throws Exception { Configuration conf = new Configuration(); @@ -156,12 +168,12 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); scheduledTasks.clear(); - List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0]; - for (TaskWithLocationHint task : tasks) { + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { scheduledTasks.add(task.getTaskIndex()); } return null; - }}).when(mockContext).scheduleVertexTasks(anyList()); + }}).when(mockContext).scheduleTasks(anyList()); final Map<String, EdgeManagerPlugin> newEdgeManagers = new HashMap<String, EdgeManagerPlugin>(); @@ -217,7 +229,7 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0); when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); verify(mockContext, times(2)).vertexReconfigurationPlanned(); Assert.assertTrue(manager.bipartiteSources == 2); @@ -240,7 +252,7 @@ public class TestShuffleVertexManager { verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled // trigger start and processing of pending notification events - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 2); verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done Assert.assertTrue(manager.pendingTasks.isEmpty()); @@ -250,20 +262,18 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); - ByteBuffer payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteString().asReadOnlyByteBuffer(); - VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload); + VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex"); // parallelism not change due to large data size manager = createManager(conf, mockContext, 0.1f, 0.1f); verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); manager.onVertexManagerEventReceived(vmEvent); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); verify(mockContext, times(2)).doneReconfiguringVertex(); // trigger scheduling @@ -281,20 +291,18 @@ public class TestShuffleVertexManager { * Delay determining parallelism until enough data has been received. */ scheduledTasks.clear(); - payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L).build().toByteString().asReadOnlyByteBuffer(); - vmEvent = VertexManagerEvent.create("Vertex", payload); //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself. manager = createManager(conf, mockContext, 0.01f, 0.75f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); //First task in src1 completed with small payload + vmEvent = getVertexManagerEvent(null, 1L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); //small payload - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertTrue(manager.determineParallelismAndApply() == false); Assert.assertEquals(4, manager.pendingTasks.size()); Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled @@ -303,8 +311,9 @@ public class TestShuffleVertexManager { Assert.assertEquals(1L, manager.completedSourceTasksOutputSize); //Second task in src1 completed with small payload + vmEvent = getVertexManagerEvent(null, 1L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); //small payload - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later Assert.assertTrue(manager.determineParallelismAndApply() == false); Assert.assertEquals(4, manager.pendingTasks.size()); @@ -314,17 +323,14 @@ public class TestShuffleVertexManager { Assert.assertEquals(2L, manager.completedSourceTasksOutputSize); //First task in src2 completed (with larger payload) to trigger determining parallelism - payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(1200L).build().toByteString() - .asReadOnlyByteBuffer(); - vmEvent = VertexManagerEvent.create("Vertex", payload); + vmEvent = getVertexManagerEvent(null, 1200L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertEquals(1, manager.pendingTasks.size()); Assert.assertEquals(1, scheduledTasks.size()); Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); @@ -336,14 +342,11 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40); scheduledTasks.clear(); - payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(100L).build().toByteString() - .asReadOnlyByteBuffer(); - vmEvent = VertexManagerEvent.create("Vertex", payload); + vmEvent = getVertexManagerEvent(null, 100L, "Vertex"); //min/max fraction of 0.0/0.2 manager = createManager(conf, mockContext, 0.0f, 0.2f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); @@ -353,13 +356,17 @@ public class TestShuffleVertexManager { //send 7 events with payload size as 100 for(int i=0;i<7;i++) { manager.onVertexManagerEventReceived(vmEvent); //small payload - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i)); //should not change parallelism verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap()); } //send 8th event with payload size as 100 manager.onVertexManagerEventReceived(vmEvent); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(8)); + + //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); + + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8)); //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap()); @@ -370,38 +377,39 @@ public class TestShuffleVertexManager { // parallelism changed due to small data size scheduledTasks.clear(); - payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer(); - vmEvent = VertexManagerEvent.create("Vertex", payload); manager = createManager(conf, mockContext, 0.5f, 0.5f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); + vmEvent = getVertexManagerEvent(null, 500L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertEquals(4, manager.pendingTasks.size()); Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(1, manager.numVertexManagerEventsReceived); Assert.assertEquals(500L, manager.completedSourceTasksOutputSize); // ignore duplicate completion - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertEquals(4, manager.pendingTasks.size()); Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(500L, manager.completedSourceTasksOutputSize); - + vmEvent = getVertexManagerEvent(null, 500L, "Vertex"); manager.onVertexManagerEventReceived(vmEvent); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test + when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2); + + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); // managedVertex tasks reduced verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); @@ -415,7 +423,7 @@ public class TestShuffleVertexManager { Assert.assertEquals(1000L, manager.completedSourceTasksOutputSize); // more completions dont cause recalculation of parallelism - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); @@ -490,7 +498,7 @@ public class TestShuffleVertexManager { mockInputVertices.put(mockSrcVertexId3, eProp3); try { manager = createManager(conf, mockContext, 0.1f, 0.1f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertFalse(true); } catch (TezUncheckedException e) { Assert.assertTrue(e.getMessage().contains( @@ -502,7 +510,7 @@ public class TestShuffleVertexManager { // check initialization manager = createManager(conf, mockContext, 0.1f, 0.1f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 2); final HashSet<Integer> scheduledTasks = new HashSet<Integer>(); @@ -510,17 +518,17 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); scheduledTasks.clear(); - List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0]; - for (TaskWithLocationHint task : tasks) { + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { scheduledTasks.add(task.getTaskIndex()); } return null; - }}).when(mockContext).scheduleVertexTasks(anyList()); + }}).when(mockContext).scheduleTasks(anyList()); // source vertices have 0 tasks. immediate start of all managed tasks when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled @@ -560,7 +568,7 @@ public class TestShuffleVertexManager { scheduledTasks.clear(); manager = createManager(conf, mockContext, 0.8f, null); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); @@ -573,7 +581,7 @@ public class TestShuffleVertexManager { // Finish all tasks before exceeding the threshold for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) { for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) { - manager.onSourceTaskCompleted(mockSrcVertex, i); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i)); ++completedTasks; if ((completedTasks + 1) >= completedTasksThreshold) { // stop before completing more than min/max source tasks @@ -586,7 +594,7 @@ public class TestShuffleVertexManager { Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled // Cross the threshold min/max threshold to schedule all tasks - manager.onSourceTaskCompleted(mockSrcVertexId2, completedTasks); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks)); Assert.assertEquals(0, manager.pendingTasks.size()); Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled @@ -595,8 +603,8 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); // source vertex have some tasks. min, max == 0 - manager = createManager(conf, mockContext, 0.0f, 0.0f); - manager.onVertexStarted(null); + manager = createManager(conf, mockContext, 0.f, 0.f); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.totalTasksToSchedule == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); @@ -609,122 +617,122 @@ public class TestShuffleVertexManager { // min, max > 0 and min == max manager = createManager(conf, mockContext, 0.25f, 0.25f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); // min, max > 0 and min == max == absolute max 1.0 manager = createManager(conf, mockContext, 1.0f, 1.0f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); // min, max > 0 and min == max manager = createManager(conf, mockContext, 1.0f, 1.0f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); // task completion from non-bipartite stage does nothing - manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); // min, max > and min < max manager = createManager(conf, mockContext, 0.25f, 0.75f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.size() == 2); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); // completion of same task again should not get counted - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.size() == 2); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 0); Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); scheduledTasks.clear(); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); // we are done. no action + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action Assert.assertTrue(manager.pendingTasks.size() == 0); Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); // min, max > and min < max manager = createManager(conf, mockContext, 0.25f, 1.0f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0)); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); Assert.assertTrue(manager.pendingTasks.size() == 2); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 1); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); - manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 0); Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); @@ -734,9 +742,10 @@ public class TestShuffleVertexManager { /** * Tasks should be scheduled only when all source vertices are configured completely + * @throws IOException */ @Test(timeout = 5000) - public void test_Tez1649_with_scatter_gather_edges() { + public void test_Tez1649_with_scatter_gather_edges() throws IOException { Configuration conf = new Configuration(); conf.setBoolean( ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, @@ -780,10 +789,7 @@ public class TestShuffleVertexManager { when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3); when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3); - ByteBuffer payload = - VertexManagerEventPayloadProto.newBuilder().setOutputSize(50L).build().toByteString().asReadOnlyByteBuffer(); - VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload); - + VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex"); // check initialization manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); @@ -792,14 +798,14 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); scheduledTasks.clear(); - List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0]; - for (TaskWithLocationHint task : tasks) { + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { scheduledTasks.add(task.getTaskIndex()); } return null; - }}).when(mockContext_R2).scheduleVertexTasks(anyList()); + }}).when(mockContext_R2).scheduleTasks(anyList()); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 3); manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); @@ -813,15 +819,15 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); //Send events for all tasks of m3. - manager.onSourceTaskCompleted(m3, new Integer(0)); - manager.onSourceTaskCompleted(m3, new Integer(1)); - manager.onSourceTaskCompleted(m3, new Integer(2)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); //Send an event for m2. But still we need to wait for at least 1 event from r1. - manager.onSourceTaskCompleted(m2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); @@ -846,7 +852,7 @@ public class TestShuffleVertexManager { when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3); manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); @@ -856,11 +862,42 @@ public class TestShuffleVertexManager { // Only need completed configuration notification from m3 manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(m3, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3); } + VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName) + throws IOException { + ByteBuffer payload = null; + if (sizes != null) { + /* + RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes); + DataOutputBuffer dout = new DataOutputBuffer(); + partitionStats.serialize(dout); + ByteString + partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData()); + payload = + VertexManagerEventPayloadProto.newBuilder() + .setOutputSize(totalSize) + .setPartitionStats(partitionStatsBytes) + .build().toByteString() + .asReadOnlyByteBuffer(); + */ + } else { + payload = + VertexManagerEventPayloadProto.newBuilder() + .setOutputSize(totalSize) + .build().toByteString() + .asReadOnlyByteBuffer(); + } + TaskAttemptIdentifierImpl taId = new TaskAttemptIdentifierImpl("dag", vertexName, + TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, taskId++), 0)); + VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload); + vmEvent.setProducerAttemptIdentifier(taId); + return vmEvent; + } + @Test(timeout = 5000) public void test_Tez1649_with_mixed_edges() { Configuration conf = new Configuration(); @@ -913,16 +950,16 @@ public class TestShuffleVertexManager { public Object answer(InvocationOnMock invocation) { Object[] args = invocation.getArguments(); scheduledTasks.clear(); - List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0]; - for (TaskWithLocationHint task : tasks) { + List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0]; + for (ScheduleTaskRequest task : tasks) { scheduledTasks.add(task.getTaskIndex()); } return null; - }}).when(mockContext).scheduleVertexTasks(anyList()); + }}).when(mockContext).scheduleTasks(anyList()); // check initialization manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 1); manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); @@ -933,13 +970,13 @@ public class TestShuffleVertexManager { Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); //Send events for 2 tasks of r1. - manager.onSourceTaskCompleted(r1, new Integer(0)); - manager.onSourceTaskCompleted(r1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); //Send an event for m2. - manager.onSourceTaskCompleted(m2, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); @@ -952,7 +989,7 @@ public class TestShuffleVertexManager { //Still, wait for a configuration to be completed from other edges scheduledTasks.clear(); manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); @@ -964,9 +1001,9 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); - manager.onSourceTaskCompleted(r1, new Integer(0)); - manager.onSourceTaskCompleted(r1, new Integer(1)); - manager.onSourceTaskCompleted(r1, new Integer(2)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 2)); //Tasks from non-scatter edges of m2 and m3 are not complete. Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); @@ -979,7 +1016,7 @@ public class TestShuffleVertexManager { //try with a zero task vertex (with non-scatter-gather edges) scheduledTasks.clear(); manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); @@ -988,7 +1025,7 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(m3)).thenReturn(3); //broadcast manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled @@ -996,8 +1033,8 @@ public class TestShuffleVertexManager { Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); //Send 2 events for tasks of r1. - manager.onSourceTaskCompleted(r1, new Integer(0)); - manager.onSourceTaskCompleted(r1, new Integer(1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(scheduledTasks.size() == 0); @@ -1009,7 +1046,7 @@ public class TestShuffleVertexManager { //try with all zero task vertices in non-SG edges scheduledTasks.clear(); manager = createManager(conf, mockContext, 0.001f, 0.001f); - manager.onVertexStarted(null); + manager.onVertexStarted(emptyCompletions); when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); @@ -1019,10 +1056,22 @@ public class TestShuffleVertexManager { //Send 1 events for tasks of r1. manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); - manager.onSourceTaskCompleted(r1, new Integer(0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3); } + + public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) { + VertexIdentifier mockVertex = mock(VertexIdentifier.class); + when(mockVertex.getName()).thenReturn(vName); + TaskIdentifier mockTask = mock(TaskIdentifier.class); + when(mockTask.getIdentifier()).thenReturn(tId); + when(mockTask.getVertexIdentifier()).thenReturn(mockVertex); + TaskAttemptIdentifier mockAttempt = mock(TaskAttemptIdentifier.class); + when(mockAttempt.getIdentifier()).thenReturn(0); + when(mockAttempt.getTaskIdentifier()).thenReturn(mockTask); + return mockAttempt; + } private ShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, Float min, Float max) { http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java index 7d7069e..04b0a03 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java @@ -24,7 +24,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; -import java.util.Map; import java.util.Random; import org.slf4j.Logger; @@ -56,7 +55,7 @@ import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; @@ -71,6 +70,7 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.processor.SimpleProcessor; import org.junit.After; @@ -523,8 +523,8 @@ public class TestAMRecovery { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { - super.onSourceTaskCompleted(srcVertexName, taskId); + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { + super.onSourceTaskCompleted(attempt); completedTaskNum ++; if (getContext().getDAGAttemptNumber() == 1) { if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) { @@ -532,7 +532,8 @@ public class TestAMRecovery { System.exit(-1); } } else { - if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) { + if (completedTaskNum == getContext(). + getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) { System.exit(-1); } } @@ -562,8 +563,8 @@ public class TestAMRecovery { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { - super.onSourceTaskCompleted(srcVertexName, taskId); + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { + super.onSourceTaskCompleted(attempt); completedTaskNum ++; if (getContext().getDAGAttemptNumber() == 1) { if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) { @@ -571,7 +572,8 @@ public class TestAMRecovery { System.exit(-1); } } else { - if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) { + if (completedTaskNum == getContext(). + getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) { System.exit(-1); } } @@ -602,8 +604,8 @@ public class TestAMRecovery { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { - super.onSourceTaskCompleted(srcVertexName, taskId); + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { + super.onSourceTaskCompleted(attempt); completedTaskNum ++; if (getContext().getDAGAttemptNumber() == 1) { if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) { @@ -611,7 +613,8 @@ public class TestAMRecovery { System.exit(-1); } } else { - if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) { + if (completedTaskNum == getContext(). + getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) { System.exit(-1); } } @@ -643,26 +646,26 @@ public class TestAMRecovery { } @Override - public void onVertexStarted(Map<String, List<Integer>> completions) + public void onVertexStarted(List<TaskAttemptIdentifier> completions) throws Exception { if (getContext().getDAGAttemptNumber() == 1) { // only schedule one task if it is partiallyFinished case if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) { - getContext().scheduleVertexTasks(Lists.newArrayList(new TaskWithLocationHint(0, null))); + getContext().scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null))); return ; } } // schedule all tasks when it is not partiallyFinished int taskNum = getContext().getVertexNumTasks(getContext().getVertexName()); - List<TaskWithLocationHint> taskWithLocationHints = new ArrayList<TaskWithLocationHint>(); + List<ScheduleTaskRequest> taskWithLocationHints = new ArrayList<ScheduleTaskRequest>(); for (int i=0;i<taskNum;++i) { - taskWithLocationHints.add(new TaskWithLocationHint(i, null)); + taskWithLocationHints.add(ScheduleTaskRequest.create(i, null)); } - getContext().scheduleVertexTasks(taskWithLocationHints); + getContext().scheduleTasks(taskWithLocationHints); } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) throws Exception { } @@ -704,9 +707,9 @@ public class TestAMRecovery { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { int curAttempt = getContext().getDAGAttemptNumber(); - super.onSourceTaskCompleted(srcVertexName, taskId); + super.onSourceTaskCompleted(attempt); int failOnAttempt = conf.getInt(FAIL_ON_ATTEMPT, 1); LOG.info("failOnAttempt:" + failOnAttempt); LOG.info("curAttempt:" + curAttempt); http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java index caf0822..7d88fdf 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java @@ -79,6 +79,7 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.Writer; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; @@ -696,7 +697,7 @@ public class TestExceptionPropagation { } @Override - public void onVertexStarted(Map<String, List<Integer>> completions) { + public void onVertexStarted(List<TaskAttemptIdentifier> completions) { if (this.exLocation == ExceptionLocation.VM_ON_VERTEX_STARTED) { throw new RuntimeException(this.exLocation.name()); } @@ -739,11 +740,11 @@ public class TestExceptionPropagation { } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) { + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { if (this.exLocation == ExceptionLocation.VM_ON_SOURCETASK_COMPLETED) { throw new RuntimeException(this.exLocation.name()); } - super.onSourceTaskCompleted(srcVertexName, attemptId); + super.onSourceTaskCompleted(attempt); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java index 7244d8d..5c6f855 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java +++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java @@ -40,7 +40,7 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexManagerPlugin; import org.apache.tez.dag.api.VertexManagerPluginContext; import org.apache.tez.dag.api.VertexManagerPluginDescriptor; -import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest; import org.apache.tez.dag.api.client.VertexStatus.State; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.AbstractLogicalOutput; @@ -49,6 +49,7 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.OutputCommitterContext; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TaskAttemptIdentifier; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.api.InputInitializer; @@ -57,15 +58,12 @@ import org.apache.tez.runtime.api.Writer; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; -import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.test.TestInput; import org.apache.tez.test.TestOutput; import org.apache.tez.test.TestProcessor; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; public class MultiAttemptDAG { @@ -102,14 +100,11 @@ public class MultiAttemptDAG { } @Override - public void onVertexStarted(Map<String, List<Integer>> completions) { + public void onVertexStarted(List<TaskAttemptIdentifier> completions) { if (completions != null) { - for (Entry<String, List<Integer>> entry : completions.entrySet()) { - LOG.info("Received completion events on vertexStarted" - + ", vertex=" + entry.getKey() - + ", completions=" + entry.getValue().size()); - numCompletions.addAndGet(entry.getValue().size()); - } + LOG.info("Received completion events on vertexStarted" + + ", completions=" + completions.size()); + numCompletions.addAndGet(completions.size()); } maybeScheduleTasks(); } @@ -129,20 +124,20 @@ public class MultiAttemptDAG { } else if (successAttemptId == getContext().getDAGAttemptNumber()) { LOG.info("Scheduling tasks for vertex=" + getContext().getVertexName()); int numTasks = getContext().getVertexNumTasks(getContext().getVertexName()); - List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks); + List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasks); for (int i=0; i<numTasks; ++i) { - scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null)); + scheduledTasks.add(ScheduleTaskRequest.create(i, null)); } - getContext().scheduleVertexTasks(scheduledTasks); + getContext().scheduleTasks(scheduledTasks); } } } @Override - public void onSourceTaskCompleted(String srcVertexName, Integer taskId) { + public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { LOG.info("Received completion events for source task" - + ", vertex=" + srcVertexName - + ", taskIdx=" + taskId); + + ", vertex=" + attempt.getTaskIdentifier().getVertexIdentifier().getName() + + ", taskIdx=" + attempt.getTaskIdentifier().getIdentifier()); numCompletions.incrementAndGet(); maybeScheduleTasks(); }
