Repository: tez Updated Branches: refs/heads/branch-0.7 fa3139be2 -> bedb9d13a
TEZ-3696. Jobs can hang when both concurrency and speculation are enabled. Contributed by Eric Badger Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bedb9d13 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bedb9d13 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bedb9d13 Branch: refs/heads/branch-0.7 Commit: bedb9d13aba28db1c00b603e74d2f001a652e809 Parents: fa3139b Author: Jason Lowe <[email protected]> Authored: Mon May 8 09:27:51 2017 -0500 Committer: Jason Lowe <[email protected]> Committed: Mon May 8 09:27:51 2017 -0500 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/DAGScheduler.java | 28 +++++---- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 +- .../tez/dag/app/dag/impl/TestDAGScheduler.java | 60 +++++++++++++++++++- .../tez/dag/app/dag/impl/TestTaskImpl.java | 43 ++++++++++++++ 4 files changed, 118 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/bedb9d13/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java index 87a6261..929f0c8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java @@ -18,21 +18,22 @@ package org.apache.tez.dag.app.dag; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; -import java.util.Queue; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - public abstract class DAGScheduler { private static class VertexInfo { int concurrencyLimit; int concurrency; - Queue<DAGEventSchedulerUpdate> pendingAttempts = Lists.newLinkedList(); - + Map<TezTaskAttemptID, DAGEventSchedulerUpdate> pendingAttempts = + new LinkedHashMap<TezTaskAttemptID, DAGEventSchedulerUpdate>(); + VertexInfo(int limit) { this.concurrencyLimit = limit; } @@ -42,7 +43,7 @@ public abstract class DAGScheduler { public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) { if (vertexInfo == null) { - vertexInfo = Maps.newHashMap(); + vertexInfo = new HashMap<TezVertexID, VertexInfo>(); } if (concurrency > 0) { vertexInfo.put(vId, new VertexInfo(concurrency)); @@ -60,7 +61,7 @@ public abstract class DAGScheduler { private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) { if (vInfo != null) { if (vInfo.concurrency >= vInfo.concurrencyLimit) { - vInfo.pendingAttempts.add(event); + vInfo.pendingAttempts.put(event.getAttempt().getID(), event); return; // already at max concurrency } vInfo.concurrency++; @@ -73,9 +74,14 @@ public abstract class DAGScheduler { if (vertexInfo != null) { VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID()); if (vInfo != null) { - vInfo.concurrency--; - if (!vInfo.pendingAttempts.isEmpty()) { - scheduleTaskWithLimit(vInfo.pendingAttempts.poll(), vInfo); + if(vInfo.pendingAttempts.remove(event.getAttempt().getID()) == null) { + vInfo.concurrency--; + if(!vInfo.pendingAttempts.isEmpty()) { + Iterator<DAGEventSchedulerUpdate> i = vInfo.pendingAttempts.values().iterator(); + DAGEventSchedulerUpdate nextTaskAttempt = i.next(); + i.remove(); + scheduleTaskWithLimit(nextTaskAttempt, vInfo); + } } } } http://git-wip-us.apache.org/repos/asf/tez/blob/bedb9d13/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 2dfd7f2..8d4106c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -963,9 +963,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId, TaskAttemptStateInternal attemptState) { this.sendTaskAttemptCompletionEvent(attemptId, attemptState); - if (getInternalState() != TaskStateInternal.SUCCEEDED) { - sendDAGSchedulerFinishedEvent(attemptId); // not a retro active action - } + sendDAGSchedulerFinishedEvent(attemptId); } private void sendDAGSchedulerFinishedEvent(TezTaskAttemptID taId) { http://git-wip-us.apache.org/repos/asf/tez/blob/bedb9d13/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java index f2fd933..f38f689 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java @@ -195,7 +195,61 @@ public class TestDAGScheduler { scheduled++; } - - - + + @Test(timeout=5000) + public void testConcurrencyLimitWithKilledNonRunningTask() { + MockEventHandler mockEventHandler = new MockEventHandler(); + DAG mockDag = mock(DAG.class); + when(mockDag.getTotalVertices()).thenReturn(2); + TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00"); + TezTaskID tId0 = TezTaskID.getInstance(vId0, 0); + + TaskAttempt mockAttempt; + + Vertex mockVertex = mock(Vertex.class); + when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex); + when(mockVertex.getDistanceFromRoot()).thenReturn(0); + when(mockVertex.getVertexId()).thenReturn(vId0); + + DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag, + mockEventHandler); + + List<TaskAttempt> mockAttempts = Lists.newArrayList(); + int completed = 0; + int requested = 0; + int scheduled = 0; + scheduler.addVertexConcurrencyLimit(vId0, 1); // effective + + // schedule beyond limit and it gets buffered + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled + Assert.assertEquals(mockAttempts.get(scheduled).getID(), + mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order + scheduled++; + + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered + + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered + + scheduler.taskCompleted(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(1))); + Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered + Assert.assertEquals(mockAttempts.get(0).getID(), + mockEventHandler.events.get(0).getTaskAttemptID()); // matches order + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/bedb9d13/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index b7a6d21..1236ced0 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.tez.dag.app.dag.event.DAGEventType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -781,6 +782,48 @@ public class TestTaskImpl { } @Test(timeout = 20000) + public void testKilledAttemptUpdatesDAGScheduler() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(firstAttempt.getID()); + updateAttemptState(firstAttempt, TaskAttemptState.RUNNING); + + // Add a speculative task attempt + mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(specAttempt.getID()); + updateAttemptState(specAttempt, TaskAttemptState.RUNNING); + assertEquals(2, mockTask.getAttemptList().size()); + + // Have the first task succeed + eventHandler.events.clear(); + mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE, + VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED); + + // The task should now have succeeded and sent kill to other attempt + assertTaskSucceededState(); + verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId), + eq(firstAttempt.getID().getId())); + @SuppressWarnings("rawtypes") + Event event = eventHandler.events.get(eventHandler.events.size()-1); + assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType()); + assertEquals(specAttempt.getID(), + ((TaskAttemptEventKillRequest) event).getTaskAttemptID()); + + eventHandler.events.clear(); + // Emulate the spec attempt being killed + mockTask.handle(new TaskEventTAUpdate(specAttempt + .getID(), TaskEventType.T_ATTEMPT_KILLED)); + assertTaskSucceededState(); + verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE, + VertexEventType.V_TASK_ATTEMPT_COMPLETED); + } + + @Test(timeout = 20000) public void testSpeculatedThenRetroactiveFailure() { TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId);
