Repository: tez Updated Branches: refs/heads/master b288be709 -> 37cab1284
TEZ-2792. Add AM web service API for tasks (sree) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/37cab128 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/37cab128 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/37cab128 Branch: refs/heads/master Commit: 37cab12849e4cc758d13ae2642c92b1999269827 Parents: b288be7 Author: Sreenath Somarajapuram <[email protected]> Authored: Fri Sep 11 22:53:03 2015 +0530 Committer: Sreenath Somarajapuram <[email protected]> Committed: Fri Sep 11 22:53:03 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/web/AMWebController.java | 206 ++++++++++++++++- .../apache/tez/dag/app/web/WebUIService.java | 49 +++- .../tez/dag/app/web/TestAMWebController.java | 231 +++++++++++++++++++ 4 files changed, 482 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c373fe7..de6ad13 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -175,6 +175,7 @@ Release 0.7.1: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2792. Add AM web service API for tasks TEZ-2807. Log data in the finish event instead of the start event TEZ-2766. Tez UI: Add vertex in-progress info in DAG details TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index db27d59..06f282c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -28,12 +28,15 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.StringTokenizer; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.google.inject.name.Named; import org.apache.tez.dag.api.client.ProgressBuilder; +import org.apache.tez.dag.app.dag.Task; +import org.apache.tez.dag.records.TezTaskID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.security.UserGroupInformation; @@ -67,7 +70,7 @@ public class AMWebController extends Controller { static final String VERTEX_PROGRESS = "vertexProgress"; static final String VERTEX_PROGRESSES = "vertexProgresses"; - static final int MAX_VERTICES_QUERIED = 100; + static final int MAX_QUERIED = 100; public static final String VERSION = "2"; private AppContext appContext; @@ -266,7 +269,7 @@ public class AMWebController extends Controller { List<Integer> vertexIDs = new ArrayList<Integer>(); try { dagID = getQueryParamInt(WebUIService.DAG_ID); - for (String vertexIDStr : $(WebUIService.VERTEX_ID).trim().split(",", MAX_VERTICES_QUERIED)) { + for (String vertexIDStr : $(WebUIService.VERTEX_ID).trim().split(",", MAX_QUERIED)) { vertexIDs.add(Integer.parseInt(vertexIDStr)); } } catch (NumberFormatException e) { @@ -342,7 +345,7 @@ public class AMWebController extends Controller { List<Integer> vertexIDs = new ArrayList<>(); if (!valueStr.equals("")) { - String[] vertexIdsStr = valueStr.split(",", MAX_VERTICES_QUERIED); + String[] vertexIdsStr = valueStr.split(",", MAX_QUERIED); try { for (String vertexIdStr : vertexIdsStr) { @@ -359,6 +362,86 @@ public class AMWebController extends Controller { return vertexIDs; } + List<String> splitString(String str, String delimiter, Integer limit) { + List<String> items = new ArrayList<>(); + + StringTokenizer tokenizer = new StringTokenizer(str, delimiter); + for(int count = 0; tokenizer.hasMoreElements() && count < limit; count ++) { + items.add(tokenizer.nextToken()); + } + + return items; + } + + /** + * getIntegersFromRequest + * Parses a query parameter with comma separated values and returns an array of integers. + * The function returns null if any of the value is not an integer + * + * @param paramName {String} + * @param limit {Integer} Maximum number of values to be taken + * + * @return {List<Integer>} List of parsed values + */ + List<Integer> getIntegersFromRequest(String paramName, Integer limit) { + String valuesStr = $(paramName).trim(); + + List<Integer> values = new ArrayList<>(); + if (!valuesStr.equals("")) { + try { + for (String valueStr : splitString(valuesStr, ",", limit)) { + int value = Integer.parseInt(valueStr); + values.add(value); + } + } catch (NumberFormatException nfe) { + sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, + String.format("invalid %s passed in as parameter", paramName), nfe); + values = null; + } + } + + return values; + } + + /** + * getIDsFromRequest + * Takes in "1_0,1_3" and returns [[1,0],[1,3]] + * Mainly to parse a query parameter with comma separated indexes. For vertex its the index, + * for task its vertexIndex_taskIndex and for attempts its vertexIndex_taskIndex_attemptNo + * The function returns null if any of the value is not an integer + * + * @param paramName {String} + * @param limit {Integer} Maximum number of values to be taken + * + * @return {List<List<Integer>>} List of parsed values + */ + List<List<Integer>> getIDsFromRequest(String paramName, Integer limit) { + String valuesStr = $(paramName).trim(); + + List<List<Integer>> values = new ArrayList<>(); + if (!valuesStr.equals("")) { + try { + for (String valueStr : splitString(valuesStr, ",", limit)) { + List<Integer> innerValues = new ArrayList<>(); + String innerValueStrs[] = valueStr.split("_"); + if(innerValueStrs.length == 2) { + for (String innerValueStr : innerValueStrs) { + int value = Integer.parseInt(innerValueStr); + innerValues.add(value); + } + values.add(innerValues); + } + } + } catch (NumberFormatException nfe) { + sendErrorResponse(HttpServletResponse.SC_BAD_REQUEST, + String.format("invalid %s passed in as parameter", paramName), nfe); + values = null; + } + } + + return values; + } + public void getDagInfo() { if (!setupResponse()) { return; @@ -413,7 +496,7 @@ public class AMWebController extends Controller { } Collection<Vertex> vertexList; - if (requestedIDs.size() == 0) { + if (requestedIDs.isEmpty()) { // no ids specified return all. vertexList = dag.getVertices().values(); } else { @@ -430,6 +513,121 @@ public class AMWebController extends Controller { )); } + Vertex getVertexFromIndex(DAG dag, Integer vertexIndex) { + final TezVertexID tezVertexID = TezVertexID.getInstance(dag.getID(), vertexIndex); + Vertex vertex = dag.getVertex(tezVertexID); + return vertex; + } + + /** + * getRequestedTasks + * Heart of getTasksInfo. Given a dag and a limit, based on the incoming query parameters + * returns a list of task instances + * + * @param dag {DAG} + * @param limit {Integer} + */ + List<Task> getRequestedTasks(DAG dag, Integer limit) { + List<Task> tasks = new ArrayList<>(); + + List<List<Integer>> taskIDs = getIDsFromRequest(WebUIService.TASK_ID, limit); + if(taskIDs == null) { + return null; + } + else if(!taskIDs.isEmpty()) { + for (List<Integer> indexes : taskIDs) { + Vertex vertex = getVertexFromIndex(dag, indexes.get(0)); + if(vertex == null) { + continue; + } + Task task = vertex.getTask(indexes.get(1)); + if(task == null) { + continue; + } + else { + tasks.add(task); + } + + if(tasks.size() >= limit) { + break; + } + } + } + else { + List<Integer> vertexIDs = getIntegersFromRequest(WebUIService.VERTEX_ID, limit); + if(vertexIDs == null) { + return null; + } + else if(!vertexIDs.isEmpty()) { + for (Integer vertexID : vertexIDs) { + Vertex vertex = getVertexFromIndex(dag, vertexID); + if(vertex == null) { + continue; + } + List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values()); + tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size()))); + + if(tasks.size() >= limit) { + break; + } + } + } + else { + Collection<Vertex> vertices = dag.getVertices().values(); + for (Vertex vertex : vertices) { + List<Task> vertexTasks = new ArrayList<>(vertex.getTasks().values()); + tasks.addAll(vertexTasks.subList(0, Math.min(vertexTasks.size(), limit - tasks.size()))); + + if(tasks.size() >= limit) { + break; + } + } + } + } + + return tasks; + } + + /** + * Renders the response JSON for tasksInfo API + * The JSON will have an array of task objects under the key tasks. + */ + public void getTasksInfo() { + if (!setupResponse()) { + return; + } + + DAG dag = checkAndGetDAGFromRequest(); + if (dag == null) { + return; + } + + int limit = MAX_QUERIED; + try { + limit = getQueryParamInt(WebUIService.LIMIT); + } catch (NumberFormatException e) { + //Ignore + } + + List<Task> tasks = getRequestedTasks(dag, limit); + if(tasks == null) { + return; + } + + ArrayList<Map<String, String>> tasksInfo = new ArrayList<>(); + for(Task t : tasks) { + Map<String, String> taskInfo = new HashMap<>(); + taskInfo.put("id", t.getTaskId().toString()); + taskInfo.put("progress", Float.toString(t.getProgress())); + taskInfo.put("status", t.getState().toString()); + tasksInfo.add(taskInfo); + } + + renderJSON(ImmutableMap.of( + "tasks", tasksInfo + )); + } + @Override @VisibleForTesting public void renderJSON(Object object) { http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java index 19e1641..32b57e9 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/WebUIService.java @@ -40,6 +40,9 @@ public class WebUIService extends AbstractService { private static final String WS_PREFIX_V2 = "/ui/ws/v2/tez/"; public static final String VERTEX_ID = "vertexID"; public static final String DAG_ID = "dagID"; + public static final String TASK_ID = "taskID"; + + public static final String LIMIT = "limit"; private static final Logger LOG = LoggerFactory.getLogger(WebUIService.class); @@ -152,9 +155,53 @@ public class WebUIService extends AbstractService { route(WS_PREFIX + pajoin("vertexProgresses", VERTEX_ID, DAG_ID), AMWebController.class, "getVertexProgresses"); - // v2 api + /** + * AM Web Service API V2 + * The API facilitates end points that would serve the user with real-time data on dag, + * vertex and tasks. + * + * Query Params: + * dagID - Same as dagIndex. Expects one single value. (Its mandatory in all APIs) + * vertexID - Same as vertex index. Can be a list of comma separated values + * taskID - Should be of the format <vertexIndex>_<taskIndex>. For instance task with + * index 5 in vertex 3 can be referenced using the id 3_5 + * limit - The max number of records returned. Currently supported only in tasksInfo. + * If not passed, limit would be taken as 100 + * + * APIs: + * /ui/ws/v2/tez/dagInfo + * Query param: + * - Accepts one single parameter, dagID + * Data returned: + * - Full id, progress, status + * + * /ui/ws/v2/tez/verticesInfo + * Query params: + * - Accepts dagID and vertexID + * - vertexID is optional + * - If specified the respective vertices will be returned, else all vertices + * in the DAG will be returned + * Data returned: + * - Full id, progress, status, totalTasks, runningTasks, succeededTasks + * failedTaskAttempts, killedTaskAttempts + * + * /ui/ws/v2/tez/tasksInfo + * Query params: + * - Accepts dagID, vertexID, taskID & limit + * - vertex and task IDs are optional + * - If taskID is passed: All (capped by limit) the specified tasks will be + * returned. vertexID if present wont be considered + * - IF vertexID is passed: All (capped by limit) tasks under the vertices + * will be returned + * - If just dagID is passed: All (capped by limit) tasks under the DAG + * will be returned + * Data returned: + * - Full id, progress, status + */ route(WS_PREFIX_V2 + pajoin("dagInfo", DAG_ID), AMWebController.class, "getDagInfo"); route(WS_PREFIX_V2 + pajoin("verticesInfo", VERTEX_ID, DAG_ID), AMWebController.class, "getVerticesInfo"); + route(WS_PREFIX_V2 + pajoin("tasksInfo", TASK_ID, VERTEX_ID, DAG_ID), AMWebController.class, + "getTasksInfo"); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/37cab128/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java index 62779bc..cba3c3e 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/web/TestAMWebController.java @@ -33,23 +33,30 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.webapp.Controller; import org.apache.tez.common.security.ACLManager; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.client.ProgressBuilder; +import org.apache.tez.dag.api.oldrecords.TaskState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.DAGState; +import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.Vertex; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.junit.Assert; import org.junit.Before; @@ -367,4 +374,228 @@ public class TestAMWebController { vertex2Result.get("failedTaskAttempts")); } + //-- Get Tasks Info Tests ----------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testGetTasksInfoWithTaskIds() { + List <Task> tasks = createMockTasks(); + List <Integer> vertexMinIds = Arrays.asList(); + List <List <Integer>> taskMinIds = Arrays.asList(Arrays.asList(0, 0), + Arrays.asList(0, 3), + Arrays.asList(0, 1)); + + // Fetch All + Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, + AMWebController.MAX_QUERIED); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("tasks")); + + ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result. + get("tasks"); + Assert.assertEquals(3, tasksInfo.size()); + + verifySingleTaskResult(tasks.get(0), tasksInfo.get(0)); + verifySingleTaskResult(tasks.get(3), tasksInfo.get(1)); + verifySingleTaskResult(tasks.get(1), tasksInfo.get(2)); + + // With limit + result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, 2); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("tasks")); + + tasksInfo = (ArrayList<Map<String, String>>) result.get("tasks"); + Assert.assertEquals(2, tasksInfo.size()); + + verifySingleTaskResult(tasks.get(0), tasksInfo.get(0)); + verifySingleTaskResult(tasks.get(3), tasksInfo.get(1)); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testGetTasksInfoGracefulTaskFetch() { + List <Task> tasks = createMockTasks(); + List <Integer> vertexMinIds = Arrays.asList(); + List <List <Integer>> taskMinIds = Arrays.asList(Arrays.asList(0, 0), + Arrays.asList(0, 6), + Arrays.asList(0, 1)); + + // Fetch All + Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, + AMWebController.MAX_QUERIED); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("tasks")); + + ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result. + get("tasks"); + Assert.assertEquals(2, tasksInfo.size()); + + verifySingleTaskResult(tasks.get(0), tasksInfo.get(0)); + verifySingleTaskResult(tasks.get(1), tasksInfo.get(1)); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testGetTasksInfoWithVertexId() { + List <Task> tasks = createMockTasks(); + List <Integer> vertexMinIds = Arrays.asList(0); + List <List <Integer>> taskMinIds = Arrays.asList(); + + Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, + AMWebController.MAX_QUERIED); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("tasks")); + + ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result. + get("tasks"); + Assert.assertEquals(4, tasksInfo.size()); + + sortMapList(tasksInfo, "id"); + verifySingleTaskResult(tasks.get(0), tasksInfo.get(0)); + verifySingleTaskResult(tasks.get(1), tasksInfo.get(1)); + verifySingleTaskResult(tasks.get(2), tasksInfo.get(2)); + verifySingleTaskResult(tasks.get(3), tasksInfo.get(3)); + + // With limit + result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, 2); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("tasks")); + + tasksInfo = (ArrayList<Map<String, String>>) result.get("tasks"); + Assert.assertEquals(2, tasksInfo.size()); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 5000) + public void testGetTasksInfoWithJustDAGId() { + List <Task> tasks = createMockTasks(); + List <Integer> vertexMinIds = Arrays.asList(); + List <List <Integer>> taskMinIds = Arrays.asList(); + + Map<String, Object> result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, + AMWebController.MAX_QUERIED); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("tasks")); + + ArrayList<Map<String, String>> tasksInfo = (ArrayList<Map<String, String>>) result. + get("tasks"); + Assert.assertEquals(4, tasksInfo.size()); + + sortMapList(tasksInfo, "id"); + verifySingleTaskResult(tasks.get(0), tasksInfo.get(0)); + verifySingleTaskResult(tasks.get(1), tasksInfo.get(1)); + verifySingleTaskResult(tasks.get(2), tasksInfo.get(2)); + verifySingleTaskResult(tasks.get(3), tasksInfo.get(3)); + + // With limit + result = getTasksTestHelper(tasks, taskMinIds, vertexMinIds, 2); + + Assert.assertEquals(1, result.size()); + Assert.assertTrue(result.containsKey("tasks")); + + tasksInfo = (ArrayList<Map<String, String>>) result.get("tasks"); + Assert.assertEquals(2, tasksInfo.size()); + } + + private void sortMapList(ArrayList<Map<String, String>> list, String propertyName) { + class MapComparator implements Comparator<Map<String, String>> { + private final String key; + + public MapComparator(String key) { + this.key = key; + } + + public int compare(Map<String, String> first, Map<String, String> second) { + String firstValue = first.get(key); + String secondValue = second.get(key); + return firstValue.compareTo(secondValue); + } + } + + Collections.sort(list, new MapComparator(propertyName)); + } + + Map<String, Object> getTasksTestHelper(List<Task> tasks, List <List <Integer>> taskMinIds, + List<Integer> vertexMinIds, Integer limit) { + //Creating mock DAG + DAG mockDAG = mock(DAG.class); + doReturn(TezDAGID.fromString("dag_1441301219877_0109_1")).when(mockDAG).getID(); + + //Creating mock vertex and attaching to mock DAG + TezVertexID vertexID = TezVertexID.fromString("vertex_1441301219877_0109_1_00"); + Vertex mockVertex = mock(Vertex.class); + doReturn(vertexID).when(mockVertex).getVertexId(); + + doReturn(mockVertex).when(mockDAG).getVertex(vertexID); + doReturn(ImmutableMap.of( + vertexID, mockVertex + )).when(mockDAG).getVertices(); + + //Creating mock tasks and attaching to mock vertex + Map<TezTaskID, Task> taskMap = Maps.newHashMap(); + for(Task task : tasks) { + TezTaskID taskId = task.getTaskId(); + int taskIndex = taskId.getId(); + doReturn(task).when(mockVertex).getTask(taskIndex); + taskMap.put(taskId, task); + } + doReturn(taskMap).when(mockVertex).getTasks(); + + //Creates & setup controller spy + AMWebController amWebController = new AMWebController(mockRequestContext, mockAppContext, + "TEST_HISTORY_URL"); + AMWebController spy = spy(amWebController); + doReturn(true).when(spy).setupResponse(); + doNothing().when(spy).renderJSON(any()); + + // Set mock query params + doReturn(limit).when(spy).getQueryParamInt(WebUIService.LIMIT); + doReturn(vertexMinIds).when(spy).getIntegersFromRequest(WebUIService.VERTEX_ID, limit); + doReturn(taskMinIds).when(spy).getIDsFromRequest(WebUIService.TASK_ID, limit); + + // Set function mocks + doReturn(mockDAG).when(spy).checkAndGetDAGFromRequest(); + + spy.getTasksInfo(); + verify(spy).renderJSON(returnResultCaptor.capture()); + + return returnResultCaptor.getValue(); + } + + private List<Task> createMockTasks() { + Task mockTask1 = createMockTask("task_1441301219877_0109_1_00_000000", TaskState.RUNNING, + 0.33f); + Task mockTask2 = createMockTask("task_1441301219877_0109_1_00_000001", TaskState.SUCCEEDED, + 1.0f); + Task mockTask3 = createMockTask("task_1441301219877_0109_1_00_000002", TaskState.SUCCEEDED, + .8f); + Task mockTask4 = createMockTask("task_1441301219877_0109_1_00_000003", TaskState.SUCCEEDED, + .8f); + + List <Task> tasks = Arrays.asList(mockTask1, mockTask2, mockTask3, mockTask4); + return tasks; + } + + private Task createMockTask(String taskIDStr, TaskState status, float progress) { + Task mockTask = mock(Task.class); + + doReturn(TezTaskID.fromString(taskIDStr)).when(mockTask).getTaskId(); + doReturn(status).when(mockTask).getState(); + doReturn(progress).when(mockTask).getProgress(); + + return mockTask; + } + + private void verifySingleTaskResult(Task mockTask, Map<String, String> taskResult) { + Assert.assertEquals(3, taskResult.size()); + Assert.assertEquals(mockTask.getTaskId().toString(), taskResult.get("id")); + Assert.assertEquals(mockTask.getState().toString(), taskResult.get("status")); + Assert.assertEquals(Float.toString(mockTask.getProgress()), taskResult.get("progress")); + } }
