Repository: tez
Updated Branches:
  refs/heads/branch-0.7 c65a9c5e4 -> ccfc16ba8


TEZ-2752. logUnsuccessful completion in Attempt should write original finish 
time to ATS (bikas)
(cherry picked from commit 996906d27cdb0a1c4301dc449aa5dc638b5b4363)

Conflicts:
        CHANGES.txt
        
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ccfc16ba
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ccfc16ba
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ccfc16ba

Branch: refs/heads/branch-0.7
Commit: ccfc16ba88c592e0a581fa8f988c214f84b68312
Parents: c65a9c5
Author: Bikas Saha <[email protected]>
Authored: Wed Sep 2 16:34:05 2015 -0700
Committer: Bikas Saha <[email protected]>
Committed: Wed Sep 2 17:58:40 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     | 15 +++----
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  6 ++-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 42 +++++++++++++-------
 3 files changed, 40 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ccfc16ba/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0c68990..5e3a183 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,8 +6,10 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
-  TEZ-2767. Make TezMxBeanResourceCalculator the default resource calculator.
+  TEZ-2752. logUnsuccessful completion in Attempt should write original finish
+  time to ATS
   TEZ-2755. Fix findbugs warning in TezClient
+  TEZ-2767. Make TezMxBeanResourceCalculator the default resource calculator.
   TEZ-2602. Throwing EOFException when launching MR job
   TEZ-2575. Handle KeyValue pairs size which do not fit in a single block in 
PipelinedSorter
   TEZ-2198. Fix sorter spill counts
@@ -241,13 +243,12 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
-<<<<<<< HEAD
-=======
+  TEZ-2752. logUnsuccessful completion in Attempt should write original finish
+  time to ATS
   TEZ-2742. VertexImpl.finished() terminationCause hides member var of the
   same name
   TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort 
buffers
   TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if 
configured higher.
->>>>>>> e5a79fd... TEZ-2742. VertexImpl.finished() terminationCause hides 
member var of the same name (bikas)
   TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
   TEZ-2687. ATS History shutdown happens before the min-held containers are 
released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the 
default max counters
@@ -458,16 +459,16 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
-<<<<<<< HEAD
-=======
+  TEZ-2752. logUnsuccessful completion in Attempt should write original finish
+  time to ATS
   TEZ-2742. VertexImpl.finished() terminationCause hides member var of the
   same name
   TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort 
buffers
   TEZ-2290. Scale memory for Default Sorter down to a max of 2047 MB if 
configured higher.
->>>>>>> e5a79fd... TEZ-2742. VertexImpl.finished() terminationCause hides 
member var of the same name (bikas)
   TEZ-2734. Add a test to verify the filename generated by OnDiskMerge.
   TEZ-2687. ATS History shutdown happens before the min-held containers are 
released
   TEZ-2629. LimitExceededException in Tez client when DAG has exceeds the 
default max counters
+  TEZ-2630. TezChild receives IP address instead of FQDN.
   TEZ-2635. Limit number of attempts being downloaded in unordered fetch.
   TEZ-2636. MRInput and MultiMRInput should work for cases when there are 0 
physical inputs.
   TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error

http://git-wip-us.apache.org/repos/asf/tez/blob/ccfc16ba/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 0cc7dc3..5792af0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -993,9 +993,13 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   protected void logJobHistoryAttemptUnsuccesfulCompletion(
       TaskAttemptState state) {
+    long finishTime = getFinishTime();
+    if (finishTime <= 0) {
+      finishTime = clock.getTime(); // comes here in case it was terminated 
before launch
+    }
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getVertex().getName(), getLaunchTime(),
-        clock.getTime(), state,
+        finishTime, state,
         terminationCause,
         StringUtils.join(
             getDiagnostics(), LINE_SEPARATOR), getCounters());

http://git-wip-us.apache.org/repos/asf/tez/blob/ccfc16ba/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index d21f715..5d05fa3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -90,6 +90,9 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
 import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -116,6 +119,7 @@ public class TestTaskAttempt {
     }
   }
   
+  AppContext appCtx;
   Task mockTask;
   TaskLocationHint locationHint;
 
