This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cd914869d add API to get progress of subtasks with given state 
(#10132)
2cd914869d is described below

commit 2cd914869d7e1ec81e30f737ba7d2eea2b2385ea
Author: Haitao Zhang <[email protected]>
AuthorDate: Wed Jan 25 00:52:13 2023 -0800

    add API to get progress of subtasks with given state (#10132)
    
    * add API to get progress of subtasks with given state
    
    * fix bugs
    
    * revise API response format
    
    * address comments
    
    * rename /tasks/subtask/state/{subTaskState}/progress
    
    * revise minion side API
---
 .../api/resources/PinotTaskRestletResource.java    |  50 +++++++-
 .../core/minion/PinotHelixTaskResourceManager.java |  54 ++++++++
 .../resources/PinotTaskRestletResourceTest.java    | 139 +++++++++++++++++++++
 .../minion/PinotHelixTaskResourceManagerTest.java  |  73 +++++++++++
 .../api/resources/PinotTaskProgressResource.java   |  60 ++++++++-
 .../pinot/minion/event/MinionEventObserver.java    |  16 +++
 .../pinot/minion/event/MinionEventObservers.java   |  20 +++
 .../pinot/minion/event/MinionProgressObserver.java |  28 ++++-
 .../apache/pinot/minion/event/MinionTaskState.java |  45 +++++++
 .../resources/PinotTaskProgressResourceTest.java   | 105 ++++++++++++++++
 .../minion/event/MinionEventObserversTest.java     |  23 ++++
 .../minion/event/MinionProgressObserverTest.java   |  27 ++++
 12 files changed, 632 insertions(+), 8 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index c2605c7614..11491061e7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -25,14 +25,18 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiKeyAuthDefinition;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -58,6 +62,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
 import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.task.TaskPartitionState;
@@ -419,7 +424,7 @@ public class PinotTaskRestletResource {
   @GET
   @Path("/tasks/subtask/{taskName}/progress")
   @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation("Get progress of specified sub tasks for the given task 
tracked by worker in memory")
+  @ApiOperation("Get progress of specified sub tasks for the given task 
tracked by minion worker in memory")
   public String getSubtaskProgress(@Context HttpHeaders httpHeaders,
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName,
       @ApiParam(value = "Sub task names separated by comma") 
@QueryParam("subtaskNames") @Nullable
@@ -451,6 +456,49 @@ public class PinotTaskRestletResource {
     }
   }
 
+  @GET
+  @Path("/tasks/subtask/workers/progress")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation("Get progress of all subtasks with specified state tracked by 
minion worker in memory")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
+  })
+  public String getSubtaskOnWorkerProgress(@Context HttpHeaders httpHeaders,
+      @ApiParam(value = "Subtask state 
(UNKNOWN,IN_PROGRESS,SUCCEEDED,CANCELLED,ERROR)", required = true)
+      @QueryParam("subTaskState") String subTaskState,
+      @ApiParam(value = "Minion worker IDs separated by comma") 
@QueryParam("minionWorkerIds") @Nullable
+          String minionWorkerIds) {
+    Set<String> selectedMinionWorkers = new HashSet<>();
+    if (StringUtils.isNotEmpty(minionWorkerIds)) {
+      selectedMinionWorkers.addAll(
+          Arrays.stream(StringUtils.split(minionWorkerIds, 
',')).map(String::trim).collect(Collectors.toList()));
+    }
+    // Relying on original schema that was used to query the controller
+    String scheme = _uriInfo.getRequestUri().getScheme();
+    List<InstanceConfig> allMinionWorkerInstanceConfigs = 
_pinotHelixResourceManager.getAllMinionInstanceConfigs();
+    Map<String, String> selectedMinionWorkerEndpoints = new HashMap<>();
+    for (InstanceConfig worker : allMinionWorkerInstanceConfigs) {
+      if (selectedMinionWorkers.isEmpty() || 
selectedMinionWorkers.contains(worker.getId())) {
+        selectedMinionWorkerEndpoints.put(worker.getId(),
+            String.format("%s://%s:%d", scheme, worker.getHostName(), 
Integer.parseInt(worker.getPort())));
+      }
+    }
+    Map<String, String> requestHeaders = new HashMap<>();
+    httpHeaders.getRequestHeaders().keySet().forEach(header ->
+        requestHeaders.put(header, httpHeaders.getHeaderString(header)));
+    int timeoutMs = _controllerConf.getMinionAdminRequestTimeoutSeconds() * 
1000;
+    try {
+      Map<String, Object> minionWorkerIdSubtaskProgressMap =
+          
_pinotHelixTaskResourceManager.getSubtaskOnWorkerProgress(subTaskState, 
_executor, _connectionManager,
+              selectedMinionWorkerEndpoints, requestHeaders, timeoutMs);
+      return JsonUtils.objectToString(minionWorkerIdSubtaskProgressMap);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to get minion worker side progress for 
subtasks with state %s due to error: %s",
+              subTaskState, ExceptionUtils.getStackTrace(e)), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   @GET
   @Path("/tasks/scheduler/information")
   @ApiOperation("Fetch cron scheduler information")
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index f7441593c9..15851efd5e 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -37,6 +37,7 @@ import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.lang3.StringUtils;
@@ -620,6 +621,59 @@ public class PinotHelixTaskResourceManager {
     return subtaskProgressMap;
   }
 
+  /**
+   * Gets progress of all subtasks with specified state tracked by given 
minion workers in memory
+   * @param subtaskState a specified subtask state, valid values are in 
org.apache.pinot.minion.event.MinionTaskState
+   * @param executor an {@link Executor} used to run logic on
+   * @param connMgr a {@link HttpConnectionManager} used to manage http 
connections
+   * @param selectedMinionWorkerEndpoints a map of worker id to http endpoint 
for minions to get subtask progress from
+   * @param requestHeaders http headers used to send requests to minion workers
+   * @param timeoutMs timeout (in millisecond) for requests sent to minion 
workers
+   * @return a map of minion worker id to subtask progress
+   */
+  public synchronized Map<String, Object> getSubtaskOnWorkerProgress(String 
subtaskState,
+      Executor executor, HttpConnectionManager connMgr, Map<String, String> 
selectedMinionWorkerEndpoints,
+      Map<String, String> requestHeaders, int timeoutMs)
+      throws JsonProcessingException {
+    return getSubtaskOnWorkerProgress(subtaskState,
+        new CompletionServiceHelper(executor, connMgr, HashBiMap.create(0)), 
selectedMinionWorkerEndpoints,
+        requestHeaders, timeoutMs);
+  }
+
+  @VisibleForTesting
+  Map<String, Object> getSubtaskOnWorkerProgress(String subtaskState,
+      CompletionServiceHelper completionServiceHelper, Map<String, String> 
selectedMinionWorkerEndpoints,
+      Map<String, String> requestHeaders, int timeoutMs)
+      throws JsonProcessingException {
+    Map<String, Object> minionWorkerIdSubtaskProgressMap = new HashMap<>();
+    if (selectedMinionWorkerEndpoints.isEmpty()) {
+      return minionWorkerIdSubtaskProgressMap;
+    }
+    Map<String, String> minionWorkerUrlToWorkerIdMap = 
selectedMinionWorkerEndpoints.entrySet().stream()
+        .collect(Collectors.toMap(
+            entry -> 
String.format("%s/tasks/subtask/state/progress?subTaskState=%s", 
entry.getValue(), subtaskState),
+            Map.Entry::getKey));
+    List<String> workerUrls = new 
ArrayList<>(minionWorkerUrlToWorkerIdMap.keySet());
+    LOGGER.debug("Getting task progress with workerUrls: {}", workerUrls);
+    // Scatter and gather progress from multiple workers.
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(workerUrls, null, true, 
requestHeaders, timeoutMs);
+    for (Map.Entry<String, String> entry : 
serviceResponse._httpResponses.entrySet()) {
+      String worker = entry.getKey();
+      String resp = entry.getValue();
+      LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
+      minionWorkerIdSubtaskProgressMap
+          .put(minionWorkerUrlToWorkerIdMap.get(worker), 
JsonUtils.stringToObject(resp, Map.class));
+    }
+    if (serviceResponse._failedResponseCount > 0) {
+      // Instead of aborting, subtasks without worker side progress return the 
task status tracked by Helix.
+      // The detailed worker failure response is logged as error by 
CompletionServiceResponse for debugging.
+      LOGGER.warn("There were {} workers failed to report task progress. Got 
partial progress info: {}",
+          serviceResponse._failedResponseCount, 
minionWorkerIdSubtaskProgressMap);
+    }
+    return minionWorkerIdSubtaskProgressMap;
+  }
+
   /**
    * Helper method to return a map of task names to corresponding task state
    * where the task corresponds to the given Pinot table name. This is used to
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
new file mode 100644
index 0000000000..764b1d53d1
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.UriInfo;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.controller.ControllerConf;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+
+public class PinotTaskRestletResourceTest {
+  @Mock
+  PinotHelixResourceManager _pinotHelixResourceManager;
+  @Mock
+  PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+  @Mock
+  ControllerConf _controllerConf;
+  @Mock
+  UriInfo _uriInfo;
+
+  @InjectMocks
+  PinotTaskRestletResource _pinotTaskRestletResource;
+
+  @BeforeMethod
+  public void init()
+      throws URISyntaxException {
+    MockitoAnnotations.openMocks(this);
+    when(_uriInfo.getRequestUri()).thenReturn(new 
URI("http://localhost:9000";));
+  }
+
+  @Test
+  public void 
testGetSubtaskWithGivenStateProgressWhenMinionWorkerIdsAreNotSpecified()
+      throws JsonProcessingException {
+    Map<String, String> minionWorkerEndpoints
+        = 
invokeGetSubtaskWithGivenStateProgressAndReturnCapturedMinionWorkerEndpoints(null);
+    assertEquals(minionWorkerEndpoints,
+        ImmutableMap.of("minion1", "http://minion1:9514";, "minion2", 
"http://minion2:9514";));
+  }
+
+  @Test
+  public void 
testGetSubtaskWithGivenStateProgressWhenAllMinionWorkerIdsAreSpecified()
+      throws JsonProcessingException {
+    // use minion worker ids with spaces ensure they will be trimmed.
+    Map<String, String> minionWorkerEndpoints
+        = 
invokeGetSubtaskWithGivenStateProgressAndReturnCapturedMinionWorkerEndpoints(" 
minion1 , minion2 ");
+    assertEquals(minionWorkerEndpoints,
+        ImmutableMap.of("minion1", "http://minion1:9514";, "minion2", 
"http://minion2:9514";));
+  }
+
+  @Test
+  public void 
testGetSubtaskWithGivenStateProgressWhenOneMinionWorkerIdIsSpecified()
+      throws JsonProcessingException {
+    Map<String, String> minionWorkerEndpoints
+        = 
invokeGetSubtaskWithGivenStateProgressAndReturnCapturedMinionWorkerEndpoints("minion1");
+    assertEquals(minionWorkerEndpoints,
+        ImmutableMap.of("minion1", "http://minion1:9514";));
+  }
+
+  private Map<String, String> 
invokeGetSubtaskWithGivenStateProgressAndReturnCapturedMinionWorkerEndpoints(
+      String minionWorkerIds)
+      throws JsonProcessingException {
+    InstanceConfig minion1 = createInstanceConfig("minion1", "minion1", 
"9514");
+    InstanceConfig minion2 = createInstanceConfig("minion2", "minion2", 
"9514");
+    
when(_pinotHelixResourceManager.getAllMinionInstanceConfigs()).thenReturn(ImmutableList.of(minion1,
 minion2));
+    HttpHeaders httpHeaders = Mockito.mock(HttpHeaders.class);
+    when(httpHeaders.getRequestHeaders()).thenReturn(new 
MultivaluedHashMap<>());
+    ArgumentCaptor<Map<String, String>> minionWorkerEndpointsCaptor = 
ArgumentCaptor.forClass(Map.class);
+    
when(_pinotHelixTaskResourceManager.getSubtaskOnWorkerProgress(anyString(), 
any(), any(),
+        minionWorkerEndpointsCaptor.capture(), anyMap(), anyInt()))
+        .thenReturn(Collections.emptyMap());
+    String progress =
+        _pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders, 
"IN_PROGRESS", minionWorkerIds);
+    assertEquals(progress, "{}");
+    return minionWorkerEndpointsCaptor.getValue();
+  }
+
+  private InstanceConfig createInstanceConfig(String instanceId, String 
hostName, String port) {
+    InstanceConfig instanceConfig = new InstanceConfig(instanceId);
+    instanceConfig.setHostName(hostName);
+    instanceConfig.setPort(port);
+    return instanceConfig;
+  }
+
+
+  @Test
+  public void testGetSubtaskWithGivenStateProgressWithException()
+      throws JsonProcessingException {
+    
when(_pinotHelixResourceManager.getAllMinionInstanceConfigs()).thenReturn(Collections.emptyList());
+    HttpHeaders httpHeaders = Mockito.mock(HttpHeaders.class);
+    when(httpHeaders.getRequestHeaders()).thenReturn(new 
MultivaluedHashMap<>());
+    when(_pinotHelixTaskResourceManager
+        .getSubtaskOnWorkerProgress(anyString(), any(), any(), anyMap(), 
anyMap(), anyInt()))
+        .thenThrow(new RuntimeException());
+    assertThrows(ControllerApplicationException.class,
+        () -> 
_pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders, 
"IN_PROGRESS", null));
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index 981472feff..5706d402ab 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -18,13 +18,17 @@
  */
 package org.apache.pinot.controller.helix.core.minion;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -35,6 +39,8 @@ import org.apache.helix.task.WorkflowContext;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -42,6 +48,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -199,6 +206,72 @@ public class PinotHelixTaskResourceManagerTest {
     assertEquals(taskProgress, "No worker has run this subtask");
   }
 
+  @Test
+  public void testGetSubtaskWithGivenStateProgressNoWorker()
+      throws JsonProcessingException {
+    CompletionServiceHelper httpHelper = mock(CompletionServiceHelper.class);
+    PinotHelixTaskResourceManager mgr =
+        new 
PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class), 
mock(TaskDriver.class));
+    // No worker to run subtasks.
+    Map<String, String> selectedMinionWorkerEndpoints = new HashMap<>();
+    Map<String, Object> progress =
+        mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper,
+            selectedMinionWorkerEndpoints, Collections.emptyMap(), 1000);
+    assertTrue(progress.isEmpty());
+    verify(httpHelper, Mockito.never()).doMultiGetRequest(any(), any(), 
anyBoolean(), any(), anyInt());
+  }
+
+  @Test
+  public void testGetSubtaskWithGivenStateProgress()
+      throws IOException {
+    CompletionServiceHelper httpHelper = mock(CompletionServiceHelper.class);
+    CompletionServiceHelper.CompletionServiceResponse httpResp =
+        new CompletionServiceHelper.CompletionServiceResponse();
+    String taskIdPrefix = "Task_SegmentGenerationAndPushTask_someone";
+    String workerIdPrefix = "worker";
+    String[] subtaskIds = new String[6];
+    String[] workerIds = new String[3];
+    Map<String, String> selectedMinionWorkerEndpoints = new HashMap<>();
+    for (int i = 0; i < 3; i++) {
+      workerIds[i] = workerIdPrefix + i;
+      String workerEndpoint = "http://"; + workerIds[i] + ":9000";
+      selectedMinionWorkerEndpoints.put(workerIds[i], workerEndpoint);
+
+      subtaskIds[2 * i] = taskIdPrefix + "_" + (2 * i);
+      subtaskIds[2 * i + 1] = taskIdPrefix + "_" + (2 * i + 1);
+      // Notice that for testing purpose, we map subtask names to empty 
strings. In reality, subtask names will be
+      // mapped to jsonized org.apache.pinot.minion.event.MinionEventObserver
+      httpResp._httpResponses.put(
+          
String.format("%s/tasks/subtask/state/progress?subTaskState=IN_PROGRESS", 
workerEndpoint),
+          JsonUtils.objectToString(ImmutableMap.of(subtaskIds[2 * i], "", 
subtaskIds[2 * i + 1], "")));
+    }
+    httpResp._failedResponseCount = 1;
+    ArgumentCaptor<List<String>> workerEndpointCaptor = 
ArgumentCaptor.forClass(List.class);
+    when(httpHelper.doMultiGetRequest(workerEndpointCaptor.capture(), any(), 
anyBoolean(), any(), anyInt()))
+        .thenReturn(httpResp);
+
+    PinotHelixTaskResourceManager mgr =
+        new 
PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class), 
mock(TaskDriver.class));
+
+    Map<String, Object> progress =
+        mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper, 
selectedMinionWorkerEndpoints,
+            Collections.emptyMap(), 1000);
+    List<String> value = workerEndpointCaptor.getValue();
+    Set<String> expectedWorkerUrls = 
selectedMinionWorkerEndpoints.values().stream()
+        .map(workerEndpoint
+            -> 
String.format("%s/tasks/subtask/state/progress?subTaskState=IN_PROGRESS", 
workerEndpoint))
+        .collect(Collectors.toSet());
+    assertEquals(new HashSet<>(value), expectedWorkerUrls);
+    assertEquals(progress.size(), 3);
+    for (int i = 0; i < 3; i++) {
+      Object responseFromMinionWorker = progress.get(workerIds[i]);
+      Map<String, Object> subtaskProgressMap = (Map<String, Object>) 
responseFromMinionWorker;
+      assertEquals(subtaskProgressMap.size(), 2);
+      assertTrue(subtaskProgressMap.containsKey(subtaskIds[2 * i]));
+      assertTrue(subtaskProgressMap.containsKey(subtaskIds[2 * i + 1]));
+    }
+  }
+
   @Test
   public void testGetTableTaskCount() {
     String taskType = "TestTask";
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
index c2260bec8e..c53a0323e6 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
@@ -27,8 +27,12 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
@@ -37,9 +41,10 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObservers;
+import org.apache.pinot.minion.event.MinionTaskState;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.slf4j.Logger;
@@ -84,4 +89,57 @@ public class PinotTaskProgressResource {
           .build());
     }
   }
