Repository: tez
Updated Branches:
  refs/heads/branch-0.7 228279f9a -> e849f0981


TEZ-3549. TaskAttemptImpl does not initialize 
TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e849f098
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e849f098
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e849f098

Branch: refs/heads/branch-0.7
Commit: e849f0981440fc34e9894bab0578ad7fb15fe28f
Parents: 228279f
Author: Jonathan Eagles <[email protected]>
Authored: Thu Dec 8 11:28:43 2016 -0600
Committer: Jonathan Eagles <[email protected]>
Committed: Thu Dec 8 11:28:43 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  2 +
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 78 ++++++++++++++++++++
 .../org/apache/tez/test/TestFaultTolerance.java |  3 +-
 4 files changed, 83 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e849f098/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1c42dbd..c652351 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3549. TaskAttemptImpl does not initialize 
TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS correctly
   TEZ-3537. ArrayIndexOutOfBoundsException with empty environment 
variables/Port YARN-3768 to Tez
   TEZ-3507. Task logs link when editing url from one task to another.
   TEZ-3536. NPE in WebUIService start when host resolution fails.

http://git-wip-us.apache.org/repos/asf/tez/blob/e849f098/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index a2da34a..7f62888 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -496,6 +496,8 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.taskSpec = taskSpec;
     this.creationCausalTA = schedulingCausalTA;
     this.creationTime = clock.getTime();
+    //set last notified progress time to current time
+    this.lastNotifyProgressTimestamp = clock.getTime();
 
     this.reportedStatus = new TaskAttemptStatus(this.attemptId);
     initTaskAttemptStatus(reportedStatus);

http://git-wip-us.apache.org/repos/asf/tez/blob/e849f098/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 97108d4..262a976 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -775,6 +775,84 @@ public class TestTaskAttempt {
             expectedEvenstAfterTerminating), 
SpeculatorEventTaskAttemptStatusUpdate.class, 2);
   }
 
+  @Test
+  public void testProgressTimeStampUpdate() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+    taskConf.setLong(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 75);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = 
mock(TaskHeartbeatHandler.class);
+    Clock mockClock = mock(Clock.class);
+    when(mockClock.getTime()).thenReturn(50l);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, mockClock,
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    verify(mockHeartbeatHandler).register(taskAttemptID);
+
+    when(mockClock.getTime()).thenReturn(100l);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+    verify(eventHandler, atLeast(1)).handle(arg.capture());
+    if (arg.getValue() instanceof TaskAttemptEventAttemptFailed) {
+      TaskAttemptEventAttemptFailed fEvent = (TaskAttemptEventAttemptFailed) 
arg.getValue();
+      assertEquals(taImpl.getID(), fEvent.getTaskAttemptID());
+      assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, 
fEvent.getTerminationCause());
+      taImpl.handle(fEvent);
+      fail("Should not fail since the timestamps do not differ by progress 
interval config");
+    } else {
+      Assert.assertEquals("Task Attempt's internal state should be RUNNING!",
+          taImpl.getInternalState(), TaskAttemptStateInternal.RUNNING);
+    }
+    when(mockClock.getTime()).thenReturn(200l);
+    taImpl.handle(new TaskAttemptEventStatusUpdate(
+        taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+    verify(eventHandler, atLeast(1)).handle(arg.capture());
+    Assert.assertTrue("This should have been an attempt failed event!", 
arg.getValue() instanceof TaskAttemptEventAttemptFailed);
+  }
+
   @Test (timeout = 5000)
   public void testNoProgressFail() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);

http://git-wip-us.apache.org/repos/asf/tez/blob/e849f098/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
----------------------------------------------------------------------
diff --git 
a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index 011e91d..a6f20bc 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -766,8 +766,9 @@ public class TestFaultTolerance {
   @Test (timeout=240000)
   public void testNoProgress() throws Exception {
     Configuration testConf = new Configuration(false);
-    testConf.setInt(TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, 1000*100); 
// long sleep
     testConf.setInt(SimpleTestDAG.TEZ_SIMPLE_DAG_NUM_TASKS, 1);
+    testConf.setLong(TestProcessor.getVertexConfName(
+        TestProcessor.TEZ_FAILING_PROCESSOR_SLEEP_MS, "v1"), 1000*100); // 
long sleep
     DAG dag = SimpleTestDAG.createDAG(testConf);
     Vertex hung = dag.getVertex("v1");
     hung.setConf(TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS, 
Long.toString(1000));

Reply via email to