abhishekbafna commented on code in PR #17128:
URL: https://github.com/apache/pinot/pull/17128#discussion_r2493370538


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -441,24 +441,63 @@ public synchronized Map<String, TaskCount> 
getTableTaskCount(String taskName) {
    * @return Set of task names
    */
   public synchronized Set<String> getTasksInProgress(String taskType) {
-    WorkflowConfig workflowConfig = 
_taskDriver.getWorkflowConfig(getHelixJobQueueName(taskType));
+    return getTasksInProgressAndRecent(taskType, 0);
+  }
+
+  /**
+   * Returns a set of Task names (in the form 
"Task_<taskType>_<uuid>_<timestamp>") that are in progress or not started
+   * yet, and optionally includes recent tasks that started after a given 
timestamp.
+   * NOTE: For tasks just submitted without the context created, count them as 
NOT_STARTED.
+   * This method combines in-progress tasks and recent tasks in a single Helix 
call to avoid duplicate calls.
+   *
+   * @param taskType Task type
+   * @param afterTimestampMs If > 0, also include tasks that started after 
this timestamp (in milliseconds).
+   *                         This is used to detect short-lived tasks that 
started and completed between cycles.
+   * @return Set of task names that are in-progress, and optionally recent 
tasks that started after the timestamp
+   */
+  public synchronized Set<String> getTasksInProgressAndRecent(String taskType, 
long afterTimestampMs) {
+    String helixJobQueueName = getHelixJobQueueName(taskType);
+    WorkflowConfig workflowConfig = 
_taskDriver.getWorkflowConfig(helixJobQueueName);
     if (workflowConfig == null) {
       return Collections.emptySet();
     }
     Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
     if (helixJobs.isEmpty()) {
       return Collections.emptySet();
     }
-    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(helixJobQueueName);
     if (workflowContext == null) {
-      return 
helixJobs.stream().map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
-    } else {
-      Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
-      return helixJobs.stream().filter(helixJobName -> {
-        TaskState taskState = helixJobStates.get(helixJobName);
-        return taskState == null || taskState == TaskState.NOT_STARTED || 
taskState == TaskState.IN_PROGRESS;
-      
}).map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
+      // If no context, return all jobs as in-progress (backward compatible 
behavior)
+      Set<String> result = helixJobs.stream()
+          .map(PinotHelixTaskResourceManager::getPinotTaskName)
+          .collect(Collectors.toSet());
+      // If timestamp is specified, we can't filter by start time without 
context, so return all
+      return result;
+    }
+
+    Set<String> result = new HashSet<>();
+    Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
+    Map<String, Long> jobStartTimes = afterTimestampMs > 0 ? 
workflowContext.getJobStartTimes() : null;

Review Comment:
   That would not happen.
   1. Similar to `workflowContext.getJobStartTimes()` is 
`workflowContext.getJobStates()` and it did not have the null check before.
   2. Both the methods, return empty map and not null as default value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to