Repository: tez Updated Branches: refs/heads/branch-0.7 228279f9a -> e849f0981
TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly (Kuhu Shukla via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e849f098 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e849f098 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e849f098 Branch: refs/heads/branch-0.7 Commit: e849f0981440fc34e9894bab0578ad7fb15fe28f Parents: 228279f Author: Jonathan Eagles <[email protected]> Authored: Thu Dec 8 11:28:43 2016 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Thu Dec 8 11:28:43 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 2 + .../tez/dag/app/dag/impl/TestTaskAttempt.java | 78 ++++++++++++++++++++ .../org/apache/tez/test/TestFaultTolerance.java | 3 +- 4 files changed, 83 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e849f098/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1c42dbd..c652351 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3549. TaskAttemptImpl does not initialize TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly TEZ-3537. ArrayIndexOutOfBoundsException with empty environment variables/Port YARN-3768 to Tez TEZ-3507. Task logs link when editing url from one task to another. TEZ-3536. NPE in WebUIService start when host resolution fails. http://git-wip-us.apache.org/repos/asf/tez/blob/e849f098/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index a2da34a..7f62888 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -496,6 +496,8 @@ public class TaskAttemptImpl implements TaskAttempt, this.taskSpec = taskSpec; this.creationCausalTA = schedulingCausalTA; this.creationTime = clock.getTime(); + //set last notified progress time to current time + this.lastNotifyProgressTimestamp = clock.getTime(); this.reportedStatus = new TaskAttemptStatus(this.attemptId); initTaskAttemptStatus(reportedStatus); http://git-wip-us.apache.org/repos/asf/tez/blob/e849f098/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 97108d4..262a976 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -775,6 +775,84 @@ public class TestTaskAttempt { expectedEvenstAfterTerminating), SpeculatorEventTaskAttemptStatusUpdate.class, 2); } + @Test + public void testProgressTimeStampUpdate() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance( + appId, 0); + TezDAGID dagID = TezDAGID.getInstance(appId, 1); + TezVertexID vertexID = TezVertexID.getInstance(dagID, 1); + TezTaskID taskID = TezTaskID.getInstance(vertexID, 1); + + MockEventHandler eventHandler = spy(new MockEventHandler()); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + Configuration taskConf = new Configuration(); + taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + taskConf.setBoolean("fs.file.impl.disable.cache", true); + taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 75); + + locationHint = TaskLocationHint.createTaskLocationHint( + new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); + Resource resource = Resource.newInstance(1024, 1); + + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + @SuppressWarnings("deprecation") + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class), + new ContainerContextMatcher(), appCtx); + containers.addContainerIfNew(container); + + doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); + doReturn(containers).when(appCtx).getAllContainers(); + + TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); + Clock mockClock = mock(Clock.class); + when(mockClock.getTime()).thenReturn(50l); + TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, + taListener, taskConf, mockClock, + mockHeartbeatHandler, appCtx, false, + resource, createFakeContainerContext(), false); + TezTaskAttemptID taskAttemptID = taImpl.getID(); + ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + + taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0)); + // At state STARTING. + taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId, + null)); + assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(), + TaskAttemptState.RUNNING); + verify(mockHeartbeatHandler).register(taskAttemptID); + + when(mockClock.getTime()).thenReturn(100l); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + verify(eventHandler, atLeast(1)).handle(arg.capture()); + if (arg.getValue() instanceof TaskAttemptEventAttemptFailed) { + TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) arg.getValue(); + assertEquals(taImpl.getID(), fEvent.getTaskAttemptID()); + assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, fEvent.getTerminationCause()); + taImpl.handle(fEvent); + fail("Should not fail since the timestamps do not differ by progress interval config"); + } else { + Assert.assertEquals("Task Attempt's internal state should be RUNNING!", + taImpl.getInternalState(), TaskAttemptStateInternal.RUNNING); + } + when(mockClock.getTime()).thenReturn(200l); + taImpl.handle(new TaskAttemptEventStatusUpdate( + taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false))); + verify(eventHandler, atLeast(1)).handle(arg.capture()); + Assert.assertTrue("This should have been an attempt failed event!", arg.getValue() instanceof TaskAttemptEventAttemptFailed); + } + @Test (timeout = 5000) public void testNoProgressFail() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); http://git-wip-us.apache.org/repos/asf/tez/blob/e849f098/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java index 011e91d..a6f20bc 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java @@ -766,8 +766,9 @@ public class TestFaultTolerance { @Test (timeout=240000) public void testNoProgress() throws Exception { Configuration testConf = new Configuration(false); - testConf.setInt(TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, 1000*100); // long sleep testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1); + testConf.setLong(TestProcessor.getVertexConfName( + TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, "v1"), 1000*100); // long sleep DAG dag = SimpleTestDAG.createDAG(testConf); Vertex hung = dag.getVertex("v1"); hung.setConf(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, Long.toString(1000));
