TEZ-2943. Change shuffle vertex manager to use per vertex data for auto reduce
and slow start (bikas)
(cherry picked from commit 0dd68ad1c92eca98a98b8bd4a0c54b8656573e8d)
Conflicts:
CHANGES.txt
tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1db5d187
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1db5d187
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1db5d187
Branch: refs/heads/branch-0.7
Commit: 1db5d18788a32bebc8bad3e1a125f27db7fbefc4
Parents: 4ff7930
Author: Bikas Saha <[email protected]>
Authored: Wed Dec 9 17:00:17 2015 -0800
Committer: Bikas Saha <[email protected]>
Committed: Fri Dec 11 18:21:58 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../vertexmanager/ShuffleVertexManager.java | 150 ++++++++------
.../vertexmanager/TestShuffleVertexManager.java | 193 ++++++++++++-------
3 files changed, 214 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/1db5d187/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd2749e..aced775 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2943. Change shuffle vertex manager to use per vertex data for auto
+ reduce and slow start
TEZ-2956. Handle auto-reduce parallelism when the
totalNumBipartiteSourceTasks is 0
TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts
http://git-wip-us.apache.org/repos/asf/tez/blob/1db5d187/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 9fb5d1e..01dc5a0 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -154,12 +154,21 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
EdgeProperty edgeProperty;
boolean vertexIsConfigured;
BitSet finishedTaskSet;
+ int numTasks;
+ int numVMEventsReceived;
+ long outputSize;
SourceVertexInfo(EdgeProperty edgeProperty) {
this.edgeProperty = edgeProperty;
- if (edgeProperty.getDataMovementType() ==
DataMovementType.SCATTER_GATHER) {
- finishedTaskSet = new BitSet();
- }
+ finishedTaskSet = new BitSet();
+ }
+
+ int getNumTasks() {
+ return numTasks;
+ }
+
+ int getNumCompletedTasks() {
+ return finishedTaskSet.cardinality();
}
}
@@ -453,6 +462,7 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) {
srcVertexInfo.put(entry.getKey(), new
SourceVertexInfo(entry.getValue()));
// TODO what if derived class has already called this
+ // register for status update from all source vertices
getContext().registerForVertexStateUpdates(entry.getKey(),
EnumSet.of(VertexState.CONFIGURED));
if (entry.getValue().getDataMovementType() ==
DataMovementType.SCATTER_GATHER) {
@@ -469,7 +479,6 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
// track the tasks in this vertex
updatePendingTasks();
- updateSourceTaskCount();
LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() +
" with " + totalNumBipartiteSourceTasks + " source tasks and " +
@@ -489,19 +498,20 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier
attempt) {
String srcVertexName =
attempt.getTaskIdentifier().getVertexIdentifier().getName();
int srcTaskId = attempt.getTaskIdentifier().getIdentifier();
- updateSourceTaskCount();
SourceVertexInfo srcInfo = srcVertexInfo.get(srcVertexName);
-
- if (srcInfo.edgeProperty.getDataMovementType() ==
DataMovementType.SCATTER_GATHER) {
- //handle duplicate events for bipartite sources
- BitSet completedSourceTasks = srcInfo.finishedTaskSet;
- if (completedSourceTasks != null) {
- // duplicate notifications tracking
- if (!completedSourceTasks.get(srcTaskId)) {
- completedSourceTasks.set(srcTaskId);
- // source task has completed
- ++numBipartiteSourceTasksCompleted;
- }
+ if (srcInfo.vertexIsConfigured) {
+ Preconditions.checkState(srcTaskId < srcInfo.numTasks,
+ "Received completion for srcTaskId " + srcTaskId + " but Vertex: " +
srcVertexName +
+ " has only " + srcInfo.numTasks + " tasks");
+ }
+ //handle duplicate events and count task completions from all source
vertices
+ BitSet completedSourceTasks = srcInfo.finishedTaskSet;
+ // duplicate notifications tracking
+ if (!completedSourceTasks.get(srcTaskId)) {
+ completedSourceTasks.set(srcTaskId);
+ // source task has completed
+ if (srcInfo.edgeProperty.getDataMovementType() ==
DataMovementType.SCATTER_GATHER) {
+ numBipartiteSourceTasksCompleted++;
}
}
schedulePendingTasks();
@@ -516,9 +526,14 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
LOG.info("Ignoring vertex manager event from: " + producerTask);
return;
}
-
+
+ String vName = producerTask.getVertexIdentifier().getName();
+ SourceVertexInfo srcInfo = srcVertexInfo.get(vName);
+ Preconditions.checkState(srcInfo != null, "Unknown vmEvent from " +
producerTask);
+
numVertexManagerEventsReceived++;
+ long sourceTaskOutputSize = 0;
if (vmEvent.getUserPayload() != null) {
// save output size
VertexManagerEventPayloadProto proto;
@@ -527,15 +542,21 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
} catch (InvalidProtocolBufferException e) {
throw new TezUncheckedException(e);
}
- long sourceTaskOutputSize = proto.getOutputSize();
+ sourceTaskOutputSize = proto.getOutputSize();
+
+ srcInfo.numVMEventsReceived++;
+ srcInfo.outputSize += sourceTaskOutputSize;
completedSourceTasksOutputSize += sourceTaskOutputSize;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received info of output size: " + sourceTaskOutputSize
- + " numInfoReceived: " + numVertexManagerEventsReceived
- + " total output size: " + completedSourceTasksOutputSize);
- }
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("For attempt: " + vmEvent.getProducerAttemptIdentifier()
+ + " received info of output size: " + sourceTaskOutputSize
+ + " vertex numEventsReceived: " + srcInfo.numVMEventsReceived
+ + " vertex output size: " + srcInfo.outputSize
+ + " total numEventsReceived: " + numVertexManagerEventsReceived
+ + " total output size: " + completedSourceTasksOutputSize);
+ }
}
void updatePendingTasks() {
@@ -554,22 +575,12 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
});
}
- void updateSourceTaskCount() {
- // track source vertices
- int numSrcTasks = 0;
- Iterable<Map.Entry<String, SourceVertexInfo>> bipartiteItr =
getBipartiteInfo();
- for(Map.Entry<String, SourceVertexInfo> entry : bipartiteItr) {
- numSrcTasks += getContext().getVertexNumTasks(entry.getKey());
- }
- totalNumBipartiteSourceTasks = numSrcTasks;
- }
-
/**
* Compute optimal parallelism needed for the job
* @return true (if parallelism is determined), false otherwise
*/
@VisibleForTesting
- boolean determineParallelismAndApply() {
+ boolean determineParallelismAndApply(float
minSourceVertexCompletedTaskFraction) {
if(numVertexManagerEventsReceived == 0) {
if (totalNumBipartiteSourceTasks > 0) {
return true;
@@ -585,21 +596,28 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
*/
boolean canDetermineParallelismLater = (completedSourceTasksOutputSize <
desiredTaskInputDataSize)
- && (numBipartiteSourceTasksCompleted < (totalNumBipartiteSourceTasks *
slowStartMaxSrcCompletionFraction));
+ && (minSourceVertexCompletedTaskFraction <
slowStartMaxSrcCompletionFraction);
if (canDetermineParallelismLater) {
LOG.info("Defer scheduling tasks; vertex=" + getContext().getVertexName()
+ ", totalNumBipartiteSourceTasks=" + totalNumBipartiteSourceTasks
+ ", completedSourceTasksOutputSize=" +
completedSourceTasksOutputSize
+ ", numVertexManagerEventsReceived=" +
numVertexManagerEventsReceived
- + ", numBipartiteSourceTasksCompleted=" +
numBipartiteSourceTasksCompleted + ", maxThreshold="
- + (totalNumBipartiteSourceTasks *
slowStartMaxSrcCompletionFraction));
+ + ", numBipartiteSourceTasksCompleted=" +
numBipartiteSourceTasksCompleted
+ + ", minSourceVertexCompletedTaskFraction=" +
minSourceVertexCompletedTaskFraction);
return false;
}
+ // Change this to use per partition stats for more accuracy TEZ-2962.
+ // Instead of aggregating overall size and then dividing equally -
coalesce partitions until
+ // desired per partition size is achieved.
long expectedTotalSourceTasksOutputSize = 0;
- if (numVertexManagerEventsReceived > 0 && totalNumBipartiteSourceTasks > 0
) {
- expectedTotalSourceTasksOutputSize =
- (totalNumBipartiteSourceTasks * completedSourceTasksOutputSize) /
numVertexManagerEventsReceived;
+ for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+ SourceVertexInfo srcInfo = vInfo.getValue();
+ if (srcInfo.numTasks > 0 && srcInfo.numVMEventsReceived > 0) {
+ // this assumes that 1 vmEvent is received per completed task -
TEZ-2961
+ expectedTotalSourceTasksOutputSize +=
+ (srcInfo.numTasks * srcInfo.outputSize) /
srcInfo.numVMEventsReceived;
+ }
}
int desiredTaskParallelism =
@@ -667,8 +685,8 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
}
return true;
}
-
- void schedulePendingTasks(int numTasksToSchedule) {
+
+ void schedulePendingTasks(int numTasksToSchedule, float
minSourceVertexCompletedTaskFraction) {
// determine parallelism before scheduling the first time
// this is the latest we can wait before determining parallelism.
// currently this depends on task completion and so this is the best time
@@ -677,7 +695,7 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
// calculating parallelism or change parallelism while tasks are already
// running then we can create other parameters to trigger this calculation.
if(enableAutoParallelism && !parallelismDetermined) {
- parallelismDetermined = determineParallelismAndApply();
+ parallelismDetermined =
determineParallelismAndApply(minSourceVertexCompletedTaskFraction);
if (!parallelismDetermined) {
//try to determine parallelism later when more info is available.
return;
@@ -707,10 +725,9 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
*/
boolean canScheduleTasks() {
for(Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
- String sourceVertex = entry.getKey();
- int numSourceTasks = getContext().getVertexNumTasks(sourceVertex);
- if (numSourceTasks > 0 && !entry.getValue().vertexIsConfigured) {
- // vertex not configured
+ // need to check for vertex configured because until that we dont know
if numTasks==0 is valid
+ if (!entry.getValue().vertexIsConfigured) { // isConfigured
+ // vertex not scheduled tasks
if (LOG.isDebugEnabled()) {
LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: "
+ getContext().getVertexName());
@@ -744,29 +761,38 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
LOG.info("All source tasks assigned. " +
"Ramping up " + numPendingTasks +
" remaining tasks for vertex: " + getContext().getVertexName());
- schedulePendingTasks(numPendingTasks);
+ schedulePendingTasks(numPendingTasks, 1);
return;
}
- float completedSourceTaskFraction = 0f;
- if (totalNumBipartiteSourceTasks != 0) { // support for 0 source tasks
- completedSourceTaskFraction = (float) numBipartiteSourceTasksCompleted /
totalNumBipartiteSourceTasks;
- } else {
- completedSourceTaskFraction = 1;
+ float minSourceVertexCompletedTaskFraction = 1f;
+ String minCompletedVertexName = "";
+ for (Map.Entry<String, SourceVertexInfo> vInfo : getBipartiteInfo()) {
+ SourceVertexInfo srcInfo = vInfo.getValue();
+ // canScheduleTasks check has already verified all sources are configured
+ Preconditions.checkState(srcInfo.vertexIsConfigured, "Vertex: " +
vInfo.getKey());
+ if (srcInfo.numTasks > 0) {
+ int numCompletedTasks = srcInfo.getNumCompletedTasks();
+ float completedFraction = (float) numCompletedTasks / srcInfo.numTasks;
+ if (minSourceVertexCompletedTaskFraction > completedFraction) {
+ minSourceVertexCompletedTaskFraction = completedFraction;
+ minCompletedVertexName = vInfo.getKey();
+ }
+ }
}
// start scheduling when source tasks completed fraction is more than min.
// linearly increase the number of scheduled tasks such that all tasks are
// scheduled when source tasks completed fraction reaches max
- float tasksFractionToSchedule = 1;
+ float tasksFractionToSchedule = 1;
float percentRange = slowStartMaxSrcCompletionFraction -
slowStartMinSrcCompletionFraction;
if (percentRange > 0) {
tasksFractionToSchedule =
- (completedSourceTaskFraction - slowStartMinSrcCompletionFraction)/
+ (minSourceVertexCompletedTaskFraction -
slowStartMinSrcCompletionFraction)/
percentRange;
} else {
// min and max are equal. schedule 100% on reaching min
- if(completedSourceTaskFraction < slowStartMinSrcCompletionFraction) {
+ if(minSourceVertexCompletedTaskFraction <
slowStartMinSrcCompletionFraction) {
tasksFractionToSchedule = 0;
}
}
@@ -784,10 +810,11 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
getContext().getVertexName() + " with totalTasks: " +
totalTasksToSchedule + ". " + numBipartiteSourceTasksCompleted +
" source tasks completed out of " +
totalNumBipartiteSourceTasks +
- ". SourceTaskCompletedFraction: " + completedSourceTaskFraction
+
+ ". MinSourceTaskCompletedFraction: " +
minSourceVertexCompletedTaskFraction +
+ " in Vertex: " + minCompletedVertexName +
" min: " + slowStartMinSrcCompletionFraction +
" max: " + slowStartMaxSrcCompletionFraction);
- schedulePendingTasks(numTasksToSchedule);
+ schedulePendingTasks(numTasksToSchedule,
minSourceVertexCompletedTaskFraction);
}
}
@@ -857,8 +884,13 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
SourceVertexInfo vInfo = srcVertexInfo.get(stateUpdate.getVertexName());
Preconditions.checkState(vInfo.vertexIsConfigured == false);
vInfo.vertexIsConfigured = true;
+ vInfo.numTasks =
getContext().getVertexNumTasks(stateUpdate.getVertexName());
+ if (vInfo.edgeProperty.getDataMovementType() ==
DataMovementType.SCATTER_GATHER) {
+ totalNumBipartiteSourceTasks += vInfo.numTasks;
+ }
LOG.info("Received configured notification : " +
stateUpdate.getVertexState() + " for vertex: "
- + stateUpdate.getVertexName() + " in vertex: " +
getContext().getVertexName());
+ + stateUpdate.getVertexName() + " in vertex: " +
getContext().getVertexName() +
+ " numBipartiteSourceTasks: " + totalNumBipartiteSourceTasks);
schedulePendingTasks();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/1db5d187/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index f3f3444..9a9ff27 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -236,10 +236,12 @@ public class TestShuffleVertexManager {
// check waiting for notification before scheduling
Assert.assertFalse(manager.pendingTasks.isEmpty());
- // source vertices have 0 tasks. so only 1 notification needed. triggers
scheduling
+ // source vertices have 0 tasks. triggers scheduling
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.isEmpty());
- verify(mockContext, times(1)).reconfigureVertex(anyInt(), any
+ verify(mockContext, times(1)).reconfigureVertex(eq(1), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(1)).doneReconfiguringVertex(); // reconfig done
Assert.assertTrue(scheduledTasks.size() == 1); // all tasks scheduled and
parallelism changed
@@ -252,13 +254,15 @@ public class TestShuffleVertexManager {
verify(mockContext, times(3)).vertexReconfigurationPlanned();
// source vertices have 0 tasks. so only 1 notification needed. does not
trigger scheduling
// normally this event will not come before onVertexStarted() is called
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
verify(mockContext, times(1)).doneReconfiguringVertex(); // no change.
will trigger after start
Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
// trigger start and processing of pending notification events
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.bipartiteSources == 2);
- verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
+ verify(mockContext, times(2)).reconfigureVertex(eq(1), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -269,30 +273,31 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
- VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L,
mockSrcVertexId1);
// parallelism not change due to large data size
manager = createManager(conf, mockContext, 0.1f, 0.1f);
verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez
notified of reconfig
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
manager.onVertexManagerEventReceived(vmEvent);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex();
// trigger scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
verify(mockContext, times(2)).reconfigureVertex(anyInt(), any
(VertexLocationHint.class), anyMap());
verify(mockContext, times(3)).doneReconfiguringVertex(); // reconfig done
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
Assert.assertEquals(4, scheduledTasks.size());
// TODO TEZ-1714 locking verify(mockContext,
times(2)).vertexManagerDone(); // notified after scheduling all tasks
- Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(5000L, manager.completedSourceTasksOutputSize);
/**
@@ -304,41 +309,42 @@ public class TestShuffleVertexManager {
//min/max fraction of 0.01/0.75 would ensure that we hit
determineParallelism code path on receiving first event itself.
manager = createManager(conf, mockContext, 0.01f, 0.75f);
manager.onVertexStarted(emptyCompletions);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
//First task in src1 completed with small payload
- vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId1);
manager.onVertexManagerEventReceived(vmEvent); //small payload
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
- Assert.assertTrue(manager.determineParallelismAndApply() == false);
+ Assert.assertTrue(manager.determineParallelismAndApply(0f) == false);
Assert.assertEquals(4, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
- //Second task in src1 completed with small payload
- vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
+ //First task in src2 completed with small payload
+ vmEvent = getVertexManagerEvent(null, 1L, mockSrcVertexId2);
manager.onVertexManagerEventReceived(vmEvent); //small payload
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
//Still overall data gathered has not reached threshold; So, ensure
parallelism can be determined later
- Assert.assertTrue(manager.determineParallelismAndApply() == false);
+ Assert.assertTrue(manager.determineParallelismAndApply(0.25f) == false);
Assert.assertEquals(4, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
- Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(2, manager.numVertexManagerEventsReceived);
Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
//First task in src2 completed (with larger payload) to trigger
determining parallelism
- vmEvent = getVertexManagerEvent(null, 1200L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 1200L, mockSrcVertexId2);
manager.onVertexManagerEventReceived(vmEvent);
- Assert.assertTrue(manager.determineParallelismAndApply()); //ensure
parallelism is determined
+ Assert.assertTrue(manager.determineParallelismAndApply(0.25f)); //ensure
parallelism is determined
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
verify(mockContext, times(1)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
- manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertEquals(1, manager.pendingTasks.size());
Assert.assertEquals(1, scheduledTasks.size());
@@ -351,10 +357,11 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
scheduledTasks.clear();
- vmEvent = getVertexManagerEvent(null, 100L, "Vertex");
//min/max fraction of 0.0/0.2
manager = createManager(conf, mockContext, 0.0f, 0.2f);
+ // initial invocation count == 3
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
@@ -363,21 +370,25 @@ public class TestShuffleVertexManager {
Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
//send 7 events with payload size as 100
- for(int i=0;i<7;i++) {
- manager.onVertexManagerEventReceived(vmEvent); //small payload
+ for(int i=0;i<8;i++) {
+ //small payload - create new event each time or it will be ignored (from
same task)
+ manager.onVertexManagerEventReceived(getVertexManagerEvent(null, 100L,
mockSrcVertexId1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i));
//should not change parallelism
- verify(mockContext, times(0)).reconfigureVertex(eq(4),
any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
+ }
+ for(int i=0;i<3;i++) {
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, i));
+ verify(mockContext, times(3)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
}
- //send 8th event with payload size as 100
- manager.onVertexManagerEventReceived(vmEvent);
-
//ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks.
Setting this for test
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
-
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
//Since max threshold (40 * 0.2 = 8) is met, vertex manager should
determine parallelism
- verify(mockContext, times(1)).reconfigureVertex(eq(4),
any(VertexLocationHint.class), anyMap());
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
+ // parallelism updated
+ verify(mockContext, times(4)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
+ // check exact update value - 8 events with 100 each => 20 -> 2000 => 2
tasks (with 1000 per task)
+ verify(mockContext, times(2)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
//reset context for next test
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
@@ -399,7 +410,7 @@ public class TestShuffleVertexManager {
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId1);
manager.onVertexManagerEventReceived(vmEvent);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertEquals(4, manager.pendingTasks.size());
@@ -413,14 +424,15 @@ public class TestShuffleVertexManager {
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
- vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
+ vmEvent = getVertexManagerEvent(null, 500L, mockSrcVertexId2);
manager.onVertexManagerEventReceived(vmEvent);
//ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks.
Setting this for test
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
// managedVertex tasks reduced
- verify(mockContext, times(2)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(5)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(3)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
// TODO improve tests for parallelism
Assert.assertEquals(0, manager.pendingTasks.size()); // all tasks scheduled
@@ -433,7 +445,7 @@ public class TestShuffleVertexManager {
// more completions dont cause recalculation of parallelism
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
- verify(mockContext, times(2)).reconfigureVertex(eq(2),
any(VertexLocationHint.class), anyMap());
+ verify(mockContext, times(5)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
EdgeManagerPlugin edgeManager = newEdgeManagers.values().iterator().next();
@@ -502,7 +514,8 @@ public class TestShuffleVertexManager {
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
-
+ when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
+
// fail if there is no bipartite src vertex
mockInputVertices.put(mockSrcVertexId3, eProp3);
try {
@@ -537,6 +550,9 @@ public class TestShuffleVertexManager {
// source vertices have 0 tasks. immediate start of all managed tasks
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
@@ -572,8 +588,9 @@ public class TestShuffleVertexManager {
}
// source vertex have some tasks. min > default and max undefined
- when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(20);
- when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
+ int numTasks = 20;
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(numTasks);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(numTasks);
scheduledTasks.clear();
manager = createManager(conf, mockContext, 0.8f, null);
@@ -581,18 +598,17 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
+
Assert.assertEquals(3, manager.pendingTasks.size());
- Assert.assertEquals(40, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(numTasks*2, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
-
- float completedTasksThreshold = 0.8f *
manager.totalNumBipartiteSourceTasks;
- int completedTasks = 0;
+ float completedTasksThreshold = 0.8f * numTasks;
// Finish all tasks before exceeding the threshold
for (String mockSrcVertex : new String[] { mockSrcVertexId1,
mockSrcVertexId2 }) {
for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i));
- ++completedTasks;
- if ((completedTasks + 1) >= completedTasksThreshold) {
+ // complete 0th tasks outside the loop
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i+1));
+ if ((i + 2) >= completedTasksThreshold) {
// stop before completing more than min/max source tasks
break;
}
@@ -603,7 +619,10 @@ public class TestShuffleVertexManager {
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
// Cross the threshold min/max threshold to schedule all tasks
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2,
completedTasks));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ Assert.assertEquals(3, manager.pendingTasks.size());
+ Assert.assertEquals(0, scheduledTasks.size());
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertEquals(0, manager.pendingTasks.size());
Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size());
// all tasks scheduled
@@ -614,13 +633,13 @@ public class TestShuffleVertexManager {
// source vertex have some tasks. min, max == 0
manager = createManager(conf, mockContext, 0.f, 0.f);
manager.onVertexStarted(emptyCompletions);
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.totalTasksToSchedule == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
// all source vertices need to be configured
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
@@ -637,10 +656,15 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
+ // task completion on only 1 SG edge does nothing
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
// min, max > 0 and min == max == absolute max 1.0
manager = createManager(conf, mockContext, 1.0f, 1.0f);
@@ -696,6 +720,10 @@ public class TestShuffleVertexManager {
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ // reset vertices for next test
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(4);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(4);
+
// min, max > and min < max
manager = createManager(conf, mockContext, 0.25f, 0.75f);
manager.onVertexStarted(emptyCompletions);
@@ -703,26 +731,30 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 2);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
// completion of same task again should not get counted
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
- Assert.assertTrue(manager.pendingTasks.size() == 2);
- Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 2);
+ Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
scheduledTasks.clear();
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1,
1)); // we are done. no action
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1,
3)); // we are done. no action
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 7);
// min, max > and min < max
manager = createManager(conf, mockContext, 0.25f, 1.0f);
@@ -731,20 +763,24 @@ public class TestShuffleVertexManager {
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3,
VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 8);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertTrue(manager.pendingTasks.size() == 2);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 2));
Assert.assertTrue(manager.pendingTasks.size() == 1);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
-
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 6);
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 3));
+
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 3));
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
- Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
+ Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 8);
}
@@ -798,7 +834,7 @@ public class TestShuffleVertexManager {
when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
- VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex");
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
// check initialization
manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
@@ -821,11 +857,11 @@ public class TestShuffleVertexManager {
manager.onVertexManagerEventReceived(vmEvent);
Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(9, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(6, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
//Send events for all tasks of m3.
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
@@ -833,23 +869,34 @@ public class TestShuffleVertexManager {
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
- //Send an event for m2. But still we need to wait for at least 1 event
from r1.
+ //Send events for m2. But still we need to wait for at least 1 event from
r1.
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 1));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
+
+ // we need to wait for at least 1 event from r1 to make sure all vertices
cross min threshold
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+ Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 6);
//Ensure that setVertexParallelism is not called for R2.
verify(mockContext_R2, times(0)).reconfigureVertex(anyInt(),
any(VertexLocationHint.class),
anyMap());
+
+ //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks.
Setting this for test
+
when(mockContext_R2.getVertexNumTasks(mockManagedVertexId_R2)).thenReturn(1);
+
// complete configuration of r1 triggers the scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
+ Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
verify(mockContext_R2, times(1)).reconfigureVertex(eq(1),
any(VertexLocationHint.class),
anyMap());
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
- Assert.assertTrue(scheduledTasks.size() == 3);
+ Assert.assertTrue(scheduledTasks.size() == 1);
//try with zero task vertices
scheduledTasks.clear();
@@ -863,14 +910,13 @@ public class TestShuffleVertexManager {
manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
manager.onVertexStarted(emptyCompletions);
Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
- Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
- Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
- Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
-
// Only need completed configuration notification from m3
manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
+ Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
@@ -989,7 +1035,7 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
- //Send an event for m2.
+ //Send an event for m3.
manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1047,8 +1093,9 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 0);
- // event from m3 triggers scheduling. no need for m2 since it has 0 tasks
+ // event from m3 triggers scheduling
manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2,
VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
@@ -1065,6 +1112,8 @@ public class TestShuffleVertexManager {
//Send 1 events for tasks of r1.
manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m3,
VertexState.CONFIGURED));
+ manager.onVertexStateUpdated(new VertexStateUpdate(m2,
VertexState.CONFIGURED));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);