Repository: tez Updated Branches: refs/heads/master 5af06047f -> 0dd68ad1c
TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce and slow start (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0dd68ad1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0dd68ad1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0dd68ad1 Branch: refs/heads/master Commit: 0dd68ad1c92eca98a98b8bd4a0c54b8656573e8d Parents: 5af0604 Author: Bikas Saha <[email protected]> Authored: Wed Dec 9 17:00:17 2015 -0800 Committer: Bikas Saha <[email protected]> Committed: Wed Dec 9 17:00:17 2015 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../vertexmanager/ShuffleVertexManager.java | 142 +++++++------ .../vertexmanager/TestShuffleVertexManager.java | 199 +++++++++++-------- 3 files changed, 209 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/0dd68ad1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9031566..cfa93bf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,8 @@ INCOMPATIBLE CHANGES TEZ-2949. Allow duplicate dag names within session for Tez. ALL CHANGES: + TEZ-2943. Change shuffle vertex manager to use per vertex data for auto + reduce and slow start TEZ-2346. TEZ-UI: Lazy load other info / counter data TEZ-2975. Bump up apache commons dependency. TEZ-2970. Re-localization in TezChild does not use correct UGI. http://git-wip-us.apache.org/repos/asf/tez/blob/0dd68ad1/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index f10c89a..c88c7a2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -168,12 +168,21 @@ public class ShuffleVertexManager extends VertexManagerPlugin { EdgeProperty edgeProperty; boolean vertexIsConfigured; BitSet finishedTaskSet; + int numTasks; + int numVMEventsReceived; + long outputSize; SourceVertexInfo(EdgeProperty edgeProperty) { this.edgeProperty = edgeProperty; - if (edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) { - finishedTaskSet = new BitSet(); - } + finishedTaskSet = new BitSet(); + } + + int getNumTasks() { + return numTasks; + } + + int getNumCompletedTasks() { + return finishedTaskSet.cardinality(); } } @@ -482,6 +491,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) { srcVertexInfo.put(entry.getKey(), new SourceVertexInfo(entry.getValue())); // TODO what if derived class has already called this + // register for status update from all source vertices getContext().registerForVertexStateUpdates(entry.getKey(), EnumSet.of(VertexState.CONFIGURED)); if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) { @@ -498,7 +508,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin { // track the tasks in this vertex updatePendingTasks(); - updateSourceTaskCount(); LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() + " with " + totalNumBipartiteSourceTasks + " source tasks and " + @@ -519,19 +528,20 @@ public class ShuffleVertexManager extends VertexManagerPlugin { public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) { String srcVertexName = attempt.getTaskIdentifier().getVertexIdentifier().getName(); int srcTaskId = attempt.getTaskIdentifier().getIdentifier(); - updateSourceTaskCount(); SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName); - - if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) { - //handle duplicate events for bipartite sources - BitSet completedSourceTasks = srcInfo.finishedTaskSet; - if (completedSourceTasks != null) { - // duplicate notifications tracking - if (!completedSourceTasks.get(srcTaskId)) { - completedSourceTasks.set(srcTaskId); - // source task has completed - ++numBipartiteSourceTasksCompleted; - } + if (srcInfo.vertexIsConfigured) { + Preconditions.checkState(srcTaskId < srcInfo.numTasks, + "Received completion for srcTaskId " + srcTaskId + " but Vertex: " + srcVertexName + + " has only " + srcInfo.numTasks + " tasks"); + } + //handle duplicate events and count task completions from all source vertices + BitSet completedSourceTasks = srcInfo.finishedTaskSet; + // duplicate notifications tracking + if (!completedSourceTasks.get(srcTaskId)) { + completedSourceTasks.set(srcTaskId); + // source task has completed + if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) { + numBipartiteSourceTasksCompleted++; } } schedulePendingTasks(); @@ -564,7 +574,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin { LOG.info("Ignoring vertex manager event from: " + producerTask); return; } - + + String vName = producerTask.getVertexIdentifier().getName(); + SourceVertexInfo srcInfo = srcVertexInfo.get(vName); + Preconditions.checkState(srcInfo != null, "Unknown vmEvent from " + producerTask); + numVertexManagerEventsReceived++; long sourceTaskOutputSize = 0; @@ -591,12 +605,17 @@ public class ShuffleVertexManager extends VertexManagerPlugin { throw new TezUncheckedException(e); } } + srcInfo.numVMEventsReceived++; + srcInfo.outputSize += sourceTaskOutputSize; completedSourceTasksOutputSize += sourceTaskOutputSize; } - + if (LOG.isDebugEnabled()) { - LOG.debug("Received info of output size: " + sourceTaskOutputSize - + " numInfoReceived: " + numVertexManagerEventsReceived + LOG.debug("For attempt: " + vmEvent.getProducerAttemptIdentifier() + + " received info of output size: " + sourceTaskOutputSize + + " vertex numEventsReceived: " + srcInfo.numVMEventsReceived + + " vertex output size: " + srcInfo.outputSize + + " total numEventsReceived: " + numVertexManagerEventsReceived + " total output size: " + completedSourceTasksOutputSize); } } @@ -613,7 +632,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } totalTasksToSchedule = pendingTasks.size(); if (stats == null) { - stats = new long[totalTasksToSchedule]; + stats = new long[totalTasksToSchedule]; // TODO lost previous data } } @@ -625,22 +644,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin { }); } - void updateSourceTaskCount() { - // track source vertices - int numSrcTasks = 0; - Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr = getBipartiteInfo(); - for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) { - numSrcTasks += getContext().getVertexNumTasks(entry.getKey()); - } - totalNumBipartiteSourceTasks = numSrcTasks; - } - /** * Compute optimal parallelism needed for the job * @return true (if parallelism is determined), false otherwise */ @VisibleForTesting - boolean determineParallelismAndApply() { + boolean determineParallelismAndApply(float minSourceVertexCompletedTaskFraction) { if(numVertexManagerEventsReceived == 0) { if (totalNumBipartiteSourceTasks > 0) { return true; @@ -656,21 +665,28 @@ public class ShuffleVertexManager extends VertexManagerPlugin { */ boolean canDetermineParallelismLater = (completedSourceTasksOutputSize < desiredTaskInputDataSize) - && (numBipartiteSourceTasksCompleted < (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction)); + && (minSourceVertexCompletedTaskFraction < slowStartMaxSrcCompletionFraction); if (canDetermineParallelismLater) { LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName() + ", totalNumBipartiteSourceTasks=" + totalNumBipartiteSourceTasks + ", completedSourceTasksOutputSize=" + completedSourceTasksOutputSize + ", numVertexManagerEventsReceived=" + numVertexManagerEventsReceived - + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted + ", maxThreshold=" - + (totalNumBipartiteSourceTasks * slowStartMaxSrcCompletionFraction)); + + ", numBipartiteSourceTasksCompleted=" + numBipartiteSourceTasksCompleted + + ", minSourceVertexCompletedTaskFraction=" + minSourceVertexCompletedTaskFraction); return false; } + // Change this to use per partition stats for more accuracy TEZ-2962. + // Instead of aggregating overall size and then dividing equally - coalesce partitions until + // desired per partition size is achieved. long expectedTotalSourceTasksOutputSize = 0; - if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0 ) { - expectedTotalSourceTasksOutputSize = - (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) / numVertexManagerEventsReceived; + for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) { + SourceVertexInfo srcInfo = vInfo.getValue(); + if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) { + // this assumes that 1 vmEvent is received per completed task - TEZ-2961 + expectedTotalSourceTasksOutputSize += + (srcInfo.numTasks * srcInfo.outputSize) / srcInfo.numVMEventsReceived; + } } int desiredTaskParallelism = @@ -755,7 +771,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } } - void schedulePendingTasks(int numTasksToSchedule) { + void schedulePendingTasks(int numTasksToSchedule, float minSourceVertexCompletedTaskFraction) { // determine parallelism before scheduling the first time // this is the latest we can wait before determining parallelism. // currently this depends on task completion and so this is the best time @@ -764,7 +780,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { // calculating parallelism or change parallelism while tasks are already // running then we can create other parameters to trigger this calculation. if(enableAutoParallelism && !parallelismDetermined) { - parallelismDetermined = determineParallelismAndApply(); + parallelismDetermined = determineParallelismAndApply(minSourceVertexCompletedTaskFraction); if (!parallelismDetermined) { //try to determine parallelism later when more info is available. return; @@ -851,10 +867,9 @@ public class ShuffleVertexManager extends VertexManagerPlugin { */ boolean canScheduleTasks() { for(Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) { - String sourceVertex = entry.getKey(); - int numSourceTasks = getContext().getVertexNumTasks(sourceVertex); - if (numSourceTasks > 0 && !entry.getValue().vertexIsConfigured) { - // vertex not configured + // need to check for vertex configured because until that we dont know if numTasks==0 is valid + if (!entry.getValue().vertexIsConfigured) { // isConfigured + // vertex not scheduled tasks if (LOG.isDebugEnabled()) { LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: " + getContext().getVertexName()); @@ -888,29 +903,38 @@ public class ShuffleVertexManager extends VertexManagerPlugin { LOG.info("All source tasks assigned. " + "Ramping up " + numPendingTasks + " remaining tasks for vertex: " + getContext().getVertexName()); - schedulePendingTasks(numPendingTasks); + schedulePendingTasks(numPendingTasks, 1); return; } - float completedSourceTaskFraction = 0f; - if (totalNumBipartiteSourceTasks != 0) { // support for 0 source tasks - completedSourceTaskFraction = (float) numBipartiteSourceTasksCompleted / totalNumBipartiteSourceTasks; - } else { - completedSourceTaskFraction = 1; + float minSourceVertexCompletedTaskFraction = 1f; + String minCompletedVertexName = ""; + for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) { + SourceVertexInfo srcInfo = vInfo.getValue(); + // canScheduleTasks check has already verified all sources are configured + Preconditions.checkState(srcInfo.vertexIsConfigured, "Vertex: " + vInfo.getKey()); + if (srcInfo.numTasks > 0) { + int numCompletedTasks = srcInfo.getNumCompletedTasks(); + float completedFraction = (float) numCompletedTasks / srcInfo.numTasks; + if (minSourceVertexCompletedTaskFraction > completedFraction) { + minSourceVertexCompletedTaskFraction = completedFraction; + minCompletedVertexName = vInfo.getKey(); + } + } } // start scheduling when source tasks completed fraction is more than min. // linearly increase the number of scheduled tasks such that all tasks are // scheduled when source tasks completed fraction reaches max - float tasksFractionToSchedule = 1; + float tasksFractionToSchedule = 1; float percentRange = slowStartMaxSrcCompletionFraction - slowStartMinSrcCompletionFraction; if (percentRange > 0) { tasksFractionToSchedule = - (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/ + (minSourceVertexCompletedTaskFraction - slowStartMinSrcCompletionFraction)/ percentRange; } else { // min and max are equal. schedule 100% on reaching min - if(completedSourceTaskFraction < slowStartMinSrcCompletionFraction) { + if(minSourceVertexCompletedTaskFraction < slowStartMinSrcCompletionFraction) { tasksFractionToSchedule = 0; } } @@ -928,10 +952,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin { getContext().getVertexName() + " with totalTasks: " + totalTasksToSchedule + ". " + numBipartiteSourceTasksCompleted + " source tasks completed out of " + totalNumBipartiteSourceTasks + - ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + + ". MinSourceTaskCompletedFraction: " + minSourceVertexCompletedTaskFraction + + " in Vertex: " + minCompletedVertexName + " min: " + slowStartMinSrcCompletionFraction + " max: " + slowStartMaxSrcCompletionFraction); - schedulePendingTasks(numTasksToSchedule); + schedulePendingTasks(numTasksToSchedule, minSourceVertexCompletedTaskFraction); } } @@ -1002,8 +1027,13 @@ public class ShuffleVertexManager extends VertexManagerPlugin { SourceVertexInfo vInfo = srcVertexInfo.get(stateUpdate.getVertexName()); Preconditions.checkState(vInfo.vertexIsConfigured == false); vInfo.vertexIsConfigured = true; + vInfo.numTasks = getContext().getVertexNumTasks(stateUpdate.getVertexName()); + if (vInfo.edgeProperty.getDataMovementType() == DataMovementType.SCATTER_GATHER) { + totalNumBipartiteSourceTasks += vInfo.numTasks; + } LOG.info("Received configured notification : " + stateUpdate.getVertexState() + " for vertex: " - + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName()); + + stateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName() + + " numBipartiteSourceTasks: " + totalNumBipartiteSourceTasks); schedulePendingTasks(); } http://git-wip-us.apache.org/repos/asf/tez/blob/0dd68ad1/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 862e4df..9d53ebc 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -242,10 +242,12 @@ public class TestShuffleVertexManager { // check waiting for notification before scheduling Assert.assertFalse(manager.pendingTasks.isEmpty()); - // source vertices have 0 tasks. so only 1 notification needed. triggers scheduling + // source vertices have 0 tasks. triggers scheduling + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.isEmpty()); - verify(mockContext, times(1)).reconfigureVertex(anyInt(), any + verify(mockContext, times(1)).reconfigureVertex(eq(1), any (VertexLocationHint.class), anyMap()); verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and parallelism changed @@ -258,13 +260,15 @@ public class TestShuffleVertexManager { verify(mockContext, times(3)).vertexReconfigurationPlanned(); // source vertices have 0 tasks. so only 1 notification needed. does not trigger scheduling // normally this event will not come before onVertexStarted() is called + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled // trigger start and processing of pending notification events manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.bipartiteSources == 2); - verify(mockContext, times(2)).reconfigureVertex(anyInt(), any + verify(mockContext, times(2)).reconfigureVertex(eq(1), any (VertexLocationHint.class), anyMap()); verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done Assert.assertTrue(manager.pendingTasks.isEmpty()); @@ -275,30 +279,31 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); - VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex"); + VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, mockSrcVertexId1); // parallelism not change due to large data size manager = createManager(conf, mockContext, 0.1f, 0.1f); verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); manager.onVertexManagerEventReceived(vmEvent); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); verify(mockContext, times(2)).reconfigureVertex(anyInt(), any (VertexLocationHint.class), anyMap()); verify(mockContext, times(2)).doneReconfiguringVertex(); // trigger scheduling manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); verify(mockContext, times(2)).reconfigureVertex(anyInt(), any (VertexLocationHint.class), anyMap()); verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled Assert.assertEquals(4, scheduledTasks.size()); // TODO TEZ-1714 locking verify(mockContext, times(2)).vertexManagerDone(); // notified after scheduling all tasks - Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); + Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize); /** @@ -313,11 +318,10 @@ public class TestShuffleVertexManager { manager = createManager(conf, mockContext, 0.01f, 0.75f); manager.onVertexStarted(emptyCompletions); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); TezTaskAttemptID taId1 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0"); - vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId1)); + vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1)); manager.onVertexManagerEventReceived(vmEvent); Assert.assertEquals(1, manager.numVertexManagerEventsReceived); @@ -329,7 +333,7 @@ public class TestShuffleVertexManager { // sending again from a different version of the same task has not impact TezTaskAttemptID taId2 = TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1"); - vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "vertex", taId2)); + vmEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2)); manager.onVertexManagerEventReceived(vmEvent); Assert.assertEquals(1, manager.numVertexManagerEventsReceived); @@ -348,41 +352,42 @@ public class TestShuffleVertexManager { //min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself. manager = createManager(conf, mockContext, 0.01f, 0.75f); manager.onVertexStarted(emptyCompletions); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); //First task in src1 completed with small payload - vmEvent = getVertexManagerEvent(null, 1L, "Vertex"); + vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId1); manager.onVertexManagerEventReceived(vmEvent); //small payload manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); - Assert.assertTrue(manager.determineParallelismAndApply() == false); + Assert.assertTrue(manager.determineParallelismAndApply(0f) == false); Assert.assertEquals(4, manager.pendingTasks.size()); Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(1, manager.numVertexManagerEventsReceived); Assert.assertEquals(1L, manager.completedSourceTasksOutputSize); - //Second task in src1 completed with small payload - vmEvent = getVertexManagerEvent(null, 1L, "Vertex"); + //First task in src2 completed with small payload + vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId2); manager.onVertexManagerEventReceived(vmEvent); //small payload - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); //Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later - Assert.assertTrue(manager.determineParallelismAndApply() == false); + Assert.assertTrue(manager.determineParallelismAndApply(0.25f) == false); Assert.assertEquals(4, manager.pendingTasks.size()); Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled - Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); + Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(2, manager.numVertexManagerEventsReceived); Assert.assertEquals(2L, manager.completedSourceTasksOutputSize); //First task in src2 completed (with larger payload) to trigger determining parallelism - vmEvent = getVertexManagerEvent(null, 1200L, "Vertex"); + vmEvent = getVertexManagerEvent(null, 1200L, mockSrcVertexId2); manager.onVertexManagerEventReceived(vmEvent); - Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined + Assert.assertTrue(manager.determineParallelismAndApply(0.25f)); //ensure parallelism is determined + verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); - manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertEquals(1, manager.pendingTasks.size()); Assert.assertEquals(1, scheduledTasks.size()); @@ -395,10 +400,11 @@ public class TestShuffleVertexManager { when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40); scheduledTasks.clear(); - vmEvent = getVertexManagerEvent(null, 100L, "Vertex"); //min/max fraction of 0.0/0.2 manager = createManager(conf, mockContext, 0.0f, 0.2f); + // initial invocation count == 3 + verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); manager.onVertexStarted(emptyCompletions); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); @@ -407,22 +413,25 @@ public class TestShuffleVertexManager { Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); //send 7 events with payload size as 100 - for(int i=0;i<7;i++) { - manager.onVertexManagerEventReceived(vmEvent); //small payload + for(int i=0;i<8;i++) { + //small payload - create new event each time or it will be ignored (from same task) + manager.onVertexManagerEventReceived(getVertexManagerEvent(null, 100L, mockSrcVertexId1)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i)); //should not change parallelism - verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); + } + for(int i=0;i<3;i++) { + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, i)); + verify(mockContext, times(3)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); } - //send 8th event with payload size as 100 - manager.onVertexManagerEventReceived(vmEvent); - //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); - - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8)); //Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism - verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class), - anyMap()); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8)); + // parallelism updated + verify(mockContext, times(4)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); + // check exact update value - 8 events with 100 each => 20 -> 2000 => 2 tasks (with 1000 per task) + verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); //reset context for next test when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2); @@ -444,7 +453,7 @@ public class TestShuffleVertexManager { Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - vmEvent = getVertexManagerEvent(null, 500L, "Vertex"); + vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId1); manager.onVertexManagerEventReceived(vmEvent); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertEquals(4, manager.pendingTasks.size()); @@ -458,14 +467,15 @@ public class TestShuffleVertexManager { Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted); Assert.assertEquals(500L, manager.completedSourceTasksOutputSize); - vmEvent = getVertexManagerEvent(null, 500L, "Vertex"); + vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId2); manager.onVertexManagerEventReceived(vmEvent); //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); // managedVertex tasks reduced - verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(3)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); // TODO improve tests for parallelism Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled @@ -478,7 +488,7 @@ public class TestShuffleVertexManager { // more completions dont cause recalculation of parallelism manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); - verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap()); + verify(mockContext, times(5)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); Assert.assertEquals(2, newEdgeManagers.size()); EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next(); @@ -548,7 +558,8 @@ public class TestShuffleVertexManager { when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices); when(mockContext.getVertexName()).thenReturn(mockManagedVertexId); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); - + when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1); + // fail if there is no bipartite src vertex mockInputVertices.put(mockSrcVertexId3, eProp3); try { @@ -583,6 +594,9 @@ public class TestShuffleVertexManager { // source vertices have 0 tasks. immediate start of all managed tasks when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0); when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); manager.onVertexStarted(emptyCompletions); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled @@ -618,8 +632,9 @@ public class TestShuffleVertexManager { } // source vertex have some tasks. min > default and max undefined - when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20); - when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20); + int numTasks = 20; + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(numTasks); + when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(numTasks); scheduledTasks.clear(); manager = createManager(conf, mockContext, 0.8f, null); @@ -627,18 +642,17 @@ public class TestShuffleVertexManager { manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); + Assert.assertEquals(3, manager.pendingTasks.size()); - Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks); + Assert.assertEquals(numTasks*2, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - - float completedTasksThreshold = 0.8f * manager.totalNumBipartiteSourceTasks; - int completedTasks = 0; + float completedTasksThreshold = 0.8f * numTasks; // Finish all tasks before exceeding the threshold for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) { for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) { - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i)); - ++completedTasks; - if ((completedTasks + 1) >= completedTasksThreshold) { + // complete 0th tasks outside the loop + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i+1)); + if ((i + 2) >= completedTasksThreshold) { // stop before completing more than min/max source tasks break; } @@ -649,7 +663,10 @@ public class TestShuffleVertexManager { Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled // Cross the threshold min/max threshold to schedule all tasks - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); + Assert.assertEquals(3, manager.pendingTasks.size()); + Assert.assertEquals(0, scheduledTasks.size()); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertEquals(0, manager.pendingTasks.size()); Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled @@ -660,13 +677,13 @@ public class TestShuffleVertexManager { // source vertex have some tasks. min, max == 0 manager = createManager(conf, mockContext, 0.0f, 0.0f); manager.onVertexStarted(emptyCompletions); - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.totalTasksToSchedule == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); // all source vertices need to be configured manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled @@ -683,10 +700,15 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0); + // task completion on only 1 SG edge does nothing + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); + Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); Assert.assertTrue(manager.pendingTasks.isEmpty()); Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1); + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); // min, max > 0 and min == max == absolute max 1.0 manager = createManager(conf, mockContext, 1.0f, 1.0f); @@ -742,6 +764,10 @@ public class TestShuffleVertexManager { Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); + // reset vertices for next test + when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(4); + when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(4); + // min, max > and min < max manager = createManager(conf, mockContext, 0.25f, 0.75f); manager.onVertexStarted(emptyCompletions); @@ -749,26 +775,30 @@ public class TestShuffleVertexManager { manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 2); - Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled + Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); // completion of same task again should not get counted manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); - Assert.assertTrue(manager.pendingTasks.size() == 2); - Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled + Assert.assertTrue(manager.pendingTasks.size() == 3); Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); + Assert.assertTrue(manager.pendingTasks.size() == 2); + Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2)); Assert.assertTrue(manager.pendingTasks.size() == 0); Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6); scheduledTasks.clear(); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); // we are done. no action Assert.assertTrue(manager.pendingTasks.size() == 0); Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7); // min, max > and min < max manager = createManager(conf, mockContext, 0.25f, 1.0f); @@ -777,20 +807,24 @@ public class TestShuffleVertexManager { manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED)); manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4); + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); Assert.assertTrue(manager.pendingTasks.size() == 2); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0)); + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2)); Assert.assertTrue(manager.pendingTasks.size() == 1); Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3); - manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 3)); Assert.assertTrue(manager.pendingTasks.size() == 0); Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled - Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4); + Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8); } @@ -844,7 +878,7 @@ public class TestShuffleVertexManager { when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3); when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3); - VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex"); + VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1); // check initialization manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); @@ -867,11 +901,11 @@ public class TestShuffleVertexManager { manager.onVertexManagerEventReceived(vmEvent); Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(9, manager.totalNumBipartiteSourceTasks); + Assert.assertEquals(6, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6); //Send events for all tasks of m3. manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); @@ -879,22 +913,29 @@ public class TestShuffleVertexManager { manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6); - //Send an event for m2. But still we need to wait for at least 1 event from r1. + //Send events for m2. But still we need to wait for at least 1 event from r1. manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0)); + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 1)); Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6); + + // we need to wait for at least 1 event from r1 to make sure all vertices cross min threshold + manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); + Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6); //Ensure that setVertexParallelism is not called for R2. verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap()); //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test - when(mockContext_R2.getVertexNumTasks("R2")).thenReturn(1); + when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(1); // complete configuration of r1 triggers the scheduling manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); + Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9); verify(mockContext_R2, times(1)).reconfigureVertex(eq(1), any(VertexLocationHint.class), anyMap()); @@ -913,14 +954,13 @@ public class TestShuffleVertexManager { manager = createManager(conf, mockContext_R2, 0.001f, 0.001f); manager.onVertexStarted(emptyCompletions); Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled - Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks); Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted); - Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled - Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); - // Only need completed configuration notification from m3 manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); + Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0)); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3); @@ -1032,12 +1072,12 @@ public class TestShuffleVertexManager { //Tasks should be scheduled in task 2, 0, 1 order long[] sizes = new long[]{(100 * 1000l * 1000l), (0l), (5000 * 1000l * 1000l)}; - VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1060000000, "R2"); + VertexManagerEvent vmEvent = getVertexManagerEvent(sizes, 1060000000, r1); manager.onVertexManagerEventReceived(vmEvent); //send VM event //stats from another vertex (more of empty stats) sizes = new long[]{(0l), (0l), (0l)}; - vmEvent = getVertexManagerEvent(sizes, 1060000000, "R2"); + vmEvent = getVertexManagerEvent(sizes, 1060000000, r1); manager.onVertexManagerEventReceived(vmEvent); //send VM event //Send an event for m2. @@ -1139,7 +1179,7 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3); - //Send an event for m2. + //Send an event for m3. manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3); @@ -1197,8 +1237,9 @@ public class TestShuffleVertexManager { Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled Assert.assertTrue(scheduledTasks.size() == 0); - // event from m3 triggers scheduling. no need for m2 since it has 0 tasks + // event from m3 triggers scheduling manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3); @@ -1215,6 +1256,8 @@ public class TestShuffleVertexManager { //Send 1 events for tasks of r1. manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED)); + manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED)); manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0)); Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled Assert.assertTrue(scheduledTasks.size() == 3);
