Repository: tez
Updated Branches:
refs/heads/branch-0.7 0eb8ea08f -> 6b05e1965
TEZ-2813. Tez UI: add counter data for rest api calls to AM Web Services v2
(sree via pramachandran)
(cherry picked from commit 1e58a0e418bfd93be1b789f288299a02a6b4c0dd)
Conflicts:
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/64526ea2
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/64526ea2
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/64526ea2
Branch: refs/heads/branch-0.7
Commit: 64526ea2c7ed661f6e584998be5278c812e80df1
Parents: 0eb8ea0
Author: Prakash Ramachandran <[email protected]>
Authored: Mon Sep 14 22:57:21 2015 +0530
Committer: Prakash Ramachandran <[email protected]>
Committed: Mon Sep 14 23:18:22 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../java/org/apache/tez/dag/app/dag/Vertex.java | 7 +
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 30 +++
.../apache/tez/dag/app/web/AMWebController.java | 210 +++++++++++++++++--
.../apache/tez/dag/app/web/WebUIService.java | 4 +
.../tez/dag/app/web/TestAMWebController.java | 191 ++++++++++++++++-
6 files changed, 429 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/64526ea2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0cd0b88..e7eb8d1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.7.1: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2813. Tez UI: add counter data for rest api calls to AM Web Services v2
TEZ-2660. Tez UI: need to show application page even if system metrics
publish is disabled.
TEZ-2787. Tez AM should have java.io.tmpdir=./tmp to be consistent with tasks
TEZ-2780. Tez UI: Update All Tasks page while in progress
http://git-wip-us.apache.org/repos/asf/tez/blob/64526ea2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index bb3548d..bb580da 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -80,6 +80,13 @@ public interface Vertex extends Comparable<Vertex> {
*/
TezCounters getAllCounters();
+ /**
+ * Get all the counters of this vertex.
+ * @return aggregate task-counters
+ */
+ TezCounters getCachedCounters();
+
+
Map<TezTaskID, Task> getTasks();
Task getTask(TezTaskID taskID);
Task getTask(int taskIndex);
http://git-wip-us.apache.org/repos/asf/tez/blob/64526ea2/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 95ee298..f16e6f9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -224,6 +224,8 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
volatile LinkedHashMap<TezTaskID, Task> tasks = new LinkedHashMap<TezTaskID,
Task>();
private Object fullCountersLock = new Object();
private TezCounters fullCounters = null;
+ private TezCounters cachedCounters = null;
+ private long cachedCountersTimestamp = 0;
private Resource taskResource;
private Configuration vertexConf;
@@ -1085,6 +1087,34 @@ public class VertexImpl implements
org.apache.tez.dag.app.dag.Vertex, EventHandl
}
}
+ @Override
+ public TezCounters getCachedCounters() {
+ readLock.lock();
+
+ try {
+ // FIXME a better lightweight approach for counters is needed
+ if (fullCounters == null && cachedCounters != null
+ && ((cachedCountersTimestamp+10000) > System.currentTimeMillis())) {
+ LOG.info("Asked for counters"
+ + ", cachedCountersTimestamp=" + cachedCountersTimestamp
+ + ", currentTime=" + System.currentTimeMillis());
+ return cachedCounters;
+ }
+
+ cachedCountersTimestamp = System.currentTimeMillis();
+ if (inTerminalState()) {
+ this.mayBeConstructFinalFullCounters();
+ return fullCounters;
+ }
+
+ TezCounters counters = new TezCounters();
+ cachedCounters = incrTaskCounters(counters, tasks.values());
+ return cachedCounters;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
public VertexStats getVertexStats() {
readLock.lock();
http://git-wip-us.apache.org/repos/asf/tez/blob/64526ea2/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
index c1ddb28..cede341 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java
@@ -25,18 +25,29 @@ import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.StringTokenizer;
+import java.util.TreeMap;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.google.inject.name.Named;
+
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.client.ProgressBuilder;
import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -362,6 +373,44 @@ public class AMWebController extends Controller {
return vertexIDs;
}
+ /**
+ * Parse a params list in the format: CtrGroup/CtrName1,CtrName2;CtrGroup2;
+ * @return nested structure of counter groups and names. Null if nothing
specified.
+ */
+ Map<String, Set<String>> getCounterListFromRequest() {
+ final String counterStr = $(WebUIService.COUNTERS).trim();
+ if (counterStr == null || counterStr.isEmpty()) {
+ return null;
+ }
+
+ String delimiter = ";";
+ String groupDelimiter = "/";
+ String counterDelimiter = ",";
+
+ StringTokenizer tokenizer = new StringTokenizer(counterStr, delimiter);
+
+ Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>();
+ while (tokenizer.hasMoreElements()) {
+ String token = tokenizer.nextToken().trim();
+ int pos = token.indexOf(groupDelimiter);
+ if (pos == -1) {
+ counterList.put(token, Collections.<String>emptySet());
+ continue;
+ }
+ String counterGroup = token.substring(0, pos);
+ Set<String> counters = Collections.<String>emptySet();
+ if (pos < token.length() - 1) {
+ String counterNames = token.substring(pos+1, token.length());
+ counters = Sets.newHashSet(
+ Splitter.on(counterDelimiter).omitEmptyStrings()
+ .trimResults().split(counterNames));
+ }
+ counterList.put(counterGroup, counters);
+ }
+ return counterList;
+ }
+
+
List<String> splitString(String str, String delimiter, Integer limit) {
List<String> items = new ArrayList<String>();
@@ -404,7 +453,7 @@ public class AMWebController extends Controller {
}
/**
- * getIDsFromRequest
+ * getTaskIDsFromRequest
* Takes in "1_0,1_3" and returns [[1,0],[1,3]]
* Mainly to parse a query parameter with comma separated indexes. For
vertex its the index,
* for task its vertexIndex_taskIndex and for attempts its
vertexIndex_taskIndex_attemptNo
@@ -415,7 +464,7 @@ public class AMWebController extends Controller {
*
* @return {List<List<Integer>>} List of parsed values
*/
- List<List<Integer>> getIDsFromRequest(String paramName, Integer limit) {
+ List<List<Integer>> getIDsFromRequest(String paramName, Integer limit,
Integer count) {
String valuesStr = $(paramName).trim();
List<List<Integer>> values = new ArrayList<List<Integer>>();
@@ -424,7 +473,7 @@ public class AMWebController extends Controller {
for (String valueStr : splitString(valuesStr, ",", limit)) {
List<Integer> innerValues = new ArrayList<Integer>();
String innerValueStrs[] = valueStr.split("_");
- if(innerValueStrs.length == 2) {
+ if(innerValueStrs.length == count) {
for (String innerValueStr : innerValueStrs) {
int value = Integer.parseInt(innerValueStr);
innerValues.add(value);
@@ -462,8 +511,33 @@ public class AMWebController extends Controller {
));
}
- private Map<String,String> getVertexInfoMap(Vertex vertex) {
- Map<String, String> vertexInfo = new HashMap<String, String>();
+ Map<String, Map<String, Long>> constructCounterMapInfo(TezCounters counters,
+ Map<String, Set<String>> counterNames) {
+ if (counterNames == null || counterNames.isEmpty()) {
+ return null;
+ }
+ LOG.info("Requested counter names=" + counterNames.entrySet());
+ LOG.info("actual counters=" + counters);
+
+ Map<String, Map<String, Long>> counterInfo = new TreeMap<String,
Map<String, Long>>();
+
+ for (Entry<String, Set<String>> entry : counterNames.entrySet()) {
+ Map<String, Long> matchedCounters = new HashMap<String, Long>();
+ CounterGroup grpCounters = counters.getGroup(entry.getKey());
+ for (TezCounter counter : grpCounters) {
+ if (entry.getValue().isEmpty() ||
entry.getValue().contains(counter.getName())) {
+ matchedCounters.put(counter.getName(), counter.getValue());
+ }
+ }
+ counterInfo.put(entry.getKey(), matchedCounters);
+ }
+
+ return counterInfo;
+ }
+
+ private Map<String, Object> getVertexInfoMap(Vertex vertex,
+ Map<String, Set<String>>
counterNames) {
+ Map<String, Object> vertexInfo = new HashMap<String, Object>();
vertexInfo.put("id", vertex.getVertexId().toString());
vertexInfo.put("status", vertex.getState().toString());
vertexInfo.put("progress", Float.toString(vertex.getProgress()));
@@ -473,8 +547,18 @@ public class AMWebController extends Controller {
vertexInfo.put("runningTasks",
Integer.toString(vertexProgress.getRunningTaskCount()));
vertexInfo.put("succeededTasks",
Integer.toString(vertexProgress.getSucceededTaskCount()));
- vertexInfo.put("failedTaskAttempts",
Integer.toString(vertexProgress.getFailedTaskAttemptCount()));
- vertexInfo.put("killedTaskAttempts",
Integer.toString(vertexProgress.getKilledTaskAttemptCount()));
+ vertexInfo.put("failedTaskAttempts",
+ Integer.toString(vertexProgress.getFailedTaskAttemptCount()));
+ vertexInfo.put("killedTaskAttempts",
+ Integer.toString(vertexProgress.getKilledTaskAttemptCount()));
+
+ if (counterNames != null && !counterNames.isEmpty()) {
+ TezCounters counters = vertex.getCachedCounters();
+ Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
+ if (counterMap != null && !counterMap.isEmpty()) {
+ vertexInfo.put("counters", counterMap);
+ }
+ }
return vertexInfo;
}
@@ -495,6 +579,8 @@ public class AMWebController extends Controller {
return;
}
+ Map<String, Set<String>> counterNames = getCounterListFromRequest();
+
Collection<Vertex> vertexList;
if (requestedIDs.isEmpty()) {
// no ids specified return all.
@@ -503,9 +589,9 @@ public class AMWebController extends Controller {
vertexList = getVerticesByIdx(dag, requestedIDs);
}
- ArrayList<Map<String, String>> verticesInfo = new ArrayList<Map<String,
String>>();
+ ArrayList<Map<String, Object>> verticesInfo = new ArrayList<Map<String,
Object>>();
for(Vertex v : vertexList) {
- verticesInfo.add(getVertexInfoMap(v));
+ verticesInfo.add(getVertexInfoMap(v, counterNames));
}
renderJSON(ImmutableMap.of(
@@ -530,7 +616,7 @@ public class AMWebController extends Controller {
List<Task> getRequestedTasks(DAG dag, Integer limit) {
List<Task> tasks = new ArrayList<Task>();
- List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID,
limit);
+ List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID,
limit, 2);
if(taskIDs == null) {
return null;
}
@@ -614,12 +700,20 @@ public class AMWebController extends Controller {
return;
}
- ArrayList<Map<String, String>> tasksInfo = new ArrayList<Map<String,
String>>();
+ Map<String, Set<String>> counterNames = getCounterListFromRequest();
+
+ ArrayList<Map<String, Object>> tasksInfo = new ArrayList<Map<String,
Object>>();
for(Task t : tasks) {
- Map<String, String> taskInfo = new HashMap<String, String>();
+ Map<String, Object> taskInfo = new HashMap<String, Object>();
taskInfo.put("id", t.getTaskId().toString());
taskInfo.put("progress", Float.toString(t.getProgress()));
taskInfo.put("status", t.getState().toString());
+
+ TezCounters counters = t.getCounters();
+ Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
+ if (counterMap != null && !counterMap.isEmpty()) {
+ taskInfo.put("counters", counterMap);
+ }
tasksInfo.add(taskInfo);
}
@@ -628,6 +722,96 @@ public class AMWebController extends Controller {
));
}
+ /**
+ * getRequestedAttempts
+ * Given a dag and a limit, based on the incoming query parameters. Used by
getAttemptsInfo
+ * returns a list of task instances
+ *
+ * @param dag {DAG}
+ * @param limit {Integer}
+ */
+ List<TaskAttempt> getRequestedAttempts(DAG dag, Integer limit) {
+ List<TaskAttempt> attempts = new ArrayList<TaskAttempt>();
+
+ List<List<Integer>> attemptIDs =
getIDsFromRequest(WebUIService.ATTEMPT_ID, limit, 3);
+ if(attemptIDs == null) {
+ return null;
+ }
+ else if(!attemptIDs.isEmpty()) {
+ for (List<Integer> indexes : attemptIDs) {
+ Vertex vertex = getVertexFromIndex(dag, indexes.get(0));
+ if(vertex == null) {
+ continue;
+ }
+ Task task = vertex.getTask(indexes.get(1));
+ if(task == null) {
+ continue;
+ }
+
+ TaskAttempt attempt = task.
+ getAttempt(TezTaskAttemptID.getInstance(task.getTaskId(),
indexes.get(2)));
+ if(attempt == null) {
+ continue;
+ }
+ else {
+ attempts.add(attempt);
+ }
+
+ if(attempts.size() >= limit) {
+ break;
+ }
+ }
+ }
+
+ return attempts;
+ }
+
+ /**
+ * Renders the response JSON for attemptsInfo API
+ * The JSON will have an array of attempt objects under the key attempts.
+ */
+ public void getAttemptsInfo() {
+ if (!setupResponse()) {
+ return;
+ }
+
+ DAG dag = checkAndGetDAGFromRequest();
+ if (dag == null) {
+ return;
+ }
+
+ int limit = MAX_QUERIED;
+ try {
+ limit = getQueryParamInt(WebUIService.LIMIT);
+ } catch (NumberFormatException e) {
+ //Ignore
+ }
+
+ List<TaskAttempt> attempts = getRequestedAttempts(dag, limit);
+ if(attempts == null) {
+ return;
+ }
+
+ Map<String, Set<String>> counterNames = getCounterListFromRequest();
+
+ ArrayList<Map<String, Object>> attemptsInfo = new ArrayList<Map<String,
Object>>();
+ for(TaskAttempt a : attempts) {
+ Map<String, Object> attemptInfo = new HashMap<String, Object>();
+ attemptInfo.put("id", a.getID().toString());
+ attemptInfo.put("progress", Float.toString(a.getProgress()));
+ attemptInfo.put("status", a.getState().toString());
+
+ TezCounters counters = a.getCounters();
+ Map<String, Map<String, Long>> counterMap =
constructCounterMapInfo(counters, counterNames);
+ if (counterMap != null && !counterMap.isEmpty()) {
+ attemptInfo.put("counters", counterMap);
+ }
+ attemptsInfo.add(attemptInfo);
+ }
+
+ renderJSON(ImmutableMap.of("attempts", attemptsInfo));
+ }
+
@Override
@VisibleForTesting
public void renderJSON(Object object) {
http://git-wip-us.apache.org/repos/asf/tez/blob/64526ea2/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
index 32b57e9..a894d25 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java
@@ -41,6 +41,8 @@ public class WebUIService extends AbstractService {
public static final String VERTEX_ID = "vertexID";
public static final String DAG_ID = "dagID";
public static final String TASK_ID = "taskID";
+ public static final String ATTEMPT_ID = "attemptID";
+ public static final String COUNTERS = "counters";
public static final String LIMIT = "limit";
@@ -202,6 +204,8 @@ public class WebUIService extends AbstractService {
route(WS_PREFIX_V2 + pajoin("verticesInfo", VERTEX_ID, DAG_ID),
AMWebController.class, "getVerticesInfo");
route(WS_PREFIX_V2 + pajoin("tasksInfo", TASK_ID, VERTEX_ID, DAG_ID),
AMWebController.class,
"getTasksInfo");
+ route(WS_PREFIX_V2 + pajoin("attemptsInfo", ATTEMPT_ID, DAG_ID),
AMWebController.class,
+ "getAttemptsInfo");
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/64526ea2/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
----------------------------------------------------------------------
diff --git
a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
index cba3c3e..56a4a82 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java
@@ -38,6 +38,8 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -45,17 +47,21 @@ import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.webapp.Controller;
+import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.client.ProgressBuilder;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
@@ -321,6 +327,9 @@ public class TestAMWebController {
doReturn(true).when(spy).setupResponse();
doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest();
+ Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>();
+ doReturn(counterList).when(spy).getCounterListFromRequest();
+
List<Integer> requested;
if (numVerticesRequested == 0) {
requested = ImmutableList.of();
@@ -352,6 +361,21 @@ public class TestAMWebController {
doReturn(progress).when(mockVertex).getProgress();
doReturn(pb).when(mockVertex).getVertexProgress();
+ TezCounters counters = new TezCounters();
+ counters.addGroup("g1", "g1");
+ counters.addGroup("g2", "g2");
+ counters.addGroup("g3", "g3");
+ counters.addGroup("g4", "g4");
+ counters.findCounter("g1", "g1_c1").setValue(100);
+ counters.findCounter("g1", "g1_c2").setValue(100);
+ counters.findCounter("g2", "g2_c3").setValue(100);
+ counters.findCounter("g2", "g2_c4").setValue(100);
+ counters.findCounter("g3", "g3_c5").setValue(100);
+ counters.findCounter("g3", "g3_c6").setValue(100);
+
+ doReturn(counters).when(mockVertex).getAllCounters();
+ doReturn(counters).when(mockVertex).getCachedCounters();
+
return mockVertex;
}
@@ -557,11 +581,14 @@ public class TestAMWebController {
// Set mock query params
doReturn(limit).when(spy).getQueryParamInt(WebUIService.LIMIT);
doReturn(vertexMinIds).when(spy).getIntegersFromRequest(WebUIService.VERTEX_ID,
limit);
- doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID,
limit);
+ doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID,
limit, 2);
// Set function mocks
doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest();
+ Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>();
+ doReturn(counterList).when(spy).getCounterListFromRequest();
+
spy.getTasksInfo();
verify(spy).renderJSON(returnResultCaptor.capture());
@@ -589,6 +616,20 @@ public class TestAMWebController {
doReturn(status).when(mockTask).getState();
doReturn(progress).when(mockTask).getProgress();
+ TezCounters counters = new TezCounters();
+ counters.addGroup("g1", "g1");
+ counters.addGroup("g2", "g2");
+ counters.addGroup("g3", "g3");
+ counters.addGroup("g4", "g4");
+ counters.findCounter("g1", "g1_c1").setValue(101);
+ counters.findCounter("g1", "g1_c2").setValue(102);
+ counters.findCounter("g2", "g2_c3").setValue(103);
+ counters.findCounter("g2", "g2_c4").setValue(104);
+ counters.findCounter("g3", "g3_c5").setValue(105);
+ counters.findCounter("g3", "g3_c6").setValue(106);
+
+ doReturn(counters).when(mockTask).getCounters();
+
return mockTask;
}
@@ -598,4 +639,152 @@ public class TestAMWebController {
Assert.assertEquals(mockTask.getState().toString(),
taskResult.get("status"));
Assert.assertEquals(Float.toString(mockTask.getProgress()),
taskResult.get("progress"));
}
+
+ //-- Get Attempts Info Tests
-----------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testGetAttemptsInfoWithIds() {
+ List <TaskAttempt> attempts = createMockAttempts();
+ List <Integer> vertexMinIds = Arrays.asList();
+ List <Integer> taskMinIds = Arrays.asList();
+ List <List <Integer>> attemptMinIds = Arrays.asList(Arrays.asList(0, 0, 0),
+ Arrays.asList(0, 0, 1),
+ Arrays.asList(0, 0, 2),
+ Arrays.asList(0, 0, 3));
+
+ // Fetch All
+ Map<String, Object> result = getAttemptsTestHelper(attempts,
attemptMinIds, vertexMinIds,
+ taskMinIds, AMWebController.MAX_QUERIED);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("attempts"));
+
+ ArrayList<Map<String, String>> attemptsInfo = (ArrayList<Map<String,
String>>) result.
+ get("attempts");
+ Assert.assertEquals(4, attemptsInfo.size());
+
+ verifySingleAttemptResult(attempts.get(0), attemptsInfo.get(0));
+ verifySingleAttemptResult(attempts.get(1), attemptsInfo.get(1));
+ verifySingleAttemptResult(attempts.get(2), attemptsInfo.get(2));
+ verifySingleAttemptResult(attempts.get(3), attemptsInfo.get(3));
+
+ // With limit
+ result = getAttemptsTestHelper(attempts, attemptMinIds, vertexMinIds,
taskMinIds, 2);
+
+ Assert.assertEquals(1, result.size());
+ Assert.assertTrue(result.containsKey("attempts"));
+
+ attemptsInfo = (ArrayList<Map<String, String>>) result.get("attempts");
+ Assert.assertEquals(2, attemptsInfo.size());
+
+ verifySingleAttemptResult(attempts.get(0), attemptsInfo.get(0));
+ verifySingleAttemptResult(attempts.get(1), attemptsInfo.get(1));
+ }
+
+ Map<String, Object> getAttemptsTestHelper(List<TaskAttempt> attempts, List
<List <Integer>> attemptMinIds,
+ List<Integer> vertexMinIds,
List<Integer> taskMinIds, Integer limit) {
+ //Creating mock DAG
+ DAG mockDAG = mock(DAG.class);
+
doReturn(TezDAGID.fromString("dag_1441301219877_0109_1")).when(mockDAG).getID();
+
+ //Creating mock vertex and attaching to mock DAG
+ TezVertexID vertexID =
TezVertexID.fromString("vertex_1441301219877_0109_1_00");
+ Vertex mockVertex = mock(Vertex.class);
+ doReturn(vertexID).when(mockVertex).getVertexId();
+
+ doReturn(mockVertex).when(mockDAG).getVertex(vertexID);
+ doReturn(ImmutableMap.of(
+ vertexID, mockVertex
+ )).when(mockDAG).getVertices();
+
+ //Creating mock task and attaching to mock Vertex
+ TezTaskID taskID =
TezTaskID.fromString("task_1441301219877_0109_1_00_000000");
+ Task mockTask = mock(Task.class);
+ doReturn(taskID).when(mockTask).getTaskId();
+ int taskIndex = taskID.getId();
+ doReturn(mockTask).when(mockVertex).getTask(taskIndex);
+ doReturn(ImmutableMap.of(
+ taskID, mockTask
+ )).when(mockVertex).getTasks();
+
+ //Creating mock tasks and attaching to mock vertex
+ Map<TezTaskAttemptID, TaskAttempt> attemptsMap = Maps.newHashMap();
+ for(TaskAttempt attempt : attempts) {
+ TezTaskAttemptID attemptId = attempt.getID();
+ doReturn(attempt).when(mockTask).getAttempt(attemptId);
+ attemptsMap.put(attemptId, attempt);
+ }
+ doReturn(attemptsMap).when(mockTask).getAttempts();
+
+ //Creates & setup controller spy
+ AMWebController amWebController = new AMWebController(mockRequestContext,
mockAppContext,
+ "TEST_HISTORY_URL");
+ AMWebController spy = spy(amWebController);
+ doReturn(true).when(spy).setupResponse();
+ doNothing().when(spy).renderJSON(any());
+
+ // Set mock query params
+ doReturn(limit).when(spy).getQueryParamInt(WebUIService.LIMIT);
+
doReturn(vertexMinIds).when(spy).getIntegersFromRequest(WebUIService.VERTEX_ID,
limit);
+ doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID,
limit, 2);
+
doReturn(attemptMinIds).when(spy).getIDsFromRequest(WebUIService.ATTEMPT_ID,
limit, 3);
+
+ // Set function mocks
+ doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest();
+
+ Map<String, Set<String>> counterList = new TreeMap<String, Set<String>>();
+ doReturn(counterList).when(spy).getCounterListFromRequest();
+
+ spy.getAttemptsInfo();
+ verify(spy).renderJSON(returnResultCaptor.capture());
+
+ return returnResultCaptor.getValue();
+ }
+
+ private List<TaskAttempt> createMockAttempts() {
+ TaskAttempt mockAttempt1 =
createMockAttempt("attempt_1441301219877_0109_1_00_000000_0",
TaskAttemptState.RUNNING,
+ 0.33f);
+ TaskAttempt mockAttempt2 =
createMockAttempt("attempt_1441301219877_0109_1_00_000000_1",
TaskAttemptState.SUCCEEDED,
+ 1.0f);
+ TaskAttempt mockAttempt3 =
createMockAttempt("attempt_1441301219877_0109_1_00_000000_2",
TaskAttemptState.FAILED,
+ .8f);
+ TaskAttempt mockAttempt4 =
createMockAttempt("attempt_1441301219877_0109_1_00_000000_3",
TaskAttemptState.SUCCEEDED,
+ .8f);
+
+ List <TaskAttempt> attempts = Arrays.asList(mockAttempt1, mockAttempt2,
mockAttempt3, mockAttempt4);
+ return attempts;
+ }
+
+ private TaskAttempt createMockAttempt(String attemptIDStr, TaskAttemptState
status, float progress) {
+ TaskAttempt mockAttempt = mock(TaskAttempt.class);
+
+
doReturn(TezTaskAttemptID.fromString(attemptIDStr)).when(mockAttempt).getID();
+ doReturn(status).when(mockAttempt).getState();
+ doReturn(progress).when(mockAttempt).getProgress();
+
+ TezCounters counters = new TezCounters();
+ counters.addGroup("g1", "g1");
+ counters.addGroup("g2", "g2");
+ counters.addGroup("g3", "g3");
+ counters.addGroup("g4", "g4");
+ counters.findCounter("g1", "g1_c1").setValue(101);
+ counters.findCounter("g1", "g1_c2").setValue(102);
+ counters.findCounter("g2", "g2_c3").setValue(103);
+ counters.findCounter("g2", "g2_c4").setValue(104);
+ counters.findCounter("g3", "g3_c5").setValue(105);
+ counters.findCounter("g3", "g3_c6").setValue(106);
+
+ doReturn(counters).when(mockAttempt).getCounters();
+
+ return mockAttempt;
+ }
+
+ private void verifySingleAttemptResult(TaskAttempt mockTask, Map<String,
String> taskResult) {
+ Assert.assertEquals(3, taskResult.size());
+ Assert.assertEquals(mockTask.getID().toString(), taskResult.get("id"));
+ Assert.assertEquals(mockTask.getState().toString(),
taskResult.get("status"));
+ Assert.assertEquals(Float.toString(mockTask.getProgress()),
taskResult.get("progress"));
+ }
+
}