Repository: tez
Updated Branches:
refs/heads/branch-0.7 2917f4571 -> 0ba1e97db
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks (jlowe)
(cherry picked from commit 7221d386a4fbc1f32aae1854bd25defb4c6d557a)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0ba1e97d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0ba1e97d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0ba1e97d
Branch: refs/heads/branch-0.7
Commit: 0ba1e97dbf7c5348ca5142d38da2fa82feb3be97
Parents: 2917f45
Author: Jason Lowe <[email protected]>
Authored: Thu Apr 28 13:59:41 2016 +0000
Committer: Jason Lowe <[email protected]>
Committed: Thu Apr 28 13:59:41 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../vertexmanager/ShuffleVertexManager.java | 7 +--
.../vertexmanager/TestShuffleVertexManager.java | 52 ++++++++++++++++++++
3 files changed, 55 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0ba1e97d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 129e3cd..36d3d55 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
+ TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3213. Uncaught exception during vertex recovery leads to invalid state
transition loop.
TEZ-3224. User payload is not initialized before creating vertex manager
plugin.
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor
initialization relative to Inputs/Outputs
http://git-wip-us.apache.org/repos/asf/tez/blob/0ba1e97d/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index b11ed3f..3b2b669 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -909,10 +909,6 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
// vertex not started yet
return;
}
- int numPendingTasks = pendingTasks.size();
- if (numPendingTasks == 0) {
- return;
- }
if (!sourceVerticesScheduled && !canScheduleTasks()) {
if (LOG.isDebugEnabled()) {
@@ -922,7 +918,8 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
return;
}
- if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks &&
numPendingTasks > 0) {
+ int numPendingTasks = pendingTasks.size();
+ if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks) {
LOG.info("All source tasks assigned. " +
"Ramping up " + numPendingTasks +
" remaining tasks for vertex: " + getContext().getVertexName());
http://git-wip-us.apache.org/repos/asf/tez/blob/0ba1e97d/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 9c21aed..df1f080 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -1281,6 +1281,58 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
}
+
+ @Test
+ public void testZeroTasksSendsConfigured() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+ true);
+
conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
1000L);
+ ShuffleVertexManager manager = null;
+
+ HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String,
EdgeProperty>();
+ String r1 = "R1";
+ EdgeProperty eProp1 = EdgeProperty.create(
+ EdgeProperty.DataMovementType.SCATTER_GATHER,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+
+ final String mockManagedVertexId = "R2";
+ mockInputVertices.put(r1, eProp1);
+
+ final VertexManagerPluginContext mockContext =
mock(VertexManagerPluginContext.class);
+
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0);
+
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
+ // check initialization
+ manager = createManager(conf, mockContext, 0.001f, 0.001f);
+
+ final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
+ doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ scheduledTasks.clear();
+ List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+ for (ScheduleTaskRequest task : tasks) {
+ scheduledTasks.add(task.getTaskIndex());
+ }
+ return null;
+ }}).when(mockContext).scheduleTasks(anyList());
+
+ manager.onVertexStarted(emptyCompletions);
+ manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
+ Assert.assertEquals(1, manager.bipartiteSources);
+ Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(0, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(0, manager.pendingTasks.size()); // no tasks scheduled
+ Assert.assertEquals(0, scheduledTasks.size());
+ verify(mockContext).doneReconfiguringVertex();
+ }
public static TaskAttemptIdentifier createTaskAttemptIdentifier(String
vName, int tId) {
VertexIdentifier mockVertex = mock(VertexIdentifier.class);