This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d95baa100 Enhance Minion task management (#11315)
2d95baa100 is described below

commit 2d95baa100e940c6089c857d5cbc5e247c0bdb70
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Sun Aug 13 17:06:31 2023 -0700

    Enhance Minion task management (#11315)
---
 .../api/resources/PinotTaskRestletResource.java    |   2 +-
 .../core/minion/PinotHelixTaskResourceManager.java | 337 +++++++++++----------
 .../core/minion/generator/TaskGeneratorUtils.java  |  51 +---
 .../minion/PinotHelixTaskResourceManagerTest.java  | 124 ++++----
 .../tests/SimpleMinionClusterIntegrationTest.java  |   1 -
 5 files changed, 267 insertions(+), 248 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index ac305202e6..a727c858e2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -410,7 +410,7 @@ public class PinotTaskRestletResource {
   @ApiOperation("Get the task state for the given task (deprecated)")
   public StringResultResponse getTaskStateDeprecated(
       @ApiParam(value = "Task name", required = true) @PathParam("taskName") 
String taskName) {
-    return new 
StringResultResponse(_pinotHelixTaskResourceManager.getTaskState(taskName).toString());
+    return new 
StringResultResponse(String.valueOf(_pinotHelixTaskResourceManager.getTaskState(taskName)));
   }
 
   @GET
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index dba8dd37bb..86ef3cef9a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -24,8 +24,8 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashBiMap;
-import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
@@ -54,8 +55,6 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
 import org.apache.pinot.common.utils.DateTimeUtils;
-import org.apache.pinot.controller.api.exception.NoTaskScheduledException;
-import org.apache.pinot.controller.api.exception.UnknownTaskTypeException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.core.common.MinionConstants;
@@ -118,25 +117,19 @@ public class PinotHelixTaskResourceManager {
    *
    * @param taskType Task type
    */
-  public void ensureTaskQueueExists(String taskType) {
+  public synchronized void ensureTaskQueueExists(String taskType) {
     String helixJobQueueName = getHelixJobQueueName(taskType);
     WorkflowConfig workflowConfig = 
_taskDriver.getWorkflowConfig(helixJobQueueName);
-
     if (workflowConfig == null) {
       // Task queue does not exist
       LOGGER.info("Creating task queue: {} for task type: {}", 
helixJobQueueName, taskType);
 
       // Set full parallelism
       // Don't allow overlap job assignment so that we can control number of 
concurrent tasks per instance
-      JobQueue jobQueue = new JobQueue.Builder(helixJobQueueName)
-          .setWorkflowConfig(new 
WorkflowConfig.Builder().setParallelJobs(Integer.MAX_VALUE).build()).build();
+      JobQueue jobQueue = new 
JobQueue.Builder(helixJobQueueName).setWorkflowConfig(
+          new 
WorkflowConfig.Builder().setParallelJobs(Integer.MAX_VALUE).build()).build();
       _taskDriver.createQueue(jobQueue);
     }
-
-    // Wait until task queue context shows up
-    while (_taskDriver.getWorkflowContext(helixJobQueueName) == null) {
-      Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-    }
   }
 
   /**
@@ -226,8 +219,14 @@ public class PinotHelixTaskResourceManager {
    * @param taskType Task type
    * @return Task queue state
    */
+  @Nullable
   public synchronized TaskState getTaskQueueState(String taskType) {
-    return 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getWorkflowState();
+    String helixJobQueueName = getHelixJobQueueName(taskType);
+    if (_taskDriver.getWorkflowConfig(helixJobQueueName) == null) {
+      return null;
+    }
+    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(helixJobQueueName);
+    return workflowContext != null ? workflowContext.getWorkflowState() : 
TaskState.NOT_STARTED;
   }
 
   /**
@@ -263,8 +262,8 @@ public class PinotHelixTaskResourceManager {
 
     String taskType = pinotTaskConfigs.get(0).getTaskType();
     String parentTaskName = getParentTaskName(taskType, UUID.randomUUID() + 
"_" + System.currentTimeMillis());
-    return submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag, 
taskTimeoutMs,
-        numConcurrentTasksPerInstance, maxAttemptsPerTask);
+    return submitTask(parentTaskName, pinotTaskConfigs, minionInstanceTag, 
taskTimeoutMs, numConcurrentTasksPerInstance,
+        maxAttemptsPerTask);
   }
 
   /**
@@ -285,9 +284,9 @@ public class PinotHelixTaskResourceManager {
     Preconditions.checkState(numConcurrentTasksPerInstance > 0);
 
     String taskType = pinotTaskConfigs.get(0).getTaskType();
-    LOGGER
-        .info("Submitting parent task: {} of type: {} with {} child task 
configs: {} to Minion instances with tag: {}",
-            parentTaskName, taskType, numChildTasks, pinotTaskConfigs, 
minionInstanceTag);
+    LOGGER.info(
+        "Submitting parent task: {} of type: {} with {} child task configs: {} 
to Minion instances with tag: {}",
+        parentTaskName, taskType, numChildTasks, pinotTaskConfigs, 
minionInstanceTag);
     List<TaskConfig> helixTaskConfigs = new ArrayList<>(numChildTasks);
     for (int i = 0; i < numChildTasks; i++) {
       PinotTaskConfig pinotTaskConfig = pinotTaskConfigs.get(i);
@@ -305,11 +304,6 @@ public class PinotHelixTaskResourceManager {
             
.setFailureThreshold(Integer.MAX_VALUE).setExpiry(_taskExpireTimeMs);
     _taskDriver.enqueueJob(getHelixJobQueueName(taskType), parentTaskName, 
jobBuilder);
 
-    // Wait until task state is available
-    while (getTaskState(parentTaskName) == null) {
-      Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-    }
-
     return parentTaskName;
   }
 
@@ -320,7 +314,11 @@ public class PinotHelixTaskResourceManager {
    * @return Set of task names
    */
   public synchronized Set<String> getTasks(String taskType) {
-    Set<String> helixJobs = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType)).getJobStates().keySet();
+    String helixJobQueueName = getHelixJobQueueName(taskType);
+    WorkflowConfig workflowConfig = 
_taskDriver.getWorkflowConfig(helixJobQueueName);
+    Preconditions.checkArgument(workflowConfig != null, "Task queue: %s for 
task type: %s does not exist",
+        helixJobQueueName, taskType);
+    Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
     Set<String> tasks = new HashSet<>(helixJobs.size());
     for (String helixJobName : helixJobs) {
       tasks.add(getPinotTaskName(helixJobName));
@@ -330,40 +328,58 @@ public class PinotHelixTaskResourceManager {
 
   /**
    * Get all task states for the given task type.
+   * NOTE: For tasks just submitted without the context created, count them as 
NOT_STARTED.
    *
    * @param taskType Task type
    * @return Map from task name to task state
    */
   public synchronized Map<String, TaskState> getTaskStates(String taskType) {
-    Map<String, TaskState> helixJobStates = new HashMap<>();
+    String helixJobQueueName = getHelixJobQueueName(taskType);
+    WorkflowConfig workflowConfig = 
_taskDriver.getWorkflowConfig(helixJobQueueName);
+    Preconditions.checkArgument(workflowConfig != null, "Task queue: %s for 
task type: %s does not exist",
+        helixJobQueueName, taskType);
+    Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
+    if (helixJobs.isEmpty()) {
+      return Collections.emptyMap();
+    }
     WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
-
     if (workflowContext == null) {
-      return helixJobStates;
-    }
-    helixJobStates = workflowContext.getJobStates();
-    Map<String, TaskState> taskStates = new HashMap<>(helixJobStates.size());
-    for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
-      taskStates.put(getPinotTaskName(entry.getKey()), entry.getValue());
+      return helixJobs.stream()
+          
.collect(Collectors.toMap(PinotHelixTaskResourceManager::getPinotTaskName, 
ignored -> TaskState.NOT_STARTED));
+    } else {
+      Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
+      return 
helixJobs.stream().collect(Collectors.toMap(PinotHelixTaskResourceManager::getPinotTaskName,
+          helixJobName -> helixJobStates.getOrDefault(helixJobName, 
TaskState.NOT_STARTED)));
     }
-    return taskStates;
   }
 
   /**
    * This method returns a count of sub-tasks in various states, given the 
top-level task name.
-   * @param parentTaskName in the form "Task_<taskType>_<uuid>_<timestamp>"
+   *
+   * @param taskName in the form "Task_<taskType>_<uuid>_<timestamp>"
    * @return TaskCount object
    */
-  public synchronized TaskCount getTaskCount(String parentTaskName) {
+  public synchronized TaskCount getTaskCount(String taskName) {
+    String helixJobName = getHelixJobName(taskName);
+    JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+    Preconditions.checkArgument(jobConfig != null, "Task: %s does not exist", 
taskName);
+    Set<String> subtasks = jobConfig.getTaskConfigMap().keySet();
     TaskCount taskCount = new TaskCount();
-    JobContext jobContext = 
_taskDriver.getJobContext(getHelixJobName(parentTaskName));
-
+    JobContext jobContext = _taskDriver.getJobContext(helixJobName);
     if (jobContext == null) {
+      int numSubtasks = subtasks.size();
+      for (int i = 0; i < numSubtasks; i++) {
+        taskCount.addTaskState(null);
+      }
       return taskCount;
     }
-    Set<Integer> partitionSet = jobContext.getPartitionSet();
-    for (int partition : partitionSet) {
-      TaskPartitionState state = jobContext.getPartitionState(partition);
+    Map<String, Integer> taskIdPartitionMap = 
jobContext.getTaskIdPartitionMap();
+    for (String taskId : subtasks) {
+      TaskPartitionState state = null;
+      Integer partition = taskIdPartitionMap.get(taskId);
+      if (partition != null) {
+        state = jobContext.getPartitionState(partition);
+      }
       taskCount.addTaskState(state);
     }
     return taskCount;
@@ -371,81 +387,91 @@ public class PinotHelixTaskResourceManager {
 
   /**
    * This method returns a map of table name to count of sub-tasks in various 
states, given the top-level task name.
+   *
    * @param taskName in the form "Task_<taskType>_<uuid>_<timestamp>"
    * @return a map of table name to {@link TaskCount}
    */
   public synchronized Map<String, TaskCount> getTableTaskCount(String 
taskName) {
-    Map<String, TaskPartitionState> subtaskStates = getSubtaskStates(taskName);
-    if (subtaskStates.isEmpty()) {
-      return Collections.emptyMap();
-    }
-
-    JobConfig jobConfig = _taskDriver.getJobConfig(getHelixJobName(taskName));
-    // in theory, this should not happen because we have already checked 
JobContext
-    if (jobConfig == null) {
-      LOGGER.warn("task {} has job context but its job config does not exist", 
taskName);
-      return Collections.emptyMap();
-    }
-
-    Map<String, TaskCount> tableTaskCountMap = new HashMap<>();
-    subtaskStates.forEach((taskId, taskState) -> {
-      TaskConfig taskConfig = jobConfig.getTaskConfig(taskId);
-      String tableNameWithType;
-      // in theory, this should not happen because jobContext has this taskId
-      if (taskConfig == null) {
-        LOGGER.warn("sub-task {} exists in helix job context but its task 
config does not exist", taskId);
-        tableNameWithType = UNKNOWN_TABLE_NAME;
-      } else {
-        tableNameWithType = 
taskConfig.getConfigMap().getOrDefault(MinionConstants.TABLE_NAME_KEY, 
UNKNOWN_TABLE_NAME);
+    String helixJobName = getHelixJobName(taskName);
+    JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+    Preconditions.checkArgument(jobConfig != null, "Task: %s does not exist", 
taskName);
+    Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
+    Map<String, TaskCount> taskCountMap = new HashMap<>();
+    JobContext jobContext = _taskDriver.getJobContext(helixJobName);
+    if (jobContext == null) {
+      for (TaskConfig taskConfig : taskConfigMap.values()) {
+        String tableName = 
taskConfig.getConfigMap().getOrDefault(MinionConstants.TABLE_NAME_KEY, 
UNKNOWN_TABLE_NAME);
+        taskCountMap.computeIfAbsent(tableName, k -> new 
TaskCount()).addTaskState(null);
       }
-      tableTaskCountMap.compute(tableNameWithType, (name, taskCount) -> {
-        if (taskCount == null) {
-          taskCount = new TaskCount();
-        }
-        taskCount.addTaskState(taskState);
-        return taskCount;
-      });
-    });
-    return tableTaskCountMap;
+      return taskCountMap;
+    }
+    Map<String, Integer> taskIdPartitionMap = 
jobContext.getTaskIdPartitionMap();
+    for (Map.Entry<String, TaskConfig> entry : taskConfigMap.entrySet()) {
+      String taskId = entry.getKey();
+      TaskPartitionState state = null;
+      Integer partition = taskIdPartitionMap.get(taskId);
+      if (partition != null) {
+        state = jobContext.getPartitionState(partition);
+      }
+      TaskConfig taskConfig = entry.getValue();
+      String tableName = 
taskConfig.getConfigMap().getOrDefault(MinionConstants.TABLE_NAME_KEY, 
UNKNOWN_TABLE_NAME);
+      taskCountMap.computeIfAbsent(tableName, k -> new 
TaskCount()).addTaskState(state);
+    }
+    return taskCountMap;
   }
 
   /**
    * Returns a set of Task names (in the form 
"Task_<taskType>_<uuid>_<timestamp>") that are in progress or not started
    * yet.
+   * NOTE: For tasks just submitted without the context created, count them as 
NOT_STARTED.
    *
-   * @param taskType
+   * @param taskType Task type
    * @return Set of task names
    */
   public synchronized Set<String> getTasksInProgress(String taskType) {
-    Set<String> tasksInProgress = new HashSet<>();
+    WorkflowConfig workflowConfig = 
_taskDriver.getWorkflowConfig(getHelixJobQueueName(taskType));
+    if (workflowConfig == null) {
+      return Collections.emptySet();
+    }
+    Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
+    if (helixJobs.isEmpty()) {
+      return Collections.emptySet();
+    }
     WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
     if (workflowContext == null) {
-      return tasksInProgress;
-    }
-
-    Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
-
-    for (Map.Entry<String, TaskState> entry : helixJobStates.entrySet()) {
-      if (entry.getValue().equals(TaskState.NOT_STARTED) || 
entry.getValue().equals(TaskState.IN_PROGRESS)) {
-        tasksInProgress.add(getPinotTaskName(entry.getKey()));
-      }
+      return 
helixJobs.stream().map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
+    } else {
+      Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
+      return helixJobs.stream().filter(helixJobName -> {
+        TaskState taskState = helixJobStates.get(helixJobName);
+        return taskState == null || taskState == TaskState.NOT_STARTED || 
taskState == TaskState.IN_PROGRESS;
+      
}).map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
     }
-    return tasksInProgress;
   }
 
   /**
    * Get the task state for the given task name.
+   * NOTE: For tasks just submitted without the context created, count them as 
NOT_STARTED.
    *
    * @param taskName Task name
    * @return Task state
    */
+  @Nullable
   public synchronized TaskState getTaskState(String taskName) {
+    String helixJobName = getHelixJobName(taskName);
+    JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+    if (jobConfig == null) {
+      return null;
+    }
     String taskType = getTaskType(taskName);
     WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
-    if (workflowContext == null) {
-      throw new UnknownTaskTypeException("Workflow context for task type 
doesn't exist: " + taskType);
+    if (workflowContext != null) {
+      TaskState jobState = workflowContext.getJobState(helixJobName);
+      if (jobState != null) {
+        return jobState;
+      }
     }
-    return workflowContext.getJobState(getHelixJobName(taskName));
+    return TaskState.NOT_STARTED;
   }
 
   /**
@@ -455,22 +481,26 @@ public class PinotHelixTaskResourceManager {
    * @return states of all the sub tasks
    */
   public synchronized Map<String, TaskPartitionState> getSubtaskStates(String 
taskName) {
-    String taskType = getTaskType(taskName);
-    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
-    if (workflowContext == null) {
-      return Collections.emptyMap();
-    }
     String helixJobName = getHelixJobName(taskName);
+    JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+    Preconditions.checkArgument(jobConfig != null, "Task: %s does not exist", 
taskName);
+    Set<String> subtasks = jobConfig.getTaskConfigMap().keySet();
+    Map<String, TaskPartitionState> subtaskStates = new HashMap<>();
     JobContext jobContext = _taskDriver.getJobContext(helixJobName);
     if (jobContext == null) {
-      return Collections.emptyMap();
-    }
-    Map<String, TaskPartitionState> subtaskStates = new HashMap<>();
-    Set<Integer> partitionSet = jobContext.getPartitionSet();
-    for (int partition : partitionSet) {
-      String taskIdForPartition = jobContext.getTaskIdForPartition(partition);
-      TaskPartitionState partitionState = 
jobContext.getPartitionState(partition);
-      subtaskStates.put(taskIdForPartition, partitionState);
+      for (String taskId : subtasks) {
+        subtaskStates.put(taskId, null);
+      }
+      return subtaskStates;
+    }
+    Map<String, Integer> taskIdPartitionMap = 
jobContext.getTaskIdPartitionMap();
+    for (String taskId : subtasks) {
+      TaskPartitionState state = null;
+      Integer partition = taskIdPartitionMap.get(taskId);
+      if (partition != null) {
+        state = jobContext.getPartitionState(partition);
+      }
+      subtaskStates.put(taskId, state);
     }
     return subtaskStates;
   }
@@ -550,39 +580,51 @@ public class PinotHelixTaskResourceManager {
       Map<String, String> requestHeaders, int timeoutMs)
       throws Exception {
     String helixJobName = getHelixJobName(taskName);
+    JobConfig jobConfig = _taskDriver.getJobConfig(helixJobName);
+    Preconditions.checkArgument(jobConfig != null, "Task: %s does not exist", 
taskName);
+    Set<String> subtasks = jobConfig.getTaskConfigMap().keySet();
+    Set<String> selectedSubtasks;
+    if (StringUtils.isNotEmpty(subtaskNames)) {
+      selectedSubtasks = new 
HashSet<>(Arrays.asList(StringUtils.split(subtaskNames, ',')));
+      selectedSubtasks.retainAll(subtasks);
+    } else {
+      selectedSubtasks = subtasks;
+    }
+    if (selectedSubtasks.isEmpty()) {
+      return Collections.emptyMap();
+    }
+    Map<String, Object> subtaskProgressMap = new HashMap<>();
     JobContext jobContext = _taskDriver.getJobContext(helixJobName);
     if (jobContext == null) {
-      throw new NoTaskScheduledException("No task scheduled with name: " + 
helixJobName);
-    }
-    Set<String> selectedSubtasks = new HashSet<>();
-    if (StringUtils.isNotEmpty(subtaskNames)) {
-      Collections.addAll(selectedSubtasks, StringUtils.split(subtaskNames, 
','));
+      for (String taskId : selectedSubtasks) {
+        subtaskProgressMap.put(taskId, "No worker has run this subtask");
+      }
+      return subtaskProgressMap;
     }
     // The worker running the subtask and task state as tracked by helix.
-    Map<String, String[]> allSubtasks = new HashMap<>();
-    Map<String, Set<String>> workerSelectedSubtasksMap = new HashMap<>();
-    for (int partition : jobContext.getPartitionSet()) {
-      String subtaskName = jobContext.getTaskIdForPartition(partition);
-      String worker = jobContext.getAssignedParticipant(partition);
-      TaskPartitionState partitionState = 
jobContext.getPartitionState(partition);
-      String taskState = partitionState == null ? null : partitionState.name();
-      allSubtasks.put(subtaskName, new String[]{worker, taskState});
-      LOGGER.debug("Subtask: {} is assigned to worker: {} with state: {} in 
Helix", subtaskName, worker, taskState);
-      if (worker == null) {
-        continue;
+    Map<String, Pair<String, TaskPartitionState>> subtaskWorkerAndStateMap = 
new HashMap<>();
+    Map<String, Set<String>> workerSubtasksMap = new HashMap<>();
+    Map<String, Integer> taskIdPartitionMap = 
jobContext.getTaskIdPartitionMap();
+    for (String taskId : selectedSubtasks) {
+      String worker = null;
+      TaskPartitionState state = null;
+      Integer partition = taskIdPartitionMap.get(taskId);
+      if (partition != null) {
+        worker = jobContext.getAssignedParticipant(partition);
+        state = jobContext.getPartitionState(partition);
       }
-      if (selectedSubtasks.isEmpty() || 
selectedSubtasks.contains(subtaskName)) {
-        workerSelectedSubtasksMap.computeIfAbsent(worker, k -> new 
HashSet<>()).add(subtaskName);
+      subtaskWorkerAndStateMap.put(taskId, Pair.of(worker, state));
+      if (worker != null) {
+        workerSubtasksMap.computeIfAbsent(worker, k -> new 
HashSet<>()).add(taskId);
       }
     }
-    LOGGER.debug("Found subtasks on workers: {}", workerSelectedSubtasksMap);
+    LOGGER.debug("Found subtasks on workers: {}", workerSubtasksMap);
     List<String> workerUrls = new ArrayList<>();
-    workerSelectedSubtasksMap.forEach((workerId, subtasksOnWorker) -> 
workerUrls.add(
+    workerSubtasksMap.forEach((workerId, subtasksOnWorker) -> workerUrls.add(
         String.format("%s/tasks/subtask/progress?subtaskNames=%s", 
workerEndpoints.get(workerId),
             StringUtils.join(subtasksOnWorker, 
CommonConstants.Minion.TASK_LIST_SEPARATOR))));
     LOGGER.debug("Getting task progress with workerUrls: {}", workerUrls);
     // Scatter and gather progress from multiple workers.
-    Map<String, Object> subtaskProgressMap = new HashMap<>();
     if (!workerUrls.isEmpty()) {
       CompletionServiceHelper.CompletionServiceResponse serviceResponse =
           completionServiceHelper.doMultiGetRequest(workerUrls, null, true, 
requestHeaders, timeoutMs);
@@ -603,22 +645,18 @@ public class PinotHelixTaskResourceManager {
     }
     // Check if any subtask missed their progress from the worker.
     // And simply check all subtasks if no subtasks are specified.
-    if (selectedSubtasks.isEmpty()) {
-      selectedSubtasks = allSubtasks.keySet();
-    }
-    for (String subtaskName : selectedSubtasks) {
-      if (subtaskProgressMap.containsKey(subtaskName)) {
+    for (String taskId : selectedSubtasks) {
+      if (subtaskProgressMap.containsKey(taskId)) {
         continue;
       }
       // Return the task progress status tracked by Helix.
-      String[] taskWorkerAndHelixState = allSubtasks.get(subtaskName);
-      if (taskWorkerAndHelixState == null || taskWorkerAndHelixState[0] == 
null) {
-        subtaskProgressMap.put(subtaskName, "No worker has run this subtask");
+      Pair<String, TaskPartitionState> workerAndState = 
subtaskWorkerAndStateMap.get(taskId);
+      if (workerAndState.getLeft() == null) {
+        subtaskProgressMap.put(taskId, "No worker has run this subtask");
       } else {
-        String taskWorker = taskWorkerAndHelixState[0];
-        String helixState = taskWorkerAndHelixState[1];
-        subtaskProgressMap.put(subtaskName,
-            String.format("No status from worker: %s. Got status: %s from 
Helix", taskWorker, helixState));
+        subtaskProgressMap.put(taskId,
+            String.format("No status from worker: %s. Got status: %s from 
Helix", workerAndState.getLeft(),
+                workerAndState.getRight()));
       }
     }
     return subtaskProgressMap;
@@ -634,26 +672,24 @@ public class PinotHelixTaskResourceManager {
    * @param timeoutMs timeout (in millisecond) for requests sent to minion 
workers
    * @return a map of minion worker id to subtask progress
    */
-  public synchronized Map<String, Object> getSubtaskOnWorkerProgress(String 
subtaskState,
-      Executor executor, HttpClientConnectionManager connMgr, Map<String, 
String> selectedMinionWorkerEndpoints,
+  public synchronized Map<String, Object> getSubtaskOnWorkerProgress(String 
subtaskState, Executor executor,
+      HttpClientConnectionManager connMgr, Map<String, String> 
selectedMinionWorkerEndpoints,
       Map<String, String> requestHeaders, int timeoutMs)
       throws JsonProcessingException {
-    return getSubtaskOnWorkerProgress(subtaskState,
-        new CompletionServiceHelper(executor, connMgr, HashBiMap.create(0)), 
selectedMinionWorkerEndpoints,
-        requestHeaders, timeoutMs);
+    return getSubtaskOnWorkerProgress(subtaskState, new 
CompletionServiceHelper(executor, connMgr, HashBiMap.create(0)),
+        selectedMinionWorkerEndpoints, requestHeaders, timeoutMs);
   }
 
   @VisibleForTesting
-  Map<String, Object> getSubtaskOnWorkerProgress(String subtaskState,
-      CompletionServiceHelper completionServiceHelper, Map<String, String> 
selectedMinionWorkerEndpoints,
-      Map<String, String> requestHeaders, int timeoutMs)
+  Map<String, Object> getSubtaskOnWorkerProgress(String subtaskState, 
CompletionServiceHelper completionServiceHelper,
+      Map<String, String> selectedMinionWorkerEndpoints, Map<String, String> 
requestHeaders, int timeoutMs)
       throws JsonProcessingException {
     Map<String, Object> minionWorkerIdSubtaskProgressMap = new HashMap<>();
     if (selectedMinionWorkerEndpoints.isEmpty()) {
       return minionWorkerIdSubtaskProgressMap;
     }
-    Map<String, String> minionWorkerUrlToWorkerIdMap = 
selectedMinionWorkerEndpoints.entrySet().stream()
-        .collect(Collectors.toMap(
+    Map<String, String> minionWorkerUrlToWorkerIdMap = 
selectedMinionWorkerEndpoints.entrySet().stream().collect(
+        Collectors.toMap(
             entry -> 
String.format("%s/tasks/subtask/state/progress?subTaskState=%s", 
entry.getValue(), subtaskState),
             Map.Entry::getKey));
     List<String> workerUrls = new 
ArrayList<>(minionWorkerUrlToWorkerIdMap.keySet());
@@ -665,8 +701,8 @@ public class PinotHelixTaskResourceManager {
       String worker = entry.getKey();
       String resp = entry.getValue();
       LOGGER.debug("Got resp: {} from worker: {}", resp, worker);
-      minionWorkerIdSubtaskProgressMap
-          .put(minionWorkerUrlToWorkerIdMap.get(worker), 
JsonUtils.stringToObject(resp, Map.class));
+      
minionWorkerIdSubtaskProgressMap.put(minionWorkerUrlToWorkerIdMap.get(worker),
+          JsonUtils.stringToObject(resp, Map.class));
     }
     if (serviceResponse._failedResponseCount > 0) {
       // Instead of aborting, subtasks without worker side progress return the 
task status tracked by Helix.
@@ -713,15 +749,10 @@ public class PinotHelixTaskResourceManager {
    * @return Map of Pinot Task Name to TaskCount
    */
   public synchronized Map<String, TaskCount> getTaskCounts(String taskType) {
+    Set<String> tasks = getTasks(taskType);
     Map<String, TaskCount> taskCounts = new TreeMap<>();
-    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
-    if (workflowContext == null) {
-      return taskCounts;
-    }
-    Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
-    for (String helixJobName : helixJobStates.keySet()) {
-      String pinotTaskName = getPinotTaskName(helixJobName);
-      taskCounts.put(pinotTaskName, getTaskCount(pinotTaskName));
+    for (String taskName : tasks) {
+      taskCounts.put(taskName, getTaskCount(taskName));
     }
     return taskCounts;
   }
@@ -884,7 +915,7 @@ public class PinotHelixTaskResourceManager {
    * @param pinotTaskName Pinot task name
    * @return helixJobName Helix Job name
    */
-  private static String getHelixJobName(String pinotTaskName) {
+  public static String getHelixJobName(String pinotTaskName) {
     return getHelixJobQueueName(getTaskType(pinotTaskName)) + 
TASK_NAME_SEPARATOR + pinotTaskName;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
index 9a6ce3c251..4f6c3c28e3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
@@ -24,11 +24,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Consumer;
-import javax.annotation.Nonnull;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.data.Segment;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
-import 
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.minion.PinotTaskConfig;
 
@@ -37,8 +35,6 @@ public class TaskGeneratorUtils {
   private TaskGeneratorUtils() {
   }
 
-  private static final long ONE_DAY_IN_MILLIS = 24 * 60 * 60 * 1000L;
-
   /**
    * If task is in final state, it will not be running any more. But note that
    * STOPPED is not a final task state in helix task framework, as a stopped 
task
@@ -48,31 +44,17 @@ public class TaskGeneratorUtils {
       EnumSet.of(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, 
TaskState.TIMED_OUT);
 
   /**
-   * Returns all the segments that have been scheduled in one day but not 
finished.
-   * <p>
-   * NOTE: we consider tasks not finished in one day as stuck and don't count 
the segments in them
-   *
-   * @param taskType Task type
-   * @param clusterInfoAccessor Cluster info accessor
-   * @return Set of running segments
+   * Returns all the segments that have been scheduled but not finished.
    */
-  public static Set<Segment> getRunningSegments(@Nonnull String taskType,
-      @Nonnull ClusterInfoAccessor clusterInfoAccessor) {
+  public static Set<Segment> getRunningSegments(String taskType, 
ClusterInfoAccessor clusterInfoAccessor) {
     Set<Segment> runningSegments = new HashSet<>();
     Map<String, TaskState> taskStates = 
clusterInfoAccessor.getTaskStates(taskType);
     for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
-      // Skip COMPLETED tasks
-      if (entry.getValue() == TaskState.COMPLETED) {
+      if (TASK_FINAL_STATES.contains(entry.getValue())) {
         continue;
       }
-
-      // Skip tasks scheduled for more than one day
       String taskName = entry.getKey();
-      if (isTaskOlderThanOneDay(taskName)) {
-        continue;
-      }
-
-      for (PinotTaskConfig pinotTaskConfig : 
clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
+      for (PinotTaskConfig pinotTaskConfig : 
clusterInfoAccessor.getTaskConfigs(taskName)) {
         Map<String, String> configs = pinotTaskConfig.getConfigs();
         runningSegments.add(
             new Segment(configs.get(MinionConstants.TABLE_NAME_KEY), 
configs.get(MinionConstants.SEGMENT_NAME_KEY)));
@@ -82,30 +64,24 @@ public class TaskGeneratorUtils {
   }
 
   /**
-   * Gets all the tasks for the provided task type and tableName, which do not 
have TaskState COMPLETED
-   * @return map containing task name to task state for non-completed tasks
-   *
-   * NOTE: we consider tasks not finished in one day as stuck and don't count 
them
+   * Gets all the tasks for the provided task type and tableName, which have 
not reached final task state yet.
    */
   public static Map<String, TaskState> getIncompleteTasks(String taskType, 
String tableNameWithType,
       ClusterInfoAccessor clusterInfoAccessor) {
-    Map<String, TaskState> nonCompletedTasks = new HashMap<>();
+    Map<String, TaskState> incompleteTasks = new HashMap<>();
     Map<String, TaskState> taskStates = 
clusterInfoAccessor.getTaskStates(taskType);
     for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
-      if (entry.getValue() == TaskState.COMPLETED) {
+      if (TASK_FINAL_STATES.contains(entry.getValue())) {
         continue;
       }
       String taskName = entry.getKey();
-      if (isTaskOlderThanOneDay(taskName)) {
-        continue;
-      }
       for (PinotTaskConfig pinotTaskConfig : 
clusterInfoAccessor.getTaskConfigs(taskName)) {
         if 
(tableNameWithType.equals(pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY)))
 {
-          nonCompletedTasks.put(taskName, entry.getValue());
+          incompleteTasks.put(taskName, entry.getValue());
         }
       }
     }
-    return nonCompletedTasks;
+    return incompleteTasks;
   }
 
   /**
@@ -130,13 +106,4 @@ public class TaskGeneratorUtils {
       }
     }
   }
-
-  /**
-   * Returns true if task's schedule time is older than 1d
-   */
-  private static boolean isTaskOlderThanOneDay(String taskName) {
-    long scheduleTimeMs =
-        
Long.parseLong(taskName.substring(taskName.lastIndexOf(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR)
 + 1));
-    return System.currentTimeMillis() - scheduleTimeMs > ONE_DAY_IN_MILLIS;
-  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
index 5706d402ab..9cbe2f5cec 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.controller.helix.core.minion;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -35,7 +34,6 @@ import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskConfig;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskPartitionState;
-import org.apache.helix.task.WorkflowContext;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -55,11 +53,13 @@ import static org.testng.Assert.assertTrue;
 
 
 public class PinotHelixTaskResourceManagerTest {
+
   @Test
   public void testGetSubtaskProgressNoWorker()
       throws Exception {
     TaskDriver taskDriver = mock(TaskDriver.class);
-    
when(taskDriver.getJobContext(anyString())).thenReturn(mock(JobContext.class));
+    JobConfig jobConfig = mock(JobConfig.class);
+    when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
     CompletionServiceHelper httpHelper = mock(CompletionServiceHelper.class);
     CompletionServiceHelper.CompletionServiceResponse httpResp =
         new CompletionServiceHelper.CompletionServiceResponse();
@@ -73,6 +73,11 @@ public class PinotHelixTaskResourceManagerTest {
     for (int i = 0; i < 3; i++) {
       subtaskNames[i] = taskName + "_" + i;
     }
+    Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+    for (String subtaskName : subtaskNames) {
+      taskConfigMap.put(subtaskName, mock(TaskConfig.class));
+    }
+    when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
     Map<String, Object> progress =
         mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), 
httpHelper, workerEndpoints,
             Collections.emptyMap(), 1000);
@@ -85,6 +90,8 @@ public class PinotHelixTaskResourceManagerTest {
   public void testGetSubtaskProgressNoResponse()
       throws Exception {
     TaskDriver taskDriver = mock(TaskDriver.class);
+    JobConfig jobConfig = mock(JobConfig.class);
+    when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
     JobContext jobContext = mock(JobContext.class);
     when(taskDriver.getJobContext(anyString())).thenReturn(jobContext);
     PinotHelixTaskResourceManager mgr =
@@ -102,17 +109,23 @@ public class PinotHelixTaskResourceManagerTest {
     }
     String taskName = "Task_SegmentGenerationAndPushTask_someone";
     String[] subtaskNames = new String[3];
-    Set<Integer> subtaskIds = new HashSet<>();
+    Map<String, Integer> taskIdPartitionMap = new HashMap<>();
     for (int i = 0; i < 3; i++) {
-      subtaskIds.add(i);
-      subtaskNames[i] = taskName + "_" + i;
+      String subtaskName = taskName + "_" + i;
+      subtaskNames[i] = subtaskName;
+      taskIdPartitionMap.put(subtaskName, i);
     }
+    Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+    for (String subtaskName : subtaskNames) {
+      taskConfigMap.put(subtaskName, mock(TaskConfig.class));
+    }
+    when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
     TaskPartitionState[] helixStates =
         new TaskPartitionState[]{TaskPartitionState.INIT, 
TaskPartitionState.RUNNING, TaskPartitionState.TASK_ERROR};
-    
when(jobContext.getTaskIdForPartition(anyInt())).thenReturn(subtaskNames[0], 
subtaskNames[1], subtaskNames[2]);
-    when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0], 
workers[1], workers[2]);
-    when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0], 
helixStates[1], helixStates[2]);
-    when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
+    when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+    when(jobContext.getAssignedParticipant(anyInt())).thenAnswer(
+        invocation -> workers[(int) invocation.getArgument(0)]);
+    when(jobContext.getPartitionState(anyInt())).thenAnswer(invocation -> 
helixStates[(int) invocation.getArgument(0)]);
     Map<String, Object> progress =
         mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), 
httpHelper, workerEndpoints,
             Collections.emptyMap(), 1000);
@@ -126,6 +139,8 @@ public class PinotHelixTaskResourceManagerTest {
   public void testGetSubtaskProgressWithResponse()
       throws Exception {
     TaskDriver taskDriver = mock(TaskDriver.class);
+    JobConfig jobConfig = mock(JobConfig.class);
+    when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
     JobContext jobContext = mock(JobContext.class);
     when(taskDriver.getJobContext(anyString())).thenReturn(jobContext);
     PinotHelixTaskResourceManager mgr =
@@ -141,19 +156,25 @@ public class PinotHelixTaskResourceManagerTest {
     }
     String taskName = "Task_SegmentGenerationAndPushTask_someone";
     String[] subtaskNames = new String[3];
-    Set<Integer> subtaskIds = new HashSet<>();
+    Map<String, Integer> taskIdPartitionMap = new HashMap<>();
     for (int i = 0; i < 3; i++) {
-      subtaskIds.add(i);
-      subtaskNames[i] = taskName + "_" + i;
+      String subtaskName = taskName + "_" + i;
+      subtaskNames[i] = subtaskName;
+      taskIdPartitionMap.put(subtaskName, i);
       httpResp._httpResponses.put(workers[i],
           JsonUtils.objectToString(Collections.singletonMap(subtaskNames[i], 
"running on worker: " + i)));
     }
+    Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+    for (String subtaskName : subtaskNames) {
+      taskConfigMap.put(subtaskName, mock(TaskConfig.class));
+    }
+    when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
     TaskPartitionState[] helixStates =
         new TaskPartitionState[]{TaskPartitionState.INIT, 
TaskPartitionState.RUNNING, TaskPartitionState.TASK_ERROR};
-    
when(jobContext.getTaskIdForPartition(anyInt())).thenReturn(subtaskNames[0], 
subtaskNames[1], subtaskNames[2]);
-    when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0], 
workers[1], workers[2]);
-    when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0], 
helixStates[1], helixStates[2]);
-    when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
+    when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+    when(jobContext.getAssignedParticipant(anyInt())).thenAnswer(
+        invocation -> workers[(int) invocation.getArgument(0)]);
+    when(jobContext.getPartitionState(anyInt())).thenAnswer(invocation -> 
helixStates[(int) invocation.getArgument(0)]);
     Map<String, Object> progress =
         mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), 
httpHelper, workerEndpoints,
             Collections.emptyMap(), 1000);
@@ -167,6 +188,8 @@ public class PinotHelixTaskResourceManagerTest {
   public void testGetSubtaskProgressPending()
       throws Exception {
     TaskDriver taskDriver = mock(TaskDriver.class);
+    JobConfig jobConfig = mock(JobConfig.class);
+    when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
     JobContext jobContext = mock(JobContext.class);
     when(taskDriver.getJobContext(anyString())).thenReturn(jobContext);
     PinotHelixTaskResourceManager mgr =
@@ -182,19 +205,23 @@ public class PinotHelixTaskResourceManagerTest {
     }
     String taskName = "Task_SegmentGenerationAndPushTask_someone";
     String[] subtaskNames = new String[3];
-    Set<Integer> subtaskIds = new HashSet<>();
+    Map<String, Integer> taskIdPartitionMap = new HashMap<>();
     for (int i = 0; i < 3; i++) {
-      subtaskIds.add(i);
-      subtaskNames[i] = taskName + "_" + i;
+      String subtaskName = taskName + "_" + i;
+      subtaskNames[i] = subtaskName;
+      taskIdPartitionMap.put(subtaskName, i);
+    }
+    Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+    for (String subtaskName : subtaskNames) {
+      taskConfigMap.put(subtaskName, mock(TaskConfig.class));
     }
-    // Some subtasks are pending to be run.
-    TaskPartitionState[] helixStates = new 
TaskPartitionState[]{TaskPartitionState.RUNNING, null, null};
+    when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
+    // Some subtasks are pending to be run
     httpResp._httpResponses.put(workers[0],
         JsonUtils.objectToString(Collections.singletonMap(subtaskNames[0], 
"running on worker: 0")));
-    
when(jobContext.getTaskIdForPartition(anyInt())).thenReturn(subtaskNames[0], 
subtaskNames[1], subtaskNames[2]);
-    when(jobContext.getAssignedParticipant(anyInt())).thenReturn(workers[0], 
null, null);
-    when(jobContext.getPartitionState(anyInt())).thenReturn(helixStates[0], 
null, null);
-    when(jobContext.getPartitionSet()).thenReturn(subtaskIds);
+    when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+    when(jobContext.getAssignedParticipant(0)).thenReturn(workers[0]);
+    
when(jobContext.getPartitionState(0)).thenReturn(TaskPartitionState.RUNNING);
     Map<String, Object> progress =
         mgr.getSubtaskProgress(taskName, StringUtils.join(subtaskNames, ','), 
httpHelper, workerEndpoints,
             Collections.emptyMap(), 1000);
@@ -215,8 +242,8 @@ public class PinotHelixTaskResourceManagerTest {
     // No worker to run subtasks.
     Map<String, String> selectedMinionWorkerEndpoints = new HashMap<>();
     Map<String, Object> progress =
-        mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper,
-            selectedMinionWorkerEndpoints, Collections.emptyMap(), 1000);
+        mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper, 
selectedMinionWorkerEndpoints, Collections.emptyMap(),
+            1000);
     assertTrue(progress.isEmpty());
     verify(httpHelper, Mockito.never()).doMultiGetRequest(any(), any(), 
anyBoolean(), any(), anyInt());
   }
@@ -247,19 +274,17 @@ public class PinotHelixTaskResourceManagerTest {
     }
     httpResp._failedResponseCount = 1;
     ArgumentCaptor<List<String>> workerEndpointCaptor = 
ArgumentCaptor.forClass(List.class);
-    when(httpHelper.doMultiGetRequest(workerEndpointCaptor.capture(), any(), 
anyBoolean(), any(), anyInt()))
-        .thenReturn(httpResp);
+    when(httpHelper.doMultiGetRequest(workerEndpointCaptor.capture(), any(), 
anyBoolean(), any(), anyInt())).thenReturn(
+        httpResp);
 
     PinotHelixTaskResourceManager mgr =
         new 
PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class), 
mock(TaskDriver.class));
-
     Map<String, Object> progress =
-        mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper, 
selectedMinionWorkerEndpoints,
-            Collections.emptyMap(), 1000);
+        mgr.getSubtaskOnWorkerProgress("IN_PROGRESS", httpHelper, 
selectedMinionWorkerEndpoints, Collections.emptyMap(),
+            1000);
     List<String> value = workerEndpointCaptor.getValue();
-    Set<String> expectedWorkerUrls = 
selectedMinionWorkerEndpoints.values().stream()
-        .map(workerEndpoint
-            -> 
String.format("%s/tasks/subtask/state/progress?subTaskState=IN_PROGRESS", 
workerEndpoint))
+    Set<String> expectedWorkerUrls = 
selectedMinionWorkerEndpoints.values().stream().map(
+            workerEndpoint -> 
String.format("%s/tasks/subtask/state/progress?subTaskState=IN_PROGRESS", 
workerEndpoint))
         .collect(Collectors.toSet());
     assertEquals(new HashSet<>(value), expectedWorkerUrls);
     assertEquals(progress.size(), 3);
@@ -274,34 +299,31 @@ public class PinotHelixTaskResourceManagerTest {
 
   @Test
   public void testGetTableTaskCount() {
-    String taskType = "TestTask";
     String taskName = "Task_TestTask_12345";
-    String helixJobQueueName = 
PinotHelixTaskResourceManager.getHelixJobQueueName(taskType);
-    String helixJobName = helixJobQueueName + "_" + taskName;
-
+    String helixJobName = 
PinotHelixTaskResourceManager.getHelixJobName(taskName);
     TaskDriver taskDriver = mock(TaskDriver.class);
-    WorkflowContext workflowContext = mock(WorkflowContext.class);
-    
when(taskDriver.getWorkflowContext(helixJobQueueName)).thenReturn(workflowContext);
-
+    JobConfig jobConfig = mock(JobConfig.class);
+    when(taskDriver.getJobConfig(anyString())).thenReturn(jobConfig);
+    Map<String, TaskConfig> taskConfigMap = new HashMap<>();
+    taskConfigMap.put("taskId0", new TaskConfig("", new HashMap<>()));
+    taskConfigMap.put("taskId1",
+        new TaskConfig("", new HashMap<>(Collections.singletonMap("tableName", 
"table1_OFFLINE"))));
+    when(jobConfig.getTaskConfigMap()).thenReturn(taskConfigMap);
     JobContext jobContext = mock(JobContext.class);
     when(taskDriver.getJobContext(helixJobName)).thenReturn(jobContext);
-    when(jobContext.getPartitionSet()).thenReturn(ImmutableSet.of(0, 1));
+    Map<String, Integer> taskIdPartitionMap = new HashMap<>();
+    taskIdPartitionMap.put("taskId0", 0);
+    taskIdPartitionMap.put("taskId1", 1);
+    when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
     when(jobContext.getTaskIdForPartition(0)).thenReturn("taskId0");
     when(jobContext.getTaskIdForPartition(1)).thenReturn("taskId1");
     
when(jobContext.getPartitionState(0)).thenReturn(TaskPartitionState.RUNNING);
     
when(jobContext.getPartitionState(1)).thenReturn(TaskPartitionState.COMPLETED);
 
-    JobConfig jobConfig = mock(JobConfig.class);
-    when(taskDriver.getJobConfig(helixJobName)).thenReturn(jobConfig);
-    when(jobConfig.getTaskConfig("taskId0")).thenReturn(new TaskConfig("", new 
HashMap<>()));
-    when(jobConfig.getTaskConfig("taskId1")).thenReturn(new TaskConfig("",
-        new HashMap<>(ImmutableMap.of("tableName", "table1_OFFLINE"))));
-
     PinotHelixTaskResourceManager mgr =
         new 
PinotHelixTaskResourceManager(mock(PinotHelixResourceManager.class), 
taskDriver);
     Map<String, PinotHelixTaskResourceManager.TaskCount> tableTaskCount = 
mgr.getTableTaskCount(taskName);
     assertEquals(tableTaskCount.size(), 2);
-
     PinotHelixTaskResourceManager.TaskCount taskCount = 
tableTaskCount.get("table1_OFFLINE");
     assertEquals(taskCount.getTotal(), 1);
     assertEquals(taskCount.getCompleted(), 1);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index 8949dc1320..5fab6ff72f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -130,7 +130,6 @@ public class SimpleMinionClusterIntegrationTest extends 
ClusterTest {
     HOLD.set(true);
     // No tasks before we start.
     
assertEquals(_helixTaskResourceManager.getTasksInProgress(TASK_TYPE).size(), 0);
-    verifyTaskCount("Task_" + TASK_TYPE + "_1624403781879", 0, 0, 0, 0);
 
     // Should create the task queues and generate a task
     String task1 = _taskManager.scheduleTasks().get(TASK_TYPE);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to