TEZ-3624. Split multiple calls on the same line in TaskCommunicatorContextImpl. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2268c720 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2268c720 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2268c720 Branch: refs/heads/TEZ-1190 Commit: 2268c720f1cb5b0f36d970ef6da88d940d13f6c7 Parents: daa8d3d Author: Siddharth Seth <[email protected]> Authored: Wed Feb 15 20:29:38 2017 -0800 Committer: Siddharth Seth <[email protected]> Committed: Wed Feb 15 20:29:38 2017 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../dag/app/TaskCommunicatorContextImpl.java | 25 ++++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/2268c720/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 31e141c..a3323b9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3624. Split multiple calls on the same line in TaskCommunicatorContextImpl. TEZ-3550. Provide access to sessionId/dagId via DagClient. TEZ-3267. Publish queue name to ATS as part of dag summary. TEZ-3609. Improve ATSv15 performance for DAG entities read calls. @@ -203,6 +204,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3624. Split multiple calls on the same line in TaskCommunicatorContextImpl. TEZ-3550. Provide access to sessionId/dagId via DagClient. TEZ-3609. Improve ATSv15 performance for DAG entities read calls. TEZ-3244. Allow overlap of input and output memory when they are not concurrent http://git-wip-us.apache.org/repos/asf/tez/blob/2268c720/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java index 1fbf853..2709787 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.rm.container.AMContainer; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.serviceplugins.api.DagInfo; @@ -144,7 +145,8 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver public void registerForVertexStateUpdates(String vertexName, @Nullable Set<VertexState> stateSet) { Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName); - getDag().getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, + DAG dag = getDag(); + dag.getStateChangeNotifier().registerForVertexUpdates(vertexName, stateSet, this); } @@ -162,7 +164,8 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver @Override public Iterable<String> getInputVertexNames(String vertexName) { Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName); - Vertex vertex = getDag().getVertex(vertexName); + DAG dag = getDag(); + Vertex vertex = dag.getVertex(vertexName); Set<Vertex> sources = vertex.getInputVertices().keySet(); return Iterables.transform(sources, new Function<Vertex, String>() { @Override @@ -175,27 +178,35 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver @Override public int getVertexTotalTaskCount(String vertexName) { Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); - return getDag().getVertex(vertexName).getTotalTasks(); + DAG dag = getDag(); + Vertex vertex = dag.getVertex(vertexName); + return vertex.getTotalTasks(); } @Override public int getVertexCompletedTaskCount(String vertexName) { Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); - return getDag().getVertex(vertexName).getCompletedTasks(); + DAG dag = getDag(); + Vertex vertex = dag.getVertex(vertexName); + return vertex.getCompletedTasks(); } @Override public int getVertexRunningTaskCount(String vertexName) { Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); - return getDag().getVertex(vertexName).getRunningTasks(); + DAG dag = getDag(); + Vertex vertex = dag.getVertex(vertexName); + return vertex.getRunningTasks(); } @Override public long getFirstAttemptStartTime(String vertexName, int taskIndex) { Preconditions.checkArgument(vertexName != null, "VertexName must be specified"); Preconditions.checkArgument(taskIndex >=0, "TaskIndex must be > 0"); - return getDag().getVertex(vertexName).getTask( - taskIndex).getFirstAttemptStartTime(); + DAG dag = getDag(); + Vertex vertex = dag.getVertex(vertexName); + Task task = vertex.getTask(taskIndex); + return task.getFirstAttemptStartTime(); } @Override