@@ -126,7 +130,10 @@ public class TestTaskAttempt {
   
   @Before
   public void setupTest() {
+    appCtx = mock(AppContext.class);
     mockTask = mock(Task.class);
+    HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
+    doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
   }
 
   @Test(timeout = 5000)
@@ -145,7 +152,7 @@ public class TestTaskAttempt {
         TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(), new 
SystemClock(),
-        mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+        mock(TaskHeartbeatHandler.class), appCtx,
         false, Resource.newInstance(1024, 1), createFakeContainerContext(), 
false);
 
     TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
@@ -180,12 +187,12 @@ public class TestTaskAttempt {
         TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(), new 
SystemClock(),
-        mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+        mock(TaskHeartbeatHandler.class), appCtx,
         false, Resource.newInstance(1024, 1), createFakeContainerContext(), 
false);
 
     TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, 
eventHandler,
         mock(TaskAttemptListener.class), new Configuration(), new 
SystemClock(),
-        mock(TaskHeartbeatHandler.class), mock(AppContext.class),
+        mock(TaskHeartbeatHandler.class), appCtx,
         true, Resource.newInstance(1024, 1), createFakeContainerContext(), 
false);
 
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -244,7 +251,7 @@ public class TestTaskAttempt {
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
         mock(TaskAttemptListener.class), new Configuration(),
         new SystemClock(), mock(TaskHeartbeatHandler.class),
-        mock(AppContext.class), false, Resource.newInstance(1024,
+        appCtx, false, Resource.newInstance(1024,
             1), createFakeContainerContext(), false);
 
     TaskAttemptImpl spyTa = spy(taImpl);
@@ -296,7 +303,7 @@ public class TestTaskAttempt {
         new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
     Resource resource = Resource.newInstance(1024, 1);
 
-    AppContext mockAppContext = mock(AppContext.class);
+    AppContext mockAppContext = appCtx;
     doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
 
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
@@ -353,7 +360,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -454,7 +460,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -519,7 +524,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -612,7 +616,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -743,7 +746,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -834,7 +836,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -929,7 +930,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -1032,7 +1032,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -1132,7 +1131,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -1140,6 +1138,8 @@ public class TestTaskAttempt {
 
     doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
     doReturn(containers).when(appCtx).getAllContainers();
+    HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
+    doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
 
     TaskHeartbeatHandler mockHeartbeatHandler = 
mock(TaskHeartbeatHandler.class);
     MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, 
eventHandler,
@@ -1161,7 +1161,12 @@ public class TestTaskAttempt {
 
     int expectedEventsTillSucceeded = 6;
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    ArgumentCaptor<DAGHistoryEvent> histArg = 
ArgumentCaptor.forClass(DAGHistoryEvent.class);
     verify(eventHandler, 
times(expectedEventsTillSucceeded)).handle(arg.capture());
+    verify(mockHistHandler, times(2)).handle(histArg.capture()); // start and 
finish
+    DAGHistoryEvent histEvent = histArg.getValue();
+    TaskAttemptFinishedEvent finishEvent = 
(TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
+    long finishTime = finishEvent.getFinishTime();
     verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2);
 
     InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1);
@@ -1192,6 +1197,11 @@ public class TestTaskAttempt {
     assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, 
taImpl.getTerminationCause());
     // verify unregister is not invoked again
     verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID);
+    verify(mockHistHandler, times(3)).handle(histArg.capture());
+    histEvent = histArg.getValue();
+    finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
+    long newFinishTime = finishEvent.getFinishTime();
+    Assert.assertEquals(finishTime, newFinishTime);
 
     assertEquals(true, taImpl.inputFailedReported);
     int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
@@ -1277,7 +1287,6 @@ public class TestTaskAttempt {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
 
-    AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
         new ContainerContextMatcher(), appCtx);
@@ -1364,18 +1373,21 @@ public class TestTaskAttempt {
     @Override
     protected void logJobHistoryAttemptStarted() {
       taskAttemptStartedEventLogged++;
+      super.logJobHistoryAttemptStarted();
     }
 
     @Override
     protected void logJobHistoryAttemptFinishedEvent(
         TaskAttemptStateInternal state) {
       taskAttemptFinishedEventLogged++;
+      super.logJobHistoryAttemptFinishedEvent(state);
     }
 
     @Override
     protected void logJobHistoryAttemptUnsuccesfulCompletion(
         TaskAttemptState state) {
       taskAttemptFinishedEventLogged++;
+      super.logJobHistoryAttemptUnsuccesfulCompletion(state);
     }
     
     @Override

Reply via email to