Repository: tez Updated Branches: refs/heads/master a55fe80bf -> de21f990a
TEZ-3719. DAGImpl.computeProgress slows down dispatcher and ipc threads (Gopal V via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/de21f990 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/de21f990 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/de21f990 Branch: refs/heads/master Commit: de21f990a06fcb304328df7a601789b348873739 Parents: a55fe80 Author: Jonathan Eagles <[email protected]> Authored: Fri May 12 09:51:00 2017 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Fri May 12 09:51:00 2017 -0500 ---------------------------------------------------------------------- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 15 ++++++--- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 33 ++++++++++++++++++-- 2 files changed, 41 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/de21f990/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 6bb14d5..04074af 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -476,11 +476,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> { public float getProgress() { readLock.lock(); try { - TaskAttempt bestAttempt = selectBestAttempt(); - if (bestAttempt == null) { - return 0f; + final TaskStateInternal state = getInternalState(); + if (state == TaskStateInternal.RUNNING) { + TaskAttempt bestAttempt = selectBestAttempt(); + if (bestAttempt == null) { + return 0f; + } + return bestAttempt.getProgress(); + } else if (state == TaskStateInternal.SUCCEEDED) { + return 1.0f; + } else { + return 0.0f; } - return bestAttempt.getProgress(); } finally { readLock.unlock(); } http://git-wip-us.apache.org/repos/asf/tez/blob/de21f990/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 79b84e8..b6d66df 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1344,7 +1344,30 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl public float getProgress() { this.readLock.lock(); try { - computeProgress(); + final VertexState state = this.getState(); + switch (state) { + case NEW: + case INITED: + case INITIALIZING: + progress = 0.0f; + break; + case RUNNING: + computeProgress(); + break; + case KILLED: + case ERROR: + case FAILED: + case TERMINATING: + progress = 0.0f; + break; + case COMMITTING: + case SUCCEEDED: + progress = 1.0f; + break; + default: + // unknown, do not change progress + break; + } return progress; } finally { this.readLock.unlock(); @@ -1381,7 +1404,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl ProgressBuilder progress = new ProgressBuilder(); progress.setTotalTaskCount(numTasks); progress.setSucceededTaskCount(succeededTaskCount); - progress.setRunningTaskCount(getRunningTasks()); + if (inTerminalState()) { + progress.setRunningTaskCount(0); + } else { + progress.setRunningTaskCount(getRunningTasks()); + } progress.setFailedTaskCount(failedTaskCount); progress.setKilledTaskCount(killedTaskCount); progress.setFailedTaskAttemptCount(failedTaskAttemptCount.get()); @@ -1434,7 +1461,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl try { float progress = 0f; for (Task task : this.tasks.values()) { - progress += (task.isFinished() ? 1f : task.getProgress()); + progress += (task.getProgress()); } if (this.numTasks != 0) { progress /= this.numTasks;
