This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2cd914869d add API to get progress of subtasks with given state
(#10132)
2cd914869d is described below
commit 2cd914869d7e1ec81e30f737ba7d2eea2b2385ea
Author: Haitao Zhang <[email protected]>
AuthorDate: Wed Jan 25 00:52:13 2023 -0800
add API to get progress of subtasks with given state (#10132)
* add API to get progress of subtasks with given state
* fix bugs
* revise API response format
* address comments
* rename /tasks/subtask/state/{subTaskState}/progress
* revise minion side API
---
.../api/resources/PinotTaskRestletResource.java | 50 +++++++-
.../core/minion/PinotHelixTaskResourceManager.java | 54 ++++++++
.../resources/PinotTaskRestletResourceTest.java | 139 +++++++++++++++++++++
.../minion/PinotHelixTaskResourceManagerTest.java | 73 +++++++++++
.../api/resources/PinotTaskProgressResource.java | 60 ++++++++-
.../pinot/minion/event/MinionEventObserver.java | 16 +++
.../pinot/minion/event/MinionEventObservers.java | 20 +++
.../pinot/minion/event/MinionProgressObserver.java | 28 ++++-
.../apache/pinot/minion/event/MinionTaskState.java | 45 +++++++
.../resources/PinotTaskProgressResourceTest.java | 105 ++++++++++++++++
.../minion/event/MinionEventObserversTest.java | 23 ++++
.../minion/event/MinionProgressObserverTest.java | 27 ++++
12 files changed, 632 insertions(+), 8 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index c2605c7614..11491061e7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -25,14 +25,18 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -58,6 +62,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.task.TaskPartitionState;
@@ -419,7 +424,7 @@ public class PinotTaskRestletResource {
@GET
@Path("/tasks/subtask/{taskName}/progress")
@Produces(MediaType.APPLICATION_JSON)
- @ApiOperation("Get progress of specified sub tasks for the given task
tracked by worker in memory")
+ @ApiOperation("Get progress of specified sub tasks for the given task
tracked by minion worker in memory")
public String getSubtaskProgress(@Context HttpHeaders httpHeaders,
@ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
@ApiParam(value = "Sub task names separated by comma")
@QueryParam("subtaskNames") @Nullable
@@ -451,6 +456,49 @@ public class PinotTaskRestletResource {
}
}
+ @GET
+ @Path("/tasks/subtask/workers/progress")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation("Get progress of all subtasks with specified state tracked by
minion worker in memory")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error")
+ })
+ public String getSubtaskOnWorkerProgress(@Context HttpHeaders httpHeaders,
+ @ApiParam(value = "Subtask state
(UNKNOWN,IN_PROGRESS,SUCCEEDED,CANCELLED,ERROR)", required = true)
+ @QueryParam("subTaskState") String subTaskState,
+ @ApiParam(value = "Minion worker IDs separated by comma")
@QueryParam("minionWorkerIds") @Nullable
+ String minionWorkerIds) {
+ Set<String> selectedMinionWorkers = new HashSet<>();
+ if (StringUtils.isNotEmpty(minionWorkerIds)) {
+ selectedMinionWorkers.addAll(
+ Arrays.stream(StringUtils.split(minionWorkerIds,
',')).map(String::trim).collect(Collectors.toList()));
+ }
+ // Relying on original schema that was used to query the controller
+ String scheme = _uriInfo.getRequestUri().getScheme();
+ List<InstanceConfig> allMinionWorkerInstanceConfigs =
_pinotHelixResourceManager.getAllMinionInstanceConfigs();
+ Map<String, String> selectedMinionWorkerEndpoints = new HashMap<>();
+ for (InstanceConfig worker : allMinionWorkerInstanceConfigs) {
+ if (selectedMinionWorkers.isEmpty() ||
selectedMinionWorkers.contains(worker.getId())) {
+ selectedMinionWorkerEndpoints.put(worker.getId(),
+ String.format("%s://%s:%d", scheme, worker.getHostName(),
Integer.parseInt(worker.getPort())));
+ }
+ }
+ Map<String, String> requestHeaders = new HashMap<>();
+ httpHeaders.getRequestHeaders().keySet().forEach(header ->
+ requestHeaders.put(header, httpHeaders.getHeaderString(header)));
+ int timeoutMs = _controllerConf.getMinionAdminRequestTimeoutSeconds() *
1000;
+ try {
+ Map<String, Object> minionWorkerIdSubtaskProgressMap =
+
_pinotHelixTaskResourceManager.getSubtaskOnWorkerProgress(subTaskState,
_executor, _connectionManager,
+ selectedMinionWorkerEndpoints, requestHeaders, timeoutMs);
+ return JsonUtils.objectToString(minionWorkerIdSubtaskProgressMap);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to get minion worker side progress for
subtasks with state %s due to error: %s",
+ subTaskState, ExceptionUtils.getStackTrace(e)),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
@GET
@Path("/tasks/scheduler/information")
@ApiOperation("Fetch cron scheduler information")
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index f7441593c9..15851efd5e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -37,6 +37,7 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.lang3.StringUtils;
@@ -620,6 +621,59 @@ public class PinotHelixTaskResourceManager {
return subtaskProgressMap;
}
+ /**
+ * Gets progress of all subtasks with specified state tracked by given
minion workers in memory
+ * @param subtaskState a specified subtask state, valid values are in
org.apache.pinot.minion.event.MinionTaskState
+ * @param executor an {@link Executor} used to run logic on
+ * @param connMgr a {@link HttpConnectionManager} used to manage http
connections
+ * @param selectedMinionWorkerEndpoints a map of worker id to http endpoint
for minions to get subtask progress from
+ * @param requestHeaders http headers used to send requests to minion workers
+ * @param timeoutMs timeout (in millisecond) for requests sent to minion
workers
+ * @return a map of minion worker id to subtask progress
+ */
+ public synchronized Map<String, Object> getSubtaskOnWorkerProgress(String
subtaskState,
+ Executor executor, HttpConnectionManager connMgr, Map<String, String>
selectedMinionWorkerEndpoints,
+ Map<String, String> requestHeaders, int timeoutMs)
+ throws JsonProcessingException {
+ return getSubtaskOnWorkerProgress(subtaskState,
+ new CompletionServiceHelper(executor, connMgr, HashBiMap.create(0)),
selectedMinionWorkerEndpoints,
+ requestHeaders, timeoutMs);
+ }
+
+ @VisibleForTesting
+ Map<String, Object> getSubtaskOnWorkerProgress(String subtaskState,
+ CompletionServiceHelper completionServiceHelper, Map<String, String>
selectedMinionWorkerEndpoints,
+ Map<String, String> requestHeaders, int timeoutMs)
+ throws JsonProcessingException {
+ Map<String, Object> minionWorkerIdSubtaskProgressMap = new HashMap<>();
+ if (selectedMinionWorkerEndpoints.isEmpty()) {
+ return minionWorkerIdSubtaskProgressMap;
+ }
+ Map<String, String> minionWorkerUrlToWorkerIdMap =
selectedMinionWorkerEndpoints.entrySet().stream()
+ .collect(Collectors.toMap(
+ entry ->
String.format("%s/tasks/subtask/state/progress?subTaskState=%s",
entry.getValue(), subtaskState),
+ Map.Entry::getKey));
+ List<String> workerUrls = new
ArrayList<>(minionWorkerUrlToWorkerIdMap.keySet());
+ LOGGER.debug("Getting task progress with workerUrls: {}", workerUrls);
+ // Scatter and gather progress from multiple workers.
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(workerUrls, null, true,
requestHeaders, timeoutMs);
+ for (Map.Entry<String, String> entry :
serviceResponse._httpResponses.entrySet()) {
+ String worker = entry.getKey();
+ String resp = entry.getValue();
+ LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
+ minionWorkerIdSubtaskProgressMap
+ .put(minionWorkerUrlToWorkerIdMap.get(worker),
JsonUtils.stringToObject(resp, Map.class));
+ }
+ if (serviceResponse._failedResponseCount > 0) {
+ // Instead of aborting, subtasks without worker side progress return the
task status tracked by Helix.
+ // The detailed worker failure response is logged as error by
CompletionServiceResponse for debugging.
+ LOGGER.warn("There were {} workers failed to report task progress. Got
partial progress info: {}",
+ serviceResponse._failedResponseCount,
minionWorkerIdSubtaskProgressMap);
+ }
+ return minionWorkerIdSubtaskProgressMap;
+ }
+
/**
* Helper method to return a map of task names to corresponding task state
* where the task corresponds to the given Pinot table name. This is used to
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
new file mode 100644
index 0000000000..764b1d53d1
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Map;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.UriInfo;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.controller.ControllerConf;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+
+public class PinotTaskRestletResourceTest {
+ @Mock
+ PinotHelixResourceManager _pinotHelixResourceManager;
+ @Mock
+ PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
+ @Mock
+ ControllerConf _controllerConf;
+ @Mock
+ UriInfo _uriInfo;
+
+ @InjectMocks
+ PinotTaskRestletResource _pinotTaskRestletResource;
+
+ @BeforeMethod
+ public void init()
+ throws URISyntaxException {
+ MockitoAnnotations.openMocks(this);
+ when(_uriInfo.getRequestUri()).thenReturn(new
URI("http://localhost:9000"));
+ }
+
+ @Test
+ public void
testGetSubtaskWithGivenStateProgressWhenMinionWorkerIdsAreNotSpecified()
+ throws JsonProcessingException {
+ Map<String, String> minionWorkerEndpoints
+ =
invokeGetSubtaskWithGivenStateProgressAndReturnCapturedMinionWorkerEndpoints(null);
+ assertEquals(minionWorkerEndpoints,
+ ImmutableMap.of("minion1", "http://minion1:9514", "minion2",
"http://minion2:9514"));
+ }
+
+ @Test
+ public void
testGetSubtaskWithGivenStateProgressWhenAllMinionWorkerIdsAreSpecified()
+ throws JsonProcessingException {
+ // use minion worker ids with spaces ensure they will be trimmed.
+ Map<String, String> minionWorkerEndpoints
+ =
invokeGetSubtaskWithGivenStateProgressAndReturnCapturedMinionWorkerEndpoints("
minion1 , minion2 ");
+ assertEquals(minionWorkerEndpoints,
+ ImmutableMap.of("minion1", "http://minion1:9514", "minion2",
"http://minion2:9514"));
+ }
+
+ @Test
+ public void
testGetSubtaskWithGivenStateProgressWhenOneMinionWorkerIdIsSpecified()
+ throws JsonProcessingException {
+ Map<String, String> minionWorkerEndpoints
+ =
invokeGetSubtaskWithGivenStateProgressAndReturnCapturedMinionWorkerEndpoints("minion1");
+ assertEquals(minionWorkerEndpoints,
+ ImmutableMap.of("minion1", "http://minion1:9514"));
+ }
+
+ private Map<String, String>
invokeGetSubtaskWithGivenStateProgressAndReturnCapturedMinionWorkerEndpoints(
+ String minionWorkerIds)
+ throws JsonProcessingException {
+ InstanceConfig minion1 = createInstanceConfig("minion1", "minion1",
"9514");
+ InstanceConfig minion2 = createInstanceConfig("minion2", "minion2",
"9514");
+
when(_pinotHelixResourceManager.getAllMinionInstanceConfigs()).thenReturn(ImmutableList.of(minion1,
minion2));
+ HttpHeaders httpHeaders = Mockito.mock(HttpHeaders.class);
+ when(httpHeaders.getRequestHeaders()).thenReturn(new
MultivaluedHashMap<>());
+ ArgumentCaptor<Map<String, String>> minionWorkerEndpointsCaptor =
ArgumentCaptor.forClass(Map.class);
+
when(_pinotHelixTaskResourceManager.getSubtaskOnWorkerProgress(anyString(),
any(), any(),
+ minionWorkerEndpointsCaptor.capture(), anyMap(), anyInt()))
+ .thenReturn(Collections.emptyMap());
+ String progress =
+ _pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders,
"IN_PROGRESS", minionWorkerIds);
+ assertEquals(progress, "{}");
+ return minionWorkerEndpointsCaptor.getValue();
+ }
+
+ private InstanceConfig createInstanceConfig(String instanceId, String
hostName, String port) {
+ InstanceConfig instanceConfig = new InstanceConfig(instanceId);
+ instanceConfig.setHostName(hostName);
+ instanceConfig.setPort(port);
+ return instanceConfig;
+ }
+
+
+ @Test
+ public void testGetSubtaskWithGivenStateProgressWithException()
+ throws JsonProcessingException {
+
when(_pinotHelixResourceManager.getAllMinionInstanceConfigs()).thenReturn(Collections.emptyList());
+ HttpHeaders httpHeaders = Mockito.mock(HttpHeaders.class);
+ when(httpHeaders.getRequestHeaders()).thenReturn(new
MultivaluedHashMap<>());
+ when(_pinotHelixTaskResourceManager
+ .getSubtaskOnWorkerProgress(anyString(), any(), any(), anyMap(),
anyMap(), anyInt()))
+ .thenThrow(new RuntimeException());
+ assertThrows(ControllerApplicationException.class,
+ () ->
_pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders,
"IN_PROGRESS", null));
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index 981472feff..5706d402ab 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -18,13 +18,17 @@
*/
package org.apache.pinot.controller.helix.core.minion;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
@@ -35,6 +39,8 @@ import org.apache.helix.task.WorkflowContext;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
@@ -42,6 +48,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -199,6 +206,72 @@ public class PinotHelixTaskResourceManagerTest {
assertEquals(taskProgress, "No worker has run this subtask");
}
+ @Test
+ public void testGetSubtaskWithGivenStateProgressNoWorker()
+ throws JsonProcessingException {
+ CompletionServiceHelper httpHelper = mock(CompletionServiceHelper.class);
+ PinotHelixTaskResourceManager mgr =
+ new
PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class),
mock(TaskDriver.class));
+ // No worker to run subtasks.
+ Map<String, String> selectedMinionWorkerEndpoints = new HashMap<>();
+ Map<String, Object> progress =
+ mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper,
+ selectedMinionWorkerEndpoints, Collections.emptyMap(), 1000);
+ assertTrue(progress.isEmpty());
+ verify(httpHelper, Mockito.never()).doMultiGetRequest(any(), any(),
anyBoolean(), any(), anyInt());
+ }
+
+ @Test
+ public void testGetSubtaskWithGivenStateProgress()
+ throws IOException {
+ CompletionServiceHelper httpHelper = mock(CompletionServiceHelper.class);
+ CompletionServiceHelper.CompletionServiceResponse httpResp =
+ new CompletionServiceHelper.CompletionServiceResponse();
+ String taskIdPrefix = "Task_SegmentGenerationAndPushTask_someone";
+ String workerIdPrefix = "worker";
+ String[] subtaskIds = new String[6];
+ String[] workerIds = new String[3];
+ Map<String, String> selectedMinionWorkerEndpoints = new HashMap<>();
+ for (int i = 0; i < 3; i++) {
+ workerIds[i] = workerIdPrefix + i;
+ String workerEndpoint = "http://" + workerIds[i] + ":9000";
+ selectedMinionWorkerEndpoints.put(workerIds[i], workerEndpoint);
+
+ subtaskIds[2 * i] = taskIdPrefix + "_" + (2 * i);
+ subtaskIds[2 * i + 1] = taskIdPrefix + "_" + (2 * i + 1);
+ // Notice that for testing purpose, we map subtask names to empty
strings. In reality, subtask names will be
+ // mapped to jsonized org.apache.pinot.minion.event.MinionEventObserver
+ httpResp._httpResponses.put(
+
String.format("%s/tasks/subtask/state/progress?subTaskState=IN_PROGRESS",
workerEndpoint),
+ JsonUtils.objectToString(ImmutableMap.of(subtaskIds[2 * i], "",
subtaskIds[2 * i + 1], "")));
+ }
+ httpResp._failedResponseCount = 1;
+ ArgumentCaptor<List<String>> workerEndpointCaptor =
ArgumentCaptor.forClass(List.class);
+ when(httpHelper.doMultiGetRequest(workerEndpointCaptor.capture(), any(),
anyBoolean(), any(), anyInt()))
+ .thenReturn(httpResp);
+
+ PinotHelixTaskResourceManager mgr =
+ new
PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class),
mock(TaskDriver.class));
+
+ Map<String, Object> progress =
+ mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper,
selectedMinionWorkerEndpoints,
+ Collections.emptyMap(), 1000);
+ List<String> value = workerEndpointCaptor.getValue();
+ Set<String> expectedWorkerUrls =
selectedMinionWorkerEndpoints.values().stream()
+ .map(workerEndpoint
+ ->
String.format("%s/tasks/subtask/state/progress?subTaskState=IN_PROGRESS",
workerEndpoint))
+ .collect(Collectors.toSet());
+ assertEquals(new HashSet<>(value), expectedWorkerUrls);
+ assertEquals(progress.size(), 3);
+ for (int i = 0; i < 3; i++) {
+ Object responseFromMinionWorker = progress.get(workerIds[i]);
+ Map<String, Object> subtaskProgressMap = (Map<String, Object>)
responseFromMinionWorker;
+ assertEquals(subtaskProgressMap.size(), 2);
+ assertTrue(subtaskProgressMap.containsKey(subtaskIds[2 * i]));
+ assertTrue(subtaskProgressMap.containsKey(subtaskIds[2 * i + 1]));
+ }
+ }
+
@Test
public void testGetTableTaskCount() {
String taskType = "TestTask";
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
index c2260bec8e..c53a0323e6 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResource.java
@@ -27,8 +27,12 @@ import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
@@ -37,9 +41,10 @@ import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
+import org.apache.pinot.minion.event.MinionTaskState;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
@@ -84,4 +89,57 @@ public class PinotTaskProgressResource {
.build());
}
}
+
+ @GET
+ @Path("/tasks/subtask/state/progress")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation("Get finer grained task progress tracked in memory for given
subtasks or given state")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error")
+ })
+ public String getSubtaskProgress(
+ @ApiParam(value = "Sub task names separated by comma")
@QueryParam("subtaskNames") @Nullable String subtaskNames,
+ @ApiParam(value = "Subtask state", required = true)
@QueryParam("subTaskState") @Nullable String subTaskState) {
+ try {
+ Map<String, MinionEventObserver> progress = new HashMap<>();
+ if (StringUtils.isEmpty(subtaskNames) &&
StringUtils.isEmpty(subTaskState)) {
+ LOGGER.debug("Getting progress of all subtasks");
+
progress.putAll(MinionEventObservers.getInstance().getMinionEventObservers());
+ } else if (!StringUtils.isEmpty(subtaskNames) &&
!StringUtils.isEmpty(subTaskState)) {
+ throw new Exception("Subtask names and state should not be specified
at the same time");
+ } else if (!StringUtils.isEmpty(subTaskState)) {
+ MinionTaskState minionTaskState = MinionTaskState.IN_PROGRESS;
+ try {
+ minionTaskState =
MinionTaskState.valueOf(subTaskState.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn("{} is not a valid subtask state, defaulting to
IN_PROGRESS", subTaskState);
+ subTaskState = MinionTaskState.IN_PROGRESS.toString();
+ }
+ LOGGER.debug("Getting progress for subtasks with state {}",
subTaskState);
+
progress.putAll(MinionEventObservers.getInstance().getMinionEventObserverWithGivenState(minionTaskState));
+ } else {
+ // !StringUtils.isEmpty(subtaskNames) is true
+ LOGGER.debug("Getting progress for subtasks: {}", subtaskNames);
+ List<String> subTaskNames =
+ Arrays.stream(StringUtils.split(subtaskNames,
CommonConstants.Minion.TASK_LIST_SEPARATOR))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ for (String subtaskName : subTaskNames) {
+ MinionEventObserver observer =
MinionEventObservers.getInstance().getMinionEventObserver(subtaskName);
+ if (observer != null) {
+ progress.put(subtaskName, observer);
+ }
+ }
+ }
+ LOGGER.debug("Got subtasks progress: {}", progress);
+ return JsonUtils.objectToString(progress);
+ } catch (Exception e) {
+ throw new
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(
+ String.format("Failed to get task progress for subtasks %s with
state %s due to error: %s",
+ StringUtils.isEmpty(subtaskNames) ? "NOT_SPECIFIED" :
subtaskNames,
+ StringUtils.isEmpty(subTaskState) ? "NOT_SPECIFIED" :
subTaskState,
+ e.getMessage()))
+ .build());
+ }
+ }
}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
index 510406d297..e113a0d862 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObserver.java
@@ -70,4 +70,20 @@ public interface MinionEventObserver {
* @param exception Exception encountered during execution
*/
void notifyTaskError(PinotTaskConfig pinotTaskConfig, Exception exception);
+
+ /**
+ * Gets the minion task state
+ * @return a {@link MinionTaskState}
+ */
+ default MinionTaskState getTaskState() {
+ return MinionTaskState.UNKNOWN;
+ }
+
+ /**
+ * Gets the minion task start timestamp
+ * @return the minion task start timestamp
+ */
+ default long getStartTs() {
+ return -1;
+ }
}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
index 46befe39a8..471d659ba4 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionEventObservers.java
@@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import org.apache.pinot.minion.MinionConf;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
@@ -111,6 +112,25 @@ public class MinionEventObservers {
return _taskEventObservers.get(taskId);
}
+ /**
+ * Gets all {@link MinionEventObserver}s
+ * @return a map of subtask ID to {@link MinionEventObserver}
+ */
+ public Map<String, MinionEventObserver> getMinionEventObservers() {
+ return _taskEventObservers;
+ }
+
+ /**
+ * Gets all {@link MinionEventObserver}s with the given {@link
MinionTaskState}
+ * @param taskState the {@link MinionTaskState} to match
+ * @return a map of subtask ID to {@link MinionEventObserver}
+ */
+ public Map<String, MinionEventObserver>
getMinionEventObserverWithGivenState(MinionTaskState taskState) {
+ return _taskEventObservers.entrySet().stream()
+ .filter(e -> e.getValue().getTaskState() == taskState)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
public void addMinionEventObserver(String taskId, MinionEventObserver
eventObserver) {
LOGGER.debug("Keep track of event observer for task: {}", taskId);
_taskEventObservers.put(taskId, eventObserver);
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
index 1cc03eec63..6f18814c78 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionProgressObserver.java
@@ -41,6 +41,7 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
private final int _maxNumStatusToTrack;
private final Deque<StatusEntry> _lastStatus = new LinkedList<>();
+ private MinionTaskState _taskState;
private long _startTs;
public MinionProgressObserver() {
@@ -49,12 +50,13 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
public MinionProgressObserver(int maxNumStatusToTrack) {
_maxNumStatusToTrack = maxNumStatusToTrack;
+ _taskState = MinionTaskState.UNKNOWN;
}
@Override
public synchronized void notifyTaskStart(PinotTaskConfig pinotTaskConfig) {
_startTs = System.currentTimeMillis();
- addStatus(_startTs, "Task started");
+ addStatus(_startTs, "Task started", MinionTaskState.IN_PROGRESS);
super.notifyTaskStart(pinotTaskConfig);
}
@@ -64,14 +66,16 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
* @param pinotTaskConfig Pinot task config
* @param progress progress status and its toString() returns sth meaningful.
*/
+ @Override
public synchronized void notifyProgress(PinotTaskConfig pinotTaskConfig,
@Nullable Object progress) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Update progress: {} for task: {}", progress,
pinotTaskConfig.getTaskId());
}
- addStatus(System.currentTimeMillis(), (progress == null) ? "" :
progress.toString());
+ addStatus(System.currentTimeMillis(), (progress == null) ? "" :
progress.toString(), MinionTaskState.IN_PROGRESS);
super.notifyProgress(pinotTaskConfig, progress);
}
+ @Override
@Nullable
public synchronized List<StatusEntry> getProgress() {
return new ArrayList<>(_lastStatus);
@@ -80,25 +84,37 @@ public class MinionProgressObserver extends
DefaultMinionEventObserver {
@Override
public synchronized void notifyTaskSuccess(PinotTaskConfig pinotTaskConfig,
@Nullable Object executionResult) {
long endTs = System.currentTimeMillis();
- addStatus(endTs, "Task succeeded in " + (endTs - _startTs) + "ms");
+ addStatus(endTs, "Task succeeded in " + (endTs - _startTs) + "ms",
MinionTaskState.SUCCEEDED);
super.notifyTaskSuccess(pinotTaskConfig, executionResult);
}
@Override
public synchronized void notifyTaskCancelled(PinotTaskConfig
pinotTaskConfig) {
long endTs = System.currentTimeMillis();
- addStatus(endTs, "Task got cancelled after " + (endTs - _startTs) + "ms");
+ addStatus(endTs, "Task got cancelled after " + (endTs - _startTs) + "ms",
MinionTaskState.CANCELLED);
super.notifyTaskCancelled(pinotTaskConfig);
}
@Override
public synchronized void notifyTaskError(PinotTaskConfig pinotTaskConfig,
Exception e) {
long endTs = System.currentTimeMillis();
- addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error:
" + ExceptionUtils.getStackTrace(e));
+ addStatus(endTs, "Task failed in " + (endTs - _startTs) + "ms with error:
" + ExceptionUtils.getStackTrace(e),
+ MinionTaskState.ERROR);
super.notifyTaskError(pinotTaskConfig, e);
}
- private void addStatus(long ts, String progress) {
+ @Override
+ public MinionTaskState getTaskState() {
+ return _taskState;
+ }
+
+ @Override
+ public long getStartTs() {
+ return _startTs;
+ }
+
+ private void addStatus(long ts, String progress, MinionTaskState taskState) {
+ _taskState = taskState;
_lastStatus.addLast(new StatusEntry(ts, progress));
if (_lastStatus.size() > _maxNumStatusToTrack) {
_lastStatus.pollFirst();
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionTaskState.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionTaskState.java
new file mode 100644
index 0000000000..2498c1a11b
--- /dev/null
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/event/MinionTaskState.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.event;
+
+/**
+ * MinionTaskState represent a minion task state
+ */
+public enum MinionTaskState {
+ /**
+ * No state is reported / unknown
+ */
+ UNKNOWN,
+ /**
+ * The minion task is in progress
+ */
+ IN_PROGRESS,
+ /**
+ * The minion task succeeded
+ */
+ SUCCEEDED,
+ /**
+ * The minion task is cancelled
+ */
+ CANCELLED,
+ /**
+ * The minion task encounters error
+ */
+ ERROR
+}
diff --git
a/pinot-minion/src/test/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResourceTest.java
b/pinot-minion/src/test/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResourceTest.java
new file mode 100644
index 0000000000..b8e288f906
--- /dev/null
+++
b/pinot-minion/src/test/java/org/apache/pinot/minion/api/resources/PinotTaskProgressResourceTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.api.resources;
+
+import java.io.IOException;
+import java.util.Map;
+import javax.ws.rs.WebApplicationException;
+import org.apache.pinot.minion.event.MinionEventObserver;
+import org.apache.pinot.minion.event.MinionEventObservers;
+import org.apache.pinot.minion.event.MinionProgressObserver;
+import org.apache.pinot.minion.event.MinionTaskState;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
+
+
+public class PinotTaskProgressResourceTest {
+
+ @Test
+ public void testGetGivenSubtaskOrStateProgress()
+ throws IOException {
+ MinionEventObserver observer1 = new MinionProgressObserver();
+ observer1.notifyTaskStart(null);
+ MinionEventObservers.getInstance().addMinionEventObserver("t01",
observer1);
+
+ MinionEventObserver observer2 = new MinionProgressObserver();
+ observer2.notifyProgress(null, "");
+ MinionEventObservers.getInstance().addMinionEventObserver("t02",
observer2);
+
+ MinionEventObserver observer3 = new MinionProgressObserver();
+ observer3.notifyTaskSuccess(null, "");
+ MinionEventObservers.getInstance().addMinionEventObserver("t03",
observer3);
+
+ PinotTaskProgressResource pinotTaskProgressResource = new
PinotTaskProgressResource();
+
+ // get all sub task progress
+ String allSubTaskProgress =
pinotTaskProgressResource.getSubtaskProgress(null, null);
+ Map<String, Object> subtaskProgressMap =
JsonUtils.stringToObject(allSubTaskProgress, Map.class);
+ assertEquals(subtaskProgressMap.size(), 3);
+
+ // get subtasks with given state
+ String subtaskWithInProgressState =
+ pinotTaskProgressResource.getSubtaskProgress(null,
MinionTaskState.IN_PROGRESS.toString());
+ assertInProgressSubtasks(subtaskWithInProgressState);
+
+ String subtaskWithUndefinedState =
+ pinotTaskProgressResource.getSubtaskProgress(null, "Undefined");
+ assertInProgressSubtasks(subtaskWithUndefinedState);
+
+ String subtaskWithSucceededState =
+ pinotTaskProgressResource.getSubtaskProgress(null,
MinionTaskState.SUCCEEDED.toString());
+ subtaskProgressMap = JsonUtils.stringToObject(subtaskWithSucceededState,
Map.class);
+ assertEquals(subtaskProgressMap.size(), 1);
+
+ String subtaskWithUnknownState =
+ pinotTaskProgressResource.getSubtaskProgress(null,
MinionTaskState.UNKNOWN.toString());
+ assertNoSubtaskWithTheGivenState(subtaskWithUnknownState);
+
+ String subtaskWithCancelledState =
+ pinotTaskProgressResource.getSubtaskProgress(null,
MinionTaskState.CANCELLED.toString());
+ assertNoSubtaskWithTheGivenState(subtaskWithCancelledState);
+
+ String subtaskWithErrorState =
+ pinotTaskProgressResource.getSubtaskProgress(null,
MinionTaskState.ERROR.toString());
+ assertNoSubtaskWithTheGivenState(subtaskWithErrorState);
+
+ // get subtasks with given name
+ String subTasksWithGivenNamesProgress =
pinotTaskProgressResource.getSubtaskProgress(" t01 , t02 ", null);
+ assertInProgressSubtasks(subTasksWithGivenNamesProgress);
+
+ // get subtasks with given names and state
+ assertThrows(WebApplicationException.class,
+ () -> pinotTaskProgressResource.getSubtaskProgress(" t01 , t02 ",
MinionTaskState.IN_PROGRESS.toString()));
+ }
+
+ private void assertInProgressSubtasks(String subtaskWithInProgressState)
+ throws IOException {
+ Map<String, Object> subtaskProgressMap =
JsonUtils.stringToObject(subtaskWithInProgressState, Map.class);
+ assertEquals(subtaskProgressMap.size(), 2);
+ }
+
+ private void assertNoSubtaskWithTheGivenState(String
subtaskWithTheGivenState)
+ throws IOException {
+ Map<String, Object> subtaskProgressMap =
JsonUtils.stringToObject(subtaskWithTheGivenState, Map.class);
+ assertEquals(subtaskProgressMap.size(), 0);
+ }
+}
diff --git
a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionEventObserversTest.java
b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionEventObserversTest.java
index c6dbf3e9cf..e7395e3618 100644
---
a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionEventObserversTest.java
+++
b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionEventObserversTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.minion.event;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.pinot.minion.MinionConf;
@@ -25,6 +26,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
@@ -64,4 +66,25 @@ public class MinionEventObserversTest {
"Failed to clean up observer");
}
}
+
+ @Test
+ public void testGetMinionEventObserverWithGivenState() {
+ MinionEventObserver observer1 = new MinionProgressObserver();
+ observer1.notifyTaskStart(null);
+ MinionEventObservers.getInstance().addMinionEventObserver("t01",
observer1);
+
+ MinionEventObserver observer2 = new MinionProgressObserver();
+ observer2.notifyProgress(null, "");
+ MinionEventObservers.getInstance().addMinionEventObserver("t02",
observer2);
+
+ MinionEventObserver observer3 = new MinionProgressObserver();
+ observer3.notifyTaskSuccess(null, "");
+ MinionEventObservers.getInstance().addMinionEventObserver("t03",
observer3);
+
+ Map<String, MinionEventObserver> minionEventObserverWithGivenState =
+
MinionEventObservers.getInstance().getMinionEventObserverWithGivenState(MinionTaskState.IN_PROGRESS);
+ assertEquals(minionEventObserverWithGivenState.size(), 2);
+ assertSame(minionEventObserverWithGivenState.get("t01"), observer1);
+ assertSame(minionEventObserverWithGivenState.get("t02"), observer2);
+ }
}
diff --git
a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java
b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java
index 897f19423e..6ef9104fb4 100644
---
a/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java
+++
b/pinot-minion/src/test/java/org/apache/pinot/minion/event/MinionProgressObserverTest.java
@@ -49,4 +49,31 @@ public class MinionProgressObserverTest {
entry = progress.get(2);
assertTrue(entry.getStatus().contains("bad bug"), entry.getStatus());
}
+
+ @Test
+ public void testGetStartTs() {
+ MinionProgressObserver observer = new MinionProgressObserver(3);
+ long ts1 = System.currentTimeMillis();
+ observer.notifyTaskStart(null);
+ long ts = observer.getStartTs();
+ long ts2 = System.currentTimeMillis();
+ assertTrue(ts1 <= ts);
+ assertTrue(ts2 >= ts);
+ }
+
+ @Test
+ public void testUpdateAndGetTaskState() {
+ MinionProgressObserver observer = new MinionProgressObserver(3);
+ assertEquals(observer.getTaskState(), MinionTaskState.UNKNOWN);
+ observer.notifyTaskStart(null);
+ assertEquals(observer.getTaskState(), MinionTaskState.IN_PROGRESS);
+ observer.notifyProgress(null, "");
+ assertEquals(observer.getTaskState(), MinionTaskState.IN_PROGRESS);
+ observer.notifyTaskSuccess(null, "");
+ assertEquals(observer.getTaskState(), MinionTaskState.SUCCEEDED);
+ observer.notifyTaskCancelled(null);
+ assertEquals(observer.getTaskState(), MinionTaskState.CANCELLED);
+ observer.notifyTaskError(null, new Exception());
+ assertEquals(observer.getTaskState(), MinionTaskState.ERROR);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]