Repository: tez Updated Branches: refs/heads/master 7b45e9a14 -> 6970fb01b
TEZ-2650. Timing details on Vertex state changes (bikas) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6970fb01 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6970fb01 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6970fb01 Branch: refs/heads/master Commit: 6970fb01bc7bd8a838dc55c5e38cbad41b1c3140 Parents: 7b45e9a Author: Bikas Saha <[email protected]> Authored: Wed Aug 5 12:35:30 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Wed Aug 5 12:35:30 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 10 ++- .../tez/dag/app/dag/impl/TestVertexImpl.java | 25 ++++++ .../tez/history/parser/datamodel/BaseInfo.java | 8 +- .../history/parser/datamodel/BaseParser.java | 6 +- .../tez/history/parser/datamodel/DagInfo.java | 24 +++--- .../parser/datamodel/TaskAttemptInfo.java | 26 +++---- .../tez/history/parser/datamodel/TaskInfo.java | 32 ++++---- .../history/parser/datamodel/VertexInfo.java | 82 +++++++++++--------- .../apache/tez/history/TestATSFileParser.java | 55 +++++++------ .../analyzer/plugins/SlowestVertexAnalyzer.java | 8 +- 11 files changed, 162 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 708dee5..4bc08d9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,7 @@ INCOMPATIBLE CHANGES TEZ-2647. Add input causality dependency for attempts TEZ-2633. Allow VertexManagerPlugins to receive and report based on attempts instead of tasks + TEZ-2650. Timing details on Vertex state changes ALL CHANGES: TEZ-2684. ShuffleVertexManager.parsePartitionStats throws IllegalStateException: Stats should be initialized. http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index accfa62..1fcfe7e 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -3471,6 +3471,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl vertex.distanceFromRoot = distanceFromRoot; } vertex.numStartedSourceVertices++; + vertex.startTimeRequested = vertex.clock.getTime(); LOG.info("Source vertex started: " + startEvent.getSourceVertexId() + " for vertex: " + vertex.logIdentifier + " numStartedSources: " + vertex.numStartedSourceVertices + " numSources: " + vertex.sourceVertices.size()); @@ -3529,12 +3530,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl Preconditions.checkState( (vertex.sourceVertices == null || vertex.sourceVertices.isEmpty()), "Vertex: " + vertex.logIdentifier + " got invalid start event"); - vertex.startTimeRequested = vertex.clock.getTime(); vertex.startSignalPending = true; + vertex.startTimeRequested = vertex.clock.getTime(); } } - + public static class StartTransition implements MultipleArcTransition<VertexImpl, VertexEvent, VertexState> { @@ -3542,7 +3543,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl public VertexState transition(VertexImpl vertex, VertexEvent event) { Preconditions.checkState(vertex.getState() == VertexState.INITED, "Unexpected state " + vertex.getState() + " for " + vertex.logIdentifier); - vertex.startTimeRequested = vertex.clock.getTime(); + // if the start signal is pending this event is a fake start event to trigger this transition + if (!vertex.startSignalPending) { + vertex.startTimeRequested = vertex.clock.getTime(); + } return vertex.startVertex(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index cfc297e..8864e9f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -3008,6 +3008,11 @@ public class TestVertexImpl { dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(2, v.getCompletedTasks()); + Assert.assertTrue(v.initTimeRequested > 0); + Assert.assertTrue(v.initedTime > 0); + Assert.assertTrue(v.startTimeRequested > 0); + Assert.assertTrue(v.startedTime > 0); + Assert.assertTrue(v.finishTime > 0); } @Test(timeout = 5000) @@ -3316,10 +3321,25 @@ public class TestVertexImpl { initAllVertices(VertexState.INITED); VertexImpl v6 = vertices.get("vertex6"); + VertexImpl v3 = vertices.get("vertex3"); startVertex(vertices.get("vertex1")); + dispatcher.await(); + Assert.assertEquals(VertexState.INITED, v3.getState()); + long v3StartTimeRequested = v3.startTimeRequested; + Assert.assertEquals(1, v3.numStartedSourceVertices); + Assert.assertTrue(v3StartTimeRequested > 0); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } startVertex(vertices.get("vertex2")); dispatcher.await(); + // start request from second source vertex overrides the value from the first source vertex + Assert.assertEquals(VertexState.RUNNING, v3.getState()); + Assert.assertEquals(2, v3.numStartedSourceVertices); + Assert.assertTrue(v3.startTimeRequested > v3StartTimeRequested); LOG.info("Verifying v6 state " + v6.getState()); Assert.assertEquals(VertexState.RUNNING, v6.getState()); Assert.assertEquals(3, v6.getDistanceFromRoot()); @@ -3701,10 +3721,15 @@ public class TestVertexImpl { // v3 still initializing with source vertex started. So should start running // once num tasks is defined Assert.assertEquals(VertexState.INITIALIZING, v3.getState()); + Assert.assertTrue(v3.numStartedSourceVertices > 0); + long v3StartTimeRequested = v3.startTimeRequested; + Assert.assertTrue(v3StartTimeRequested > 0); v3.reconfigureVertex(numTasks, null, null); dispatcher.await(); Assert.assertEquals(numTasks, v3.getTotalTasks()); Assert.assertEquals(VertexState.RUNNING, v3.getState()); + // the start time requested should remain at its original value + Assert.assertEquals(v3StartTimeRequested, v3.startTimeRequested); } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java index 8bd6bfb..3f9666a 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java @@ -61,28 +61,28 @@ public abstract class BaseInfo { * * @return long */ - public abstract long getStartTime(); + public abstract long getStartTimeInterval(); /** * Get finish time w.r.t DAG * * @return long */ - public abstract long getFinishTime(); + public abstract long getFinishTimeInterval(); /** * Get absolute start time * * @return long */ - public abstract long getAbsStartTime(); + public abstract long getStartTime(); /** * Get absolute finish time * * @return long */ - public abstract long getAbsFinishTime(); + public abstract long getFinishTime(); public abstract String getDiagnostics(); http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java index a484bd5..62ba474 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java @@ -85,11 +85,11 @@ public abstract class BaseParser { //Set reference time for all events for (VertexInfo vertexInfo : dagInfo.getVertices()) { - setReferenceTime(vertexInfo.getEvents(), dagInfo.getStartTime()); + setReferenceTime(vertexInfo.getEvents(), dagInfo.getStartTimeInterval()); for (TaskInfo taskInfo : vertexInfo.getTasks()) { - setReferenceTime(taskInfo.getEvents(), dagInfo.getStartTime()); + setReferenceTime(taskInfo.getEvents(), dagInfo.getStartTimeInterval()); for (TaskAttemptInfo taskAttemptInfo : taskInfo.getTaskAttempts()) { - setReferenceTime(taskAttemptInfo.getEvents(), dagInfo.getStartTime()); + setReferenceTime(taskAttemptInfo.getEvents(), dagInfo.getStartTimeInterval()); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java index 0f3c3af..fe596f0 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java @@ -307,9 +307,9 @@ public class DagInfo extends BaseInfo { sb.append("dagID=").append(getDagId()).append(", "); sb.append("dagName=").append(getName()).append(", "); sb.append("status=").append(getStatus()).append(", "); - sb.append("startTime=").append(getStartTime()).append(", "); - sb.append("submitTime=").append(getAbsoluteSubmitTime()).append(", "); - sb.append("endTime=").append(getFinishTime()).append(", "); + sb.append("startTime=").append(getStartTimeInterval()).append(", "); + sb.append("submitTime=").append(getSubmitTime()).append(", "); + sb.append("endTime=").append(getFinishTimeInterval()).append(", "); sb.append("timeTaken=").append(getTimeTaken()).append(", "); sb.append("diagnostics=").append(getDiagnostics()).append(", "); sb.append("vertexNameIDMapping=").append(getVertexNameIDMapping()).append(", "); @@ -336,15 +336,15 @@ public class DagInfo extends BaseInfo { return Collections.unmodifiableCollection(edgeInfoMap.values()); } - public final long getAbsoluteSubmitTime() { + public final long getSubmitTime() { return submitTime; } - public final long getAbsStartTime() { + public final long getStartTime() { return startTime; } - public final long getAbsFinishTime() { + public final long getFinishTime() { return endTime; } @@ -354,24 +354,24 @@ public class DagInfo extends BaseInfo { * * @return starting time w.r.t to dag */ - public final long getStartTime() { + public final long getStartTimeInterval() { return 0; } @Override - public final long getFinishTime() { + public final long getFinishTimeInterval() { long dagEndTime = (endTime - startTime); if (dagEndTime < 0) { //probably dag is not complete or failed in middle. get the last task attempt time for (VertexInfo vertexInfo : getVertices()) { - dagEndTime = (vertexInfo.getFinishTime() > dagEndTime) ? vertexInfo.getFinishTime() : dagEndTime; + dagEndTime = (vertexInfo.getFinishTimeInterval() > dagEndTime) ? vertexInfo.getFinishTimeInterval() : dagEndTime; } } return dagEndTime; } public final long getTimeTaken() { - return getFinishTime(); + return getFinishTimeInterval(); } public final String getStatus() { @@ -412,8 +412,8 @@ public class DagInfo extends BaseInfo { Collections.sort(vertices, new Comparator<VertexInfo>() { @Override public int compare(VertexInfo o1, VertexInfo o2) { - return (o1.getStartTime() < o2.getStartTime()) ? -1 : - ((o1.getStartTime() == o2.getStartTime()) ? + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1); } }); http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java index b412c46..8f7ec23 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java @@ -91,24 +91,24 @@ public class TaskAttemptInfo extends BaseInfo { } @Override - public final long getStartTime() { - return startTime - (getTaskInfo().getVertexInfo().getDagInfo().getAbsStartTime()); + public final long getStartTimeInterval() { + return startTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); } @Override - public final long getFinishTime() { - return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getAbsStartTime()); + public final long getFinishTimeInterval() { + return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); } - public final long getAbsStartTime() { + public final long getStartTime() { return startTime; } - public final long getAbsFinishTime() { + public final long getFinishTime() { return endTime; } - public final long getAbsoluteScheduledTime() { + public final long getScheduledTime() { return scheduledTime; } @@ -121,11 +121,11 @@ public class TaskAttemptInfo extends BaseInfo { } public final long getTimeTaken() { - return getFinishTime() - getStartTime(); + return getFinishTimeInterval() - getStartTimeInterval(); } - public final long getScheduledTime() { - return scheduledTime - (getTaskInfo().getVertexInfo().getDagInfo().getAbsStartTime()); + public final long getScheduledTimeInterval() { + return scheduledTime - (getTaskInfo().getVertexInfo().getDagInfo().getStartTime()); } public final String getSchedulingCausalTA() { @@ -255,9 +255,9 @@ public class TaskAttemptInfo extends BaseInfo { StringBuilder sb = new StringBuilder(); sb.append("["); sb.append("taskAttemptId=").append(getTaskAttemptId()).append(", "); - sb.append("scheduledTime=").append(getScheduledTime()).append(", "); - sb.append("startTime=").append(getStartTime()).append(", "); - sb.append("finishTime=").append(getFinishTime()).append(", "); + sb.append("scheduledTime=").append(getScheduledTimeInterval()).append(", "); + sb.append("startTime=").append(getStartTimeInterval()).append(", "); + sb.append("finishTime=").append(getFinishTimeInterval()).append(", "); sb.append("timeTaken=").append(getTimeTaken()).append(", "); sb.append("events=").append(getEvents()).append(", "); sb.append("diagnostics=").append(getDiagnostics()).append(", "); http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java index cb966a4..9705b73 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java @@ -76,26 +76,26 @@ public class TaskInfo extends BaseInfo { } @Override - public final long getStartTime() { - return startTime - (vertexInfo.getDagInfo().getAbsStartTime()); + public final long getStartTimeInterval() { + return startTime - (vertexInfo.getDagInfo().getStartTime()); } - public final long getAbsStartTime() { + public final long getStartTime() { return startTime; } - public final long getAbsFinishTime() { + public final long getFinishTime() { return endTime; } @Override - public final long getFinishTime() { - long taskFinishTime = endTime - (vertexInfo.getDagInfo().getAbsStartTime()); + public final long getFinishTimeInterval() { + long taskFinishTime = endTime - (vertexInfo.getDagInfo().getStartTime()); if (taskFinishTime < 0) { //probably vertex is not complete or failed in middle. get the last task attempt time for (TaskAttemptInfo attemptInfo : getTaskAttempts()) { - taskFinishTime = (attemptInfo.getFinishTime() > taskFinishTime) - ? attemptInfo.getFinishTime() : taskFinishTime; + taskFinishTime = (attemptInfo.getFinishTimeInterval() > taskFinishTime) + ? attemptInfo.getFinishTimeInterval() : taskFinishTime; } } return taskFinishTime; @@ -222,8 +222,8 @@ public class TaskInfo extends BaseInfo { return Ordering.from(new Comparator<TaskAttemptInfo>() { @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { - return (o1.getFinishTime() < o2.getFinishTime()) ? -1 : - ((o1.getFinishTime() == o2.getFinishTime()) ? + return (o1.getFinishTimeInterval() < o2.getFinishTimeInterval()) ? -1 : + ((o1.getFinishTimeInterval() == o2.getFinishTimeInterval()) ? 0 : 1); } }).max(attemptsList); @@ -259,8 +259,8 @@ public class TaskInfo extends BaseInfo { private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() { return Ordering.from(new Comparator<TaskAttemptInfo>() { @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { - return (o1.getStartTime() < o2.getStartTime()) ? -1 : - ((o1.getStartTime() == o2.getStartTime()) ? 0 : 1); + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1); } }); } @@ -306,7 +306,7 @@ public class TaskInfo extends BaseInfo { } public final long getTimeTaken() { - return getFinishTime() - getStartTime(); + return getFinishTimeInterval() - getStartTimeInterval(); } public final String getSuccessfulAttemptId() { @@ -318,7 +318,7 @@ public class TaskInfo extends BaseInfo { } public final long getScheduledTime() { - return scheduledTime - this.getVertexInfo().getDagInfo().getAbsStartTime(); + return scheduledTime - this.getVertexInfo().getDagInfo().getStartTime(); } @Override @@ -327,8 +327,8 @@ public class TaskInfo extends BaseInfo { sb.append("["); sb.append("taskId=").append(getTaskId()).append(", "); sb.append("scheduledTime=").append(getAbsoluteScheduleTime()).append(", "); - sb.append("startTime=").append(getStartTime()).append(", "); - sb.append("finishTime=").append(getFinishTime()).append(", "); + sb.append("startTime=").append(getStartTimeInterval()).append(", "); + sb.append("finishTime=").append(getFinishTimeInterval()).append(", "); sb.append("timeTaken=").append(getTimeTaken()).append(", "); sb.append("events=").append(getEvents()).append(", "); sb.append("diagnostics=").append(getDiagnostics()).append(", "); http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java index 3445adb..554f94b 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java @@ -44,8 +44,12 @@ import static org.apache.hadoop.classification.InterfaceStability.Evolving; public class VertexInfo extends BaseInfo { private final String vertexName; - private final long endTime; + private final long finishTime; private final long initTime; + private final long initRequestedTime; + private final long startTime; + private final long startRequestedTime; + private final String diagnostics; private final String processorClass; @@ -58,8 +62,6 @@ public class VertexInfo extends BaseInfo { private final String status; - private final long startTime; - //TaskID --> TaskInfo for internal reference private Map<String, TaskInfo> taskInfoMap; @@ -87,9 +89,11 @@ public class VertexInfo extends BaseInfo { //Parse additional Info JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); + initRequestedTime = otherInfoNode.optLong(Constants.INIT_REQUESTED_TIME); + startRequestedTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME); startTime = otherInfoNode.optLong(Constants.START_TIME); initTime = otherInfoNode.optLong(Constants.INIT_TIME); - endTime = otherInfoNode.optLong(Constants.FINISH_TIME); + finishTime = otherInfoNode.optLong(Constants.FINISH_TIME); diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS); numTasks = otherInfoNode.optInt(Constants.NUM_TASKS); failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS); @@ -157,41 +161,49 @@ public class VertexInfo extends BaseInfo { } @Override - public final long getStartTime() { - return startTime - (dagInfo.getAbsStartTime()); + public final long getStartTimeInterval() { + return startTime - (dagInfo.getStartTime()); } - public final long getFirstTaskStartTime() { - return getFirstTaskToStart().getStartTime(); + public final long getFirstTaskStartTimeInterval() { + return getFirstTaskToStart().getStartTimeInterval(); } - public final long getLastTaskFinishTime() { - if (getLastTaskToFinish() == null || getLastTaskToFinish().getFinishTime() < 0) { - return dagInfo.getFinishTime(); + public final long getLastTaskFinishTimeInterval() { + if (getLastTaskToFinish() == null || getLastTaskToFinish().getFinishTimeInterval() < 0) { + return dagInfo.getFinishTimeInterval(); } - return getLastTaskToFinish().getFinishTime(); + return getLastTaskToFinish().getFinishTimeInterval(); } - public final long getAbsStartTime() { + public final long getStartTime() { return startTime; } - public final long getAbsFinishTime() { - return endTime; + public final long getFinishTime() { + return finishTime; } - public final long getAbsoluteInitTime() { + public final long getInitTime() { return initTime; } + + public final long getInitRequestedTime() { + return initRequestedTime; + } + public final long getStartRequestedTime() { + return startRequestedTime; + } + @Override - public final long getFinishTime() { - long vertexEndTime = endTime - (dagInfo.getAbsStartTime()); + public final long getFinishTimeInterval() { + long vertexEndTime = finishTime - (dagInfo.getStartTime()); if (vertexEndTime < 0) { //probably vertex is not complete or failed in middle. get the last task attempt time for (TaskInfo taskInfo : getTasks()) { - vertexEndTime = (taskInfo.getFinishTime() > vertexEndTime) - ? taskInfo.getFinishTime() : vertexEndTime; + vertexEndTime = (taskInfo.getFinishTimeInterval() > vertexEndTime) + ? taskInfo.getFinishTimeInterval() : vertexEndTime; } } return vertexEndTime; @@ -209,16 +221,16 @@ public class VertexInfo extends BaseInfo { //Quite possible that getFinishTime is not yet recorded for failed vertices (or killed vertices) //Start time of vertex infers that the dependencies are done and AM has inited it. public final long getTimeTaken() { - return (getFinishTime() - getStartTime()); + return (getFinishTimeInterval() - getStartTimeInterval()); } //Time taken for last task to finish - time taken for first task to start public final long getTimeTakenForTasks() { - return (getLastTaskFinishTime() - getFirstTaskStartTime()); + return (getLastTaskFinishTimeInterval() - getFirstTaskStartTimeInterval()); } - public final long getInitTime() { - return initTime - dagInfo.getAbsStartTime(); + public final long getInitTimeInterval() { + return initTime - dagInfo.getStartTime(); } public final int getNumTasks() { @@ -383,8 +395,8 @@ public class VertexInfo extends BaseInfo { } Collections.sort(taskInfoList, new Comparator<TaskInfo>() { @Override public int compare(TaskInfo o1, TaskInfo o2) { - return (o1.getStartTime() < o2.getStartTime()) ? -1 : - ((o1.getStartTime() == o2.getStartTime()) ? + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1); } }); @@ -403,8 +415,8 @@ public class VertexInfo extends BaseInfo { } Collections.sort(taskInfoList, new Comparator<TaskInfo>() { @Override public int compare(TaskInfo o1, TaskInfo o2) { - return (o1.getFinishTime() > o2.getFinishTime()) ? -1 : - ((o1.getStartTime() == o2.getStartTime()) ? + return (o1.getFinishTimeInterval() > o2.getFinishTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1); } }); @@ -460,8 +472,8 @@ public class VertexInfo extends BaseInfo { private Ordering<TaskInfo> orderingOnStartTime() { return Ordering.from(new Comparator<TaskInfo>() { @Override public int compare(TaskInfo o1, TaskInfo o2) { - return (o1.getStartTime() < o2.getStartTime()) ? -1 : - ((o1.getStartTime() == o2.getStartTime()) ? 0 : 1); + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1); } }); } @@ -469,8 +481,8 @@ public class VertexInfo extends BaseInfo { private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() { return Ordering.from(new Comparator<TaskAttemptInfo>() { @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) { - return (o1.getStartTime() < o2.getStartTime()) ? -1 : - ((o1.getStartTime() == o2.getStartTime()) ? 0 : 1); + return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 : + ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ? 0 : 1); } }); } @@ -516,9 +528,9 @@ public class VertexInfo extends BaseInfo { sb.append("["); sb.append("vertexName=").append(getVertexName()).append(", "); sb.append("events=").append(getEvents()).append(", "); - sb.append("initTime=").append(getInitTime()).append(", "); - sb.append("startTime=").append(getStartTime()).append(", "); - sb.append("endTime=").append(getFinishTime()).append(", "); + sb.append("initTime=").append(getInitTimeInterval()).append(", "); + sb.append("startTime=").append(getStartTimeInterval()).append(", "); + sb.append("endTime=").append(getFinishTimeInterval()).append(", "); sb.append("timeTaken=").append(getTimeTaken()).append(", "); sb.append("diagnostics=").append(getDiagnostics()).append(", "); sb.append("numTasks=").append(getNumTasks()).append(", "); http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java index d205056..0d76e03 100644 --- a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java +++ b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java @@ -232,6 +232,11 @@ public class TestATSFileParser { String lastDataEventSourceTA = null; for (VertexInfo vertexInfo : dagInfo.getVertices()) { assertTrue(vertexInfo.getKilledTasksCount() == 0); + assertTrue(vertexInfo.getInitRequestedTime() > 0); + assertTrue(vertexInfo.getInitTime() > 0); + assertTrue(vertexInfo.getStartRequestedTime() > 0); + assertTrue(vertexInfo.getStartTime() > 0); + assertTrue(vertexInfo.getFinishTime() > 0); long finishTime = 0; for (TaskInfo taskInfo : vertexInfo.getTasks()) { assertTrue(taskInfo.getNumberOfTaskAttempts() == 1); @@ -246,8 +251,8 @@ public class TestATSFileParser { List<TaskAttemptInfo> attempts = taskInfo.getTaskAttempts(); if (vertexInfo.getVertexName().equals(TOKENIZER)) { // get the last task to finish and track its successful attempt - if (finishTime < taskInfo.getAbsFinishTime()) { - finishTime = taskInfo.getAbsFinishTime(); + if (finishTime < taskInfo.getFinishTime()) { + finishTime = taskInfo.getFinishTime(); lastSourceTA = taskInfo.getSuccessfulAttemptId(); } } else { @@ -262,8 +267,8 @@ public class TestATSFileParser { } } for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) { - assertTrue(attemptInfo.getStartTime() > 0); - assertTrue(attemptInfo.getScheduledTime() > 0); + assertTrue(attemptInfo.getStartTimeInterval() > 0); + assertTrue(attemptInfo.getScheduledTimeInterval() > 0); } } assertTrue(vertexInfo.getLastTaskToFinish() != null); @@ -471,16 +476,16 @@ public class TestATSFileParser { assertTrue(versionInfo.getRevision() != null); assertTrue(versionInfo.getBuildTime() != null); - assertTrue(dagInfo.getAbsStartTime() > 0); - assertTrue(dagInfo.getFinishTime() > 0); - assertTrue(dagInfo.getStartTime() == 0); - assertTrue(dagInfo.getAbsStartTime() > 0); + assertTrue(dagInfo.getStartTime() > 0); + assertTrue(dagInfo.getFinishTimeInterval() > 0); + assertTrue(dagInfo.getStartTimeInterval() == 0); + assertTrue(dagInfo.getStartTime() > 0); if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) { - assertTrue(dagInfo.getAbsFinishTime() >= dagInfo.getAbsStartTime()); + assertTrue(dagInfo.getFinishTime() >= dagInfo.getStartTime()); } - assertTrue(dagInfo.getFinishTime() > dagInfo.getStartTime()); + assertTrue(dagInfo.getFinishTimeInterval() > dagInfo.getStartTimeInterval()); - assertTrue(dagInfo.getAbsStartTime() > dagInfo.getAbsoluteSubmitTime()); + assertTrue(dagInfo.getStartTime() > dagInfo.getSubmitTime()); assertTrue(dagInfo.getTimeTaken() > 0); //Verify all vertices @@ -501,13 +506,13 @@ public class TestATSFileParser { if (hasFailedTasks) { assertTrue(vertexInfo.getFailedTasksCount() > 0); } + assertTrue(vertexInfo.getStartTimeInterval() > 0); assertTrue(vertexInfo.getStartTime() > 0); - assertTrue(vertexInfo.getAbsStartTime() > 0); - assertTrue(vertexInfo.getFinishTime() > 0); - assertTrue(vertexInfo.getStartTime() < vertexInfo.getFinishTime()); + assertTrue(vertexInfo.getFinishTimeInterval() > 0); + assertTrue(vertexInfo.getStartTimeInterval() < vertexInfo.getFinishTimeInterval()); assertTrue(vertexInfo.getVertexName() != null); if (!hasFailedTasks) { - assertTrue(vertexInfo.getAbsFinishTime() > 0); + assertTrue(vertexInfo.getFinishTime() > 0); assertTrue(vertexInfo.getFailedTasks().size() == 0); assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size()); assertTrue(vertexInfo.getFailedTasksCount() == 0); @@ -535,22 +540,22 @@ public class TestATSFileParser { assertTrue(vertexInfo.getProcessorClassName() != null); assertTrue(vertexInfo.getStatus() != null); assertTrue(vertexInfo.getDagInfo() != null); - assertTrue(vertexInfo.getInitTime() > 0); + assertTrue(vertexInfo.getInitTimeInterval() > 0); assertTrue(vertexInfo.getNumTasks() > 0); } private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) { assertTrue(taskInfo != null); assertTrue(taskInfo.getStatus() != null); - assertTrue(taskInfo.getStartTime() > 0); + assertTrue(taskInfo.getStartTimeInterval() > 0); //Not testing for killed attempts. So if there are no failures, it should succeed if (!hasFailedAttempts) { assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())); - assertTrue(taskInfo.getFinishTime() > 0 && taskInfo.getAbsFinishTime() > taskInfo - .getFinishTime()); + assertTrue(taskInfo.getFinishTimeInterval() > 0 && taskInfo.getFinishTime() > taskInfo + .getFinishTimeInterval()); assertTrue( - taskInfo.getStartTime() > 0 && taskInfo.getAbsStartTime() > taskInfo.getStartTime()); + taskInfo.getStartTimeInterval() > 0 && taskInfo.getStartTime() > taskInfo.getStartTimeInterval()); assertTrue(taskInfo.getSuccessfulAttemptId() != null); assertTrue(taskInfo.getSuccessfulTaskAttempt() != null); } @@ -564,13 +569,13 @@ public class TestATSFileParser { private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) { if (attemptInfo.getStatus() != null && attemptInfo.getStatus() .equals(TaskAttemptState.SUCCEEDED)) { + assertTrue(attemptInfo.getStartTimeInterval() > 0); + assertTrue(attemptInfo.getFinishTimeInterval() > 0); assertTrue(attemptInfo.getStartTime() > 0); assertTrue(attemptInfo.getFinishTime() > 0); - assertTrue(attemptInfo.getAbsStartTime() > 0); - assertTrue(attemptInfo.getAbsFinishTime() > 0); - assertTrue(attemptInfo.getAbsFinishTime() > attemptInfo.getAbsStartTime()); - assertTrue(attemptInfo.getAbsFinishTime() > attemptInfo.getFinishTime()); - assertTrue(attemptInfo.getAbsStartTime() > attemptInfo.getStartTime()); + assertTrue(attemptInfo.getFinishTime() > attemptInfo.getStartTime()); + assertTrue(attemptInfo.getFinishTime() > attemptInfo.getFinishTimeInterval()); + assertTrue(attemptInfo.getStartTime() > attemptInfo.getStartTimeInterval()); assertTrue(attemptInfo.getNodeId() != null); assertTrue(attemptInfo.getTimeTaken() != -1); assertTrue(attemptInfo.getEvents() != null); http://git-wip-us.apache.org/repos/asf/tez/blob/6970fb01/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java ---------------------------------------------------------------------- diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java index 7364506..b7fca0b 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java @@ -83,8 +83,8 @@ public class SlowestVertexAnalyzer implements Analyzer { //Find the slowest last event received if (entry.getValue().getValue() > max) { //w.r.t vertex start time. - max =(attemptInfo.getStartTime() + entry.getValue().getValue()) - - (vertexInfo.getStartTime()); + max =(attemptInfo.getStartTimeInterval() + entry.getValue().getValue()) - + (vertexInfo.getStartTimeInterval()); maxSourceName = entry.getKey(); } } @@ -105,8 +105,8 @@ public class SlowestVertexAnalyzer implements Analyzer { //Find the slowest last event received if (entry.getValue().getValue() > shuffleMax) { //w.r.t vertex start time. - shuffleMax =(attemptInfo.getStartTime() + entry.getValue().getValue()) - - (vertexInfo.getStartTime()); + shuffleMax =(attemptInfo.getStartTimeInterval() + entry.getValue().getValue()) - + (vertexInfo.getStartTimeInterval()); shuffleMaxSource = entry.getKey(); } }
