Copilot commented on code in PR #17128:
URL: https://github.com/apache/pinot/pull/17128#discussion_r2488867732


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -90,20 +106,59 @@ protected final void runTask(Properties 
periodicTaskProperties) {
       TaskCount taskTypeAccumulatedCount = new TaskCount();
       Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
       try {
-        Set<String> tasksInProgress = 
_helixTaskResourceManager.getTasksInProgress(taskType);
-        final int numRunningTasks = tasksInProgress.size();
-        for (String task : tasksInProgress) {
-          Map<String, TaskCount> tableTaskCount = 
_helixTaskResourceManager.getTableTaskCount(task);
-          tableTaskCount.forEach((tableNameWithType, taskCount) -> {
-            taskTypeAccumulatedCount.accumulate(taskCount);
-            tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
-              if (count == null) {
-                count = new TaskCount();
-              }
-              count.accumulate(taskCount);
-              return count;
+        // Capture the current execution timestamp for this task type 
collection cycle
+        long currentExecutionTimestamp = System.currentTimeMillis();
+        // For task types encountered for the first time, use current time 
minus one collection cycle
+        // as the initial timestamp to ensure we don't miss tasks that started 
recently
+        long previousExecutionTimestamp = 
_previousExecutionTimestamps.computeIfAbsent(taskType,
+            k -> currentExecutionTimestamp - _taskMetricsEmitterFrequencyMs);
+
+        // Get currently in-progress tasks (for metrics)
+        Set<String> currentInProgressTasks = 
_helixTaskResourceManager.getTasksInProgress(taskType);
+
+        // Get tasks that were in-progress during the previous collection cycle
+        Set<String> previouslyInProgressTasks =
+            _previousInProgressTasks.getOrDefault(taskType, 
Collections.emptySet());
+
+        // Get all tasks including those that started after the previous 
execution timestamp
+        // This combines in-progress tasks and short-lived tasks that started 
and completed between cycles
+        // in a single Helix call, avoiding duplicate 
getWorkflowConfig/getWorkflowContext calls
+        Set<String> tasksIncludingShortLived = 
_helixTaskResourceManager.getTasksInProgressAndRecent(
+            taskType, previousExecutionTimestamp);
+
+        // Start with all tasks that need reporting (in-progress + short-lived)
+        Set<String> tasksToReport = new HashSet<>(tasksIncludingShortLived);
+
+        // Include tasks that were in-progress previously but are no longer 
in-progress
+        // These tasks completed between collection cycles and need their 
final metrics reported
+        for (String taskName : previouslyInProgressTasks) {
+          if (!tasksIncludingShortLived.contains(taskName)) {
+            LOGGER.debug("Including task {} that completed between collection 
cycles for taskType: {}",
+                taskName, taskType);
+            tasksToReport.add(taskName);
+          }
+        }
+
+        final int numRunningTasks = currentInProgressTasks.size();
+
+        // Process all tasks that need metrics reported
+        for (String task : tasksToReport) {
+          try {
+            Map<String, TaskCount> tableTaskCount = 
_helixTaskResourceManager.getTableTaskCount(task);
+            tableTaskCount.forEach((tableNameWithType, taskCount) -> {
+              taskTypeAccumulatedCount.accumulate(taskCount);
+              tableAccumulatedCount.compute(tableNameWithType, (name, count) 
-> {
+                if (count == null) {
+                  count = new TaskCount();
+                }
+                count.accumulate(taskCount);
+                return count;
+              });
             });
-          });
+          } catch (Exception e) {
+            LOGGER.warn("Failed to get task count for task: {} of type: {} 
(task may have been purged from DAG)",
+                task, taskType, e);

Review Comment:
   The error message indicates the task 'may have been purged from DAG', but 
this is speculative. The exception could occur for multiple reasons (network 
issues, Helix unavailability, etc.). Consider either logging at debug level 
with the current message, or using a more general warning message like 'Failed 
to get task count for task: {} of type: {}' without speculating about the cause.
   ```suggestion
               LOGGER.warn("Failed to get task count for task: {} of type: {}", 
task, taskType, e);
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -441,24 +441,63 @@ public synchronized Map<String, TaskCount> 
getTableTaskCount(String taskName) {
    * @return Set of task names
    */
   public synchronized Set<String> getTasksInProgress(String taskType) {
-    WorkflowConfig workflowConfig = 
_taskDriver.getWorkflowConfig(getHelixJobQueueName(taskType));
+    return getTasksInProgressAndRecent(taskType, 0);
+  }
+
+  /**
+   * Returns a set of Task names (in the form 
"Task_<taskType>_<uuid>_<timestamp>") that are in progress or not started
+   * yet, and optionally includes recent tasks that started after a given 
timestamp.
+   * NOTE: For tasks just submitted without the context created, count them as 
NOT_STARTED.
+   * This method combines in-progress tasks and recent tasks in a single Helix 
call to avoid duplicate calls.
+   *
+   * @param taskType Task type
+   * @param afterTimestampMs If > 0, also include tasks that started after 
this timestamp (in milliseconds).
+   *                         This is used to detect short-lived tasks that 
started and completed between cycles.
+   * @return Set of task names that are in-progress, and optionally recent 
tasks that started after the timestamp
+   */
+  public synchronized Set<String> getTasksInProgressAndRecent(String taskType, 
long afterTimestampMs) {
+    String helixJobQueueName = getHelixJobQueueName(taskType);
+    WorkflowConfig workflowConfig = 
_taskDriver.getWorkflowConfig(helixJobQueueName);
     if (workflowConfig == null) {
       return Collections.emptySet();
     }
     Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
     if (helixJobs.isEmpty()) {
       return Collections.emptySet();
     }
-    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+    WorkflowContext workflowContext = 
_taskDriver.getWorkflowContext(helixJobQueueName);
     if (workflowContext == null) {
-      return 
helixJobs.stream().map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
-    } else {
-      Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
-      return helixJobs.stream().filter(helixJobName -> {
-        TaskState taskState = helixJobStates.get(helixJobName);
-        return taskState == null || taskState == TaskState.NOT_STARTED || 
taskState == TaskState.IN_PROGRESS;
-      
}).map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
+      // If no context, return all jobs as in-progress (backward compatible 
behavior)
+      Set<String> result = helixJobs.stream()
+          .map(PinotHelixTaskResourceManager::getPinotTaskName)
+          .collect(Collectors.toSet());
+      // If timestamp is specified, we can't filter by start time without 
context, so return all

Review Comment:
   This comment suggests that when a timestamp is specified but no context 
exists, all tasks are returned without filtering. However, this behavior may 
not align with the caller's expectations when requesting tasks after a specific 
timestamp. Consider documenting this limitation in the method's Javadoc to make 
it clear that timestamp filtering requires a valid WorkflowContext.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -90,20 +106,59 @@ protected final void runTask(Properties 
periodicTaskProperties) {
       TaskCount taskTypeAccumulatedCount = new TaskCount();
       Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
       try {
-        Set<String> tasksInProgress = 
_helixTaskResourceManager.getTasksInProgress(taskType);
-        final int numRunningTasks = tasksInProgress.size();
-        for (String task : tasksInProgress) {
-          Map<String, TaskCount> tableTaskCount = 
_helixTaskResourceManager.getTableTaskCount(task);
-          tableTaskCount.forEach((tableNameWithType, taskCount) -> {
-            taskTypeAccumulatedCount.accumulate(taskCount);
-            tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
-              if (count == null) {
-                count = new TaskCount();
-              }
-              count.accumulate(taskCount);
-              return count;
+        // Capture the current execution timestamp for this task type 
collection cycle
+        long currentExecutionTimestamp = System.currentTimeMillis();
+        // For task types encountered for the first time, use current time 
minus one collection cycle
+        // as the initial timestamp to ensure we don't miss tasks that started 
recently
+        long previousExecutionTimestamp = 
_previousExecutionTimestamps.computeIfAbsent(taskType,
+            k -> currentExecutionTimestamp - _taskMetricsEmitterFrequencyMs);
+
+        // Get currently in-progress tasks (for metrics)
+        Set<String> currentInProgressTasks = 
_helixTaskResourceManager.getTasksInProgress(taskType);
+
+        // Get tasks that were in-progress during the previous collection cycle
+        Set<String> previouslyInProgressTasks =
+            _previousInProgressTasks.getOrDefault(taskType, 
Collections.emptySet());
+
+        // Get all tasks including those that started after the previous 
execution timestamp
+        // This combines in-progress tasks and short-lived tasks that started 
and completed between cycles
+        // in a single Helix call, avoiding duplicate 
getWorkflowConfig/getWorkflowContext calls
+        Set<String> tasksIncludingShortLived = 
_helixTaskResourceManager.getTasksInProgressAndRecent(
+            taskType, previousExecutionTimestamp);
+
+        // Start with all tasks that need reporting (in-progress + short-lived)
+        Set<String> tasksToReport = new HashSet<>(tasksIncludingShortLived);
+
+        // Include tasks that were in-progress previously but are no longer 
in-progress
+        // These tasks completed between collection cycles and need their 
final metrics reported
+        for (String taskName : previouslyInProgressTasks) {
+          if (!tasksIncludingShortLived.contains(taskName)) {
+            LOGGER.debug("Including task {} that completed between collection 
cycles for taskType: {}",

Review Comment:
   [nitpick] This debug log message may be helpful during development, but in 
production it could generate significant log volume for active task types. 
Consider whether this debug logging is necessary, or if it should be at trace 
level. If kept, ensure it provides sufficient value for troubleshooting in 
production environments.
   ```suggestion
               LOGGER.trace("Including task {} that completed between 
collection cycles for taskType: {}",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to