This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2d95baa100 Enhance Minion task management (#11315)
2d95baa100 is described below
commit 2d95baa100e940c6089c857d5cbc5e247c0bdb70
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Aug 13 17:06:31 2023 -0700
Enhance Minion task management (#11315)
---
.../api/resources/PinotTaskRestletResource.java | 2 +-
.../core/minion/PinotHelixTaskResourceManager.java | 337 +++++++++++----------
.../core/minion/generator/TaskGeneratorUtils.java | 51 +---
.../minion/PinotHelixTaskResourceManagerTest.java | 124 ++++----
.../tests/SimpleMinionClusterIntegrationTest.java | 1 -
5 files changed, 267 insertions(+), 248 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index ac305202e6..a727c858e2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -410,7 +410,7 @@ public class PinotTaskRestletResource {
@ApiOperation("Get the task state for the given task (deprecated)")
public StringResultResponse getTaskStateDeprecated(
@ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName) {
- return new
StringResultResponse(_pinotHelixTaskResourceManager.getTaskState(taskName).toString());
+ return new
StringResultResponse(String.valueOf(_pinotHelixTaskResourceManager.getTaskState(taskName)));
}
@GET
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index dba8dd37bb..86ef3cef9a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -24,8 +24,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBiMap;
-import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
@@ -54,8 +55,6 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
import org.apache.pinot.common.utils.DateTimeUtils;
-import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
-import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.core.common.MinionConstants;
@@ -118,25 +117,19 @@ public class PinotHelixTaskResourceManager {
*
* @param taskType Task type
*/
- public void ensureTaskQueueExists(String taskType) {
+ public synchronized void ensureTaskQueueExists(String taskType) {
String helixJobQueueName = getHelixJobQueueName(taskType);
WorkflowConfig workflowConfig =
_taskDriver.getWorkflowConfig(helixJobQueueName);
-
if (workflowConfig == null) {
// Task queue does not exist
LOGGER.info("Creating task queue: {} for task type: {}",
helixJobQueueName, taskType);
// Set full parallelism
// Don't allow overlap job assignment so that we can control number of
concurrent tasks per instance
- JobQueue jobQueue = new JobQueue.Builder(helixJobQueueName)
- .setWorkflowConfig(new
WorkflowConfig.Builder().setParallelJobs(Integer.MAX_VALUE).build()).build();
+ JobQueue jobQueue = new
JobQueue.Builder(helixJobQueueName).setWorkflowConfig(
+ new
WorkflowConfig.Builder().setParallelJobs(Integer.MAX_VALUE).build()).build();
_taskDriver.createQueue(jobQueue);
}
-
- // Wait until task queue context shows up
- while (_taskDriver.getWorkflowContext(helixJobQueueName) == null) {
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
}
/**
@@ -226,8 +219,14 @@ public class PinotHelixTaskResourceManager {
* @param taskType Task type
* @return Task queue state
*/
+ @Nullable
public synchronized TaskState getTaskQueueState(String taskType) {
- return
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getWorkflowState();
+ String helixJobQueueName = getHelixJobQueueName(taskType);
+ if (_taskDriver.getWorkflowConfig(helixJobQueueName) == null) {
+ return null;
+ }
+ WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(helixJobQueueName);
+ return workflowContext != null ? workflowContext.getWorkflowState() :
TaskState.NOT_STARTED;
}
/**
@@ -263,8 +262,8 @@ public class PinotHelixTaskResourceManager {
String taskType = pinotTaskConfigs.get(0).getTaskType();
String parentTaskName = getParentTaskName(taskType, UUID.randomUUID() +
"_" + System.currentTimeMillis());
- return submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag,
taskTimeoutMs,
- numConcurrentTasksPerInstance, maxAttemptsPerTask);
+ return submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag,
taskTimeoutMs, numConcurrentTasksPerInstance,
+ maxAttemptsPerTask);
}
/**
@@ -285,9 +284,9 @@ public class PinotHelixTaskResourceManager {
Preconditions.checkState(numConcurrentTasksPerInstance > 0);
String taskType = pinotTaskConfigs.get(0).getTaskType();
- LOGGER
- .info("Submitting parent task: {} of type: {} with {} child task
configs: {} to Minion instances with tag: {}",
- parentTaskName, taskType, numChildTasks, pinotTaskConfigs,
minionInstanceTag);
+ LOGGER.info(
+ "Submitting parent task: {} of type: {} with {} child task configs: {}
to Minion instances with tag: {}",
+ parentTaskName, taskType, numChildTasks, pinotTaskConfigs,
minionInstanceTag);
List<TaskConfig> helixTaskConfigs = new ArrayList<>(numChildTasks);
for (int i = 0; i < numChildTasks; i++) {
PinotTaskConfig pinotTaskConfig = pinotTaskConfigs.get(i);
@@ -305,11 +304,6 @@ public class PinotHelixTaskResourceManager {
.setFailureThreshold(Integer.MAX_VALUE).setExpiry(_taskExpireTimeMs);
_taskDriver.enqueueJob(getHelixJobQueueName(taskType), parentTaskName,
jobBuilder);
- // Wait until task state is available
- while (getTaskState(parentTaskName) == null) {
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
-
return parentTaskName;
}
@@ -320,7 +314,11 @@ public class PinotHelixTaskResourceManager {
* @return Set of task names
*/
public synchronized Set<String> getTasks(String taskType) {
- Set<String> helixJobs =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobStates().keySet();
+ String helixJobQueueName = getHelixJobQueueName(taskType);
+ WorkflowConfig workflowConfig =
_taskDriver.getWorkflowConfig(helixJobQueueName);
+ Preconditions.checkArgument(workflowConfig != null, "Task queue: %s for
task type: %s does not exist",
+ helixJobQueueName, taskType);
+ Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
Set<String> tasks = new HashSet<>(helixJobs.size());
for (String helixJobName : helixJobs) {
tasks.add(getPinotTaskName(helixJobName));
@@ -330,40 +328,58 @@ public class PinotHelixTaskResourceManager {
/**
* Get all task states for the given task type.
+ * NOTE: For tasks just submitted without the context created, count them as
NOT_STARTED.
*
* @param taskType Task type
* @return Map from task name to task state
*/
public synchronized Map<String, TaskState> getTaskStates(String taskType) {
- Map<String, TaskState> helixJobStates = new HashMap<>();
+ String helixJobQueueName = getHelixJobQueueName(taskType);
+ WorkflowConfig workflowConfig =
_taskDriver.getWorkflowConfig(helixJobQueueName);
+ Preconditions.checkArgument(workflowConfig != null, "Task queue: %s for
task type: %s does not exist",
+ helixJobQueueName, taskType);
+ Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
+ if (helixJobs.isEmpty()) {
+ return Collections.emptyMap();
+ }
WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
-
if (workflowContext == null) {
- return helixJobStates;
- }
- helixJobStates = workflowContext.getJobStates();
- Map<String, TaskState> taskStates = new HashMap<>(helixJobStates.size());
- for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
- taskStates.put(getPinotTaskName(entry.getKey()), entry.getValue());
+ return helixJobs.stream()
+
.collect(Collectors.toMap(PinotHelixTaskResourceManager::getPinotTaskName,
ignored -> TaskState.NOT_STARTED));
+ } else {
+ Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
+ return
helixJobs.stream().collect(Collectors.toMap(PinotHelixTaskResourceManager::getPinotTaskName,
+ helixJobName -> helixJobStates.getOrDefault(helixJobName,
TaskState.NOT_STARTED)));
}
- return taskStates;
}
/**
* This method returns a count of sub-tasks in various states, given the
top-level task name.
- * @param parentTaskName in the form "Task_<taskType>_<uuid>_<timestamp>"
+ *
+ * @param taskName in the form "Task_<taskType>_<uuid>_<timestamp>"
* @return TaskCount object
*/
- public synchronized TaskCount getTaskCount(String parentTaskName) {
+ public synchronized TaskCount getTaskCount(String taskName) {
+ String helixJobName = getHelixJobName(taskName);
+ JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+ Preconditions.checkArgument(jobConfig != null, "Task: %s does not exist",
taskName);
+ Set<String> subtasks = jobConfig.getTaskConfigMap().keySet();
TaskCount taskCount = new TaskCount();
- JobContext jobContext =
_taskDriver.getJobContext(getHelixJobName(parentTaskName));
-
+ JobContext jobContext = _taskDriver.getJobContext(helixJobName);
if (jobContext == null) {
+ int numSubtasks = subtasks.size();
+ for (int i = 0; i < numSubtasks; i++) {
+ taskCount.addTaskState(null);
+ }
return taskCount;
}
- Set<Integer> partitionSet = jobContext.getPartitionSet();
- for (int partition : partitionSet) {
- TaskPartitionState state = jobContext.getPartitionState(partition);
+ Map<String, Integer> taskIdPartitionMap =
jobContext.getTaskIdPartitionMap();
+ for (String taskId : subtasks) {
+ TaskPartitionState state = null;
+ Integer partition = taskIdPartitionMap.get(taskId);
+ if (partition != null) {
+ state = jobContext.getPartitionState(partition);
+ }
taskCount.addTaskState(state);
}
return taskCount;
@@ -371,81 +387,91 @@ public class PinotHelixTaskResourceManager {
/**
* This method returns a map of table name to count of sub-tasks in various
states, given the top-level task name.
+ *
* @param taskName in the form "Task_<taskType>_<uuid>_<timestamp>"
* @return a map of table name to {@link TaskCount}
*/
public synchronized Map<String, TaskCount> getTableTaskCount(String
taskName) {
- Map<String, TaskPartitionState> subtaskStates = getSubtaskStates(taskName);
- if (subtaskStates.isEmpty()) {
- return Collections.emptyMap();
- }
-
- JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
- // in theory, this should not happen because we have already checked
JobContext
- if (jobConfig == null) {
- LOGGER.warn("task {} has job context but its job config does not exist",
taskName);
- return Collections.emptyMap();
- }
-
- Map<String, TaskCount> tableTaskCountMap = new HashMap<>();
- subtaskStates.forEach((taskId, taskState) -> {
- TaskConfig taskConfig = jobConfig.getTaskConfig(taskId);
- String tableNameWithType;
- // in theory, this should not happen because jobContext has this taskId
- if (taskConfig == null) {
- LOGGER.warn("sub-task {} exists in helix job context but its task
config does not exist", taskId);
- tableNameWithType = UNKNOWN_TABLE_NAME;
- } else {
- tableNameWithType =
taskConfig.getConfigMap().getOrDefault(MinionConstants.TABLE_NAME_KEY,
UNKNOWN_TABLE_NAME);
+ String helixJobName = getHelixJobName(taskName);
+ JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+ Preconditions.checkArgument(jobConfig != null, "Task: %s does not exist",
taskName);
+ Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+ Map<String, TaskCount> taskCountMap = new HashMap<>();
+ JobContext jobContext = _taskDriver.getJobContext(helixJobName);
+ if (jobContext == null) {
+ for (TaskConfig taskConfig : taskConfigMap.values()) {
+ String tableName =
taskConfig.getConfigMap().getOrDefault(MinionConstants.TABLE_NAME_KEY,
UNKNOWN_TABLE_NAME);
+ taskCountMap.computeIfAbsent(tableName, k -> new
TaskCount()).addTaskState(null);
}
- tableTaskCountMap.compute(tableNameWithType, (name, taskCount) -> {
- if (taskCount == null) {
- taskCount = new TaskCount();
- }
- taskCount.addTaskState(taskState);
- return taskCount;
- });
- });
- return tableTaskCountMap;
+ return taskCountMap;
+ }
+ Map<String, Integer> taskIdPartitionMap =
jobContext.getTaskIdPartitionMap();
+ for (Map.Entry<String, TaskConfig> entry : taskConfigMap.entrySet()) {
+ String taskId = entry.getKey();
+ TaskPartitionState state = null;
+ Integer partition = taskIdPartitionMap.get(taskId);
+ if (partition != null) {
+ state = jobContext.getPartitionState(partition);
+ }
+ TaskConfig taskConfig = entry.getValue();
+ String tableName =
taskConfig.getConfigMap().getOrDefault(MinionConstants.TABLE_NAME_KEY,
UNKNOWN_TABLE_NAME);
+ taskCountMap.computeIfAbsent(tableName, k -> new
TaskCount()).addTaskState(state);
+ }
+ return taskCountMap;
}
/**
* Returns a set of Task names (in the form
"Task_<taskType>_<uuid>_<timestamp>") that are in progress or not started
* yet.
+ * NOTE: For tasks just submitted without the context created, count them as
NOT_STARTED.
*
- * @param taskType
+ * @param taskType Task type
* @return Set of task names
*/
public synchronized Set<String> getTasksInProgress(String taskType) {
- Set<String> tasksInProgress = new HashSet<>();
+ WorkflowConfig workflowConfig =
_taskDriver.getWorkflowConfig(getHelixJobQueueName(taskType));
+ if (workflowConfig == null) {
+ return Collections.emptySet();
+ }
+ Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
+ if (helixJobs.isEmpty()) {
+ return Collections.emptySet();
+ }
WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
if (workflowContext == null) {
- return tasksInProgress;
- }
-
- Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
-
- for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
- if (entry.getValue().equals(TaskState.NOT_STARTED) ||
entry.getValue().equals(TaskState.IN_PROGRESS)) {
- tasksInProgress.add(getPinotTaskName(entry.getKey()));
- }
+ return
helixJobs.stream().map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
+ } else {
+ Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
+ return helixJobs.stream().filter(helixJobName -> {
+ TaskState taskState = helixJobStates.get(helixJobName);
+ return taskState == null || taskState == TaskState.NOT_STARTED ||
taskState == TaskState.IN_PROGRESS;
+
}).map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
}
- return tasksInProgress;
}
/**
* Get the task state for the given task name.
+ * NOTE: For tasks just submitted without the context created, count them as
NOT_STARTED.
*
* @param taskName Task name
* @return Task state
*/
+ @Nullable
public synchronized TaskState getTaskState(String taskName) {
+ String helixJobName = getHelixJobName(taskName);
+ JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+ if (jobConfig == null) {
+ return null;
+ }
String taskType = getTaskType(taskName);
WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
- if (workflowContext == null) {
- throw new UnknownTaskTypeException("Workflow context for task type
doesn't exist: " + taskType);
+ if (workflowContext != null) {
+ TaskState jobState = workflowContext.getJobState(helixJobName);
+ if (jobState != null) {
+ return jobState;
+ }
}
- return workflowContext.getJobState(getHelixJobName(taskName));
+ return TaskState.NOT_STARTED;
}
/**
@@ -455,22 +481,26 @@ public class PinotHelixTaskResourceManager {
* @return states of all the sub tasks
*/
public synchronized Map<String, TaskPartitionState> getSubtaskStates(String
taskName) {
- String taskType = getTaskType(taskName);
- WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
- if (workflowContext == null) {
- return Collections.emptyMap();
- }
String helixJobName = getHelixJobName(taskName);
+ JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+ Preconditions.checkArgument(jobConfig != null, "Task: %s does not exist",
taskName);
+ Set<String> subtasks = jobConfig.getTaskConfigMap().keySet();
+ Map<String, TaskPartitionState> subtaskStates = new HashMap<>();
JobContext jobContext = _taskDriver.getJobContext(helixJobName);
if (jobContext == null) {
- return Collections.emptyMap();
- }
- Map<String, TaskPartitionState> subtaskStates = new HashMap<>();
- Set<Integer> partitionSet = jobContext.getPartitionSet();
- for (int partition : partitionSet) {
- String taskIdForPartition = jobContext.getTaskIdForPartition(partition);
- TaskPartitionState partitionState =
jobContext.getPartitionState(partition);
- subtaskStates.put(taskIdForPartition, partitionState);
+ for (String taskId : subtasks) {
+ subtaskStates.put(taskId, null);
+ }
+ return subtaskStates;
+ }
+ Map<String, Integer> taskIdPartitionMap =
jobContext.getTaskIdPartitionMap();
+ for (String taskId : subtasks) {
+ TaskPartitionState state = null;
+ Integer partition = taskIdPartitionMap.get(taskId);
+ if (partition != null) {
+ state = jobContext.getPartitionState(partition);
+ }
+ subtaskStates.put(taskId, state);
}
return subtaskStates;
}
@@ -550,39 +580,51 @@ public class PinotHelixTaskResourceManager {
Map<String, String> requestHeaders, int timeoutMs)
throws Exception {
String helixJobName = getHelixJobName(taskName);
+ JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+ Preconditions.checkArgument(jobConfig != null, "Task: %s does not exist",
taskName);
+ Set<String> subtasks = jobConfig.getTaskConfigMap().keySet();
+ Set<String> selectedSubtasks;
+ if (StringUtils.isNotEmpty(subtaskNames)) {
+ selectedSubtasks = new
HashSet<>(Arrays.asList(StringUtils.split(subtaskNames, ',')));
+ selectedSubtasks.retainAll(subtasks);
+ } else {
+ selectedSubtasks = subtasks;
+ }
+ if (selectedSubtasks.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Map<String, Object> subtaskProgressMap = new HashMap<>();
JobContext jobContext = _taskDriver.getJobContext(helixJobName);
if (jobContext == null) {
- throw new NoTaskScheduledException("No task scheduled with name: " +
helixJobName);
- }
- Set<String> selectedSubtasks = new HashSet<>();
- if (StringUtils.isNotEmpty(subtaskNames)) {
- Collections.addAll(selectedSubtasks, StringUtils.split(subtaskNames,
','));
+ for (String taskId : selectedSubtasks) {
+ subtaskProgressMap.put(taskId, "No worker has run this subtask");
+ }
+ return subtaskProgressMap;
}
// The worker running the subtask and task state as tracked by helix.
- Map<String, String[]> allSubtasks = new HashMap<>();
- Map<String, Set<String>> workerSelectedSubtasksMap = new HashMap<>();
- for (int partition : jobContext.getPartitionSet()) {
- String subtaskName = jobContext.getTaskIdForPartition(partition);
- String worker = jobContext.getAssignedParticipant(partition);
- TaskPartitionState partitionState =
jobContext.getPartitionState(partition);
- String taskState = partitionState == null ? null : partitionState.name();
- allSubtasks.put(subtaskName, new String[]{worker, taskState});
- LOGGER.debug("Subtask: {} is assigned to worker: {} with state: {} in
Helix", subtaskName, worker, taskState);
- if (worker == null) {
- continue;
+ Map<String, Pair<String, TaskPartitionState>> subtaskWorkerAndStateMap =
new HashMap<>();
+ Map<String, Set<String>> workerSubtasksMap = new HashMap<>();
+ Map<String, Integer> taskIdPartitionMap =
jobContext.getTaskIdPartitionMap();
+ for (String taskId : selectedSubtasks) {
+ String worker = null;
+ TaskPartitionState state = null;
+ Integer partition = taskIdPartitionMap.get(taskId);
+ if (partition != null) {
+ worker = jobContext.getAssignedParticipant(partition);
+ state = jobContext.getPartitionState(partition);
}
- if (selectedSubtasks.isEmpty() ||
selectedSubtasks.contains(subtaskName)) {
- workerSelectedSubtasksMap.computeIfAbsent(worker, k -> new
HashSet<>()).add(subtaskName);
+ subtaskWorkerAndStateMap.put(taskId, Pair.of(worker, state));
+ if (worker != null) {
+ workerSubtasksMap.computeIfAbsent(worker, k -> new
HashSet<>()).add(taskId);
}
}
- LOGGER.debug("Found subtasks on workers: {}", workerSelectedSubtasksMap);
+ LOGGER.debug("Found subtasks on workers: {}", workerSubtasksMap);
List<String> workerUrls = new ArrayList<>();
- workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) ->
workerUrls.add(
+ workerSubtasksMap.forEach((workerId, subtasksOnWorker) -> workerUrls.add(
String.format("%s/tasks/subtask/progress?subtaskNames=%s",
workerEndpoints.get(workerId),
StringUtils.join(subtasksOnWorker,
CommonConstants.Minion.TASK_LIST_SEPARATOR))));
LOGGER.debug("Getting task progress with workerUrls: {}", workerUrls);
// Scatter and gather progress from multiple workers.
- Map<String, Object> subtaskProgressMap = new HashMap<>();
if (!workerUrls.isEmpty()) {
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
completionServiceHelper.doMultiGetRequest(workerUrls, null, true,
requestHeaders, timeoutMs);
@@ -603,22 +645,18 @@ public class PinotHelixTaskResourceManager {
}
// Check if any subtask missed their progress from the worker.
// And simply check all subtasks if no subtasks are specified.
- if (selectedSubtasks.isEmpty()) {
- selectedSubtasks = allSubtasks.keySet();
- }
- for (String subtaskName : selectedSubtasks) {
- if (subtaskProgressMap.containsKey(subtaskName)) {
+ for (String taskId : selectedSubtasks) {
+ if (subtaskProgressMap.containsKey(taskId)) {
continue;
}
// Return the task progress status tracked by Helix.
- String[] taskWorkerAndHelixState = allSubtasks.get(subtaskName);
- if (taskWorkerAndHelixState == null || taskWorkerAndHelixState[0] ==
null) {
- subtaskProgressMap.put(subtaskName, "No worker has run this subtask");
+ Pair<String, TaskPartitionState> workerAndState =
subtaskWorkerAndStateMap.get(taskId);
+ if (workerAndState.getLeft() == null) {
+ subtaskProgressMap.put(taskId, "No worker has run this subtask");
} else {
- String taskWorker = taskWorkerAndHelixState[0];
- String helixState = taskWorkerAndHelixState[1];
- subtaskProgressMap.put(subtaskName,
- String.format("No status from worker: %s. Got status: %s from
Helix", taskWorker, helixState));
+ subtaskProgressMap.put(taskId,
+ String.format("No status from worker: %s. Got status: %s from
Helix", workerAndState.getLeft(),
+ workerAndState.getRight()));
}
}
return subtaskProgressMap;
@@ -634,26 +672,24 @@ public class PinotHelixTaskResourceManager {
* @param timeoutMs timeout (in millisecond) for requests sent to minion
workers
* @return a map of minion worker id to subtask progress
*/
- public synchronized Map<String, Object> getSubtaskOnWorkerProgress(String
subtaskState,
- Executor executor, HttpClientConnectionManager connMgr, Map<String,
String> selectedMinionWorkerEndpoints,
+ public synchronized Map<String, Object> getSubtaskOnWorkerProgress(String
subtaskState, Executor executor,
+ HttpClientConnectionManager connMgr, Map<String, String>
selectedMinionWorkerEndpoints,
Map<String, String> requestHeaders, int timeoutMs)
throws JsonProcessingException {
- return getSubtaskOnWorkerProgress(subtaskState,
- new CompletionServiceHelper(executor, connMgr, HashBiMap.create(0)),
selectedMinionWorkerEndpoints,
- requestHeaders, timeoutMs);
+ return getSubtaskOnWorkerProgress(subtaskState, new
CompletionServiceHelper(executor, connMgr, HashBiMap.create(0)),
+ selectedMinionWorkerEndpoints, requestHeaders, timeoutMs);
}
@VisibleForTesting
- Map<String, Object> getSubtaskOnWorkerProgress(String subtaskState,
- CompletionServiceHelper completionServiceHelper, Map<String, String>
selectedMinionWorkerEndpoints,
- Map<String, String> requestHeaders, int timeoutMs)
+ Map<String, Object> getSubtaskOnWorkerProgress(String subtaskState,
CompletionServiceHelper completionServiceHelper,
+ Map<String, String> selectedMinionWorkerEndpoints, Map<String, String>
requestHeaders, int timeoutMs)
throws JsonProcessingException {
Map<String, Object> minionWorkerIdSubtaskProgressMap = new HashMap<>();
if (selectedMinionWorkerEndpoints.isEmpty()) {
return minionWorkerIdSubtaskProgressMap;
}
- Map<String, String> minionWorkerUrlToWorkerIdMap =
selectedMinionWorkerEndpoints.entrySet().stream()
- .collect(Collectors.toMap(
+ Map<String, String> minionWorkerUrlToWorkerIdMap =
selectedMinionWorkerEndpoints.entrySet().stream().collect(
+ Collectors.toMap(
entry ->
String.format("%s/tasks/subtask/state/progress?subTaskState=%s",
entry.getValue(), subtaskState),
Map.Entry::getKey));
List<String> workerUrls = new
ArrayList<>(minionWorkerUrlToWorkerIdMap.keySet());
@@ -665,8 +701,8 @@ public class PinotHelixTaskResourceManager {
String worker = entry.getKey();
String resp = entry.getValue();
LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
- minionWorkerIdSubtaskProgressMap
- .put(minionWorkerUrlToWorkerIdMap.get(worker),
JsonUtils.stringToObject(resp, Map.class));
+
minionWorkerIdSubtaskProgressMap.put(minionWorkerUrlToWorkerIdMap.get(worker),
+ JsonUtils.stringToObject(resp, Map.class));
}
if (serviceResponse._failedResponseCount > 0) {
// Instead of aborting, subtasks without worker side progress return the
task status tracked by Helix.
@@ -713,15 +749,10 @@ public class PinotHelixTaskResourceManager {
* @return Map of Pinot Task Name to TaskCount
*/
public synchronized Map<String, TaskCount> getTaskCounts(String taskType) {
+ Set<String> tasks = getTasks(taskType);
Map<String, TaskCount> taskCounts = new TreeMap<>();
- WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
- if (workflowContext == null) {
- return taskCounts;
- }
- Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
- for (String helixJobName : helixJobStates.keySet()) {
- String pinotTaskName = getPinotTaskName(helixJobName);
- taskCounts.put(pinotTaskName, getTaskCount(pinotTaskName));
+ for (String taskName : tasks) {
+ taskCounts.put(taskName, getTaskCount(taskName));
}
return taskCounts;
}
@@ -884,7 +915,7 @@ public class PinotHelixTaskResourceManager {
* @param pinotTaskName Pinot task name
* @return helixJobName Helix Job name
*/
- private static String getHelixJobName(String pinotTaskName) {
+ public static String getHelixJobName(String pinotTaskName) {
return getHelixJobQueueName(getTaskType(pinotTaskName)) +
TASK_NAME_SEPARATOR + pinotTaskName;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
index 9a6ce3c251..4f6c3c28e3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
@@ -24,11 +24,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
-import javax.annotation.Nonnull;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.data.Segment;
import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
-import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -37,8 +35,6 @@ public class TaskGeneratorUtils {
private TaskGeneratorUtils() {
}
- private static final long ONE_DAY_IN_MILLIS = 24 * 60 * 60 * 1000L;
-
/**
* If task is in final state, it will not be running any more. But note that
* STOPPED is not a final task state in helix task framework, as a stopped
task
@@ -48,31 +44,17 @@ public class TaskGeneratorUtils {
EnumSet.of(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED,
TaskState.TIMED_OUT);
/**
- * Returns all the segments that have been scheduled in one day but not
finished.
- * <p>
- * NOTE: we consider tasks not finished in one day as stuck and don't count
the segments in them
- *
- * @param taskType Task type
- * @param clusterInfoAccessor Cluster info accessor
- * @return Set of running segments
+ * Returns all the segments that have been scheduled but not finished.
*/
- public static Set<Segment> getRunningSegments(@Nonnull String taskType,
- @Nonnull ClusterInfoAccessor clusterInfoAccessor) {
+ public static Set<Segment> getRunningSegments(String taskType,
ClusterInfoAccessor clusterInfoAccessor) {
Set<Segment> runningSegments = new HashSet<>();
Map<String, TaskState> taskStates =
clusterInfoAccessor.getTaskStates(taskType);
for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
- // Skip COMPLETED tasks
- if (entry.getValue() == TaskState.COMPLETED) {
+ if (TASK_FINAL_STATES.contains(entry.getValue())) {
continue;
}
-
- // Skip tasks scheduled for more than one day
String taskName = entry.getKey();
- if (isTaskOlderThanOneDay(taskName)) {
- continue;
- }
-
- for (PinotTaskConfig pinotTaskConfig :
clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
+ for (PinotTaskConfig pinotTaskConfig :
clusterInfoAccessor.getTaskConfigs(taskName)) {
Map<String, String> configs = pinotTaskConfig.getConfigs();
runningSegments.add(
new Segment(configs.get(MinionConstants.TABLE_NAME_KEY),
configs.get(MinionConstants.SEGMENT_NAME_KEY)));
@@ -82,30 +64,24 @@ public class TaskGeneratorUtils {
}
/**
- * Gets all the tasks for the provided task type and tableName, which do not
have TaskState COMPLETED
- * @return map containing task name to task state for non-completed tasks
- *
- * NOTE: we consider tasks not finished in one day as stuck and don't count
them
+ * Gets all the tasks for the provided task type and tableName, which have
not reached final task state yet.
*/
public static Map<String, TaskState> getIncompleteTasks(String taskType,
String tableNameWithType,
ClusterInfoAccessor clusterInfoAccessor) {
- Map<String, TaskState> nonCompletedTasks = new HashMap<>();
+ Map<String, TaskState> incompleteTasks = new HashMap<>();
Map<String, TaskState> taskStates =
clusterInfoAccessor.getTaskStates(taskType);
for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
- if (entry.getValue() == TaskState.COMPLETED) {
+ if (TASK_FINAL_STATES.contains(entry.getValue())) {
continue;
}
String taskName = entry.getKey();
- if (isTaskOlderThanOneDay(taskName)) {
- continue;
- }
for (PinotTaskConfig pinotTaskConfig :
clusterInfoAccessor.getTaskConfigs(taskName)) {
if
(tableNameWithType.equals(pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY)))
{
- nonCompletedTasks.put(taskName, entry.getValue());
+ incompleteTasks.put(taskName, entry.getValue());
}
}
}
- return nonCompletedTasks;
+ return incompleteTasks;
}
/**
@@ -130,13 +106,4 @@ public class TaskGeneratorUtils {
}
}
}
-
- /**
- * Returns true if task's schedule time is older than 1d
- */
- private static boolean isTaskOlderThanOneDay(String taskName) {
- long scheduleTimeMs =
-
Long.parseLong(taskName.substring(taskName.lastIndexOf(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR)
+ 1));
- return System.currentTimeMillis() - scheduleTimeMs > ONE_DAY_IN_MILLIS;
- }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index 5706d402ab..9cbe2f5cec 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.helix.core.minion;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -35,7 +34,6 @@ import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.WorkflowContext;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -55,11 +53,13 @@ import static org.testng.Assert.assertTrue;
public class PinotHelixTaskResourceManagerTest {
+
@Test
public void testGetSubtaskProgressNoWorker()
throws Exception {
TaskDriver taskDriver = mock(TaskDriver.class);
-
when(taskDriver.getJobContext(anyString())).thenReturn(mock(JobContext.class));
+ JobConfig jobConfig = mock(JobConfig.class);
+ when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
CompletionServiceHelper httpHelper = mock(CompletionServiceHelper.class);
CompletionServiceHelper.CompletionServiceResponse httpResp =
new CompletionServiceHelper.CompletionServiceResponse();
@@ -73,6 +73,11 @@ public class PinotHelixTaskResourceManagerTest {
for (int i = 0; i < 3; i++) {
subtaskNames[i] = taskName + "_" + i;
}
+ Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+ for (String subtaskName : subtaskNames) {
+ taskConfigMap.put(subtaskName, mock(TaskConfig.class));
+ }
+ when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
Map<String, Object> progress =
mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','),
httpHelper, workerEndpoints,
Collections.emptyMap(), 1000);
@@ -85,6 +90,8 @@ public class PinotHelixTaskResourceManagerTest {
public void testGetSubtaskProgressNoResponse()
throws Exception {
TaskDriver taskDriver = mock(TaskDriver.class);
+ JobConfig jobConfig = mock(JobConfig.class);
+ when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
JobContext jobContext = mock(JobContext.class);
when(taskDriver.getJobContext(anyString())).thenReturn(jobContext);
PinotHelixTaskResourceManager mgr =
@@ -102,17 +109,23 @@ public class PinotHelixTaskResourceManagerTest {
}
String taskName = "Task_SegmentGenerationAndPushTask_someone";
String[] subtaskNames = new String[3];
- Set<Integer> subtaskIds = new HashSet<>();
+ Map<String, Integer> taskIdPartitionMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
- subtaskIds.add(i);
- subtaskNames[i] = taskName + "_" + i;
+ String subtaskName = taskName + "_" + i;
+ subtaskNames[i] = subtaskName;
+ taskIdPartitionMap.put(subtaskName, i);
}
+ Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+ for (String subtaskName : subtaskNames) {
+ taskConfigMap.put(subtaskName, mock(TaskConfig.class));
+ }
+ when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
TaskPartitionState[] helixStates =
new TaskPartitionState[]{TaskPartitionState.INIT,
TaskPartitionState.RUNNING, TaskPartitionState.TASK_ERROR};
-
when(jobContext.getTaskIdForPartition(anyInt())).thenReturn(subtaskNames[0],
subtaskNames[1], subtaskNames[2]);
- when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0],
workers[1], workers[2]);
- when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0],
helixStates[1], helixStates[2]);
- when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
+ when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+ when(jobContext.getAssignedParticipant(anyInt())).thenAnswer(
+ invocation -> workers[(int) invocation.getArgument(0)]);
+ when(jobContext.getPartitionState(anyInt())).thenAnswer(invocation ->
helixStates[(int) invocation.getArgument(0)]);
Map<String, Object> progress =
mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','),
httpHelper, workerEndpoints,
Collections.emptyMap(), 1000);
@@ -126,6 +139,8 @@ public class PinotHelixTaskResourceManagerTest {
public void testGetSubtaskProgressWithResponse()
throws Exception {
TaskDriver taskDriver = mock(TaskDriver.class);
+ JobConfig jobConfig = mock(JobConfig.class);
+ when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
JobContext jobContext = mock(JobContext.class);
when(taskDriver.getJobContext(anyString())).thenReturn(jobContext);
PinotHelixTaskResourceManager mgr =
@@ -141,19 +156,25 @@ public class PinotHelixTaskResourceManagerTest {
}
String taskName = "Task_SegmentGenerationAndPushTask_someone";
String[] subtaskNames = new String[3];
- Set<Integer> subtaskIds = new HashSet<>();
+ Map<String, Integer> taskIdPartitionMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
- subtaskIds.add(i);
- subtaskNames[i] = taskName + "_" + i;
+ String subtaskName = taskName + "_" + i;
+ subtaskNames[i] = subtaskName;
+ taskIdPartitionMap.put(subtaskName, i);
httpResp._httpResponses.put(workers[i],
JsonUtils.objectToString(Collections.singletonMap(subtaskNames[i],
"running on worker: " + i)));
}
+ Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+ for (String subtaskName : subtaskNames) {
+ taskConfigMap.put(subtaskName, mock(TaskConfig.class));
+ }
+ when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
TaskPartitionState[] helixStates =
new TaskPartitionState[]{TaskPartitionState.INIT,
TaskPartitionState.RUNNING, TaskPartitionState.TASK_ERROR};
-
when(jobContext.getTaskIdForPartition(anyInt())).thenReturn(subtaskNames[0],
subtaskNames[1], subtaskNames[2]);
- when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0],
workers[1], workers[2]);
- when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0],
helixStates[1], helixStates[2]);
- when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
+ when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+ when(jobContext.getAssignedParticipant(anyInt())).thenAnswer(
+ invocation -> workers[(int) invocation.getArgument(0)]);
+ when(jobContext.getPartitionState(anyInt())).thenAnswer(invocation ->
helixStates[(int) invocation.getArgument(0)]);
Map<String, Object> progress =
mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','),
httpHelper, workerEndpoints,
Collections.emptyMap(), 1000);
@@ -167,6 +188,8 @@ public class PinotHelixTaskResourceManagerTest {
public void testGetSubtaskProgressPending()
throws Exception {
TaskDriver taskDriver = mock(TaskDriver.class);
+ JobConfig jobConfig = mock(JobConfig.class);
+ when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
JobContext jobContext = mock(JobContext.class);
when(taskDriver.getJobContext(anyString())).thenReturn(jobContext);
PinotHelixTaskResourceManager mgr =
@@ -182,19 +205,23 @@ public class PinotHelixTaskResourceManagerTest {
}
String taskName = "Task_SegmentGenerationAndPushTask_someone";
String[] subtaskNames = new String[3];
- Set<Integer> subtaskIds = new HashSet<>();
+ Map<String, Integer> taskIdPartitionMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
- subtaskIds.add(i);
- subtaskNames[i] = taskName + "_" + i;
+ String subtaskName = taskName + "_" + i;
+ subtaskNames[i] = subtaskName;
+ taskIdPartitionMap.put(subtaskName, i);
+ }
+ Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+ for (String subtaskName : subtaskNames) {
+ taskConfigMap.put(subtaskName, mock(TaskConfig.class));
}
- // Some subtasks are pending to be run.
- TaskPartitionState[] helixStates = new
TaskPartitionState[]{TaskPartitionState.RUNNING, null, null};
+ when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
+ // Some subtasks are pending to be run
httpResp._httpResponses.put(workers[0],
JsonUtils.objectToString(Collections.singletonMap(subtaskNames[0],
"running on worker: 0")));
-
when(jobContext.getTaskIdForPartition(anyInt())).thenReturn(subtaskNames[0],
subtaskNames[1], subtaskNames[2]);
- when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0],
null, null);
- when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0],
null, null);
- when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
+ when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+ when(jobContext.getAssignedParticipant(0)).thenReturn(workers[0]);
+
when(jobContext.getPartitionState(0)).thenReturn(TaskPartitionState.RUNNING);
Map<String, Object> progress =
mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','),
httpHelper, workerEndpoints,
Collections.emptyMap(), 1000);
@@ -215,8 +242,8 @@ public class PinotHelixTaskResourceManagerTest {
// No worker to run subtasks.
Map<String, String> selectedMinionWorkerEndpoints = new HashMap<>();
Map<String, Object> progress =
- mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper,
- selectedMinionWorkerEndpoints, Collections.emptyMap(), 1000);
+ mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper,
selectedMinionWorkerEndpoints, Collections.emptyMap(),
+ 1000);
assertTrue(progress.isEmpty());
verify(httpHelper, Mockito.never()).doMultiGetRequest(any(), any(),
anyBoolean(), any(), anyInt());
}
@@ -247,19 +274,17 @@ public class PinotHelixTaskResourceManagerTest {
}
httpResp._failedResponseCount = 1;
ArgumentCaptor<List<String>> workerEndpointCaptor =
ArgumentCaptor.forClass(List.class);
- when(httpHelper.doMultiGetRequest(workerEndpointCaptor.capture(), any(),
anyBoolean(), any(), anyInt()))
- .thenReturn(httpResp);
+ when(httpHelper.doMultiGetRequest(workerEndpointCaptor.capture(), any(),
anyBoolean(), any(), anyInt())).thenReturn(
+ httpResp);
PinotHelixTaskResourceManager mgr =
new
PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class),
mock(TaskDriver.class));
-
Map<String, Object> progress =
- mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper,
selectedMinionWorkerEndpoints,
- Collections.emptyMap(), 1000);
+ mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper,
selectedMinionWorkerEndpoints, Collections.emptyMap(),
+ 1000);
List<String> value = workerEndpointCaptor.getValue();
- Set<String> expectedWorkerUrls =
selectedMinionWorkerEndpoints.values().stream()
- .map(workerEndpoint
- ->
String.format("%s/tasks/subtask/state/progress?subTaskState=IN_PROGRESS",
workerEndpoint))
+ Set<String> expectedWorkerUrls =
selectedMinionWorkerEndpoints.values().stream().map(
+ workerEndpoint ->
String.format("%s/tasks/subtask/state/progress?subTaskState=IN_PROGRESS",
workerEndpoint))
.collect(Collectors.toSet());
assertEquals(new HashSet<>(value), expectedWorkerUrls);
assertEquals(progress.size(), 3);
@@ -274,34 +299,31 @@ public class PinotHelixTaskResourceManagerTest {
@Test
public void testGetTableTaskCount() {
- String taskType = "TestTask";
String taskName = "Task_TestTask_12345";
- String helixJobQueueName =
PinotHelixTaskResourceManager.getHelixJobQueueName(taskType);
- String helixJobName = helixJobQueueName + "_" + taskName;
-
+ String helixJobName =
PinotHelixTaskResourceManager.getHelixJobName(taskName);
TaskDriver taskDriver = mock(TaskDriver.class);
- WorkflowContext workflowContext = mock(WorkflowContext.class);
-
when(taskDriver.getWorkflowContext(helixJobQueueName)).thenReturn(workflowContext);
-
+ JobConfig jobConfig = mock(JobConfig.class);
+ when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
+ Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+ taskConfigMap.put("taskId0", new TaskConfig("", new HashMap<>()));
+ taskConfigMap.put("taskId1",
+ new TaskConfig("", new HashMap<>(Collections.singletonMap("tableName",
"table1_OFFLINE"))));
+ when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
JobContext jobContext = mock(JobContext.class);
when(taskDriver.getJobContext(helixJobName)).thenReturn(jobContext);
- when(jobContext.getPartitionSet()).thenReturn(ImmutableSet.of(0, 1));
+ Map<String, Integer> taskIdPartitionMap = new HashMap<>();
+ taskIdPartitionMap.put("taskId0", 0);
+ taskIdPartitionMap.put("taskId1", 1);
+ when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
when(jobContext.getTaskIdForPartition(0)).thenReturn("taskId0");
when(jobContext.getTaskIdForPartition(1)).thenReturn("taskId1");
when(jobContext.getPartitionState(0)).thenReturn(TaskPartitionState.RUNNING);
when(jobContext.getPartitionState(1)).thenReturn(TaskPartitionState.COMPLETED);
- JobConfig jobConfig = mock(JobConfig.class);
- when(taskDriver.getJobConfig(helixJobName)).thenReturn(jobConfig);
- when(jobConfig.getTaskConfig("taskId0")).thenReturn(new TaskConfig("", new
HashMap<>()));
- when(jobConfig.getTaskConfig("taskId1")).thenReturn(new TaskConfig("",
- new HashMap<>(ImmutableMap.of("tableName", "table1_OFFLINE"))));
-
PinotHelixTaskResourceManager mgr =
new
PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class),
taskDriver);
Map<String, PinotHelixTaskResourceManager.TaskCount> tableTaskCount =
mgr.getTableTaskCount(taskName);
assertEquals(tableTaskCount.size(), 2);
-
PinotHelixTaskResourceManager.TaskCount taskCount =
tableTaskCount.get("table1_OFFLINE");
assertEquals(taskCount.getTotal(), 1);
assertEquals(taskCount.getCompleted(), 1);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 8949dc1320..5fab6ff72f 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -130,7 +130,6 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
HOLD.set(true);
// No tasks before we start.
assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0);
- verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0);
// Should create the task queues and generate a task
String task1 = _taskManager.scheduleTasks().get(TASK_TYPE);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]