Repository: tez Updated Branches: refs/heads/master a0d59c379 -> 7d412b203
TEZ-2761. Tez UI: update the progress on the dag and vertices pages with info from AM Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/7d412b20 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/7d412b20 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/7d412b20 Branch: refs/heads/master Commit: 7d412b2037c009780eacfe7304f80040addcdc0c Parents: a0d59c3 Author: Prakash Ramachandran <[email protected]> Authored: Fri Sep 4 12:36:35 2015 +0530 Committer: Prakash Ramachandran <[email protected]> Committed: Fri Sep 4 12:36:35 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/ATSConstants.java | 1 + .../apache/tez/dag/app/web/AMWebController.java | 138 +++++++++++++++- .../apache/tez/dag/app/web/WebUIService.java | 5 + .../tez/dag/app/web/TestAMWebController.java | 162 +++++++++++++++++++ .../ats/HistoryEventTimelineConversion.java | 3 + .../ats/TestHistoryEventTimelineConversion.java | 8 +- tez-ui/src/main/webapp/app/scripts/app.js | 28 ++++ .../app/scripts/controllers/dag_controller.js | 119 ++++++++++++-- .../scripts/controllers/dag_index_controller.js | 9 +- .../app/scripts/controllers/dag_vertices.js | 46 ++++-- .../main/webapp/app/scripts/default-configs.js | 3 +- .../src/main/webapp/app/scripts/helpers/misc.js | 8 + .../main/webapp/app/scripts/helpers/pollster.js | 52 ++++++ .../app/scripts/mixins/run_periodically.js | 78 --------- .../app/scripts/models/TimelineRestAdapter.js | 21 +++ .../src/main/webapp/app/scripts/models/dag.js | 39 ++++- tez-ui/src/main/webapp/app/scripts/router.js | 5 +- 18 files changed, 614 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d7da445..7d996ff 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -163,6 +163,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2761. Tez UI: update the progress on the dag and vertices pages with info from AM TEZ-2731. Fix Tez GenericCounter performance bottleneck TEZ-2752. logUnsuccessful completion in Attempt should write original finish time to ATS http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index 4566a91..ad9270f 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -53,6 +53,7 @@ public class ATSConstants { public static final String DAG_PLAN = "dagPlan"; public static final String DAG_NAME = "dagName"; public static final String DAG_STATE = "dagState"; + public static final String DAG_AM_WEB_SERVICE_VERSION = "amWebServiceVersion"; public static final String RECOVERY_FAILURE_REASON = "recoveryFailureReason"; public static final String VERTEX_NAME = "vertexName"; public static final String VERTEX_NAME_ID_MAPPING = "vertexNameIdMapping"; http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index 909ac95..db27d59 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -21,10 +21,8 @@ package org.apache.tez.dag.app.web; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; -import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URL; -import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -32,8 +30,10 @@ import java.util.List; import java.util.Map; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.google.inject.name.Named; +import org.apache.tez.dag.api.client.ProgressBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -68,6 +68,7 @@ public class AMWebController extends Controller { static final String VERTEX_PROGRESSES = "vertexProgresses"; static final int MAX_VERTICES_QUERIED = 100; + public static final String VERSION = "2"; private AppContext appContext; private String historyUrl; @@ -296,6 +297,139 @@ public class AMWebController extends Controller { renderJSON(result); } + // AM WebApi V2. + @VisibleForTesting + protected boolean setupResponse() { + setCorsHeaders(); + + if (!hasAccess()) { + sendErrorResponse(HttpServletResponse.SC_UNAUTHORIZED, "Access denied for user: " + + request().getRemoteUser(), null); + return false; + } + + return true; + } + + DAG checkAndGetDAGFromRequest() { + DAG dag = null; + int errorCode = HttpServletResponse.SC_OK; + String message = null; + Exception ex = null; + try { + int dagID = getQueryParamInt(WebUIService.DAG_ID); + dag = appContext.getCurrentDAG(); + if (dag == null || dag.getID().getId() != dagID) { + errorCode = HttpServletResponse.SC_NOT_FOUND; + message = "Not current Dag: " + dagID; + } + } catch (NumberFormatException e) { + errorCode = HttpServletResponse.SC_BAD_REQUEST; + message = "Invalid dag id"; + ex = e; + } + + if (errorCode != HttpServletResponse.SC_OK) { + dag = null; + sendErrorResponse(errorCode, message, ex); + } + + return dag; + } + + Collection<Integer> getVertexIDsFromRequest() { + final String valueStr = $(WebUIService.VERTEX_ID).trim(); + + List<Integer> vertexIDs = new ArrayList<>(); + if (!valueStr.equals("")) { + String[] vertexIdsStr = valueStr.split(",", MAX_VERTICES_QUERIED); + + try { + for (String vertexIdStr : vertexIdsStr) { + int vertexId = Integer.parseInt(vertexIdStr); + vertexIDs.add(vertexId); + } + } catch (NumberFormatException nfe) { + sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, + "invalid vertex ID passed in as parameter", nfe); + vertexIDs = null; + } + } + + return vertexIDs; + } + + public void getDagInfo() { + if (!setupResponse()) { + return; + } + + DAG dag = checkAndGetDAGFromRequest(); + if (dag == null) { + return; + } + + Map<String, String> dagInfo = new HashMap<>(); + dagInfo.put("id", dag.getID().toString()); + dagInfo.put("progress", Float.toString(dag.getProgress())); + dagInfo.put("status", dag.getState().toString()); + + renderJSON(ImmutableMap.of( + "dag", dagInfo + )); + } + + private Map<String,String> getVertexInfoMap(Vertex vertex) { + Map<String, String> vertexInfo = new HashMap<>(); + vertexInfo.put("id", vertex.getVertexId().toString()); + vertexInfo.put("status", vertex.getState().toString()); + vertexInfo.put("progress", Float.toString(vertex.getProgress())); + + ProgressBuilder vertexProgress = vertex.getVertexProgress(); + vertexInfo.put("totalTasks", Integer.toString(vertexProgress.getTotalTaskCount())); + vertexInfo.put("runningTasks", Integer.toString(vertexProgress.getRunningTaskCount())); + vertexInfo.put("succeededTasks", Integer.toString(vertexProgress.getSucceededTaskCount())); + + vertexInfo.put("failedTaskAttempts", Integer.toString(vertexProgress.getFailedTaskAttemptCount())); + vertexInfo.put("killedTaskAttempts", Integer.toString(vertexProgress.getKilledTaskAttemptCount())); + + return vertexInfo; + } + + public void getVerticesInfo() { + if (!setupResponse()) { + return; + } + + DAG dag = checkAndGetDAGFromRequest(); + if (dag == null) { + return; + } + + Collection<Integer> requestedIDs = getVertexIDsFromRequest(); + + if (requestedIDs == null) { + return; + } + + Collection<Vertex> vertexList; + if (requestedIDs.size() == 0) { + // no ids specified return all. + vertexList = dag.getVertices().values(); + } else { + vertexList = getVerticesByIdx(dag, requestedIDs); + } + + ArrayList<Map<String, String>> verticesInfo = new ArrayList<>(); + for(Vertex v : vertexList) { + verticesInfo.add(getVertexInfoMap(v)); + } + + renderJSON(ImmutableMap.of( + "vertices", verticesInfo + )); + } + @Override @VisibleForTesting public void renderJSON(Object object) { http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java index 4f2fec0..19e1641 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java @@ -37,6 +37,7 @@ import org.apache.tez.dag.app.AppContext; public class WebUIService extends AbstractService { private static final String WS_PREFIX = "/ui/ws/v1/tez/"; + private static final String WS_PREFIX_V2 = "/ui/ws/v2/tez/"; public static final String VERTEX_ID = "vertexID"; public static final String DAG_ID = "dagID"; @@ -150,6 +151,10 @@ public class WebUIService extends AbstractService { "getVertexProgress"); route(WS_PREFIX + pajoin("vertexProgresses", VERTEX_ID, DAG_ID), AMWebController.class, "getVertexProgresses"); + + // v2 api + route(WS_PREFIX_V2 + pajoin("dagInfo", DAG_ID), AMWebController.class, "getDagInfo"); + route(WS_PREFIX_V2 + pajoin("verticesInfo", VERTEX_ID, DAG_ID), AMWebController.class, "getVerticesInfo"); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java index fc17d3e..62779bc 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java @@ -32,16 +32,23 @@ import static org.mockito.Mockito.when; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.webapp.Controller; import org.apache.tez.common.security.ACLManager; import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.client.ProgressBuilder; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.DAG; +import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.Vertex; +import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezVertexID; import org.junit.Assert; @@ -205,4 +212,159 @@ public class TestAMWebController { } + // AM Webservice Version 2 + //ArgumentCaptor<Map<String, Object>> returnResultCaptor; + @Captor + ArgumentCaptor<Map<String,Object>> returnResultCaptor; + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testGetDagInfo() { + AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext, + "TEST_HISTORY_URL"); + AMWebController spy = spy(amWebController); + DAG mockDAG = mock(DAG.class); + + + doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID(); + doReturn(66.0f).when(mockDAG).getProgress(); + doReturn(DAGState.RUNNING).when(mockDAG).getState(); + + doReturn(true).when(spy).setupResponse(); + doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest(); + doNothing().when(spy).renderJSON(any()); + + spy.getDagInfo(); + verify(spy).renderJSON(returnResultCaptor.capture()); + + final Map<String, Object> result = returnResultCaptor.getValue(); + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("dag")); + Map<String, String> dagInfo = (Map<String, String>) result.get("dag"); + + Assert.assertEquals(3, dagInfo.size()); + Assert.assertTrue("dag_1422960590892_0007_42".equals(dagInfo.get("id"))); + Assert.assertEquals("66.0", dagInfo.get("progress")); + Assert.assertEquals("RUNNING", dagInfo.get("status")); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testGetVerticesInfoGetAll() { + Vertex mockVertex1 = createMockVertex("vertex_1422960590892_0007_42_00", VertexState.RUNNING, + 0.33f, 3); + Vertex mockVertex2 = createMockVertex("vertex_1422960590892_0007_42_01", VertexState.SUCCEEDED, + 1.0f, 5); + + final Map<String, Object> result = getVerticesTestHelper(0, mockVertex1, mockVertex2); + + Assert.assertEquals(1, result.size()); + + Assert.assertTrue(result.containsKey("vertices")); + ArrayList<Map<String, String>> verticesInfo = (ArrayList<Map<String, String>>) result.get("vertices"); + Assert.assertEquals(2, verticesInfo.size()); + + Map<String, String> vertex1Result = verticesInfo.get(0); + Map<String, String> vertex2Result = verticesInfo.get(1); + + verifySingleVertexResult(mockVertex1, vertex1Result); + verifySingleVertexResult(mockVertex2, vertex2Result); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testGetVerticesInfoGetPartial() { + Vertex mockVertex1 = createMockVertex("vertex_1422960590892_0007_42_00", VertexState.RUNNING, + 0.33f, 3); + Vertex mockVertex2 = createMockVertex("vertex_1422960590892_0007_42_01", VertexState.SUCCEEDED, + 1.0f, 5); + + final Map<String, Object> result = getVerticesTestHelper(1, mockVertex1, mockVertex2); + + Assert.assertEquals(1, result.size()); + + Assert.assertTrue(result.containsKey("vertices")); + List<Map<String, String>> verticesInfo = (List<Map<String, String>>) result.get("vertices"); + Assert.assertEquals(1, verticesInfo.size()); + + Map<String, String> vertex1Result = verticesInfo.get(0); + + verifySingleVertexResult(mockVertex1, vertex1Result); + } + + Map<String, Object> getVerticesTestHelper(int numVerticesRequested, Vertex mockVertex1, + Vertex mockVertex2) { + DAG mockDAG = mock(DAG.class); + doReturn(TezDAGID.fromString("dag_1422960590892_0007_42")).when(mockDAG).getID(); + + TezVertexID vertexId1 = mockVertex1.getVertexId(); + doReturn(mockVertex1).when(mockDAG).getVertex(vertexId1); + TezVertexID vertexId2 = mockVertex2.getVertexId(); + doReturn(mockVertex2).when(mockDAG).getVertex(vertexId2); + + AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext, + "TEST_HISTORY_URL"); + AMWebController spy = spy(amWebController); + + doReturn(ImmutableMap.of( + mockVertex1.getVertexId(), mockVertex1, + mockVertex2.getVertexId(), mockVertex2 + )).when(mockDAG).getVertices(); + + doReturn(true).when(spy).setupResponse(); + doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest(); + + List<Integer> requested; + if (numVerticesRequested == 0) { + requested = ImmutableList.of(); + } else { + requested = ImmutableList.of(mockVertex1.getVertexId().getId()); + } + + doReturn(requested).when(spy).getVertexIDsFromRequest(); + doNothing().when(spy).renderJSON(any()); + + spy.getVerticesInfo(); + verify(spy).renderJSON(returnResultCaptor.capture()); + + return returnResultCaptor.getValue(); + } + + private Vertex createMockVertex(String vertexIDStr, VertexState status, float progress, + int taskCounts) { + ProgressBuilder pb = new ProgressBuilder(); + pb.setTotalTaskCount(taskCounts); + pb.setSucceededTaskCount(taskCounts * 2); + pb.setFailedTaskAttemptCount(taskCounts * 3); + pb.setKilledTaskAttemptCount(taskCounts * 4); + pb.setRunningTaskCount(taskCounts * 5); + + Vertex mockVertex = mock(Vertex.class); + doReturn(TezVertexID.fromString(vertexIDStr)).when(mockVertex).getVertexId(); + doReturn(status).when(mockVertex).getState(); + doReturn(progress).when(mockVertex).getProgress(); + doReturn(pb).when(mockVertex).getVertexProgress(); + + return mockVertex; + } + + + private void verifySingleVertexResult(Vertex mockVertex2, Map<String, String> vertex2Result) { + ProgressBuilder progress; + Assert.assertEquals(mockVertex2.getVertexId().toString(), vertex2Result.get("id")); + Assert.assertEquals(mockVertex2.getState().toString(), vertex2Result.get("status")); + Assert.assertEquals(Float.toString(mockVertex2.getProgress()), vertex2Result.get("progress")); + progress = mockVertex2.getVertexProgress(); + Assert.assertEquals(Integer.toString(progress.getTotalTaskCount()), + vertex2Result.get("totalTasks")); + Assert.assertEquals(Integer.toString(progress.getRunningTaskCount()), + vertex2Result.get("runningTasks")); + Assert.assertEquals(Integer.toString(progress.getSucceededTaskCount()), + vertex2Result.get("succeededTasks")); + Assert.assertEquals(Integer.toString(progress.getKilledTaskAttemptCount()), + vertex2Result.get("killedTaskAttempts")); + Assert.assertEquals(Integer.toString(progress.getFailedTaskAttemptCount()), + vertex2Result.get("failedTaskAttempts")); + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index b979402..1b7e183 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -31,6 +31,7 @@ import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.app.web.AMWebController; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.events.AMLaunchedEvent; @@ -182,6 +183,7 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.TEZ_VERSION, DAGUtils.convertTezVersionToATSMap(event.getVersion())); } + atsEntity.addOtherInfo(ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION); return atsEntity; } @@ -398,6 +400,7 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.APPLICATION_ATTEMPT_ID, event.getApplicationAttemptId().toString()); atsEntity.addOtherInfo(ATSConstants.USER, event.getUser()); + atsEntity.addOtherInfo(ATSConstants.DAG_AM_WEB_SERVICE_VERSION, AMWebController.VERSION); return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 75828c3..8db32b0 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -46,6 +46,7 @@ import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.impl.VertexStats; +import org.apache.tez.dag.app.web.AMWebController; import org.apache.tez.dag.history.HistoryEvent; import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.events.AMLaunchedEvent; @@ -242,12 +243,14 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(1, timelineEntity.getPrimaryFilters().size()); Assert.assertTrue(timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); - Assert.assertEquals(4, timelineEntity.getOtherInfo().size()); + Assert.assertEquals(5, timelineEntity.getOtherInfo().size()); Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.CONFIG)); Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.TEZ_VERSION)); Assert.assertEquals(user, timelineEntity.getOtherInfo().get(ATSConstants.USER)); Assert.assertEquals(applicationId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); + Assert.assertEquals(AMWebController.VERSION, + timelineEntity.getOtherInfo().get(ATSConstants.DAG_AM_WEB_SERVICE_VERSION)); Map<String, String> config = (Map<String, String>) timelineEntity.getOtherInfo().get(ATSConstants.CONFIG); @@ -428,6 +431,7 @@ public class TestHistoryEventTimelineConversion { Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); + Assert.assertEquals(5, timelineEntity.getOtherInfo().size()); Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); Assert.assertEquals(applicationId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); @@ -435,6 +439,8 @@ public class TestHistoryEventTimelineConversion { timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ATTEMPT_ID)); Assert.assertEquals(applicationAttemptId.getApplicationId().toString(), timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); + Assert.assertEquals(AMWebController.VERSION, + timelineEntity.getOtherInfo().get(ATSConstants.DAG_AM_WEB_SERVICE_VERSION)); Assert.assertEquals(user, timelineEntity.getOtherInfo().get(ATSConstants.USER)); } http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/app.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/app.js b/tez-ui/src/main/webapp/app/scripts/app.js index 4ca7b96..8e32874 100644 --- a/tez-ui/src/main/webapp/app/scripts/app.js +++ b/tez-ui/src/main/webapp/app/scripts/app.js @@ -190,6 +190,34 @@ App.ready = function () { return typeName + '?dagID=%@&vertexID=%@'; } }); + + // v2 version of am web services + App.DagInfoAdapter = App.AMInfoAdapter.extend({ + namespace: App.Configs.restNamespace.aminfoV2, + buildURL: function(type, id, record) { + var url = this._super(type, null, record); + return url.replace('__app_id__', record.get('appId')) + .fmt(record.get('dagIdx')); + }, + pathForType: function() { + return 'dagInfo?dagID=%@'; + } + }); + + App.VertexInfoAdapter = App.AMInfoAdapter.extend({ + namespace: App.Configs.restNamespace.aminfoV2, + ajax: function(url, method, hash) { + var options = hash.data || {}; + url = url.replace('__app_id__', options.appId) + .fmt(options.dagIdx); + delete options['dagIdx']; + delete options['appId']; + return this._super(url, method, hash); + }, + pathForType: function() { + return 'verticesInfo?dagID=%@'; + } + }); }; $.ajaxPrefilter(function(options, originalOptions, jqXHR) { http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/controllers/dag_controller.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/controllers/dag_controller.js b/tez-ui/src/main/webapp/app/scripts/controllers/dag_controller.js index 96e707f..af22918 100644 --- a/tez-ui/src/main/webapp/app/scripts/controllers/dag_controller.js +++ b/tez-ui/src/main/webapp/app/scripts/controllers/dag_controller.js @@ -26,19 +26,6 @@ App.DagController = Em.ObjectController.extend(App.Helpers.DisplayHelper, { var loaders = []; var applicationId = dag.get('applicationId'); - if (dag.get('status') === 'RUNNING') { - // update the progress info if available. this need not block the UI - App.Helpers.misc.removeRecord(this.store, 'dagProgress', dag.get('id')); - var aminfoLoader = that.store.find('dagProgress', dag.get('id'), { - appId: applicationId, - dagIdx: dag.get('idx') - }).then(function(dagProgressInfo) { - dag.set('progress', dagProgressInfo.get('progress')); - }).catch(function (error) { - Em.Logger.error("Failed to fetch dagProgress") - }); - loaders.push(aminfoLoader); - } App.Helpers.misc.removeRecord(this.store, 'appDetail', applicationId); var appDetailLoader = this.store.find('appDetail', applicationId) .then(function(app){ @@ -79,7 +66,111 @@ App.DagController = Em.ObjectController.extend(App.Helpers.DisplayHelper, { } } - return Em.RSVP.all(loaders); + var allLoaders = Em.RSVP.all(loaders); + allLoaders.then(function(){ + ['dagProgress', 'dagInfo', 'vertexInfo'].forEach(function(itemType){ + that.store.unloadAll(itemType); + }); + if (dag.get('status') === 'RUNNING') { + // update the progress info if available. this need not block the UI + if (dag.get('amWebServiceVersion') == 'v1') { + that.updateInfoFromAM(dag); + } else { + // if AM version is v2 we keep updating the status, progress etc live. + ["loading", "id", "model.status"].forEach(function(item) { + Em.addObserver(that, item, that.startAMInfoUpdateService); + }); + } + } + }); + + return allLoaders; + }, + + updateAMDagInfo: function() { + var dagId = this.get('id') + that = this, + dagInfoLoader = null; + + if (!dagId) return; + + if (this.store.recordIsLoaded("dagInfo", dagId)) { + var dagInfoRecord = this.store.recordForId("dagInfo", dagId); + if (dagInfoRecord.get('isLoading')) return; + dagInfoLoader = dagInfoRecord.reload(); + } else { + dagInfoLoader = this.store.find("dagInfo", dagId, { + appId: that.get('applicationId'), + dagIdx: that.get('idx') + }) + } + + dagInfoLoader.then(function(dagInfo){ + that.set('amDagInfo', dagInfo); + //TODO: find another way to trigger notification + that.set('amDagInfo._amInfoLastUpdatedTime', moment()); + }).catch(function(e){ + // do nothing. + }); + }, + + updateAMVerticesInfo: function() { + var dagId = this.get('id') + that = this, + verticesInfoLoader = null; + + if (!dagId) return; + + verticesInfoLoader = this.store.find('vertexInfo', { + appId: that.get('applicationId'), + dagIdx: that.get('idx') + }); + + verticesInfoLoader.then(function(verticesInfo) { + that.set('amVertexInfo', verticesInfo); + }).catch(function(e){ + // do nothing + }); + + }, + + startAMInfoUpdateService: function() { + if (this.get('loading') || !this.get('model.id') || this.get('model.status') != 'RUNNING') { + return; + } + + var amInfoUpdateService = this.get('amInfoUpdateService') + that = this; + + if (Em.isNone(amInfoUpdateService)) { + amInfoUpdateService = App.Helpers.pollster.create({ + onPoll: function() { + that.updateAMDagInfo(); + that.updateAMVerticesInfo(); + } + }); + that.set('amInfoUpdateService', amInfoUpdateService); + amInfoUpdateService.start(true); + + ["loading", "id", "model.status"].forEach(function(item) { + Em.addObserver(that, item, that.stopAMInfoUpdateService); + }); + } + }, + + dostopAMInfoUpdateService: function() { + var amInfoUpdateService = this.get('amInfoUpdateService'); + if (!Em.isNone(amInfoUpdateService)) { + amInfoUpdateService.stop(); + this.set('amInfoUpdateService', undefined); + } + }, + + // stop the update service if the status changes. see startAMInfoUpdateService + stopAMInfoUpdateService: function() { + if (this.get('loading') || this.get('model.status') != 'RUNNING') { + this.dostopAMInfoUpdateService(); + } }, enableAppIdLink: function() { http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/controllers/dag_index_controller.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/controllers/dag_index_controller.js b/tez-ui/src/main/webapp/app/scripts/controllers/dag_index_controller.js index 795c9e3..a8bf20f 100644 --- a/tez-ui/src/main/webapp/app/scripts/controllers/dag_index_controller.js +++ b/tez-ui/src/main/webapp/app/scripts/controllers/dag_index_controller.js @@ -81,7 +81,7 @@ App.DagIndexController = Em.ObjectController.extend(App.ModelRefreshMixin, { progressStr: function() { var pct; - if (Ember.typeOf(this.get('progress')) === 'number') { + if (Ember.typeOf(this.get('progress')) === 'number' && this.get('status') == 'RUNNING') { pct = App.Helpers.number.fractionToPercentage(this.get('progress')); } return pct; @@ -135,4 +135,11 @@ App.DagIndexController = Em.ObjectController.extend(App.ModelRefreshMixin, { } }.property('appContextInfo.appType'), + updateAMInfo: function() { + var status = this.get('amDagInfo.status'); + if (!Em.isNone(status)) { + this.set('status', status); + this.set('progress', this.get('amDagInfo.progress')); + } + }.observes('amDagInfo', 'amDagInfo._amInfoLastUpdatedTime') }); http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/controllers/dag_vertices.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/controllers/dag_vertices.js b/tez-ui/src/main/webapp/app/scripts/controllers/dag_vertices.js index b4ac5cc..ea25818 100644 --- a/tez-ui/src/main/webapp/app/scripts/controllers/dag_vertices.js +++ b/tez-ui/src/main/webapp/app/scripts/controllers/dag_vertices.js @@ -47,7 +47,9 @@ App.DagVerticesController = App.TablePageController.extend({ }); } - this._loadProgress(data); + if (this.get('controllers.dag.amWebServiceVersion') == 'v1') { + this._loadProgress(data); + } return this._super(); }, @@ -70,24 +72,45 @@ App.DagVerticesController = App.TablePageController.extend({ vertexIds: runningVerticesIdx.join(',') } }).then(function(vertexProgressInfo) { - vertexProgressInfo.forEach(function(item) { - var model = vertices.findBy('id', item.get('id')) || Em.Object.create(); - model.set('progress', item.get('progress')); - }); + that.set('controllers.dag.amVertexInfo', vertexProgressInfo); }).catch(function(error) { Em.Logger.debug("failed to fetch vertex progress") }); } }, + overlayVertexInfo: function(vertex, amVertexInfo) { + if (Em.isNone(amVertexInfo) || Em.isNone(vertex)) return; + amVertexInfo.set('_amInfoLastUpdatedTime', moment()); + vertex.setProperties(amVertexInfo.getProperties('status', 'progress', '_amInfoLastUpdatedTime')); + }, + + updateVertexInfo: function() { + var amVertexInfo = this.get('controllers.dag.amVertexInfo'); + var vertices = this.get('data'); + var that = this; + if (amVertexInfo && vertices) { + amVertexInfo.forEach(function(item) { + that.overlayVertexInfo(vertices.findBy('id', item.get('id')), item); + }); + } + }.observes('controllers.dag.amVertexInfo', 'data'), + defaultColumnConfigs: function() { function onProgressChange() { var progress = this.get('vertex.progress'), - pct; - if (Ember.typeOf(progress) === 'number') { + pct, + status; + status = this.get('vertex.status'); + if (Ember.typeOf(progress) === 'number' && status == 'RUNNING') { pct = App.Helpers.number.fractionToPercentage(progress); - this.set('progress', pct); } + this.setProperties({ + progress: pct, + status: status, + statusIcon: App.Helpers.misc.getStatusClassForEntity(status, + this.get('vertex.hasFailedTaskAttempts')) + }); } return [ @@ -118,14 +141,13 @@ App.DagVerticesController = App.TablePageController.extend({ var status = row.get('status'), content = Ember.Object.create({ vertex: row, - status: status, - statusIcon: App.Helpers.misc.getStatusClassForEntity(status, - row.get('hasFailedTaskAttempts')) }); - if(status == 'RUNNING') { + row.addObserver('_amInfoLastUpdatedTime', content, onProgressChange); row.addObserver('progress', content, onProgressChange); + row.addObserver('status', content, onProgressChange); } + onProgressChange.call(content); return content; } }, http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/default-configs.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/default-configs.js b/tez-ui/src/main/webapp/app/scripts/default-configs.js index d0ffa97..13867d8 100644 --- a/tez-ui/src/main/webapp/app/scripts/default-configs.js +++ b/tez-ui/src/main/webapp/app/scripts/default-configs.js @@ -48,7 +48,8 @@ $.extend(true, App.Configs, { restNamespace: { timeline: 'ws/v1/timeline', applicationHistory: 'ws/v1/applicationhistory', - aminfo: 'proxy/__app_id__/ws/v1/tez' + aminfo: 'proxy/__app_id__/ws/v1/tez', + aminfoV2: 'proxy/__app_id__/ws/v2/tez' }, tables: { http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/helpers/misc.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/helpers/misc.js b/tez-ui/src/main/webapp/app/scripts/helpers/misc.js index cb6d815..639c9e8 100644 --- a/tez-ui/src/main/webapp/app/scripts/helpers/misc.js +++ b/tez-ui/src/main/webapp/app/scripts/helpers/misc.js @@ -352,6 +352,14 @@ App.Helpers.misc = { return appId; }, + /* Gets the dag index from the dag id + * @param dagId {String} + * @return dag index for the given dagId {String} + */ + getDagIndexFromDagId: function(dagId) { + return dagId.split('_').splice(-1).pop(); + }, + /** * Remove the specific record from store * @param store {DS.Store} http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/helpers/pollster.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/helpers/pollster.js b/tez-ui/src/main/webapp/app/scripts/helpers/pollster.js new file mode 100644 index 0000000..5728b81 --- /dev/null +++ b/tez-ui/src/main/webapp/app/scripts/helpers/pollster.js @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +App.Helpers.pollster = Ember.Object.extend({ + interval: function() { + return this.get('_interval') || 10000; // Time between polls (in ms) + }.property().readOnly(), + + // Schedules the function `f` to be executed every `interval` time. + // if runImmediate is set first run is scheduled immedietly + schedule: function(f, runImmediete) { + return Ember.run.later(this, function() { + f.apply(this); + this.set('timer', this.schedule(f)); + }, this.get('interval')); + }, + + // Stops the pollster + stop: function() { + Ember.run.cancel(this.get('timer')); + }, + + // Starts the pollster, i.e. executes the `onPoll` function every interval. + start: function(runImmediate, interval) { + if (!!interval && interval > 1000) { + this.set('_interval', interval) + } + var callback = this.get('onPoll'); + if (runImmediate) { + callback.apply(this); + } + this.set('timer', this.schedule(callback, runImmediate)); + }, + + onPoll: function(){ + // Issue JSON request and add data to the store + } +}); \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/mixins/run_periodically.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/mixins/run_periodically.js b/tez-ui/src/main/webapp/app/scripts/mixins/run_periodically.js deleted file mode 100644 index 6a534a9..0000000 --- a/tez-ui/src/main/webapp/app/scripts/mixins/run_periodically.js +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Allow to run object method periodically and stop it - * Example: - * <code> - * var obj = Ember.Object.createWithMixins(App.RunPeriodically, { - * method: Ember.K - * }); - * obj.set('interval', 10000); // override default value - * obj.loop('method'); // run periodically - * obj.stop(); // stop running - * </code> - * @type {Ember.Mixin} - */ -App.RunPeriodically = Ember.Mixin.create({ - - /** - * Interval for loop - * @type {number} - */ - interval: 10000, - - /** - * setTimeout's return value - * @type {number} - */ - timer: null, - - /** - * Run <code>methodName</code> periodically with <code>interval</code> - * @param {string} methodName method name to run periodically - * @param {bool} initRun should methodName be run before setInterval call (default - true) - * @method run - */ - loop: function(methodName, initRun) { - initRun = Em.isNone(initRun) ? true : initRun; - var self = this, - interval = this.get('interval'); - Ember.assert('Interval should be numeric and greated than 0', $.isNumeric(interval) && interval > 0); - if (initRun) { - this[methodName](); - } - this.set('timer', - setInterval(function () { - self[methodName](); - }, interval) - ); - }, - - /** - * Stop running <code>timer</code> - * @method stop - */ - stop: function() { - var timer = this.get('timer'); - if (!Em.isNone(timer)) { - clearTimeout(timer); - } - } - -}); http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/models/TimelineRestAdapter.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/models/TimelineRestAdapter.js b/tez-ui/src/main/webapp/app/scripts/models/TimelineRestAdapter.js index 8f836cc..a21fd67 100644 --- a/tez-ui/src/main/webapp/app/scripts/models/TimelineRestAdapter.js +++ b/tez-ui/src/main/webapp/app/scripts/models/TimelineRestAdapter.js @@ -109,6 +109,11 @@ var timelineJsonToDagMap = { planName: 'otherinfo.dagPlan.dagName', planVersion: 'otherinfo.dagPlan.version', + amWebServiceVersion: { + custom: function(source) { + return Em.get(source, 'otherinfo.amWebServiceVersion') || '1'; + } + }, appContextInfo: { custom: function (source) { var appType = undefined, @@ -522,3 +527,19 @@ App.HiveQuerySerializer = App.TimelineSerializer.extend({ App.VertexProgressSerializer = App.DagProgressSerializer = DS.RESTSerializer.extend({}); +// v2 version of am web services +App.DagInfoSerializer = DS.RESTSerializer.extend({ + normalizePayload: function(rawPayload) { + return { + dagInfo : rawPayload.dag + } + } +}); + +App.VertexInfoSerializer = DS.RESTSerializer.extend({ + normalizePayload: function(rawPayload) { + return { + vertexInfo : rawPayload.vertices + } + } +}); http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/models/dag.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/models/dag.js b/tez-ui/src/main/webapp/app/scripts/models/dag.js index 7cfbc1b..011c181 100644 --- a/tez-ui/src/main/webapp/app/scripts/models/dag.js +++ b/tez-ui/src/main/webapp/app/scripts/models/dag.js @@ -18,11 +18,11 @@ App.Dag = App.AbstractEntity.extend({ idx: function() { - return this.get('id').split('_').splice(-1).pop(); + return App.Helpers.misc.getDagIndexFromDagId(this.get('id')); }.property('id'), submittedTime: DS.attr('number'), - + // start time of the entity startTime: DS.attr('number'), @@ -72,6 +72,7 @@ App.Dag = App.AbstractEntity.extend({ vertexIdToNameMap: DS.attr('array'), counterGroups: DS.attr('array'), + amWebServiceVersion: DS.attr('string') }); App.CounterGroup = DS.Model.extend({ @@ -391,6 +392,40 @@ App.VertexProgress = DS.Model.extend({ dagIdx: DS.attr('string') }); +App.DagInfo = DS.Model.extend({ + // we need appId and dagIdx as they are used for querying with AM + appId: function() { + return App.Helpers.misc.getAppIdFromDagId(this.get('id')); + }.property('id'), + dagIdx: function() { + return App.Helpers.misc.getDagIndexFromDagId(this.get('id')); + }.property('id'), + + progress: DS.attr('number'), + status: DS.attr('string'), +}); + +App.VertexInfo = DS.Model.extend({ + // we need appId and dagIdx as they are used for querying with AM + appId: function() { + return App.Helpers.misc.getAppIdFromDagId(this.get('id')); + }.property('id'), + dagIdx: function() { + return App.Helpers.misc.getDagIndexFromDagId(this.get('id')); + }.property('id'), + + progress: DS.attr('number'), + status: DS.attr('string'), + totalTasks: DS.attr('number'), + runningTasks: DS.attr('number'), + succeededTasks: DS.attr('number'), + failedTaskAttempts: DS.attr('number'), + killedTaskAttempts: DS.attr('number'), + pendingTasks: function() { + return this.get('totalTasks') - this.get('runningTasks') - this.get('succeededTasks'); + }.property('totalTasks', 'runningTasks', 'succeededTasks') +}); + App.KVDatum = DS.Model.extend({ key: DS.attr('string'), value: DS.attr('string'), http://git-wip-us.apache.org/repos/asf/tez/blob/7d412b20/tez-ui/src/main/webapp/app/scripts/router.js ---------------------------------------------------------------------- diff --git a/tez-ui/src/main/webapp/app/scripts/router.js b/tez-ui/src/main/webapp/app/scripts/router.js index 29ac3cc..c5c4687 100644 --- a/tez-ui/src/main/webapp/app/scripts/router.js +++ b/tez-ui/src/main/webapp/app/scripts/router.js @@ -148,7 +148,10 @@ App.DagRoute = Em.Route.extend({ afterModel: function(model) { return this.controllerFor('dag').loadAdditional(model); }, - setupController: setupControllerFactory('Dag: %@ (%@)', 'name', 'id') + setupController: setupControllerFactory('Dag: %@ (%@)', 'name', 'id'), + resetController: function() { + this.controller.dostopAMInfoUpdateService(); + } }); App.DagViewRoute = Em.Route.extend({
