This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new cf4356738e handle pending minion tasks properly when getting the task
progress status (#9911)
cf4356738e is described below
commit cf4356738e6493b04c22416856f4ebbdcf9d54ae
Author: Xiaobing <[email protected]>
AuthorDate: Wed Dec 14 23:53:22 2022 -0800
handle pending minion tasks properly when getting the task progress status
(#9911)
* handle pending tasks properly when getting task progress status
* add test
---
.../core/minion/PinotHelixTaskResourceManager.java | 45 ++++++++++++---------
.../minion/PinotHelixTaskResourceManagerTest.java | 47 +++++++++++++++++++++-
2 files changed, 72 insertions(+), 20 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index 48e3717a72..4a2c0bad3c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -517,27 +517,41 @@ public class PinotHelixTaskResourceManager {
for (int partition : jobContext.getPartitionSet()) {
String subtaskName = jobContext.getTaskIdForPartition(partition);
String worker = jobContext.getAssignedParticipant(partition);
- allSubtasks.put(subtaskName, new String[]{worker,
jobContext.getPartitionState(partition).name()});
+ TaskPartitionState partitionState =
jobContext.getPartitionState(partition);
+ String taskState = partitionState == null ? null : partitionState.name();
+ allSubtasks.put(subtaskName, new String[]{worker, taskState});
+ LOGGER.debug("Subtask: {} is assigned to worker: {} with state: {} in
Helix", subtaskName, worker, taskState);
+ if (worker == null) {
+ continue;
+ }
if (selectedSubtasks.isEmpty() ||
selectedSubtasks.contains(subtaskName)) {
workerSelectedSubtasksMap.computeIfAbsent(worker, k -> new
HashSet<>()).add(subtaskName);
}
}
LOGGER.debug("Found subtasks on workers: {}", workerSelectedSubtasksMap);
List<String> workerUrls = new ArrayList<>();
- workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) ->
workerUrls.add(String
- .format("%s/tasks/subtask/progress?subtaskNames=%s",
workerEndpoints.get(workerId),
+ workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) ->
workerUrls.add(
+ String.format("%s/tasks/subtask/progress?subtaskNames=%s",
workerEndpoints.get(workerId),
StringUtils.join(subtasksOnWorker,
CommonConstants.Minion.TASK_LIST_SEPARATOR))));
LOGGER.debug("Getting task progress with workerUrls: {}", workerUrls);
// Scatter and gather progress from multiple workers.
Map<String, Object> subtaskProgressMap = new HashMap<>();
- CompletionServiceHelper.CompletionServiceResponse serviceResponse =
- completionServiceHelper.doMultiGetRequest(workerUrls, null, true,
requestHeaders, timeoutMs);
- for (Map.Entry<String, String> entry :
serviceResponse._httpResponses.entrySet()) {
- String worker = entry.getKey();
- String resp = entry.getValue();
- LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
- if (StringUtils.isNotEmpty(resp)) {
- subtaskProgressMap.putAll(JsonUtils.stringToObject(resp, Map.class));
+ if (!workerUrls.isEmpty()) {
+ CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+ completionServiceHelper.doMultiGetRequest(workerUrls, null, true,
requestHeaders, timeoutMs);
+ for (Map.Entry<String, String> entry :
serviceResponse._httpResponses.entrySet()) {
+ String worker = entry.getKey();
+ String resp = entry.getValue();
+ LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
+ if (StringUtils.isNotEmpty(resp)) {
+ subtaskProgressMap.putAll(JsonUtils.stringToObject(resp, Map.class));
+ }
+ }
+ if (serviceResponse._failedResponseCount > 0) {
+ // Instead of aborting, subtasks without worker side progress return
the task status tracked by Helix.
+ // The detailed worker failure response is logged as error by
CompletionServiceResponse for debugging.
+ LOGGER.warn("There were {} workers failed to report task progress. Got
partial progress info: {}",
+ serviceResponse._failedResponseCount, subtaskProgressMap);
}
}
// Check if any subtask missed their progress from the worker.
@@ -549,8 +563,9 @@ public class PinotHelixTaskResourceManager {
if (subtaskProgressMap.containsKey(subtaskName)) {
continue;
}
+ // Return the task progress status tracked by Helix.
String[] taskWorkerAndHelixState = allSubtasks.get(subtaskName);
- if (taskWorkerAndHelixState == null) {
+ if (taskWorkerAndHelixState == null || taskWorkerAndHelixState[0] ==
null) {
subtaskProgressMap.put(subtaskName, "No worker has run this subtask");
} else {
String taskWorker = taskWorkerAndHelixState[0];
@@ -559,12 +574,6 @@ public class PinotHelixTaskResourceManager {
String.format("No status from worker: %s. Got status: %s from
Helix", taskWorker, helixState));
}
}
- if (serviceResponse._failedResponseCount > 0) {
- // Subtasks without worker side progress are filled with status tracked
by Helix so return them back.
- // The detailed worker failure response is logged as error by
CompletionServiceResponse for debugging.
- LOGGER.warn("There were {} workers failed to report task progress. Got
partial progress info: {}",
- serviceResponse._failedResponseCount, subtaskProgressMap);
- }
return subtaskProgressMap;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index fb3e8412f0..bf354d5622 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -83,7 +83,7 @@ public class PinotHelixTaskResourceManagerTest {
when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(),
anyInt())).thenReturn(httpResp);
// Three workers to run 3 subtasks but got no progress status from workers.
httpResp._failedResponseCount = 3;
- String[] workers = new String[]{"worker01", "worker02", "worker03"};
+ String[] workers = new String[]{"worker0", "worker1", "worker2"};
Map<String, String> workerEndpoints = new HashMap<>();
for (String worker : workers) {
workerEndpoints.put(worker, "http://" + worker + ":9000");
@@ -122,7 +122,7 @@ public class PinotHelixTaskResourceManagerTest {
CompletionServiceHelper.CompletionServiceResponse httpResp =
new CompletionServiceHelper.CompletionServiceResponse();
when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(),
anyInt())).thenReturn(httpResp);
- String[] workers = new String[]{"worker01", "worker02", "worker03"};
+ String[] workers = new String[]{"worker0", "worker1", "worker2"};
Map<String, String> workerEndpoints = new HashMap<>();
for (String worker : workers) {
workerEndpoints.put(worker, "http://" + worker + ":9000");
@@ -150,4 +150,47 @@ public class PinotHelixTaskResourceManagerTest {
assertEquals(taskProgress, "running on worker: " + i);
}
}
+
+ @Test
+ public void testGetSubtaskProgressPending()
+ throws Exception {
+ TaskDriver taskDriver = mock(TaskDriver.class);
+ JobContext jobContext = mock(JobContext.class);
+ when(taskDriver.getJobContext(anyString())).thenReturn(jobContext);
+ PinotHelixTaskResourceManager mgr =
+ new
PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class),
taskDriver);
+ CompletionServiceHelper httpHelper = mock(CompletionServiceHelper.class);
+ CompletionServiceHelper.CompletionServiceResponse httpResp =
+ new CompletionServiceHelper.CompletionServiceResponse();
+ when(httpHelper.doMultiGetRequest(any(), any(), anyBoolean(), any(),
anyInt())).thenReturn(httpResp);
+ String[] workers = new String[]{"worker0", "worker1", "worker2"};
+ Map<String, String> workerEndpoints = new HashMap<>();
+ for (String worker : workers) {
+ workerEndpoints.put(worker, "http://" + worker + ":9000");
+ }
+ String taskName = "Task_SegmentGenerationAndPushTask_someone";
+ String[] subtaskNames = new String[3];
+ Set<Integer> subtaskIds = new HashSet<>();
+ for (int i = 0; i < 3; i++) {
+ subtaskIds.add(i);
+ subtaskNames[i] = taskName + "_" + i;
+ }
+ // Some subtasks are pending to be run.
+ TaskPartitionState[] helixStates = new
TaskPartitionState[]{TaskPartitionState.RUNNING, null, null};
+ httpResp._httpResponses.put(workers[0],
+ JsonUtils.objectToString(Collections.singletonMap(subtaskNames[0],
"running on worker: 0")));
+
when(jobContext.getTaskIdForPartition(anyInt())).thenReturn(subtaskNames[0],
subtaskNames[1], subtaskNames[2]);
+ when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0],
null, null);
+ when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0],
null, null);
+ when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
+ Map<String, Object> progress =
+ mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','),
httpHelper, workerEndpoints,
+ Collections.emptyMap(), 1000);
+ String taskProgress = (String) progress.get(subtaskNames[0]);
+ assertEquals(taskProgress, "running on worker: 0");
+ taskProgress = (String) progress.get(subtaskNames[1]);
+ assertEquals(taskProgress, "No worker has run this subtask");
+ taskProgress = (String) progress.get(subtaskNames[2]);
+ assertEquals(taskProgress, "No worker has run this subtask");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]