Repository: tez Updated Branches: refs/heads/master eeac7a92c -> 996906d27
TEZ-2752. logUnsuccessful completion in Attempt should write original finish time to ATS (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/996906d2 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/996906d2 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/996906d2 Branch: refs/heads/master Commit: 996906d27cdb0a1c4301dc449aa5dc638b5b4363 Parents: eeac7a9 Author: Bikas Saha <[email protected]> Authored: Wed Sep 2 16:34:05 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Wed Sep 2 16:34:05 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 8 ++++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 ++- .../tez/dag/app/dag/impl/TestTaskAttempt.java | 42 +++++++++++++------- 3 files changed, 40 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/996906d2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a6d3b54..687db4d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,8 @@ Release 0.8.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2752. logUnsuccessful completion in Attempt should write original finish + time to ATS TEZ-2755. Fix findbugs warning in TezClient TEZ-2767. Make TezMxBeanResourceCalculator the default resource calculator. TEZ-2765. Change Xmlwriter to use defaultValue instead of value tag. @@ -160,6 +162,8 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2752. logUnsuccessful completion in Attempt should write original finish + time to ATS TEZ-2755. Fix findbugs warning in TezClient TEZ-2767. Make TezMxBeanResourceCalculator the default resource calculator. TEZ-2602. Throwing EOFException when launching MR job @@ -395,6 +399,8 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2752. logUnsuccessful completion in Attempt should write original finish + time to ATS TEZ-2742. VertexImpl.finished() terminationCause hides member var of the same name TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers @@ -609,6 +615,8 @@ INCOMPATIBLE CHANGES TEZ-2552. CRC errors can cause job to run for very long time in large jobs. ALL CHANGES: + TEZ-2752. logUnsuccessful completion in Attempt should write original finish + time to ATS TEZ-2742. VertexImpl.finished() terminationCause hides member var of the same name TEZ-2732. DefaultSorter throws ArrayIndex exceptions on 2047 Mb size sort buffers http://git-wip-us.apache.org/repos/asf/tez/blob/996906d2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 3f2e3a4..e57c827 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -1048,9 +1048,13 @@ public class TaskAttemptImpl implements TaskAttempt, protected void logJobHistoryAttemptUnsuccesfulCompletion( TaskAttemptState state) { + long finishTime = getFinishTime(); + if (finishTime <= 0) { + finishTime = clock.getTime(); // comes here in case it was terminated before launch + } TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent( attemptId, getVertex().getName(), getLaunchTime(), - clock.getTime(), state, + finishTime, state, terminationCause, StringUtils.join( getDiagnostics(), LINE_SEPARATOR), getCounters(), lastDataEventTime, http://git-wip-us.apache.org/repos/asf/tez/blob/996906d2/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java index 13c9202..101b22f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java @@ -91,6 +91,9 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; import org.apache.tez.dag.app.rm.container.AMContainerMap; import org.apache.tez.dag.app.rm.container.ContainerContextMatcher; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEventHandler; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; import org.apache.tez.dag.records.TaskAttemptTerminationCause; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -117,6 +120,7 @@ public class TestTaskAttempt { } } + AppContext appCtx; Task mockTask; TaskLocationHint locationHint; @@ -127,7 +131,10 @@ public class TestTaskAttempt { @Before public void setupTest() { + appCtx = mock(AppContext.class); mockTask = mock(Task.class); + HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); + doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); } @Test(timeout = 5000) @@ -146,7 +153,7 @@ public class TestTaskAttempt { TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), - mock(TaskHeartbeatHandler.class), mock(AppContext.class), + mock(TaskHeartbeatHandler.class), appCtx, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class); @@ -181,12 +188,12 @@ public class TestTaskAttempt { TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), - mock(TaskHeartbeatHandler.class), mock(AppContext.class), + mock(TaskHeartbeatHandler.class), appCtx, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler, mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), - mock(TaskHeartbeatHandler.class), mock(AppContext.class), + mock(TaskHeartbeatHandler.class), appCtx, true, Resource.newInstance(1024, 1), createFakeContainerContext(), false); ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); @@ -245,7 +252,7 @@ public class TestTaskAttempt { TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), mock(TaskHeartbeatHandler.class), - mock(AppContext.class), false, Resource.newInstance(1024, + appCtx, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false); TaskAttemptImpl spyTa = spy(taImpl); @@ -295,7 +302,7 @@ public class TestTaskAttempt { new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null); Resource resource = Resource.newInstance(1024, 1); - AppContext mockAppContext = mock(AppContext.class); + AppContext mockAppContext = appCtx; doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo(); TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, @@ -352,7 +359,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -452,7 +458,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -516,7 +521,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -608,7 +612,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -738,7 +741,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -828,7 +830,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -922,7 +923,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -1024,7 +1024,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -1123,7 +1122,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -1131,6 +1129,8 @@ public class TestTaskAttempt { doReturn(new ClusterInfo()).when(appCtx).getClusterInfo(); doReturn(containers).when(appCtx).getAllContainers(); + HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class); + doReturn(mockHistHandler).when(appCtx).getHistoryHandler(); TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class); MockTaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler, @@ -1152,7 +1152,12 @@ public class TestTaskAttempt { int expectedEventsTillSucceeded = 6; ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class); + ArgumentCaptor<DAGHistoryEvent> histArg = ArgumentCaptor.forClass(DAGHistoryEvent.class); verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture()); + verify(mockHistHandler, times(2)).handle(histArg.capture()); // start and finish + DAGHistoryEvent histEvent = histArg.getValue(); + TaskAttemptFinishedEvent finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent(); + long finishTime = finishEvent.getFinishTime(); verifyEventType(arg.getAllValues(), TaskEventTAUpdate.class, 2); InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, 1); @@ -1183,6 +1188,11 @@ public class TestTaskAttempt { assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl.getTerminationCause()); // verify unregister is not invoked again verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID); + verify(mockHistHandler, times(3)).handle(histArg.capture()); + histEvent = histArg.getValue(); + finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent(); + long newFinishTime = finishEvent.getFinishTime(); + Assert.assertEquals(finishTime, newFinishTime); assertEquals(true, taImpl.inputFailedReported); int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2; @@ -1266,7 +1276,6 @@ public class TestTaskAttempt { when(container.getNodeId()).thenReturn(nid); when(container.getNodeHttpAddress()).thenReturn("localhost:0"); - AppContext appCtx = mock(AppContext.class); AMContainerMap containers = new AMContainerMap( mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appCtx); @@ -1353,18 +1362,21 @@ public class TestTaskAttempt { @Override protected void logJobHistoryAttemptStarted() { taskAttemptStartedEventLogged++; + super.logJobHistoryAttemptStarted(); } @Override protected void logJobHistoryAttemptFinishedEvent( TaskAttemptStateInternal state) { taskAttemptFinishedEventLogged++; + super.logJobHistoryAttemptFinishedEvent(state); } @Override protected void logJobHistoryAttemptUnsuccesfulCompletion( TaskAttemptState state) { taskAttemptFinishedEventLogged++; + super.logJobHistoryAttemptUnsuccesfulCompletion(state); } @Override
