Repository: tez Updated Branches: refs/heads/master b6f15dcdc -> b3ad59578
TEZ-2813. Tez UI: add counter data for rest api calls to AM Web Services v2 (sree via pramachandran) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1e58a0e4 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1e58a0e4 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1e58a0e4 Branch: refs/heads/master Commit: 1e58a0e418bfd93be1b789f288299a02a6b4c0dd Parents: b6f15dc Author: Prakash Ramachandran <[email protected]> Authored: Mon Sep 14 22:57:21 2015 +0530 Committer: Prakash Ramachandran <[email protected]> Committed: Mon Sep 14 22:57:21 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/dag/app/dag/Vertex.java | 7 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 32 ++- .../apache/tez/dag/app/web/AMWebController.java | 228 +++++++++++++++++-- .../apache/tez/dag/app/web/WebUIService.java | 4 + .../tez/dag/app/web/TestAMWebController.java | 191 +++++++++++++++- 6 files changed, 439 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 60010d0..bb023b1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -176,6 +176,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2813. Tez UI: add counter data for rest api calls to AM Web Services v2 TEZ-2660. Tez UI: need to show application page even if system metrics publish is disabled. TEZ-2787. Tez AM should have java.io.tmpdir=./tmp to be consistent with tasks TEZ-2780. Tez UI: Update All Tasks page while in progress http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index 552da11..96301a5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -80,6 +80,13 @@ public interface Vertex extends Comparable<Vertex> { */ TezCounters getAllCounters(); + /** + * Get all the counters of this vertex. + * @return aggregate task-counters + */ + TezCounters getCachedCounters(); + + Map<TezTaskID, Task> getTasks(); Task getTask(TezTaskID taskID); Task getTask(int taskIndex); http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 3dae42b..2e13090 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -228,6 +228,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID, Task>(); private Object fullCountersLock = new Object(); private TezCounters fullCounters = null; + private TezCounters cachedCounters = null; + private long cachedCountersTimestamp = 0; private Resource taskResource; private Configuration vertexConf; @@ -1190,6 +1192,34 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl } } + @Override + public TezCounters getCachedCounters() { + readLock.lock(); + + try { + // FIXME a better lightweight approach for counters is needed + if (fullCounters == null && cachedCounters != null + && ((cachedCountersTimestamp+10000) > System.currentTimeMillis())) { + LOG.info("Asked for counters" + + ", cachedCountersTimestamp=" + cachedCountersTimestamp + + ", currentTime=" + System.currentTimeMillis()); + return cachedCounters; + } + + cachedCountersTimestamp = System.currentTimeMillis(); + if (inTerminalState()) { + this.mayBeConstructFinalFullCounters(); + return fullCounters; + } + + TezCounters counters = new TezCounters(); + cachedCounters = incrTaskCounters(counters, tasks.values()); + return cachedCounters; + } finally { + readLock.unlock(); + } + } + public VertexStats getVertexStats() { readLock.lock(); @@ -2843,7 +2873,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl private static List<TaskAttemptIdentifier> getTaskAttemptIdentifiers(DAG dag, List<TezTaskAttemptID> taIds) { - List<TaskAttemptIdentifier> attempts = new ArrayList<>(taIds.size()); + List<TaskAttemptIdentifier> attempts = new ArrayList<TaskAttemptIdentifier>(taIds.size()); String dagName = dag.getName(); for (TezTaskAttemptID taId : taIds) { String vertexName = dag.getVertex(taId.getTaskID().getVertexID()).getName(); http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index 06f282c..cede341 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -25,18 +25,29 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.StringTokenizer; +import java.util.TreeMap; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.google.inject.name.Named; + +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.client.ProgressBuilder; import org.apache.tez.dag.app.dag.Task; -import org.apache.tez.dag.records.TezTaskID; +import org.apache.tez.dag.app.dag.TaskAttempt; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -343,7 +354,7 @@ public class AMWebController extends Controller { Collection<Integer> getVertexIDsFromRequest() { final String valueStr = $(WebUIService.VERTEX_ID).trim(); - List<Integer> vertexIDs = new ArrayList<>(); + List<Integer> vertexIDs = new ArrayList<Integer>(); if (!valueStr.equals("")) { String[] vertexIdsStr = valueStr.split(",", MAX_QUERIED); @@ -362,8 +373,46 @@ public class AMWebController extends Controller { return vertexIDs; } + /** + * Parse a params list in the format: CtrGroup/CtrName1,CtrName2;CtrGroup2; + * @return nested structure of counter groups and names. Null if nothing specified. + */ + Map<String, Set<String>> getCounterListFromRequest() { + final String counterStr = $(WebUIService.COUNTERS).trim(); + if (counterStr == null || counterStr.isEmpty()) { + return null; + } + + String delimiter = ";"; + String groupDelimiter = "/"; + String counterDelimiter = ","; + + StringTokenizer tokenizer = new StringTokenizer(counterStr, delimiter); + + Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>(); + while (tokenizer.hasMoreElements()) { + String token = tokenizer.nextToken().trim(); + int pos = token.indexOf(groupDelimiter); + if (pos == -1) { + counterList.put(token, Collections.<String>emptySet()); + continue; + } + String counterGroup = token.substring(0, pos); + Set<String> counters = Collections.<String>emptySet(); + if (pos < token.length() - 1) { + String counterNames = token.substring(pos+1, token.length()); + counters = Sets.newHashSet( + Splitter.on(counterDelimiter).omitEmptyStrings() + .trimResults().split(counterNames)); + } + counterList.put(counterGroup, counters); + } + return counterList; + } + + List<String> splitString(String str, String delimiter, Integer limit) { - List<String> items = new ArrayList<>(); + List<String> items = new ArrayList<String>(); StringTokenizer tokenizer = new StringTokenizer(str, delimiter); for(int count = 0; tokenizer.hasMoreElements() && count < limit; count ++) { @@ -386,7 +435,7 @@ public class AMWebController extends Controller { List<Integer> getIntegersFromRequest(String paramName, Integer limit) { String valuesStr = $(paramName).trim(); - List<Integer> values = new ArrayList<>(); + List<Integer> values = new ArrayList<Integer>(); if (!valuesStr.equals("")) { try { for (String valueStr : splitString(valuesStr, ",", limit)) { @@ -404,7 +453,7 @@ public class AMWebController extends Controller { } /** - * getIDsFromRequest + * getTaskIDsFromRequest * Takes in "1_0,1_3" and returns [[1,0],[1,3]] * Mainly to parse a query parameter with comma separated indexes. For vertex its the index, * for task its vertexIndex_taskIndex and for attempts its vertexIndex_taskIndex_attemptNo @@ -415,16 +464,16 @@ public class AMWebController extends Controller { * * @return {List<List<Integer>>} List of parsed values */ - List<List<Integer>> getIDsFromRequest(String paramName, Integer limit) { + List<List<Integer>> getIDsFromRequest(String paramName, Integer limit, Integer count) { String valuesStr = $(paramName).trim(); - List<List<Integer>> values = new ArrayList<>(); + List<List<Integer>> values = new ArrayList<List<Integer>>(); if (!valuesStr.equals("")) { try { for (String valueStr : splitString(valuesStr, ",", limit)) { - List<Integer> innerValues = new ArrayList<>(); + List<Integer> innerValues = new ArrayList<Integer>(); String innerValueStrs[] = valueStr.split("_"); - if(innerValueStrs.length == 2) { + if(innerValueStrs.length == count) { for (String innerValueStr : innerValueStrs) { int value = Integer.parseInt(innerValueStr); innerValues.add(value); @@ -452,7 +501,7 @@ public class AMWebController extends Controller { return; } - Map<String, String> dagInfo = new HashMap<>(); + Map<String, String> dagInfo = new HashMap<String, String>(); dagInfo.put("id", dag.getID().toString()); dagInfo.put("progress", Float.toString(dag.getProgress())); dagInfo.put("status", dag.getState().toString()); @@ -462,8 +511,33 @@ public class AMWebController extends Controller { )); } - private Map<String,String> getVertexInfoMap(Vertex vertex) { - Map<String, String> vertexInfo = new HashMap<>(); + Map<String, Map<String, Long>> constructCounterMapInfo(TezCounters counters, + Map<String, Set<String>> counterNames) { + if (counterNames == null || counterNames.isEmpty()) { + return null; + } + LOG.info("Requested counter names=" + counterNames.entrySet()); + LOG.info("actual counters=" + counters); + + Map<String, Map<String, Long>> counterInfo = new TreeMap<String, Map<String, Long>>(); + + for (Entry<String, Set<String>> entry : counterNames.entrySet()) { + Map<String, Long> matchedCounters = new HashMap<String, Long>(); + CounterGroup grpCounters = counters.getGroup(entry.getKey()); + for (TezCounter counter : grpCounters) { + if (entry.getValue().isEmpty() || entry.getValue().contains(counter.getName())) { + matchedCounters.put(counter.getName(), counter.getValue()); + } + } + counterInfo.put(entry.getKey(), matchedCounters); + } + + return counterInfo; + } + + private Map<String, Object> getVertexInfoMap(Vertex vertex, + Map<String, Set<String>> counterNames) { + Map<String, Object> vertexInfo = new HashMap<String, Object>(); vertexInfo.put("id", vertex.getVertexId().toString()); vertexInfo.put("status", vertex.getState().toString()); vertexInfo.put("progress", Float.toString(vertex.getProgress())); @@ -473,8 +547,18 @@ public class AMWebController extends Controller { vertexInfo.put("runningTasks", Integer.toString(vertexProgress.getRunningTaskCount())); vertexInfo.put("succeededTasks", Integer.toString(vertexProgress.getSucceededTaskCount())); - vertexInfo.put("failedTaskAttempts", Integer.toString(vertexProgress.getFailedTaskAttemptCount())); - vertexInfo.put("killedTaskAttempts", Integer.toString(vertexProgress.getKilledTaskAttemptCount())); + vertexInfo.put("failedTaskAttempts", + Integer.toString(vertexProgress.getFailedTaskAttemptCount())); + vertexInfo.put("killedTaskAttempts", + Integer.toString(vertexProgress.getKilledTaskAttemptCount())); + + if (counterNames != null && !counterNames.isEmpty()) { + TezCounters counters = vertex.getCachedCounters(); + Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); + if (counterMap != null && !counterMap.isEmpty()) { + vertexInfo.put("counters", counterMap); + } + } return vertexInfo; } @@ -495,6 +579,8 @@ public class AMWebController extends Controller { return; } + Map<String, Set<String>> counterNames = getCounterListFromRequest(); + Collection<Vertex> vertexList; if (requestedIDs.isEmpty()) { // no ids specified return all. @@ -503,9 +589,9 @@ public class AMWebController extends Controller { vertexList = getVerticesByIdx(dag, requestedIDs); } - ArrayList<Map<String, String>> verticesInfo = new ArrayList<>(); + ArrayList<Map<String, Object>> verticesInfo = new ArrayList<Map<String, Object>>(); for(Vertex v : vertexList) { - verticesInfo.add(getVertexInfoMap(v)); + verticesInfo.add(getVertexInfoMap(v, counterNames)); } renderJSON(ImmutableMap.of( @@ -528,9 +614,9 @@ public class AMWebController extends Controller { * @param limit {Integer} */ List<Task> getRequestedTasks(DAG dag, Integer limit) { - List<Task> tasks = new ArrayList<>(); + List<Task> tasks = new ArrayList<Task>(); - List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID, limit); + List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID, limit, 2); if(taskIDs == null) { return null; } @@ -564,7 +650,7 @@ public class AMWebController extends Controller { if(vertex == null) { continue; } - List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values()); + List<Task> vertexTasks = new ArrayList<Task>(vertex.getTasks().values()); tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size()))); if(tasks.size() >= limit) { @@ -575,7 +661,7 @@ public class AMWebController extends Controller { else { Collection<Vertex> vertices = dag.getVertices().values(); for (Vertex vertex : vertices) { - List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values()); + List<Task> vertexTasks = new ArrayList<Task>(vertex.getTasks().values()); tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size()))); if(tasks.size() >= limit) { @@ -614,12 +700,20 @@ public class AMWebController extends Controller { return; } - ArrayList<Map<String, String>> tasksInfo = new ArrayList<>(); + Map<String, Set<String>> counterNames = getCounterListFromRequest(); + + ArrayList<Map<String, Object>> tasksInfo = new ArrayList<Map<String, Object>>(); for(Task t : tasks) { - Map<String, String> taskInfo = new HashMap<>(); + Map<String, Object> taskInfo = new HashMap<String, Object>(); taskInfo.put("id", t.getTaskId().toString()); taskInfo.put("progress", Float.toString(t.getProgress())); taskInfo.put("status", t.getState().toString()); + + TezCounters counters = t.getCounters(); + Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); + if (counterMap != null && !counterMap.isEmpty()) { + taskInfo.put("counters", counterMap); + } tasksInfo.add(taskInfo); } @@ -628,6 +722,96 @@ public class AMWebController extends Controller { )); } + /** + * getRequestedAttempts + * Given a dag and a limit, based on the incoming query parameters. Used by getAttemptsInfo + * returns a list of task instances + * + * @param dag {DAG} + * @param limit {Integer} + */ + List<TaskAttempt> getRequestedAttempts(DAG dag, Integer limit) { + List<TaskAttempt> attempts = new ArrayList<TaskAttempt>(); + + List<List<Integer>> attemptIDs = getIDsFromRequest(WebUIService.ATTEMPT_ID, limit, 3); + if(attemptIDs == null) { + return null; + } + else if(!attemptIDs.isEmpty()) { + for (List<Integer> indexes : attemptIDs) { + Vertex vertex = getVertexFromIndex(dag, indexes.get(0)); + if(vertex == null) { + continue; + } + Task task = vertex.getTask(indexes.get(1)); + if(task == null) { + continue; + } + + TaskAttempt attempt = task. + getAttempt(TezTaskAttemptID.getInstance(task.getTaskId(), indexes.get(2))); + if(attempt == null) { + continue; + } + else { + attempts.add(attempt); + } + + if(attempts.size() >= limit) { + break; + } + } + } + + return attempts; + } + + /** + * Renders the response JSON for attemptsInfo API + * The JSON will have an array of attempt objects under the key attempts. + */ + public void getAttemptsInfo() { + if (!setupResponse()) { + return; + } + + DAG dag = checkAndGetDAGFromRequest(); + if (dag == null) { + return; + } + + int limit = MAX_QUERIED; + try { + limit = getQueryParamInt(WebUIService.LIMIT); + } catch (NumberFormatException e) { + //Ignore + } + + List<TaskAttempt> attempts = getRequestedAttempts(dag, limit); + if(attempts == null) { + return; + } + + Map<String, Set<String>> counterNames = getCounterListFromRequest(); + + ArrayList<Map<String, Object>> attemptsInfo = new ArrayList<Map<String, Object>>(); + for(TaskAttempt a : attempts) { + Map<String, Object> attemptInfo = new HashMap<String, Object>(); + attemptInfo.put("id", a.getID().toString()); + attemptInfo.put("progress", Float.toString(a.getProgress())); + attemptInfo.put("status", a.getState().toString()); + + TezCounters counters = a.getCounters(); + Map<String, Map<String, Long>> counterMap = constructCounterMapInfo(counters, counterNames); + if (counterMap != null && !counterMap.isEmpty()) { + attemptInfo.put("counters", counterMap); + } + attemptsInfo.add(attemptInfo); + } + + renderJSON(ImmutableMap.of("attempts", attemptsInfo)); + } + @Override @VisibleForTesting public void renderJSON(Object object) { http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java index 32b57e9..a894d25 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java @@ -41,6 +41,8 @@ public class WebUIService extends AbstractService { public static final String VERTEX_ID = "vertexID"; public static final String DAG_ID = "dagID"; public static final String TASK_ID = "taskID"; + public static final String ATTEMPT_ID = "attemptID"; + public static final String COUNTERS = "counters"; public static final String LIMIT = "limit"; @@ -202,6 +204,8 @@ public class WebUIService extends AbstractService { route(WS_PREFIX_V2 + pajoin("verticesInfo", VERTEX_ID, DAG_ID), AMWebController.class, "getVerticesInfo"); route(WS_PREFIX_V2 + pajoin("tasksInfo", TASK_ID, VERTEX_ID, DAG_ID), AMWebController.class, "getTasksInfo"); + route(WS_PREFIX_V2 + pajoin("attemptsInfo", ATTEMPT_ID, DAG_ID), AMWebController.class, + "getAttemptsInfo"); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/1e58a0e4/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java index cba3c3e..56a4a82 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java @@ -38,6 +38,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -45,17 +47,21 @@ import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.webapp.Controller; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.common.security.ACLManager; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.client.ProgressBuilder; +import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.junit.Assert; @@ -321,6 +327,9 @@ public class TestAMWebController { doReturn(true).when(spy).setupResponse(); doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest(); + Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>(); + doReturn(counterList).when(spy).getCounterListFromRequest(); + List<Integer> requested; if (numVerticesRequested == 0) { requested = ImmutableList.of(); @@ -352,6 +361,21 @@ public class TestAMWebController { doReturn(progress).when(mockVertex).getProgress(); doReturn(pb).when(mockVertex).getVertexProgress(); + TezCounters counters = new TezCounters(); + counters.addGroup("g1", "g1"); + counters.addGroup("g2", "g2"); + counters.addGroup("g3", "g3"); + counters.addGroup("g4", "g4"); + counters.findCounter("g1", "g1_c1").setValue(100); + counters.findCounter("g1", "g1_c2").setValue(100); + counters.findCounter("g2", "g2_c3").setValue(100); + counters.findCounter("g2", "g2_c4").setValue(100); + counters.findCounter("g3", "g3_c5").setValue(100); + counters.findCounter("g3", "g3_c6").setValue(100); + + doReturn(counters).when(mockVertex).getAllCounters(); + doReturn(counters).when(mockVertex).getCachedCounters(); + return mockVertex; } @@ -557,11 +581,14 @@ public class TestAMWebController { // Set mock query params doReturn(limit).when(spy).getQueryParamInt(WebUIService.LIMIT); doReturn(vertexMinIds).when(spy).getIntegersFromRequest(WebUIService.VERTEX_ID, limit); - doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID, limit); + doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID, limit, 2); // Set function mocks doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest(); + Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>(); + doReturn(counterList).when(spy).getCounterListFromRequest(); + spy.getTasksInfo(); verify(spy).renderJSON(returnResultCaptor.capture()); @@ -589,6 +616,20 @@ public class TestAMWebController { doReturn(status).when(mockTask).getState(); doReturn(progress).when(mockTask).getProgress(); + TezCounters counters = new TezCounters(); + counters.addGroup("g1", "g1"); + counters.addGroup("g2", "g2"); + counters.addGroup("g3", "g3"); + counters.addGroup("g4", "g4"); + counters.findCounter("g1", "g1_c1").setValue(101); + counters.findCounter("g1", "g1_c2").setValue(102); + counters.findCounter("g2", "g2_c3").setValue(103); + counters.findCounter("g2", "g2_c4").setValue(104); + counters.findCounter("g3", "g3_c5").setValue(105); + counters.findCounter("g3", "g3_c6").setValue(106); + + doReturn(counters).when(mockTask).getCounters(); + return mockTask; } @@ -598,4 +639,152 @@ public class TestAMWebController { Assert.assertEquals(mockTask.getState().toString(), taskResult.get("status")); Assert.assertEquals(Float.toString(mockTask.getProgress()), taskResult.get("progress")); } + + //-- Get Attempts Info Tests ----------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testGetAttemptsInfoWithIds() { + List <TaskAttempt> attempts = createMockAttempts(); + List <Integer> vertexMinIds = Arrays.asList(); + List <Integer> taskMinIds = Arrays.asList(); + List <List <Integer>> attemptMinIds = Arrays.asList(Arrays.asList(0, 0, 0), + Arrays.asList(0, 0, 1), + Arrays.asList(0, 0, 2), + Arrays.asList(0, 0, 3)); + + // Fetch All + Map<String, Object> result = getAttemptsTestHelper(attempts, attemptMinIds, vertexMinIds, + taskMinIds, AMWebController.MAX_QUERIED); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("attempts")); + + ArrayList<Map<String, String>> attemptsInfo = (ArrayList<Map<String, String>>) result. + get("attempts"); + Assert.assertEquals(4, attemptsInfo.size()); + + verifySingleAttemptResult(attempts.get(0), attemptsInfo.get(0)); + verifySingleAttemptResult(attempts.get(1), attemptsInfo.get(1)); + verifySingleAttemptResult(attempts.get(2), attemptsInfo.get(2)); + verifySingleAttemptResult(attempts.get(3), attemptsInfo.get(3)); + + // With limit + result = getAttemptsTestHelper(attempts, attemptMinIds, vertexMinIds, taskMinIds, 2); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("attempts")); + + attemptsInfo = (ArrayList<Map<String, String>>) result.get("attempts"); + Assert.assertEquals(2, attemptsInfo.size()); + + verifySingleAttemptResult(attempts.get(0), attemptsInfo.get(0)); + verifySingleAttemptResult(attempts.get(1), attemptsInfo.get(1)); + } + + Map<String, Object> getAttemptsTestHelper(List<TaskAttempt> attempts, List <List <Integer>> attemptMinIds, + List<Integer> vertexMinIds, List<Integer> taskMinIds, Integer limit) { + //Creating mock DAG + DAG mockDAG = mock(DAG.class); + doReturn(TezDAGID.fromString("dag_1441301219877_0109_1")).when(mockDAG).getID(); + + //Creating mock vertex and attaching to mock DAG + TezVertexID vertexID = TezVertexID.fromString("vertex_1441301219877_0109_1_00"); + Vertex mockVertex = mock(Vertex.class); + doReturn(vertexID).when(mockVertex).getVertexId(); + + doReturn(mockVertex).when(mockDAG).getVertex(vertexID); + doReturn(ImmutableMap.of( + vertexID, mockVertex + )).when(mockDAG).getVertices(); + + //Creating mock task and attaching to mock Vertex + TezTaskID taskID = TezTaskID.fromString("task_1441301219877_0109_1_00_000000"); + Task mockTask = mock(Task.class); + doReturn(taskID).when(mockTask).getTaskId(); + int taskIndex = taskID.getId(); + doReturn(mockTask).when(mockVertex).getTask(taskIndex); + doReturn(ImmutableMap.of( + taskID, mockTask + )).when(mockVertex).getTasks(); + + //Creating mock tasks and attaching to mock vertex + Map<TezTaskAttemptID, TaskAttempt> attemptsMap = Maps.newHashMap(); + for(TaskAttempt attempt : attempts) { + TezTaskAttemptID attemptId = attempt.getID(); + doReturn(attempt).when(mockTask).getAttempt(attemptId); + attemptsMap.put(attemptId, attempt); + } + doReturn(attemptsMap).when(mockTask).getAttempts(); + + //Creates & setup controller spy + AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext, + "TEST_HISTORY_URL"); + AMWebController spy = spy(amWebController); + doReturn(true).when(spy).setupResponse(); + doNothing().when(spy).renderJSON(any()); + + // Set mock query params + doReturn(limit).when(spy).getQueryParamInt(WebUIService.LIMIT); + doReturn(vertexMinIds).when(spy).getIntegersFromRequest(WebUIService.VERTEX_ID, limit); + doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID, limit, 2); + doReturn(attemptMinIds).when(spy).getIDsFromRequest(WebUIService.ATTEMPT_ID, limit, 3); + + // Set function mocks + doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest(); + + Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>(); + doReturn(counterList).when(spy).getCounterListFromRequest(); + + spy.getAttemptsInfo(); + verify(spy).renderJSON(returnResultCaptor.capture()); + + return returnResultCaptor.getValue(); + } + + private List<TaskAttempt> createMockAttempts() { + TaskAttempt mockAttempt1 = createMockAttempt("attempt_1441301219877_0109_1_00_000000_0", TaskAttemptState.RUNNING, + 0.33f); + TaskAttempt mockAttempt2 = createMockAttempt("attempt_1441301219877_0109_1_00_000000_1", TaskAttemptState.SUCCEEDED, + 1.0f); + TaskAttempt mockAttempt3 = createMockAttempt("attempt_1441301219877_0109_1_00_000000_2", TaskAttemptState.FAILED, + .8f); + TaskAttempt mockAttempt4 = createMockAttempt("attempt_1441301219877_0109_1_00_000000_3", TaskAttemptState.SUCCEEDED, + .8f); + + List <TaskAttempt> attempts = Arrays.asList(mockAttempt1, mockAttempt2, mockAttempt3, mockAttempt4); + return attempts; + } + + private TaskAttempt createMockAttempt(String attemptIDStr, TaskAttemptState status, float progress) { + TaskAttempt mockAttempt = mock(TaskAttempt.class); + + doReturn(TezTaskAttemptID.fromString(attemptIDStr)).when(mockAttempt).getID(); + doReturn(status).when(mockAttempt).getState(); + doReturn(progress).when(mockAttempt).getProgress(); + + TezCounters counters = new TezCounters(); + counters.addGroup("g1", "g1"); + counters.addGroup("g2", "g2"); + counters.addGroup("g3", "g3"); + counters.addGroup("g4", "g4"); + counters.findCounter("g1", "g1_c1").setValue(101); + counters.findCounter("g1", "g1_c2").setValue(102); + counters.findCounter("g2", "g2_c3").setValue(103); + counters.findCounter("g2", "g2_c4").setValue(104); + counters.findCounter("g3", "g3_c5").setValue(105); + counters.findCounter("g3", "g3_c6").setValue(106); + + doReturn(counters).when(mockAttempt).getCounters(); + + return mockAttempt; + } + + private void verifySingleAttemptResult(TaskAttempt mockTask, Map<String, String> taskResult) { + Assert.assertEquals(3, taskResult.size()); + Assert.assertEquals(mockTask.getID().toString(), taskResult.get("id")); + Assert.assertEquals(mockTask.getState().toString(), taskResult.get("status")); + Assert.assertEquals(Float.toString(mockTask.getProgress()), taskResult.get("progress")); + } + }
