Repository: tez
Updated Branches:
refs/heads/branch-0.8 4694a9efd -> efbe13481
TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks (jlowe)
(cherry picked from commit 7221d386a4fbc1f32aae1854bd25defb4c6d557a)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/efbe1348
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/efbe1348
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/efbe1348
Branch: refs/heads/branch-0.8
Commit: efbe134814dd583f10190ad02a618e85230a465a
Parents: 4694a9e
Author: Jason Lowe <[email protected]>
Authored: Thu Apr 28 13:57:58 2016 +0000
Committer: Jason Lowe <[email protected]>
Committed: Thu Apr 28 13:57:58 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../vertexmanager/ShuffleVertexManager.java | 7 +--
.../vertexmanager/TestShuffleVertexManager.java | 52 ++++++++++++++++++++
3 files changed, 56 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/efbe1348/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b6c5520..3c80f7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.4: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3219. Allow service plugins to define log locations link for remotely
run task attempts.
TEZ-3224. User payload is not initialized before creating vertex manager
plugin.
TEZ-3226. Tez UI 2: All DAGs UX improvements.
@@ -443,6 +444,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES:
+ TEZ-3203. DAG hangs when one of the upstream vertices has zero tasks
TEZ-3224. User payload is not initialized before creating vertex manager
plugin.
TEZ-3165. Allow Inputs/Outputs to be initialized serially, control processor
initialization relative to Inputs/Outputs
TEZ-3202. Reduce the memory need for jobs with high number of segments
http://git-wip-us.apache.org/repos/asf/tez/blob/efbe1348/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index 47fc60f..aee8b6f 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -909,10 +909,6 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
// vertex not started yet
return;
}
- int numPendingTasks = pendingTasks.size();
- if (numPendingTasks == 0) {
- return;
- }
if (!sourceVerticesScheduled && !canScheduleTasks()) {
if (LOG.isDebugEnabled()) {
@@ -922,7 +918,8 @@ public class ShuffleVertexManager extends
VertexManagerPlugin {
return;
}
- if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks &&
numPendingTasks > 0) {
+ int numPendingTasks = pendingTasks.size();
+ if (numBipartiteSourceTasksCompleted == totalNumBipartiteSourceTasks) {
LOG.info("All source tasks assigned. " +
"Ramping up " + numPendingTasks +
" remaining tasks for vertex: " + getContext().getVertexName());
http://git-wip-us.apache.org/repos/asf/tez/blob/efbe1348/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 9c21aed..df1f080 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -1281,6 +1281,58 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
}
+
+ @Test
+ public void testZeroTasksSendsConfigured() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(
+ ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
+ true);
+
conf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
1000L);
+ ShuffleVertexManager manager = null;
+
+ HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String,
EdgeProperty>();
+ String r1 = "R1";
+ EdgeProperty eProp1 = EdgeProperty.create(
+ EdgeProperty.DataMovementType.SCATTER_GATHER,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("out"),
+ InputDescriptor.create("in"));
+
+ final String mockManagedVertexId = "R2";
+ mockInputVertices.put(r1, eProp1);
+
+ final VertexManagerPluginContext mockContext =
mock(VertexManagerPluginContext.class);
+
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(0);
+
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, r1);
+ // check initialization
+ manager = createManager(conf, mockContext, 0.001f, 0.001f);
+
+ final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
+ doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ scheduledTasks.clear();
+ List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+ for (ScheduleTaskRequest task : tasks) {
+ scheduledTasks.add(task.getTaskIndex());
+ }
+ return null;
+ }}).when(mockContext).scheduleTasks(anyList());
+
+ manager.onVertexStarted(emptyCompletions);
+ manager.onVertexStateUpdated(new VertexStateUpdate(r1,
VertexState.CONFIGURED));
+ Assert.assertEquals(1, manager.bipartiteSources);
+ Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+ Assert.assertEquals(0, manager.totalNumBipartiteSourceTasks);
+ Assert.assertEquals(0, manager.pendingTasks.size()); // no tasks scheduled
+ Assert.assertEquals(0, scheduledTasks.size());
+ verify(mockContext).doneReconfiguringVertex();
+ }
public static TaskAttemptIdentifier createTaskAttemptIdentifier(String
vName, int tId) {
VertexIdentifier mockVertex = mock(VertexIdentifier.class);