Repository: tez Updated Branches: refs/heads/master e0ca8c9a9 -> 09102e517
TEZ-3938. Task attempts failing due to not making progress (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/09102e51 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/09102e51 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/09102e51 Branch: refs/heads/master Commit: 09102e517b2afa20add2de4cc018b8abbb66feb3 Parents: e0ca8c9 Author: Jonathan Eagles <[email protected]> Authored: Tue Jun 5 16:24:58 2018 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Tue Jun 5 16:24:58 2018 -0500 ---------------------------------------------------------------------- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 5 +- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 61 ++++++++++++++++++++ 2 files changed, 63 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/09102e51/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index c43bd98..6ad41f8 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -575,8 +575,6 @@ public class TaskAttemptImpl implements TaskAttempt, this.taskSpec = taskSpec; this.creationCausalTA = schedulingCausalTA; this.creationTime = clock.getTime(); - //set last notified progress time to current time - this.lastNotifyProgressTimestamp = clock.getTime(); this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); @@ -1434,6 +1432,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.nodeHttpAddress = StringInterner.weakIntern(container.getNodeHttpAddress()); ta.nodeRackName = StringInterner.weakIntern(RackResolver.resolve(ta.containerNodeId.getHost()) .getNetworkLocation()); + ta.lastNotifyProgressTimestamp = ta.clock.getTime(); ta.launchTime = ta.clock.getTime(); @@ -1585,7 +1584,7 @@ public class TaskAttemptImpl implements TaskAttempt, ta.lastNotifyProgressTimestamp = ta.clock.getTime(); } else { long currTime = ta.clock.getTime(); - if (ta.hungIntervalMax > 0 && + if (ta.hungIntervalMax > 0 && ta.lastNotifyProgressTimestamp > 0 && currTime - ta.lastNotifyProgressTimestamp > ta.hungIntervalMax) { // task is hung String diagnostics = "Attempt failed because it appears to make no progress for " + http://git-wip-us.apache.org/repos/asf/tez/blob/09102e51/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 2bad2ef..503e418 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -18,6 +18,8 @@ package org.apache.tez.dag.app.dag.impl; +import org.apache.tez.dag.app.MockClock; +import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; @@ -1138,6 +1140,65 @@ public class TestTaskAttempt { assertEquals(2, taImpl.getCounters().findCounter("group", "counter").getValue()); } + @Test (timeout = 60000L) + public void testProgressAfterSubmit() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 50); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container, 0, 0, 0); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + MockClock mockClock = new MockClock(); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, mockClock, + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + mockClock.incrementTime(20L); + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + mockClock.incrementTime(55L); + taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId)); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + verify(eventHandler, atLeast(1)).handle(arg.capture()); + if (arg.getValue() instanceof TaskAttemptEvent) { + taImpl.handle((TaskAttemptEvent) arg.getValue()); + } + Assert.assertEquals("Task Attempt's internal state should be SUBMITTED!", + taImpl.getInternalState(), TaskAttemptStateInternal.SUBMITTED); + } + @Test (timeout = 5000) public void testNoProgressFail() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2);
