TEZ-2347. Expose additional information in TaskCommunicatorContext. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f5c1f47d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f5c1f47d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f5c1f47d Branch: refs/heads/master Commit: f5c1f47dfec8d4e432c6970fd3267dd13ade8b03 Parents: f0c8845 Author: Siddharth Seth <[email protected]> Authored: Mon Apr 20 13:17:31 2015 -0700 Committer: Siddharth Seth <[email protected]> Committed: Fri Aug 21 18:13:54 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../tez/dag/api/TaskCommunicatorContext.java | 50 ++++++++++++++++++++ .../dag/app/TaskCommunicatorContextImpl.java | 50 ++++++++++++++++++++ .../java/org/apache/tez/dag/app/dag/DAG.java | 2 + .../java/org/apache/tez/dag/app/dag/Task.java | 2 + .../org/apache/tez/dag/app/dag/TaskAttempt.java | 6 +++ .../apache/tez/dag/app/dag/impl/DAGImpl.java | 10 ++++ .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 12 +++++ .../apache/tez/dag/app/dag/impl/TaskImpl.java | 13 ++++- 9 files changed, 145 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f5c1f47d/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index ca5225e..7c13110 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -16,5 +16,6 @@ ALL CHANGES: TEZ-2284. Separate TaskReporter into an interface. TEZ-2285. Allow TaskCommunicators to indicate task/container liveness. TEZ-2302. Allow TaskCommunicators to subscribe for Vertex updates. + TEZ-2347. Expose additional information in TaskCommunicatorContext. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/f5c1f47d/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java index 19caed9..56345ab 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicatorContext.java @@ -16,6 +16,7 @@ package org.apache.tez.dag.api; import javax.annotation.Nullable; import java.io.IOException; +import java.util.Collection; import java.util.Set; import org.apache.hadoop.security.Credentials; @@ -71,4 +72,53 @@ public interface TaskCommunicatorContext { // TODO TEZ-2003 API. Should a method exist for task succeeded. // TODO Eventually Add methods to report availability stats to the scheduler. + + /** + * Get the name of the currently executing dag + * @return the name of the currently executing dag + */ + String getCurretnDagName(); + + /** + * Get the name of the Input vertices for the specified vertex. + * Root Inputs are not returned. + * @param vertexName the vertex for which source vertex names will be returned + * @return an Iterable containing the list of input vertices for the specified vertex + */ + Iterable<String> getInputVertexNames(String vertexName); + + /** + * Get the total number of tasks in the given vertex + * @param vertexName + * @return total number of tasks in this vertex + */ + int getVertexTotalTaskCount(String vertexName); + + /** + * Get the number of completed tasks for a given vertex + * @param vertexName the vertex name + * @return the number of completed tasks for the vertex + */ + int getVertexCompletedTaskCount(String vertexName); + + /** + * Get the number of running tasks for a given vertex + * @param vertexName the vertex name + * @return the number of running tasks for the vertex + */ + int getVertexRunningTaskCount(String vertexName); + + /** + * Get the start time for the first attempt of the specified task + * @param vertexName the vertex to which the task belongs + * @param taskIndex the index of the task + * @return the start time for the first attempt of the task + */ + long getFirstAttemptStartTime(String vertexName, int taskIndex); + + /** + * Get the start time for the currently executing DAG + * @return time when the current dag started executing + */ + long getDagStartTime(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f5c1f47d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 3714c3c..4cb0c93 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -18,7 +18,9 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.Set; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -31,6 +33,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.event.VertexState; import org.apache.tez.dag.api.event.VertexStateUpdate; +import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexStateUpdateListener; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -111,6 +114,53 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver context.getCurrentDAG().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this); } + @Override + public String getCurretnDagName() { + return context.getCurrentDAG().getName(); + } + + @Override + public Iterable<String> getInputVertexNames(String vertexName) { + Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName); + Vertex vertex = context.getCurrentDAG().getVertex(vertexName); + Set<Vertex> sources = vertex.getInputVertices().keySet(); + return Iterables.transform(sources, new Function<Vertex, String>() { + @Override + public String apply(@Nullable Vertex input) { + return input.getName(); + } + }); + } + + @Override + public int getVertexTotalTaskCount(String vertexName) { + Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); + return context.getCurrentDAG().getVertex(vertexName).getTotalTasks(); + } + + @Override + public int getVertexCompletedTaskCount(String vertexName) { + Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); + return context.getCurrentDAG().getVertex(vertexName).getCompletedTasks(); + } + + @Override + public int getVertexRunningTaskCount(String vertexName) { + Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); + return context.getCurrentDAG().getVertex(vertexName).getRunningTasks(); + } + + @Override + public long getFirstAttemptStartTime(String vertexName, int taskIndex) { + Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); + Preconditions.checkArgument(taskIndex >=0, "TaskIndex must be > 0"); + return context.getCurrentDAG().getVertex(vertexName).getTask(taskIndex).getFirstAttemptStartTime(); + } + + @Override + public long getDagStartTime() { + return context.getCurrentDAG().getStartTime(); + } @Override public void onStateUpdated(VertexStateUpdate event) { http://git-wip-us.apache.org/repos/asf/tez/blob/f5c1f47d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index 6d6872b..458362f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -94,6 +94,8 @@ public interface DAG { Map<String, TezVertexID> getVertexNameIDMapping(); + long getStartTime(); + StateChangeNotifier getStateChangeNotifier(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f5c1f47d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java index 47b56f2..a011b61 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java @@ -72,4 +72,6 @@ public interface Task { public TaskSpec getBaseTaskSpec(); public TaskLocationHint getTaskLocationHint(); + + long getFirstAttemptStartTime(); } http://git-wip-us.apache.org/repos/asf/tez/blob/f5c1f47d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java index 4360cc3..cbe72c1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java @@ -125,6 +125,12 @@ public interface TaskAttempt { */ long getLaunchTime(); + /** + * Get the time at which this attempt was scheduled + * @return the time at which this attempt was scheduled, 0 if it hasn't been scheduled yet + */ + long getScheduleTime(); + /** * @return attempt's finish time. If attempt is not finished * yet, returns 0. http://git-wip-us.apache.org/repos/asf/tez/blob/f5c1f47d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index ef2df78..e37fc2f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -702,6 +702,16 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } @Override + public long getStartTime() { + readLock.lock(); + try { + return this.startTime; + } finally { + readLock.unlock(); + } + } + + @Override public StateChangeNotifier getStateChangeNotifier() { return entityUpdateTracker; } http://git-wip-us.apache.org/repos/asf/tez/blob/f5c1f47d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index dfb2618..cb26c55 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -134,6 +134,7 @@ public class TaskAttemptImpl implements TaskAttempt, protected final AppContext appContext; private final TaskHeartbeatHandler taskHeartbeatHandler; private long launchTime = 0; + private long scheduleTime = 0; private long finishTime = 0; private String trackerName; private int httpPort; @@ -699,6 +700,16 @@ public class TaskAttemptImpl implements TaskAttempt, } @Override + public long getScheduleTime() { + readLock.lock(); + try { + return scheduleTime; + } finally { + readLock.unlock(); + } + } + + @Override public long getFinishTime() { readLock.lock(); try { @@ -1060,6 +1071,7 @@ public class TaskAttemptImpl implements TaskAttempt, public TaskAttemptStateInternal transition(TaskAttemptImpl ta, TaskAttemptEvent event) { TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event; + ta.scheduleTime = ta.clock.getTime(); // TODO Creating the remote task here may not be required in case of // recovery. http://git-wip-us.apache.org/repos/asf/tez/blob/f5c1f47d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index e6027f5..93b4c3f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -1529,7 +1529,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { this.writeLock.unlock(); } } - + + @Override + public long getFirstAttemptStartTime() { + readLock.lock(); + try { + // The first attempt will always have an index of 0. + return getAttempt(TezTaskAttemptID.getInstance(getTaskId(), 0)).getScheduleTime(); + } finally { + readLock.unlock(); + } + } + private static class KillTransition implements SingleArcTransition<TaskImpl, TaskEvent> { @Override