+
+  @GET
+  @Path("/tasks/subtask/state/progress")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation("Get finer grained task progress tracked in memory for given 
subtasks or given state")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, 
message = "Internal server error")
+  })
+  public String getSubtaskProgress(
+      @ApiParam(value = "Sub task names separated by comma") 
@QueryParam("subtaskNames") @Nullable String subtaskNames,
+      @ApiParam(value = "Subtask state", required = true) 
@QueryParam("subTaskState") @Nullable String subTaskState) {
+    try {
+      Map<String, MinionEventObserver> progress = new HashMap<>();
+      if (StringUtils.isEmpty(subtaskNames) && 
StringUtils.isEmpty(subTaskState)) {
+        LOGGER.debug("Getting progress of all subtasks");
+        
progress.putAll(MinionEventObservers.getInstance().getMinionEventObservers());
+      } else if (!StringUtils.isEmpty(subtaskNames) && 
!StringUtils.isEmpty(subTaskState)) {
+        throw new Exception("Subtask names and state should not be specified 
at the same time");
+      } else if (!StringUtils.isEmpty(subTaskState)) {
+        MinionTaskState minionTaskState = MinionTaskState.IN_PROGRESS;
+        try {
+          minionTaskState = 
MinionTaskState.valueOf(subTaskState.toUpperCase());
+        } catch (IllegalArgumentException e) {
+          LOGGER.warn("{} is not a valid subtask state, defaulting to 
IN_PROGRESS", subTaskState);
+          subTaskState = MinionTaskState.IN_PROGRESS.toString();
+        }
+        LOGGER.debug("Getting progress for subtasks with state {}", 
subTaskState);
+        
progress.putAll(MinionEventObservers.getInstance().getMinionEventObserverWithGivenState(minionTaskState));
+      } else {
+        // !StringUtils.isEmpty(subtaskNames) is true
+        LOGGER.debug("Getting progress for subtasks: {}", subtaskNames);
+        List<String> subTaskNames =
+            Arrays.stream(StringUtils.split(subtaskNames, 
CommonConstants.Minion.TASK_LIST_SEPARATOR))
+                .map(String::trim)
+                .collect(Collectors.toList());
+        for (String subtaskName : subTaskNames) {
+          MinionEventObserver observer = 
MinionEventObservers.getInstance().getMinionEventObserver(subtaskName);
+          if (observer != null) {
+            progress.put(subtaskName, observer);
+          }
+        }
+      }
+      LOGGER.debug("Got subtasks progress: {}", progress);
+      return JsonUtils.objectToString(progress);
+    } catch (Exception e) {
+      throw new 
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(
+              String.format("Failed to get task progress for subtasks %s with 
state %s due to error: %s",
+                  StringUtils.isEmpty(subtaskNames) ? "NOT_SPECIFIED" : 
subtaskNames,
+                  StringUtils.isEmpty(subTaskState) ? "NOT_SPECIFIED" : 
subTaskState,
+                  e.getMessage()))
+          .build());
+    }
+  }
 }
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
index 510406d297..e113a0d862 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
@@ -70,4 +70,20 @@ public interface MinionEventObserver {
    * @param exception Exception encountered during execution
    */
   void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception);
