Repository: tez Updated Branches: refs/heads/master 125f8c023 -> 2b42ac8a0
TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2b42ac8a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2b42ac8a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2b42ac8a Branch: refs/heads/master Commit: 2b42ac8a0836b9501e5acc03ecf85d7f3dd4cc92 Parents: 125f8c0 Author: Jonathan Eagles <[email protected]> Authored: Thu Dec 8 10:52:46 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Thu Dec 8 10:52:46 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 3 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 2 + .../tez/dag/app/dag/impl/TestTaskAttempt.java | 76 ++++++++++++++++++++ .../org/apache/tez/test/TestFaultTolerance.java | 3 +- 4 files changed, 83 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2b42ac8a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a7299b7..2dd9b0a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly TEZ-3552. Shuffle split array when size-based sorting is turned off. TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768 to Tez TEZ-3271. Provide mapreduce failures.maxpercent equivalent. @@ -156,6 +157,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768 to Tez TEZ-3536. NPE in WebUIService start when host resolution fails. TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code. @@ -665,6 +667,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768 to Tez TEZ-3536. NPE in WebUIService start when host resolution fails. TEZ-3493. DAG submit timeout cannot be set to a month http://git-wip-us.apache.org/repos/asf/tez/blob/2b42ac8a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index e5f3e71..8a81575 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -509,6 +509,8 @@ public class TaskAttemptImpl implements TaskAttempt, this.taskSpec = taskSpec; this.creationCausalTA = schedulingCausalTA; this.creationTime = clock.getTime(); + //set last notified progress time to current time + this.lastNotifyProgressTimestamp = clock.getTime(); this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); http://git-wip-us.apache.org/repos/asf/tez/blob/2b42ac8a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index a50ca49..44d8213 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -943,6 +943,82 @@ public class TestTaskAttempt { expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2); } + @Test + public void testProgressTimeStampUpdate() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener(); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 75); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container, 0, 0, 0); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + Clock mockClock = mock(Clock.class); + when(mockClock.getTime()).thenReturn(50l); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, mockClock, + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), + TaskAttemptState.RUNNING); + verify(mockHeartbeatHandler).register(taskAttemptID); + + when(mockClock.getTime()).thenReturn(100l); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + verify(eventHandler, atLeast(1)).handle(arg.capture()); + if (arg.getValue() instanceof TaskAttemptEventAttemptFailed) { + TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue(); + assertEquals(taImpl.getID(), fEvent.getTaskAttemptID()); + assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause()); + taImpl.handle(fEvent); + fail("Should not fail since the timestamps do not differ by progress interval config"); + } else { + Assert.assertEquals("Task Attempt's internal state should be RUNNING!", + taImpl.getInternalState(), TaskAttemptStateInternal.RUNNING); + } + when(mockClock.getTime()).thenReturn(200l); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + verify(eventHandler, atLeast(1)).handle(arg.capture()); + Assert.assertTrue("This should have been an attempt failed event!", arg.getValue() instanceof TaskAttemptEventAttemptFailed); + } + @Test (timeout = 5000) public void testNoProgressFail() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); http://git-wip-us.apache.org/repos/asf/tez/blob/2b42ac8a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java index 764ef0f..b2a5d17 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java @@ -777,8 +777,9 @@ public class TestFaultTolerance { @Test (timeout=240000) public void testNoProgress() throws Exception { Configuration testConf = new Configuration(false); - testConf.setInt(TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, 1000*100); // long sleep testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setLong(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, "v1"), 1000*100); // long sleep DAG dag = SimpleTestDAG.createDAG(testConf); Vertex hung = dag.getVertex("v1"); hung.setConf(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, Long.toString(1000));
