Repository: tez
Updated Branches:
  refs/heads/master ffa3d5208 -> d93bdc700


TEZ-2825. Report progress in terms of completed tasks to reduce load on AM for 
Tez UI (hitesh via rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d93bdc70
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d93bdc70
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d93bdc70

Branch: refs/heads/master
Commit: d93bdc7003f4c1b5415e7718f99527806bf8f37b
Parents: ffa3d52
Author: Rajesh Balamohan <[email protected]>
Authored: Tue Sep 15 16:02:37 2015 +0530
Committer: Rajesh Balamohan <[email protected]>
Committed: Tue Sep 15 16:02:37 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../java/org/apache/tez/dag/app/dag/DAG.java    |  1 +
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  1 +
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 29 ++++++++++++++++++++
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 23 ++++++++++++++++
 .../apache/tez/dag/app/web/AMWebController.java | 13 +++++----
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  7 +++++
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  7 +++++
 .../tez/dag/app/web/TestAMWebController.java    |  8 +++---
 9 files changed, 82 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d93bdc70/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b45dc6b..a517ce0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2825. Report progress in terms of completed tasks to reduce load on AM 
for Tez UI
   TEZ-2812. Tez UI: Update task & attempt tables while in progress.
   TEZ-2786. Tez UI: Update vertex, task & attempt details page while in 
progress.
   TEZ-2612. Support for showing allocation delays due to internal preemption
@@ -178,6 +179,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2825. Report progress in terms of completed tasks to reduce load on AM 
for Tez UI
   TEZ-2812. Tez UI: Update task & attempt tables while in progress.
   TEZ-2786. Tez UI: Update vertex, task & attempt details page while in 
progress.
   TEZ-2817. Tez UI: update in progress counter data for the dag vertices and 
tasks table

http://git-wip-us.apache.org/repos/asf/tez/blob/d93bdc70/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 335239e..15349a0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -70,6 +70,7 @@ public interface DAG {
   int getTotalVertices();
   int getSuccessfulVertices();
   float getProgress();
+  float getCompletedTaskProgress();
   boolean isUber();
   String getUserName();
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d93bdc70/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 96301a5..c5a3c35 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -96,6 +96,7 @@ public interface Vertex extends Comparable<Vertex> {
   int getSucceededTasks();
   int getRunningTasks();
   float getProgress();
+  float getCompletedTaskProgress();
   ProgressBuilder getVertexProgress();
   VertexStatusBuilder getVertexStatus(Set<StatusGetOpts> statusOptions);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d93bdc70/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index da9c416..5b69f0e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -815,6 +815,35 @@ public class DAGImpl implements 
org.apache.tez.dag.app.dag.DAG,
   }
 
   @Override
+  public float getCompletedTaskProgress() {
+    this.readLock.lock();
+    try {
+      int totalTasks = 0;
+      int completedTasks = 0;
+      for (Vertex v : getVertices().values()) {
+        int vTotalTasks = v.getTotalTasks();
+        int vCompletedTasks = v.getSucceededTasks();
+        if (vTotalTasks > 0) {
+          totalTasks += vTotalTasks;
+          completedTasks += vCompletedTasks;
+        }
+      }
+      if (totalTasks == 0) {
+        DAGState state = getStateMachine().getCurrentState();
+        if (state == DAGState.ERROR || state == DAGState.FAILED
+            || state == DAGState.KILLED || state == DAGState.SUCCEEDED) {
+          return 1.0f;
+        } else {
+          return 0.0f;
+        }
+      }
+      return ((float)completedTasks/totalTasks);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
   public Map<TezVertexID, Vertex> getVertices() {
     synchronized (tasksSyncHandle) {
       return Collections.unmodifiableMap(vertices);

http://git-wip-us.apache.org/repos/asf/tez/blob/d93bdc70/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 2e13090..946ec19 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1285,6 +1285,29 @@ public class VertexImpl implements 
org.apache.tez.dag.app.dag.Vertex, EventHandl
   }
 
   @Override
+  public float getCompletedTaskProgress() {
+    this.readLock.lock();
+    try {
+      int totalTasks = getTotalTasks();
+      if (totalTasks < 0) {
+        return 0.0f;
+      }
+      if (totalTasks == 0) {
+        VertexState state = getStateMachine().getCurrentState();
+        if (state == VertexState.ERROR || state == VertexState.FAILED
+            || state == VertexState.KILLED || state == VertexState.SUCCEEDED) {
+          return 1.0f;
+        } else {
+          return 0.0f;
+        }
+      }
+      return ((float)this.succeededTaskCount/totalTasks);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  @Override
   public ProgressBuilder getVertexProgress() {
     this.readLock.lock();
     try {

http://git-wip-us.apache.org/repos/asf/tez/blob/d93bdc70/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java 
b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
index cede341..c10c850 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -199,7 +199,8 @@ public class AMWebController extends Controller {
 
     Map<String, ProgressInfo> result = new HashMap<String, ProgressInfo>();
     result.put(DAG_PROGRESS,
-        new ProgressInfo(currentDAG.getID().toString(), 
currentDAG.getProgress()));
+        new ProgressInfo(currentDAG.getID().toString(),
+            currentDAG.getCompletedTaskProgress()));
     renderJSON(result);
   }
 
@@ -238,7 +239,8 @@ public class AMWebController extends Controller {
     }
 
     Map<String, ProgressInfo> result = new HashMap<String, ProgressInfo>();
-    result.put(VERTEX_PROGRESS, new ProgressInfo(tezVertexID.toString(), 
vertex.getProgress()));
+    result.put(VERTEX_PROGRESS, new ProgressInfo(tezVertexID.toString(),
+        vertex.getCompletedTaskProgress()));
     renderJSON(result);
   }
 
@@ -303,7 +305,8 @@ public class AMWebController extends Controller {
 
     Collection<ProgressInfo> progresses = new 
ArrayList<ProgressInfo>(vertices.size());
     for(Vertex vertex : vertices) {
-      progresses.add(new ProgressInfo(vertex.getVertexId().toString(), 
vertex.getProgress()));
+      progresses.add(new ProgressInfo(vertex.getVertexId().toString(),
+          vertex.getCompletedTaskProgress()));
     }
 
     Map<String, Collection<ProgressInfo>> result = new HashMap<String, 
Collection<ProgressInfo>>();
@@ -503,7 +506,7 @@ public class AMWebController extends Controller {
 
     Map<String, String> dagInfo = new HashMap<String, String>();
     dagInfo.put("id", dag.getID().toString());
-    dagInfo.put("progress", Float.toString(dag.getProgress()));
+    dagInfo.put("progress", Float.toString(dag.getCompletedTaskProgress()));
     dagInfo.put("status", dag.getState().toString());
 
     renderJSON(ImmutableMap.of(
@@ -540,7 +543,7 @@ public class AMWebController extends Controller {
     Map<String, Object> vertexInfo = new HashMap<String, Object>();
     vertexInfo.put("id", vertex.getVertexId().toString());
     vertexInfo.put("status", vertex.getState().toString());
-    vertexInfo.put("progress", Float.toString(vertex.getProgress()));
+    vertexInfo.put("progress", 
Float.toString(vertex.getCompletedTaskProgress()));
 
     ProgressBuilder vertexProgress = vertex.getVertexProgress();
     vertexInfo.put("totalTasks", 
Integer.toString(vertexProgress.getTotalTaskCount()));

http://git-wip-us.apache.org/repos/asf/tez/blob/d93bdc70/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 676ae33..dba6c01 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -1076,7 +1076,9 @@ public class TestDAGImpl {
   @Test(timeout = 5000)
   public void testVertexCompletion() {
     initDAG(dag);
+    Assert.assertTrue(0.0f == dag.getCompletedTaskProgress());
     startDAG(dag);
+    Assert.assertTrue(0.0f == dag.getCompletedTaskProgress());
     dispatcher.await();
 
     TezVertexID vId = TezVertexID.getInstance(dagId, 1);
@@ -1089,6 +1091,10 @@ public class TestDAGImpl {
 
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(1, dag.getSuccessfulVertices());
+
+    // 2 tasks completed, total plan has 11 vertices
+    Assert.assertEquals((float)2/11,
+        dag.getCompletedTaskProgress(), 0.05);
   }
   
   @SuppressWarnings("unchecked")
@@ -1295,6 +1301,7 @@ public class TestDAGImpl {
     }
     
     Assert.assertEquals(3, groupDag.getSuccessfulVertices());
+    Assert.assertTrue(1.0f == groupDag.getCompletedTaskProgress());
     Assert.assertEquals(DAGState.SUCCEEDED, groupDag.getState());
     Assert.assertEquals(2, TotalCountingOutputCommitter.totalCommitCounter);
   }  

http://git-wip-us.apache.org/repos/asf/tez/blob/d93bdc70/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index a54c56a..5f8b949 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -3135,12 +3135,14 @@ public class TestVertexImpl {
     dispatcher.await();
     Assert.assertEquals(VertexState.RUNNING, v.getState());
     Assert.assertEquals(1, v.getCompletedTasks());
+    Assert.assertTrue((0.5f) == v.getCompletedTaskProgress());
 
     dispatcher.getEventHandler().handle(
         new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
     dispatcher.await();
     Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
     Assert.assertEquals(2, v.getCompletedTasks());
+    Assert.assertTrue((1.0f) == v.getCompletedTaskProgress());
     Assert.assertTrue(v.initTimeRequested > 0);
     Assert.assertTrue(v.initedTime > 0);
     Assert.assertTrue(v.startTimeRequested > 0);
@@ -3829,6 +3831,7 @@ public class TestVertexImpl {
     dispatcher.await();
     // vertex should be in initializing state since parallelism is not set
     Assert.assertEquals(-1, v1.getTotalTasks());
+    Assert.assertTrue(0.0f == v1.getCompletedTaskProgress());
     Assert.assertEquals(VertexState.INITIALIZING, v1.getState());
     Assert.assertEquals(-1, v2.getTotalTasks());
     Assert.assertEquals(VertexState.INITIALIZING, v2.getState());
@@ -5323,9 +5326,11 @@ public class TestVertexImpl {
       v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
       dispatcher.await();
       Assert.assertEquals(VertexState.INITED, v.getState());
+      Assert.assertTrue(0.0f == v.getCompletedTaskProgress());
       v.handle(new VertexEvent(vId, VertexEventType.V_START));
       dispatcher.await();
       Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
+      Assert.assertTrue(1.0f == v.getCompletedTaskProgress());
     } finally {
       if (vId != null) {
         vertexIdMap.remove(vId);
@@ -6503,4 +6508,6 @@ public class TestVertexImpl {
   private interface ContextSettableInputInitialzier {
     void setContext(InputInitializerContext context);
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d93bdc70/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java 
b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
index 56a4a82..5a37c04 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
@@ -145,7 +145,7 @@ public class TestAMWebController {
     doReturn("42").when(spy).$(WebUIService.DAG_ID);
     doReturn(mockResponse).when(spy).response();
     
doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID();
-    doReturn(66.0f).when(mockDAG).getProgress();
+    doReturn(66.0f).when(mockDAG).getCompletedTaskProgress();
     doReturn(mockDAG).when(mockAppContext).getCurrentDAG();
     doNothing().when(spy).renderJSON(any());
     spy.getDagProgress();
@@ -175,7 +175,7 @@ public class TestAMWebController {
     
doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID();
     doReturn(mockDAG).when(mockAppContext).getCurrentDAG();
     doReturn(mockVertex).when(mockDAG).getVertex(any(TezVertexID.class));
-    doReturn(66.0f).when(mockVertex).getProgress();
+    doReturn(66.0f).when(mockVertex).getCompletedTaskProgress();
     doNothing().when(spy).renderJSON(any());
     doNothing().when(spy).setCorsHeaders();
 
@@ -240,7 +240,7 @@ public class TestAMWebController {
 
 
     
doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID();
-    doReturn(66.0f).when(mockDAG).getProgress();
+    doReturn(66.0f).when(mockDAG).getCompletedTaskProgress();
     doReturn(DAGState.RUNNING).when(mockDAG).getState();
 
     doReturn(true).when(spy).setupResponse();
@@ -384,7 +384,7 @@ public class TestAMWebController {
     ProgressBuilder progress;
     Assert.assertEquals(mockVertex2.getVertexId().toString(), 
vertex2Result.get("id"));
     Assert.assertEquals(mockVertex2.getState().toString(), 
vertex2Result.get("status"));
-    Assert.assertEquals(Float.toString(mockVertex2.getProgress()), 
vertex2Result.get("progress"));
+    
Assert.assertEquals(Float.toString(mockVertex2.getCompletedTaskProgress()), 
vertex2Result.get("progress"));
     progress = mockVertex2.getVertexProgress();
     Assert.assertEquals(Integer.toString(progress.getTotalTaskCount()),
         vertex2Result.get("totalTasks"));

Reply via email to