+
+  /**
+   * Gets the minion task state
+   * @return a {@link MinionTaskState}
+   */
+  default MinionTaskState getTaskState() {
+    return MinionTaskState.UNKNOWN;
+  }
+
+  /**
+   * Gets the minion task start timestamp
+   * @return the minion task start timestamp
+   */
+  default long getStartTs() {
+    return -1;
+  }
 }
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
index 46befe39a8..471d659ba4 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 import org.apache.pinot.minion.MinionConf;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.slf4j.Logger;
@@ -111,6 +112,25 @@ public class MinionEventObservers {
     return _taskEventObservers.get(taskId);
   }
 
+  /**
+   * Gets all {@link MinionEventObserver}s
+   * @return a map of subtask ID to {@link MinionEventObserver}
+   */
+  public Map<String, MinionEventObserver> getMinionEventObservers() {
+    return _taskEventObservers;
+  }
+
+  /**
+   * Gets all {@link MinionEventObserver}s with the given {@link 
MinionTaskState}
+   * @param taskState the {@link MinionTaskState} to match
+   * @return a map of subtask ID to {@link MinionEventObserver}
+   */
+  public Map<String, MinionEventObserver> 
getMinionEventObserverWithGivenState(MinionTaskState taskState) {
+    return _taskEventObservers.entrySet().stream()
+        .filter(e -> e.getValue().getTaskState() == taskState)
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
   public void addMinionEventObserver(String taskId, MinionEventObserver 
eventObserver) {
     LOGGER.debug("Keep track of event observer for task: {}", taskId);
     _taskEventObservers.put(taskId, eventObserver);
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
index 1cc03eec63..6f18814c78 100644
--- 
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
@@ -41,6 +41,7 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
 
   private final int _maxNumStatusToTrack;
   private final Deque<StatusEntry> _lastStatus = new LinkedList<>();
+  private MinionTaskState _taskState;
   private long _startTs;
 
   public MinionProgressObserver() {
@@ -49,12 +50,13 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
 
   public MinionProgressObserver(int maxNumStatusToTrack) {
     _maxNumStatusToTrack = maxNumStatusToTrack;
+    _taskState = MinionTaskState.UNKNOWN;
   }
 
   @Override
   public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
     _startTs = System.currentTimeMillis();
-    addStatus(_startTs, "Task started");
+    addStatus(_startTs, "Task started", MinionTaskState.IN_PROGRESS);
     super.notifyTaskStart(pinotTaskConfig);
   }
 
@@ -64,14 +66,16 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
    * @param pinotTaskConfig Pinot task config
    * @param progress progress status and its toString() returns sth meaningful.
    */
+  @Override
   public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig, 
@Nullable Object progress) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("Update progress: {} for task: {}", progress, 
pinotTaskConfig.getTaskId());
     }
