Repository: tez Updated Branches: refs/heads/branch-0.7 0c519d172 -> f96d12ae5
TEZ-2825. Report progress in terms of completed tasks to reduce load on AM for Tez UI (hitesh via rbalamohan) (cherry picked from commit d93bdc7003f4c1b5415e7718f99527806bf8f37b) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f96d12ae Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f96d12ae Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f96d12ae Branch: refs/heads/branch-0.7 Commit: f96d12ae52b17c1c72be8e3eaceb48e4e1aef43f Parents: 0c519d1 Author: Rajesh Balamohan <[email protected]> Authored: Tue Sep 15 16:02:37 2015 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Sep 15 16:05:30 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/dag/app/dag/DAG.java | 1 + .../java/org/apache/tez/dag/app/dag/Vertex.java | 1 + .../apache/tez/dag/app/dag/impl/DAGImpl.java | 29 ++++++++++++++++++++ .../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 ++++++++++++++++ .../apache/tez/dag/app/web/AMWebController.java | 13 +++++---- .../tez/dag/app/dag/impl/TestDAGImpl.java | 7 +++++ .../tez/dag/app/dag/impl/TestVertexImpl.java | 7 +++++ .../tez/dag/app/web/TestAMWebController.java | 8 +++--- 9 files changed, 81 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/f96d12ae/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1537d4e..6adb38e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2825. Report progress in terms of completed tasks to reduce load on AM for Tez UI TEZ-2812. Tez UI: Update task & attempt tables while in progress. TEZ-2786. Tez UI: Update vertex, task & attempt details page while in progress. TEZ-2817. Tez UI: update in progress counter data for the dag vertices and tasks table http://git-wip-us.apache.org/repos/asf/tez/blob/f96d12ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index 4c3426a..8470513 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -70,6 +70,7 @@ public interface DAG { int getTotalVertices(); int getSuccessfulVertices(); float getProgress(); + float getCompletedTaskProgress(); boolean isUber(); String getUserName(); http://git-wip-us.apache.org/repos/asf/tez/blob/f96d12ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index bb580da..f03dcd1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -96,6 +96,7 @@ public interface Vertex extends Comparable<Vertex> { int getSucceededTasks(); int getRunningTasks(); float getProgress(); + float getCompletedTaskProgress(); ProgressBuilder getVertexProgress(); VertexStatusBuilder getVertexStatus(Set<StatusGetOpts> statusOptions); http://git-wip-us.apache.org/repos/asf/tez/blob/f96d12ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 33e818a..9a40f88 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -790,6 +790,35 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } @Override + public float getCompletedTaskProgress() { + this.readLock.lock(); + try { + int totalTasks = 0; + int completedTasks = 0; + for (Vertex v : getVertices().values()) { + int vTotalTasks = v.getTotalTasks(); + int vCompletedTasks = v.getSucceededTasks(); + if (vTotalTasks > 0) { + totalTasks += vTotalTasks; + completedTasks += vCompletedTasks; + } + } + if (totalTasks == 0) { + DAGState state = getStateMachine().getCurrentState(); + if (state == DAGState.ERROR || state == DAGState.FAILED + || state == DAGState.KILLED || state == DAGState.SUCCEEDED) { + return 1.0f; + } else { + return 0.0f; + } + } + return ((float)completedTasks/totalTasks); + } finally { + this.readLock.unlock(); + } + } + + @Override public Map<TezVertexID, Vertex> getVertices() { synchronized (tasksSyncHandle) { return Collections.unmodifiableMap(vertices); http://git-wip-us.apache.org/repos/asf/tez/blob/f96d12ae/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f16e6f9..daeae3f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1180,6 +1180,29 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } @Override + public float getCompletedTaskProgress() { + this.readLock.lock(); + try { + int totalTasks = getTotalTasks(); + if (totalTasks < 0) { + return 0.0f; + } + if (totalTasks == 0) { + VertexState state = getStateMachine().getCurrentState(); + if (state == VertexState.ERROR || state == VertexState.FAILED + || state == VertexState.KILLED || state == VertexState.SUCCEEDED) { + return 1.0f; + } else { + return 0.0f; + } + } + return ((float)this.succeededTaskCount/totalTasks); + } finally { + this.readLock.unlock(); + } + } + + @Override public ProgressBuilder getVertexProgress() { this.readLock.lock(); try { http://git-wip-us.apache.org/repos/asf/tez/blob/f96d12ae/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index cede341..c10c850 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -199,7 +199,8 @@ public class AMWebController extends Controller { Map<String, ProgressInfo> result = new HashMap<String, ProgressInfo>(); result.put(DAG_PROGRESS, - new ProgressInfo(currentDAG.getID().toString(), currentDAG.getProgress())); + new ProgressInfo(currentDAG.getID().toString(), + currentDAG.getCompletedTaskProgress())); renderJSON(result); } @@ -238,7 +239,8 @@ public class AMWebController extends Controller { } Map<String, ProgressInfo> result = new HashMap<String, ProgressInfo>(); - result.put(VERTEX_PROGRESS, new ProgressInfo(tezVertexID.toString(), vertex.getProgress())); + result.put(VERTEX_PROGRESS, new ProgressInfo(tezVertexID.toString(), + vertex.getCompletedTaskProgress())); renderJSON(result); } @@ -303,7 +305,8 @@ public class AMWebController extends Controller { Collection<ProgressInfo> progresses = new ArrayList<ProgressInfo>(vertices.size()); for(Vertex vertex : vertices) { - progresses.add(new ProgressInfo(vertex.getVertexId().toString(), vertex.getProgress())); + progresses.add(new ProgressInfo(vertex.getVertexId().toString(), + vertex.getCompletedTaskProgress())); } Map<String, Collection<ProgressInfo>> result = new HashMap<String, Collection<ProgressInfo>>(); @@ -503,7 +506,7 @@ public class AMWebController extends Controller { Map<String, String> dagInfo = new HashMap<String, String>(); dagInfo.put("id", dag.getID().toString()); - dagInfo.put("progress", Float.toString(dag.getProgress())); + dagInfo.put("progress", Float.toString(dag.getCompletedTaskProgress())); dagInfo.put("status", dag.getState().toString()); renderJSON(ImmutableMap.of( @@ -540,7 +543,7 @@ public class AMWebController extends Controller { Map<String, Object> vertexInfo = new HashMap<String, Object>(); vertexInfo.put("id", vertex.getVertexId().toString()); vertexInfo.put("status", vertex.getState().toString()); - vertexInfo.put("progress", Float.toString(vertex.getProgress())); + vertexInfo.put("progress", Float.toString(vertex.getCompletedTaskProgress())); ProgressBuilder vertexProgress = vertex.getVertexProgress(); vertexInfo.put("totalTasks", Integer.toString(vertexProgress.getTotalTaskCount())); http://git-wip-us.apache.org/repos/asf/tez/blob/f96d12ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index beb17ee..e69db0f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -1076,7 +1076,9 @@ public class TestDAGImpl { @Test(timeout = 5000) public void testVertexCompletion() { initDAG(dag); + Assert.assertTrue(0.0f == dag.getCompletedTaskProgress()); startDAG(dag); + Assert.assertTrue(0.0f == dag.getCompletedTaskProgress()); dispatcher.await(); TezVertexID vId = TezVertexID.getInstance(dagId, 1); @@ -1089,6 +1091,10 @@ public class TestDAGImpl { Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(1, dag.getSuccessfulVertices()); + + // 2 tasks completed, total plan has 11 vertices + Assert.assertEquals((float)2/11, + dag.getCompletedTaskProgress(), 0.05); } @SuppressWarnings("unchecked") @@ -1295,6 +1301,7 @@ public class TestDAGImpl { } Assert.assertEquals(3, groupDag.getSuccessfulVertices()); + Assert.assertTrue(1.0f == groupDag.getCompletedTaskProgress()); Assert.assertEquals(DAGState.SUCCEEDED, groupDag.getState()); Assert.assertEquals(2, TotalCountingOutputCommitter.totalCommitCounter); } http://git-wip-us.apache.org/repos/asf/tez/blob/f96d12ae/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 1ae9289..f65ecab 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -2995,12 +2995,14 @@ public class TestVertexImpl { dispatcher.await(); Assert.assertEquals(VertexState.RUNNING, v.getState()); Assert.assertEquals(1, v.getCompletedTasks()); + Assert.assertTrue((0.5f) == v.getCompletedTaskProgress()); dispatcher.getEventHandler().handle( new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED)); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); Assert.assertEquals(2, v.getCompletedTasks()); + Assert.assertTrue((1.0f) == v.getCompletedTaskProgress()); Assert.assertTrue(v.initTimeRequested > 0); Assert.assertTrue(v.initedTime > 0); Assert.assertTrue(v.startTimeRequested > 0); @@ -3689,6 +3691,7 @@ public class TestVertexImpl { dispatcher.await(); // vertex should be in initializing state since parallelism is not set Assert.assertEquals(-1, v1.getTotalTasks()); + Assert.assertTrue(0.0f == v1.getCompletedTaskProgress()); Assert.assertEquals(VertexState.INITIALIZING, v1.getState()); Assert.assertEquals(-1, v2.getTotalTasks()); Assert.assertEquals(VertexState.INITIALIZING, v2.getState()); @@ -5183,9 +5186,11 @@ public class TestVertexImpl { v.handle(new VertexEvent(vId, VertexEventType.V_INIT)); dispatcher.await(); Assert.assertEquals(VertexState.INITED, v.getState()); + Assert.assertTrue(0.0f == v.getCompletedTaskProgress()); v.handle(new VertexEvent(vId, VertexEventType.V_START)); dispatcher.await(); Assert.assertEquals(VertexState.SUCCEEDED, v.getState()); + Assert.assertTrue(1.0f == v.getCompletedTaskProgress()); } finally { if (vId != null) { vertexIdMap.remove(vId); @@ -6262,4 +6267,6 @@ public class TestVertexImpl { private interface ContextSettableInputInitialzier { void setContext(InputInitializerContext context); } + + } http://git-wip-us.apache.org/repos/asf/tez/blob/f96d12ae/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java index 56a4a82..5a37c04 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java @@ -145,7 +145,7 @@ public class TestAMWebController { doReturn("42").when(spy).$(WebUIService.DAG_ID); doReturn(mockResponse).when(spy).response(); doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID(); - doReturn(66.0f).when(mockDAG).getProgress(); + doReturn(66.0f).when(mockDAG).getCompletedTaskProgress(); doReturn(mockDAG).when(mockAppContext).getCurrentDAG(); doNothing().when(spy).renderJSON(any()); spy.getDagProgress(); @@ -175,7 +175,7 @@ public class TestAMWebController { doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID(); doReturn(mockDAG).when(mockAppContext).getCurrentDAG(); doReturn(mockVertex).when(mockDAG).getVertex(any(TezVertexID.class)); - doReturn(66.0f).when(mockVertex).getProgress(); + doReturn(66.0f).when(mockVertex).getCompletedTaskProgress(); doNothing().when(spy).renderJSON(any()); doNothing().when(spy).setCorsHeaders(); @@ -240,7 +240,7 @@ public class TestAMWebController { doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID(); - doReturn(66.0f).when(mockDAG).getProgress(); + doReturn(66.0f).when(mockDAG).getCompletedTaskProgress(); doReturn(DAGState.RUNNING).when(mockDAG).getState(); doReturn(true).when(spy).setupResponse(); @@ -384,7 +384,7 @@ public class TestAMWebController { ProgressBuilder progress; Assert.assertEquals(mockVertex2.getVertexId().toString(), vertex2Result.get("id")); Assert.assertEquals(mockVertex2.getState().toString(), vertex2Result.get("status")); - Assert.assertEquals(Float.toString(mockVertex2.getProgress()), vertex2Result.get("progress")); + Assert.assertEquals(Float.toString(mockVertex2.getCompletedTaskProgress()), vertex2Result.get("progress")); progress = mockVertex2.getVertexProgress(); Assert.assertEquals(Integer.toString(progress.getTotalTaskCount()), vertex2Result.get("totalTasks"));
