Repository: tez Updated Branches: refs/heads/master 9c1d8ceed -> e41699119
TEZ-3173. Update Tez AM REST APIs for more information for each vertex. (Zhiyuan Yang via hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e4169911 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e4169911 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e4169911 Branch: refs/heads/master Commit: e416991192d43ea750c3fa78c5cced0e9e6cadc1 Parents: 9c1d8ce Author: Hitesh Shah <[email protected]> Authored: Wed Mar 30 16:50:45 2016 -0700 Committer: Hitesh Shah <[email protected]> Committed: Wed Mar 30 16:50:45 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/dag/app/dag/Task.java | 2 + .../java/org/apache/tez/dag/app/dag/Vertex.java | 8 +++ .../apache/tez/dag/app/dag/impl/TaskImpl.java | 24 ++++++-- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 62 ++++++++++++++++++- .../tez/dag/app/dag/impl/VertexStats.java | 4 ++ .../apache/tez/dag/app/web/AMWebController.java | 6 ++ .../tez/dag/app/dag/impl/TestTaskImpl.java | 17 ++++++ .../tez/dag/app/dag/impl/TestVertexImpl.java | 63 ++++++++++++++++++++ .../tez/dag/app/web/TestAMWebController.java | 17 ++++++ 10 files changed, 199 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 830d7fa..70901e0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES TEZ-3029. Add an onError method to service plugin contexts. ALL CHANGES: + TEZ-3173. Update Tez AM REST APIs for more information for each vertex. TEZ-3108. Add support for external services to local mode. TEZ-3189. Pre-warm dags should not be counted in submitted dags count by DAGAppMaster. TEZ-2967. Vertex start time should be that of first task start time in UI http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java index 04f0e5b..d1b9b2a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java @@ -71,4 +71,6 @@ public interface Task { public TaskLocationHint getTaskLocationHint(); long getFirstAttemptStartTime(); + + long getFinishTime(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 54f2ffa..25fbf3a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -189,4 +189,12 @@ public interface Vertex extends Comparable<Vertex> { public int getTaskSchedulerIdentifier(); public int getContainerLauncherIdentifier(); public int getTaskCommunicatorIdentifier(); + + public long getInitTime(); + public long getStartTime(); + public long getFinishTime(); + + void reportTaskStartTime(long taskStartTime); + public long getFirstTaskStartTime(); + public long getLastTaskFinishTime(); } http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 32869a5..9217e84 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -328,6 +328,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { private final boolean leafVertex; + @VisibleForTesting + long finishTime = -1L; + @Override public TaskState getState() { readLock.lock(); @@ -427,7 +430,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { try { report.setTaskId(taskId); report.setStartTime(getLaunchTime()); - report.setFinishTime(getFinishTime()); + report.setFinishTime(getLastTaskAttemptFinishTime()); report.setTaskState(getState()); report.setProgress(getProgress()); return report; @@ -546,6 +549,16 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } } + @Override + public long getFinishTime() { + readLock.lock(); + try { + return finishTime; + } finally { + readLock.unlock(); + } + } + @VisibleForTesting public TaskStateInternal getInternalState() { readLock.lock(); @@ -587,7 +600,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { //this is always called in read/write lock //TODO Verify behaviour is Task is killed (no finished attempt) - private long getFinishTime() { + private long getLastTaskAttemptFinishTime() { if (!isFinished()) { return 0; } @@ -866,8 +879,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { protected void logJobHistoryTaskFinishedEvent() { // FIXME need to handle getting finish time as this function // is called from within a transition + this.finishTime = clock.getTime(); TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId, - getVertex().getName(), getLaunchTime(), clock.getTime(), + getVertex().getName(), getLaunchTime(), this.finishTime, successfulAttempt, TaskState.SUCCEEDED, "", getCounters(), failedAttempts); this.appContext.getHistoryHandler().handle( @@ -875,8 +889,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } protected void logJobHistoryTaskFailedEvent(TaskState finalState) { + this.finishTime = clock.getTime(); TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId, - getVertex().getName(), getLaunchTime(), clock.getTime(), null, + getVertex().getName(), getLaunchTime(), this.finishTime, null, finalState, StringUtils.join(getDiagnostics(), LINE_SEPARATOR), getCounters(), failedAttempts); @@ -944,6 +959,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { } else { task.scheduledTime = task.clock.getTime(); task.logJobHistoryTaskStartedEvent(); + task.vertex.reportTaskStartTime(task.getLaunchTime()); } // No matter whether it is in recovery or normal execution, always schedule new task attempt. // TaskAttempt will continue the recovery if necessary and send task attempt status http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 8b81be7..ea202f7 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -672,6 +672,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl long startedTime; // Time when entering state STARTED @VisibleForTesting long finishTime; + long firstTaskStartTime = -1L; + Object firstTaskStartTimeLock = new Object(); private float progress; private final TezVertexID vertexId; //runtime assigned id. @@ -1208,6 +1210,65 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } + @Override + public long getInitTime() { + readLock.lock(); + try { + return initedTime; + } finally { + readLock.unlock(); + } + } + + @Override + public long getStartTime() { + readLock.lock(); + try { + return startedTime; + } finally { + readLock.unlock(); + } + } + + @Override + public long getFinishTime() { + readLock.lock(); + try { + return finishTime; + } finally { + readLock.unlock(); + } + } + + @Override + public void reportTaskStartTime(long taskStartTime) { + synchronized (firstTaskStartTimeLock) { + if (firstTaskStartTime < 0 || taskStartTime < firstTaskStartTime) { + firstTaskStartTime = taskStartTime; + } + } + } + + @Override + public long getFirstTaskStartTime() { + return firstTaskStartTime; + } + + @Override + public long getLastTaskFinishTime() { + readLock.lock(); + try { + if (inTerminalState()) { + mayBeConstructFinalFullCounters(); + return vertexStats.getLastTaskFinishTime(); + } else { + return -1; + } + } finally { + readLock.unlock(); + } + } + boolean inTerminalState() { VertexState state = getInternalState(); if (state == VertexState.ERROR || state == VertexState.FAILED @@ -1859,7 +1920,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl finishTime = clock.getTime(); } - void logJobHistoryVertexInitializedEvent() { if (recoveryData == null || !recoveryData.shouldSkipInit()) { VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName, http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java index ae844ee..c7bccd1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexStats.java @@ -44,6 +44,10 @@ public class VertexStats { return firstTaskStartTime; } + public void setFirstTaskStartTime(long firstTaskStartTime) { + this.firstTaskStartTime = firstTaskStartTime; + } + public Set<TezTaskID> getFirstTasksToStart() { return Collections.unmodifiableSet(firstTasksToStart); } http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index c70fdae..54706ec 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -572,6 +572,12 @@ public class AMWebController extends Controller { vertexInfo.put("status", vertex.getState().toString()); vertexInfo.put("progress", Float.toString(vertex.getCompletedTaskProgress())); + vertexInfo.put("initTime", Long.toString(vertex.getInitTime())); + vertexInfo.put("startTime", Long.toString(vertex.getStartTime())); + vertexInfo.put("finishTime", Long.toString(vertex.getFinishTime())); + vertexInfo.put("firstTaskStartTime", Long.toString(vertex.getFirstTaskStartTime())); + vertexInfo.put("lastTaskFinishTime", Long.toString(vertex.getLastTaskFinishTime())); + ProgressBuilder vertexProgress = vertex.getVertexProgress(); vertexInfo.put("totalTasks", Integer.toString(vertexProgress.getTotalTaskCount())); vertexInfo.put("runningTasks", Integer.toString(vertexProgress.getRunningTaskCount())); http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java index 6f11aa0..1ed7ecb 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java @@ -31,6 +31,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.tez.dag.history.DAGHistoryEvent; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventHandler; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.mockito.ArgumentCaptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -108,6 +113,7 @@ public class TestTaskImpl { private Container mockContainer; private AMContainer mockAMContainer; private NodeId mockNodeId; + private HistoryEventHandler mockHistoryHandler; private MockTaskImpl mockTask; private TaskSpec mockTaskSpec; @@ -139,10 +145,12 @@ public class TestTaskImpl { mockContainer = mock(Container.class); mockAMContainer = mock(AMContainer.class); mockNodeId = mock(NodeId.class); + mockHistoryHandler = mock(HistoryEventHandler.class); when(mockContainer.getId()).thenReturn(mockContainerId); when(mockContainer.getNodeId()).thenReturn(mockNodeId); when(mockAMContainer.getContainer()).thenReturn(mockContainer); when(appContext.getAllContainers().get(mockContainerId)).thenReturn(mockAMContainer); + when(appContext.getHistoryHandler()).thenReturn(mockHistoryHandler); taskResource = Resource.newInstance(1024, 1); localResources = new HashMap<String, LocalResource>(); environment = new HashMap<String, String>(); @@ -620,6 +628,14 @@ public class TestTaskImpl { verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId), eq(mockTask.getLastAttempt().getID().getId())); + ArgumentCaptor<DAGHistoryEvent> argumentCaptor = ArgumentCaptor.forClass(DAGHistoryEvent.class); + verify(mockHistoryHandler).handle(argumentCaptor.capture()); + DAGHistoryEvent dagHistoryEvent = argumentCaptor.getValue(); + HistoryEvent historyEvent = dagHistoryEvent.getHistoryEvent(); + assertTrue(historyEvent instanceof TaskFinishedEvent); + TaskFinishedEvent taskFinishedEvent = (TaskFinishedEvent)historyEvent; + assertEquals(taskFinishedEvent.getFinishTime(), mockTask.getFinishTime()); + eventHandler.events.clear(); // Now fail the attempt after it has succeeded TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class); @@ -904,6 +920,7 @@ public class TestTaskImpl { } protected void logJobHistoryTaskFinishedEvent() { + super.logJobHistoryTaskFinishedEvent(); taskFinishedEventLogged++; } http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 659d099..7a20a37 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.Limits; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.app.rm.AMSchedulerEventType; import org.apache.tez.hadoop.shim.DefaultHadoopShim; import org.apache.tez.runtime.api.VertexStatistics; import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; @@ -6728,4 +6729,66 @@ public class TestVertexImpl { } + @Test(timeout = 5000) + public void testFirstTaskStartTime() { + VertexImpl v = vertices.get("vertex1"); + Assert.assertEquals(v.getFirstTaskStartTime(), -1); + v.reportTaskStartTime(100); + Assert.assertEquals(v.getFirstTaskStartTime(), 100); + v.reportTaskStartTime(50); + Assert.assertEquals(v.getFirstTaskStartTime(), 50); + v.reportTaskStartTime(200); + Assert.assertEquals(v.getFirstTaskStartTime(), 50); + } + + @Test(timeout = 5000) + public void testLastTaskFinishTime() { + NodeId nid = NodeId.newInstance("127.0.0.1", 0); + ContainerId contId = ContainerId.newContainerId(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + AMContainerMap containers = new AMContainerMap( + mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class), + new ContainerContextMatcher(), appContext); + containers.addContainerIfNew(container, 0, 0, 0); + doReturn(containers).when(appContext).getAllContainers(); + dispatcher.register(DAGEventType.class, dagEventDispatcher); + + initAllVertices(VertexState.INITED); + + VertexImpl v = vertices.get("vertex2"); + startVertex(v); + + TezTaskID tid0 = TezTaskID.getInstance(v.getVertexId(), 0); + TezTaskID tid1 = TezTaskID.getInstance(v.getVertexId(), 1); + TaskImpl task0 = (TaskImpl) v.getTask(tid0); + TaskImpl task1 = (TaskImpl) v.getTask(tid1); + + TezTaskAttemptID taskAttemptId0 = TezTaskAttemptID.getInstance(task0.getTaskId(), 0); + TezTaskAttemptID taskAttemptId1 = TezTaskAttemptID.getInstance(task1.getTaskId(), 0); + TaskAttemptImpl taskAttempt0 = (TaskAttemptImpl) task0.getAttempt(taskAttemptId0); + TaskAttemptImpl taskAttempt1 = (TaskAttemptImpl) task1.getAttempt(taskAttemptId1); + + Assert.assertEquals(v.getLastTaskFinishTime(), -1); + + taskAttempt0.handle(new TaskAttemptEventSchedule(taskAttemptId0, 0, 0)); + taskAttempt0.handle(new TaskAttemptEventStartedRemotely(taskAttemptId0, contId, null)); + taskAttempt0.handle(new TaskAttemptEvent(taskAttemptId0, TaskAttemptEventType.TA_DONE)); + //task0.handle(new TaskEventTAUpdate(taskAttemptId0, TaskEventType.T_ATTEMPT_SUCCEEDED)); + + Assert.assertEquals(v.getLastTaskFinishTime(), -1); + + taskAttempt1.handle(new TaskAttemptEventSchedule(taskAttemptId1, 0, 0)); + taskAttempt1.handle(new TaskAttemptEventStartedRemotely(taskAttemptId1, contId, null)); + taskAttempt1.handle(new TaskAttemptEvent(taskAttemptId1, TaskAttemptEventType.TA_DONE)); + //task1.handle(new TaskEventTAUpdate(taskAttemptId1, TaskEventType.T_ATTEMPT_SUCCEEDED)); + + dispatcher.await(); + + Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); + Assert.assertTrue(v.getLastTaskFinishTime() > 0); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/e4169911/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java index 1b35913..8561c8c 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java @@ -381,6 +381,11 @@ public class TestAMWebController { doReturn(status).when(mockVertex).getState(); doReturn(progress).when(mockVertex).getProgress(); doReturn(pb).when(mockVertex).getVertexProgress(); + doReturn(1L).when(mockVertex).getInitTime(); + doReturn(1L).when(mockVertex).getStartTime(); + doReturn(2L).when(mockVertex).getFinishTime(); + doReturn(1L).when(mockVertex).getFirstTaskStartTime(); + doReturn(2L).when(mockVertex).getLastTaskFinishTime(); TezCounters counters = new TezCounters(); counters.addGroup("g1", "g1"); @@ -417,6 +422,18 @@ public class TestAMWebController { vertex2Result.get("killedTaskAttempts")); Assert.assertEquals(Integer.toString(progress.getFailedTaskAttemptCount()), vertex2Result.get("failedTaskAttempts")); + String str0 = Long.toString(mockVertex2.getInitTime()); + String str1 = vertex2Result.get("initTime"); + Assert.assertEquals(Long.toString(mockVertex2.getInitTime()), + vertex2Result.get("initTime")); + Assert.assertEquals(Long.toString(mockVertex2.getStartTime()), + vertex2Result.get("startTime")); + Assert.assertEquals(Long.toString(mockVertex2.getFinishTime()), + vertex2Result.get("finishTime")); + Assert.assertEquals(Long.toString(mockVertex2.getFirstTaskStartTime()), + vertex2Result.get("firstTaskStartTime")); + Assert.assertEquals(Long.toString(mockVertex2.getLastTaskFinishTime()), + vertex2Result.get("lastTaskFinishTime")); } //-- Get Tasks Info Tests -----------------------------------------------------------------------
