Repository: tez Updated Branches: refs/heads/branch-0.8 6bd8a7212 -> 261fe92b2
TEZ-3696. Jobs can hang when both concurrency and speculation are enabled. Contributed by Eric Badger. (cherry picked from commit 68fe023389b69689c10815c5131b4775d273b5b5) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/261fe92b Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/261fe92b Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/261fe92b Branch: refs/heads/branch-0.8 Commit: 261fe92b26997a6631f7ec8b69f6b458ab89ed59 Parents: 6bd8a72 Author: Siddharth Seth <[email protected]> Authored: Thu May 4 11:35:05 2017 -0700 Committer: Jason Lowe <[email protected]> Committed: Thu May 4 17:12:05 2017 -0500 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/DAGScheduler.java | 27 +++++---- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 4 +- .../tez/dag/app/dag/impl/TestDAGScheduler.java | 60 +++++++++++++++++++- .../tez/dag/app/dag/impl/TestTaskImpl.java | 42 ++++++++++++++ 4 files changed, 116 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/261fe92b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java index 87a6261..3055cd3 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java @@ -18,21 +18,21 @@ package org.apache.tez.dag.app.dag; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; -import java.util.Queue; import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - public abstract class DAGScheduler { private static class VertexInfo { int concurrencyLimit; int concurrency; - Queue<DAGEventSchedulerUpdate> pendingAttempts = Lists.newLinkedList(); - + Map<TezTaskAttemptID, DAGEventSchedulerUpdate> pendingAttempts = new LinkedHashMap<>(); + VertexInfo(int limit) { this.concurrencyLimit = limit; } @@ -42,7 +42,7 @@ public abstract class DAGScheduler { public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) { if (vertexInfo == null) { - vertexInfo = Maps.newHashMap(); + vertexInfo = new HashMap<>(); } if (concurrency > 0) { vertexInfo.put(vId, new VertexInfo(concurrency)); @@ -60,7 +60,7 @@ public abstract class DAGScheduler { private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) { if (vInfo != null) { if (vInfo.concurrency >= vInfo.concurrencyLimit) { - vInfo.pendingAttempts.add(event); + vInfo.pendingAttempts.put(event.getAttempt().getID(), event); return; // already at max concurrency } vInfo.concurrency++; @@ -73,9 +73,14 @@ public abstract class DAGScheduler { if (vertexInfo != null) { VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID()); if (vInfo != null) { - vInfo.concurrency--; - if (!vInfo.pendingAttempts.isEmpty()) { - scheduleTaskWithLimit(vInfo.pendingAttempts.poll(), vInfo); + if(vInfo.pendingAttempts.remove(event.getAttempt().getID()) == null) { + vInfo.concurrency--; + if(!vInfo.pendingAttempts.isEmpty()) { + Iterator<DAGEventSchedulerUpdate> i = vInfo.pendingAttempts.values().iterator(); + DAGEventSchedulerUpdate nextTaskAttempt = i.next(); + i.remove(); + scheduleTaskWithLimit(nextTaskAttempt, vInfo); + } } } } http://git-wip-us.apache.org/repos/asf/tez/blob/261fe92b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index ec7db61..6bb14d5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -852,9 +852,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId, TaskAttemptStateInternal attemptState) { this.sendTaskAttemptCompletionEvent(attemptId, attemptState); - if (getInternalState() != TaskStateInternal.SUCCEEDED) { - sendDAGSchedulerFinishedEvent(attemptId); // not a retro active action - } + sendDAGSchedulerFinishedEvent(attemptId); } private void sendDAGSchedulerFinishedEvent(TezTaskAttemptID taId) { http://git-wip-us.apache.org/repos/asf/tez/blob/261fe92b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java index f2fd933..f38f689 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java @@ -195,7 +195,61 @@ public class TestDAGScheduler { scheduled++; } - - - + + @Test(timeout=5000) + public void testConcurrencyLimitWithKilledNonRunningTask() { + MockEventHandler mockEventHandler = new MockEventHandler(); + DAG mockDag = mock(DAG.class); + when(mockDag.getTotalVertices()).thenReturn(2); + TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00"); + TezTaskID tId0 = TezTaskID.getInstance(vId0, 0); + + TaskAttempt mockAttempt; + + Vertex mockVertex = mock(Vertex.class); + when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex); + when(mockVertex.getDistanceFromRoot()).thenReturn(0); + when(mockVertex.getVertexId()).thenReturn(vId0); + + DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag, + mockEventHandler); + + List<TaskAttempt> mockAttempts = Lists.newArrayList(); + int completed = 0; + int requested = 0; + int scheduled = 0; + scheduler.addVertexConcurrencyLimit(vId0, 1); // effective + + // schedule beyond limit and it gets buffered + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled + Assert.assertEquals(mockAttempts.get(scheduled).getID(), + mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order + scheduled++; + + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered + + mockAttempt = mock(TaskAttempt.class); + mockAttempts.add(mockAttempt); + when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, requested++)); + scheduler.scheduleTask(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt)); + Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered + + scheduler.taskCompleted(new DAGEventSchedulerUpdate( + DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(1))); + Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered + Assert.assertEquals(mockAttempts.get(0).getID(), + mockEventHandler.events.get(0).getTaskAttemptID()); // matches order + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/261fe92b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index fb2f543..3375047 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -32,6 +32,9 @@ import java.util.List; import java.util.Map; import org.apache.tez.common.TezAbstractEvent; +import org.apache.tez.dag.app.dag.DAGScheduler; +import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate; +import org.apache.tez.dag.app.dag.event.DAGEventType; import org.apache.tez.dag.app.dag.event.TaskEvent; import org.apache.tez.dag.app.dag.event.TaskEventTAFailed; import org.apache.tez.dag.app.dag.event.TaskEventTAKilled; @@ -845,6 +848,45 @@ public class TestTaskImpl { } @Test(timeout = 20000) + public void testKilledAttemptUpdatesDAGScheduler() { + TezTaskID taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(firstAttempt.getID()); + updateAttemptState(firstAttempt, TaskAttemptState.RUNNING); + + // Add a speculative task attempt + mockTask.handle(createTaskTAAddSpecAttempt(firstAttempt.getID())); + MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt(); + launchTaskAttempt(specAttempt.getID()); + updateAttemptState(specAttempt, TaskAttemptState.RUNNING); + assertEquals(2, mockTask.getAttemptList().size()); + + // Have the first task succeed + eventHandler.events.clear(); + mockTask.handle(createTaskTASucceededEvent(firstAttempt.getID())); + verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE, + VertexEventType.V_TASK_COMPLETED, VertexEventType.V_TASK_ATTEMPT_COMPLETED); + + // The task should now have succeeded and sent kill to other attempt + assertTaskSucceededState(); + verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId), + eq(firstAttempt.getID().getId())); + @SuppressWarnings("rawtypes") + Event event = eventHandler.events.get(eventHandler.events.size()-1); + assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType()); + assertEquals(specAttempt.getID(), + ((TaskAttemptEventKillRequest) event).getTaskAttemptID()); + + eventHandler.events.clear(); + // Emulate the spec attempt being killed + mockTask.handle(createTaskTAKilledEvent(specAttempt.getID())); + assertTaskSucceededState(); + verifyOutgoingEvents(eventHandler.events, DAGEventType.DAG_SCHEDULER_UPDATE, + VertexEventType.V_TASK_ATTEMPT_COMPLETED); + } + + @Test(timeout = 20000) public void testSpeculatedThenRetroactiveFailure() { TezTaskID taskId = getNewTaskID(); scheduleTaskAttempt(taskId);