-    addStatus(System.currentTimeMillis(), (progress == null) ? "" : 
progress.toString());
+    addStatus(System.currentTimeMillis(), (progress == null) ? "" : 
progress.toString(), MinionTaskState.IN_PROGRESS);
     super.notifyProgress(pinotTaskConfig, progress);
   }
 
+  @Override
   @Nullable
   public synchronized List<StatusEntry> getProgress() {
     return new ArrayList<>(_lastStatus);
@@ -80,25 +84,37 @@ public class MinionProgressObserver extends 
DefaultMinionEventObserver {
   @Override
   public synchronized void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig, 
@Nullable Object executionResult) {
     long endTs = System.currentTimeMillis();
-    addStatus(endTs, "Task succeeded in " + (endTs - _startTs) + "ms");
+    addStatus(endTs, "Task succeeded in " + (endTs - _startTs) + "ms", 
MinionTaskState.SUCCEEDED);
     super.notifyTaskSuccess(pinotTaskConfig, executionResult);
   }
 
   @Override
   public synchronized void notifyTaskCancelled(PinotTaskConfig 
pinotTaskConfig) {
     long endTs = System.currentTimeMillis();
-    addStatus(endTs, "Task got cancelled after " + (endTs - _startTs) + "ms");
+    addStatus(endTs, "Task got cancelled after " + (endTs - _startTs) + "ms", 
MinionTaskState.CANCELLED);
     super.notifyTaskCancelled(pinotTaskConfig);
   }
 
   @Override
   public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig, 
Exception e) {
     long endTs = System.currentTimeMillis();
-    addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: 
" + ExceptionUtils.getStackTrace(e));
+    addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error: 
" + ExceptionUtils.getStackTrace(e),
+        MinionTaskState.ERROR);
     super.notifyTaskError(pinotTaskConfig, e);
   }
 
