Repository: tez Updated Branches: refs/heads/branch-0.7 9be8cd47f -> bc56ca315
http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java index f43f52c..9c321ff 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java @@ -215,7 +215,7 @@ public class TestTaskRecovery { long taStartTime = taskStartTime + 100L; TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); + taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(0, task.getFinishedAttemptsCount()); assertEquals(taskScheduledTime, task.scheduledTime); @@ -286,7 +286,7 @@ public class TestTaskRecovery { restoreFromTaskStartEvent(); TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters())); + 0L, 0L, TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters(), null)); task.handle(new TaskEventRecoverTask(task.getTaskId())); // wait for the second task attempt is scheduled dispatcher.await(); @@ -307,7 +307,7 @@ public class TestTaskRecovery { restoreFromTaskStartEvent(); TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters())); + 0L, 0L, TaskAttemptState.FAILED, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters(), null)); task.handle(new TaskEventRecoverTask(task.getTaskId())); // wait for the second task attempt is scheduled dispatcher.await(); @@ -329,7 +329,7 @@ public class TestTaskRecovery { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); try { task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, - 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters())); + 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters(), null)); fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)"); } catch (TezUncheckedException e) { assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover")); @@ -372,7 +372,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -405,7 +405,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.FAILED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -438,7 +438,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -473,7 +473,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -516,7 +516,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -528,7 +528,7 @@ public class TestTaskRecovery { recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.FAILED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -563,7 +563,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -575,7 +575,7 @@ public class TestTaskRecovery { recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -614,7 +614,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -654,7 +654,7 @@ public class TestTaskRecovery { TaskState recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.SUCCEEDED, recoveredState); assertEquals(1, task.getAttempts().size()); assertEquals(1, task.getFinishedAttemptsCount()); @@ -721,7 +721,7 @@ public class TestTaskRecovery { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, - taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "")); + taStartTime, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); @@ -735,7 +735,7 @@ public class TestTaskRecovery { recoveredState = task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, taStartTime, taFinishTime, TaskAttemptState.KILLED, null, - "", new TezCounters())); + "", new TezCounters(), null)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, ((TaskAttemptImpl) task.getAttempt(taId)).getInternalState()); @@ -774,9 +774,9 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "")); + mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.KILLED, null, "", null)); + 0, TaskAttemptState.KILLED, null, "", null, null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(0, task.failedAttempts); @@ -804,9 +804,9 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "")); + mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, null, "", null)); + 0, TaskAttemptState.FAILED, null, "", null, null)); } assertEquals(maxFailedAttempts, task.getAttempts().size()); assertEquals(maxFailedAttempts, task.failedAttempts); @@ -834,9 +834,9 @@ public class TestTaskRecovery { for (int i = 0; i < maxFailedAttempts - 1; ++i) { TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId()); task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L, - mock(ContainerId.class), mock(NodeId.class), "", "", "")); + mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0, - 0, TaskAttemptState.FAILED, null, "", null)); + 0, TaskAttemptState.FAILED, null, "", null, null)); } assertEquals(maxFailedAttempts - 1, task.getAttempts().size()); assertEquals(maxFailedAttempts - 1, task.failedAttempts); @@ -844,7 +844,7 @@ public class TestTaskRecovery { TezTaskAttemptID newTaskAttemptId = getNewTaskAttemptID(task.getTaskId()); TaskState recoveredState = task.restoreFromEvent(new TaskAttemptStartedEvent(newTaskAttemptId, - vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "")); + vertexName, 0, mock(ContainerId.class), mock(NodeId.class), "", "", "", 0, null, 0)); assertEquals(TaskState.RUNNING, recoveredState); assertEquals(TaskAttemptStateInternal.NEW, http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 67bad47..1ae9289 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -110,6 +110,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.ContainerHeartbeatHandler; +import org.apache.tez.dag.app.MockClock; import org.apache.tez.dag.app.TaskAttemptEventInfo; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; @@ -2271,6 +2272,8 @@ public class TestVertexImpl { }}) .when(execService).submit((Callable<Void>) any()); + MockClock clock = new MockClock(); + doReturn(execService).when(appContext).getExecService(); doReturn(conf).when(appContext).getAMConf(); doReturn(new Credentials()).when(dag).getCredentials(); @@ -2281,7 +2284,8 @@ public class TestVertexImpl { doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources(); doReturn(historyEventHandler).when(appContext).getHistoryHandler(); doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler(); - + doReturn(clock).when(appContext).getClock(); + vertexGroups = Maps.newHashMap(); for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) { vertexGroups.put(groupInfo.getGroupName(), new VertexGroupInfo(groupInfo)); @@ -2997,6 +3001,11 @@ public class TestVertexImpl { dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(2, v.getCompletedTasks()); + Assert.assertTrue(v.initTimeRequested > 0); + Assert.assertTrue(v.initedTime > 0); + Assert.assertTrue(v.startTimeRequested > 0); + Assert.assertTrue(v.startedTime > 0); + Assert.assertTrue(v.finishTime > 0); } @Test(timeout = 5000) @@ -3307,10 +3316,25 @@ public class TestVertexImpl { initAllVertices(VertexState.INITED); VertexImpl v6 = vertices.get("vertex6"); + VertexImpl v3 = vertices.get("vertex3"); startVertex(vertices.get("vertex1")); + dispatcher.await(); + Assert.assertEquals(VertexState.INITED, v3.getState()); + long v3StartTimeRequested = v3.startTimeRequested; + Assert.assertEquals(1, v3.numStartedSourceVertices); + Assert.assertTrue(v3StartTimeRequested > 0); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } startVertex(vertices.get("vertex2")); dispatcher.await(); + // start request from second source vertex overrides the value from the first source vertex + Assert.assertEquals(VertexState.RUNNING, v3.getState()); + Assert.assertEquals(2, v3.numStartedSourceVertices); + Assert.assertTrue(v3.startTimeRequested > v3StartTimeRequested); LOG.info("Verifying v6 state " + v6.getState()); Assert.assertEquals(VertexState.RUNNING, v6.getState()); Assert.assertEquals(3, v6.getDistanceFromRoot()); @@ -3692,10 +3716,15 @@ public class TestVertexImpl { // v3 still initializing with source vertex started. So should start running // once num tasks is defined Assert.assertEquals(VertexState.INITIALIZING, v3.getState()); + Assert.assertTrue(v3.numStartedSourceVertices > 0); + long v3StartTimeRequested = v3.startTimeRequested; + Assert.assertTrue(v3StartTimeRequested > 0); v3.reconfigureVertex(numTasks, null, null); dispatcher.await(); Assert.assertEquals(numTasks, v3.getTotalTasks()); Assert.assertEquals(VertexState.RUNNING, v3.getState()); + // the start time requested should remain at its original value + Assert.assertEquals(v3StartTimeRequested, v3.startTimeRequested); } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java index fafbba6..f9a1c5e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java @@ -108,11 +108,14 @@ public class TestAMContainer { assertNull(wc.amContainer.getCurrentTaskAttempt()); // Assign task. + long currTime = wc.appContext.getClock().getTime(); wc.assignTaskAttempt(wc.taskAttemptID); wc.verifyState(AMContainerState.LAUNCHING); wc.verifyNoOutgoingEvents(); assertEquals(wc.taskAttemptID, wc.amContainer.getCurrentTaskAttempt()); - + assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() > 0); + assertTrue(wc.amContainer.getCurrentTaskAttemptAllocationTime() >= currTime); + // Container Launched wc.containerLaunched(); wc.verifyState(AMContainerState.RUNNING); http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java index 3760644..5c8c90e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java @@ -48,6 +48,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; @@ -479,7 +480,10 @@ public class TestHistoryEventsProtoConversion { "vertex1", 10009l, ContainerId.newInstance( ApplicationAttemptId.newInstance( ApplicationId.newInstance(0, 1), 1), 1001), NodeId.newInstance( - "host1", 19999), "inProgress", "Completed", "nodeHttpAddress"); + "host1", 19999), "inProgress", "Completed", "nodeHttpAddress", 1024, + TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 0), 1024 + ); TaskAttemptStartedEvent deserializedEvent = (TaskAttemptStartedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -490,6 +494,12 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getNodeId()); Assert.assertEquals(event.getStartTime(), deserializedEvent.getStartTime()); + Assert.assertEquals(event.getCreationTime(), + deserializedEvent.getCreationTime()); + Assert.assertEquals(event.getAllocationTime(), + deserializedEvent.getAllocationTime()); + Assert.assertEquals(event.getCreationCausalTA(), + deserializedEvent.getCreationCausalTA()); logEvents(event, deserializedEvent); } @@ -499,7 +509,7 @@ public class TestHistoryEventsProtoConversion { TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - null, null, null); + null, null, null, null); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -515,11 +525,17 @@ public class TestHistoryEventsProtoConversion { logEvents(event, deserializedEvent); } { + TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( + TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 0), 0); + long timestamp = 1024L; + List<DataEventDependencyInfo> events = Lists.newArrayList(); + events.add(new DataEventDependencyInfo(timestamp, taId)); + events.add(new DataEventDependencyInfo(timestamp, taId)); TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent( TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1), "vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED, - TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters()); + TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters(), events); TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent) testProtoConversion(event); Assert.assertEquals(event.getTaskAttemptID(), @@ -534,6 +550,9 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getCounters()); Assert.assertEquals(event.getTaskAttemptError(), deserializedEvent.getTaskAttemptError()); + Assert.assertEquals(events.size(), event.getDataEvents().size()); + Assert.assertEquals(events.get(0).getTimestamp(), event.getDataEvents().get(0).getTimestamp()); + Assert.assertEquals(events.get(0).getTaskAttemptId(), event.getDataEvents().get(0).getTaskAttemptId()); logEvents(event, deserializedEvent); } } @@ -582,9 +601,10 @@ public class TestHistoryEventsProtoConversion { } catch (RuntimeException e) { // Expected } + long eventTime = 1024; List<TezEvent> events = Arrays.asList(new TezEvent(DataMovementEvent.create(1, null), - new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null))); + new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null), eventTime)); event = new VertexRecoverableEventsGeneratedEvent( TezVertexID.getInstance( TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events); @@ -595,6 +615,8 @@ public class TestHistoryEventsProtoConversion { deserializedEvent.getTezEvents().size()); Assert.assertEquals(event.getTezEvents().get(0).getEventType(), deserializedEvent.getTezEvents().get(0).getEventType()); + Assert.assertEquals(event.getTezEvents().get(0).getEventReceivedTime(), + deserializedEvent.getTezEvents().get(0).getEventReceivedTime()); logEvents(event, deserializedEvent); } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java index 3ab204a..711e4bb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java @@ -160,11 +160,11 @@ public class TestHistoryEventJsonConversion { break; case TASK_ATTEMPT_STARTED: event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, - nodeId, null, null, "nodeHttpAddress"); + nodeId, null, null, "nodeHttpAddress", 0, null, 0); break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null); + random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 53fc66b..4685a61 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -435,7 +435,10 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics()); atsEntity.addOtherInfo(ATSConstants.COUNTERS, DAGUtils.convertCountersToATSMap(event.getCounters())); - + if (event.getDataEvents() != null && !event.getDataEvents().isEmpty()) { + atsEntity.addOtherInfo(ATSConstants.LAST_DATA_EVENTS, + DAGUtils.convertDataEventDependecyInfoToATS(event.getDataEvents())); + } return atsEntity; } @@ -470,6 +473,12 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress()); atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString()); atsEntity.addOtherInfo(ATSConstants.STATUS, TaskAttemptState.RUNNING.name()); + atsEntity.addOtherInfo(ATSConstants.CREATION_TIME, event.getCreationTime()); + atsEntity.addOtherInfo(ATSConstants.ALLOCATION_TIME, event.getAllocationTime()); + if (event.getCreationCausalTA() != null) { + atsEntity.addOtherInfo(ATSConstants.CREATION_CAUSAL_ATTEMPT, + event.getCreationCausalTA().toString()); + } return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 7e99ac3..2849c10 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.history.logging.ats; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -45,6 +46,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.VertexState; +import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo; import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.app.web.AMWebController; import org.apache.tez.dag.history.HistoryEvent; @@ -85,6 +87,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; + public class TestHistoryEventTimelineConversion { private ApplicationAttemptId applicationAttemptId; @@ -166,11 +170,11 @@ public class TestHistoryEventTimelineConversion { break; case TASK_ATTEMPT_STARTED: event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", random.nextInt(), containerId, - nodeId, null, null, "nodeHttpAddress"); + nodeId, null, null, "nodeHttpAddress", 0, null, 0); break; case TASK_ATTEMPT_FINISHED: event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(), - random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null); + random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null, null); break; case CONTAINER_LAUNCHED: event = new ContainerLaunchedEvent(containerId, random.nextInt(), @@ -445,6 +449,7 @@ public class TestHistoryEventTimelineConversion { timelineEntity.getOtherInfo().get(ATSConstants.USER)); } + @SuppressWarnings("unchecked") @Test(timeout = 5000) public void testConvertTaskAttemptFinishedEvent() { String vertexName = "testVertex"; @@ -456,9 +461,13 @@ public class TestHistoryEventTimelineConversion { .values()[random.nextInt(TaskAttemptTerminationCause.values().length)]; String diagnostics = "random diagnostics message"; TezCounters counters = new TezCounters(); + long lastDataEventTime = finishTime - 1; + List<DataEventDependencyInfo> events = Lists.newArrayList(); + events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID)); + events.add(new DataEventDependencyInfo(lastDataEventTime, tezTaskAttemptID)); TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName, - startTime, finishTime, state, error, diagnostics, counters); + startTime, finishTime, state, error, diagnostics, counters, events); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId()); Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); @@ -481,12 +490,17 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(finishTime, evt.getTimestamp()); final Map<String, Object> otherInfo = timelineEntity.getOtherInfo(); - Assert.assertEquals(6, otherInfo.size()); + Assert.assertEquals(7, otherInfo.size()); Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME)); Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN)); Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS)); Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM)); Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS)); + Map<String, Object> obj1 = (Map<String, Object>)otherInfo.get(ATSConstants.LAST_DATA_EVENTS); + List<Object> obj2 = (List<Object>) obj1.get(ATSConstants.LAST_DATA_EVENTS); + Assert.assertEquals(2, obj2.size()); + Map<String, Object> obj3 = (Map<String, Object>) obj2.get(0); + Assert.assertEquals(events.get(0).getTimestamp(), obj3.get(ATSConstants.TIMESTAMP)); Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS)); } @@ -730,8 +744,11 @@ public class TestHistoryEventTimelineConversion { @Test(timeout = 5000) public void testConvertTaskAttemptStartedEvent() { long startTime = random.nextLong(); + long creationTime = 1024; + long allocationTime = 1024; TaskAttemptStartedEvent event = new TaskAttemptStartedEvent(tezTaskAttemptID, "v1", - startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress"); + startTime, containerId, nodeId, "inProgressURL", "logsURL", "nodeHttpAddress", + creationTime, tezTaskAttemptID, allocationTime); TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event); Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType()); @@ -776,6 +793,10 @@ public class TestHistoryEventTimelineConversion { timelineEntity.getOtherInfo().get(ATSConstants.NODE_HTTP_ADDRESS)); Assert.assertTrue(TaskAttemptState.RUNNING.name() .equals(timelineEntity.getOtherInfo().get(ATSConstants.STATUS))); + Assert.assertEquals(tezTaskAttemptID.toString(), + timelineEntity.getOtherInfo().get(ATSConstants.CREATION_CAUSAL_ATTEMPT)); + Assert.assertEquals(creationTime, timelineEntity.getOtherInfo().get(ATSConstants.CREATION_TIME)); + Assert.assertEquals(allocationTime, timelineEntity.getOtherInfo().get(ATSConstants.ALLOCATION_TIME)); } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/bc56ca31/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index 974e190..b44b7d4 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -55,12 +55,19 @@ public class TezEvent implements Writable { private EventMetaData sourceInfo; private EventMetaData destinationInfo; + + private long eventReceivedTime; public TezEvent() { } public TezEvent(Event event, EventMetaData sourceInfo) { + this(event, sourceInfo, System.currentTimeMillis()); + } + + public TezEvent(Event event, EventMetaData sourceInfo, long time) { this.event = event; + this.eventReceivedTime = time; this.setSourceInfo(sourceInfo); if (event instanceof DataMovementEvent) { eventType = EventType.DATA_MOVEMENT_EVENT; @@ -91,6 +98,14 @@ public class TezEvent implements Writable { public Event getEvent() { return event; } + + public void setEventReceivedTime(long eventReceivedTime) { // TODO save + this.eventReceivedTime = eventReceivedTime; + } + + public long getEventReceivedTime() { + return eventReceivedTime; + } public EventMetaData getSourceInfo() { return sourceInfo; @@ -119,6 +134,7 @@ public class TezEvent implements Writable { } out.writeBoolean(true); out.writeInt(eventType.ordinal()); + out.writeLong(eventReceivedTime); if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) { // TODO NEWTEZ convert to PB TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event; @@ -188,6 +204,7 @@ public class TezEvent implements Writable { return; } eventType = EventType.values()[in.readInt()]; + eventReceivedTime = in.readLong(); if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) { // TODO NEWTEZ convert to PB event = new TaskStatusUpdateEvent();