-  private void addStatus(long ts, String progress) {
+  @Override
+  public MinionTaskState getTaskState() {
+    return _taskState;
+  }
+
+  @Override
+  public long getStartTs() {
+    return _startTs;
+  }
+
+  private void addStatus(long ts, String progress, MinionTaskState taskState) {
+    _taskState = taskState;
     _lastStatus.addLast(new StatusEntry(ts, progress));
     if (_lastStatus.size() > _maxNumStatusToTrack) {
       _lastStatus.pollFirst();
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionTaskState.java 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionTaskState.java
new file mode 100644
index 0000000000..2498c1a11b
--- /dev/null
+++ 
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionTaskState.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.event;
+
+/**
+ * MinionTaskState represent a minion task state
+ */
+public enum MinionTaskState {
+  /**
+   * No state is reported / unknown
+   */
+  UNKNOWN,
+  /**
+   * The minion task is in progress
+   */
+  IN_PROGRESS,
+  /**
+   * The minion task succeeded
+   */
+  SUCCEEDED,
+  /**
+   * The minion task is cancelled
+   */
+  CANCELLED,
+  /**
+   * The minion task encounters error
+   */
+  ERROR
+}
diff --git 
a/pinot-minion/src/test/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResourceTest.java
 
b/pinot-minion/src/test/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResourceTest.java
new file mode 100644
index 0000000000..b8e288f906
--- /dev/null
+++ 
b/pinot-minion/src/test/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResourceTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.api.resources;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.ws.rs.WebApplicationException;
+import org.apache.pinot.minion.event.MinionEventObserver;
+import org.apache.pinot.minion.event.MinionEventObservers;
+import org.apache.pinot.minion.event.MinionProgressObserver;
+import org.apache.pinot.minion.event.MinionTaskState;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+
+public class PinotTaskProgressResourceTest {
+
+  @Test
+  public void testGetGivenSubtaskOrStateProgress()
+      throws IOException {
+    MinionEventObserver observer1 = new MinionProgressObserver();
+    observer1.notifyTaskStart(null);
+    MinionEventObservers.getInstance().addMinionEventObserver("t01", 
observer1);
+
+    MinionEventObserver observer2 = new MinionProgressObserver();
+    observer2.notifyProgress(null, "");
+    MinionEventObservers.getInstance().addMinionEventObserver("t02", 
observer2);
+
+    MinionEventObserver observer3 = new MinionProgressObserver();
+    observer3.notifyTaskSuccess(null, "");
+    MinionEventObservers.getInstance().addMinionEventObserver("t03", 
observer3);
+
+    PinotTaskProgressResource pinotTaskProgressResource = new 
PinotTaskProgressResource();
+
+    // get all sub task progress
+    String allSubTaskProgress = 
pinotTaskProgressResource.getSubtaskProgress(null, null);
+    Map<String, Object> subtaskProgressMap = 
JsonUtils.stringToObject(allSubTaskProgress, Map.class);
+    assertEquals(subtaskProgressMap.size(), 3);
+
+    // get subtasks with given state
+    String subtaskWithInProgressState =
+        pinotTaskProgressResource.getSubtaskProgress(null, 
MinionTaskState.IN_PROGRESS.toString());
+    assertInProgressSubtasks(subtaskWithInProgressState);
+
+    String subtaskWithUndefinedState =
+        pinotTaskProgressResource.getSubtaskProgress(null, "Undefined");
+    assertInProgressSubtasks(subtaskWithUndefinedState);
+
+    String subtaskWithSucceededState =
+        pinotTaskProgressResource.getSubtaskProgress(null, 
MinionTaskState.SUCCEEDED.toString());
+    subtaskProgressMap = JsonUtils.stringToObject(subtaskWithSucceededState, 
Map.class);
+    assertEquals(subtaskProgressMap.size(), 1);
+
+    String subtaskWithUnknownState =
+        pinotTaskProgressResource.getSubtaskProgress(null, 
MinionTaskState.UNKNOWN.toString());
+    assertNoSubtaskWithTheGivenState(subtaskWithUnknownState);
+
+    String subtaskWithCancelledState =
+        pinotTaskProgressResource.getSubtaskProgress(null, 
MinionTaskState.CANCELLED.toString());
+    assertNoSubtaskWithTheGivenState(subtaskWithCancelledState);
+
+    String subtaskWithErrorState =
+        pinotTaskProgressResource.getSubtaskProgress(null, 
MinionTaskState.ERROR.toString());
+    assertNoSubtaskWithTheGivenState(subtaskWithErrorState);
+
+    // get subtasks with given name
+    String subTasksWithGivenNamesProgress = 
pinotTaskProgressResource.getSubtaskProgress(" t01 , t02 ", null);
+    assertInProgressSubtasks(subTasksWithGivenNamesProgress);
+
+    // get subtasks with given names and state
+    assertThrows(WebApplicationException.class,
+        () -> pinotTaskProgressResource.getSubtaskProgress(" t01 , t02 ", 
MinionTaskState.IN_PROGRESS.toString()));
+  }
+
+  private void assertInProgressSubtasks(String subtaskWithInProgressState)
+      throws IOException {
+    Map<String, Object> subtaskProgressMap = 
JsonUtils.stringToObject(subtaskWithInProgressState, Map.class);
+    assertEquals(subtaskProgressMap.size(), 2);
+  }
+
+  private void assertNoSubtaskWithTheGivenState(String 
subtaskWithTheGivenState)
+      throws IOException {
+    Map<String, Object> subtaskProgressMap = 
JsonUtils.stringToObject(subtaskWithTheGivenState, Map.class);
+    assertEquals(subtaskProgressMap.size(), 0);
+  }
+}
diff --git 
a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionEventObserversTest.java
 
b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionEventObserversTest.java
index c6dbf3e9cf..e7395e3618 100644
--- 
a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionEventObserversTest.java
+++ 
b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionEventObserversTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.minion.event;
 
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import org.apache.pinot.minion.MinionConf;
@@ -25,6 +26,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
@@ -64,4 +66,25 @@ public class MinionEventObserversTest {
               "Failed to clean up observer");
     }
   }
+
+  @Test
+  public void testGetMinionEventObserverWithGivenState() {
+    MinionEventObserver observer1 = new MinionProgressObserver();
+    observer1.notifyTaskStart(null);
+    MinionEventObservers.getInstance().addMinionEventObserver("t01", 
observer1);
+
+    MinionEventObserver observer2 = new MinionProgressObserver();
+    observer2.notifyProgress(null, "");
+    MinionEventObservers.getInstance().addMinionEventObserver("t02", 
observer2);
+
+    MinionEventObserver observer3 = new MinionProgressObserver();
+    observer3.notifyTaskSuccess(null, "");
+    MinionEventObservers.getInstance().addMinionEventObserver("t03", 
observer3);
+
+    Map<String, MinionEventObserver> minionEventObserverWithGivenState =
+        
MinionEventObservers.getInstance().getMinionEventObserverWithGivenState(MinionTaskState.IN_PROGRESS);
+    assertEquals(minionEventObserverWithGivenState.size(), 2);
+    assertSame(minionEventObserverWithGivenState.get("t01"), observer1);
+    assertSame(minionEventObserverWithGivenState.get("t02"), observer2);
+  }
 }
diff --git 
a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java
 
b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java
index 897f19423e..6ef9104fb4 100644
--- 
a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java
+++ 
b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java
@@ -49,4 +49,31 @@ public class MinionProgressObserverTest {
     entry = progress.get(2);
     assertTrue(entry.getStatus().contains("bad bug"), entry.getStatus());
   }
+
+  @Test
+  public void testGetStartTs() {
+    MinionProgressObserver observer = new MinionProgressObserver(3);
+    long ts1 = System.currentTimeMillis();
+    observer.notifyTaskStart(null);
+    long ts = observer.getStartTs();
+    long ts2 = System.currentTimeMillis();
+    assertTrue(ts1 <= ts);
+    assertTrue(ts2 >= ts);
+  }
+
+  @Test
+  public void testUpdateAndGetTaskState() {
+    MinionProgressObserver observer = new MinionProgressObserver(3);
+    assertEquals(observer.getTaskState(), MinionTaskState.UNKNOWN);
+    observer.notifyTaskStart(null);
+    assertEquals(observer.getTaskState(), MinionTaskState.IN_PROGRESS);
+    observer.notifyProgress(null, "");
+    assertEquals(observer.getTaskState(), MinionTaskState.IN_PROGRESS);
+    observer.notifyTaskSuccess(null, "");
+    assertEquals(observer.getTaskState(), MinionTaskState.SUCCEEDED);
+    observer.notifyTaskCancelled(null);
+    assertEquals(observer.getTaskState(), MinionTaskState.CANCELLED);
+    observer.notifyTaskError(null, new Exception());
+    assertEquals(observer.getTaskState(), MinionTaskState.ERROR);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to