Copilot commented on code in PR #17128:
URL: https://github.com/apache/pinot/pull/17128#discussion_r2488867732
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -90,20 +106,59 @@ protected final void runTask(Properties
periodicTaskProperties) {
TaskCount taskTypeAccumulatedCount = new TaskCount();
Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
try {
- Set<String> tasksInProgress =
_helixTaskResourceManager.getTasksInProgress(taskType);
- final int numRunningTasks = tasksInProgress.size();
- for (String task : tasksInProgress) {
- Map<String, TaskCount> tableTaskCount =
_helixTaskResourceManager.getTableTaskCount(task);
- tableTaskCount.forEach((tableNameWithType, taskCount) -> {
- taskTypeAccumulatedCount.accumulate(taskCount);
- tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
- if (count == null) {
- count = new TaskCount();
- }
- count.accumulate(taskCount);
- return count;
+ // Capture the current execution timestamp for this task type
collection cycle
+ long currentExecutionTimestamp = System.currentTimeMillis();
+ // For task types encountered for the first time, use current time
minus one collection cycle
+ // as the initial timestamp to ensure we don't miss tasks that started
recently
+ long previousExecutionTimestamp =
_previousExecutionTimestamps.computeIfAbsent(taskType,
+ k -> currentExecutionTimestamp - _taskMetricsEmitterFrequencyMs);
+
+ // Get currently in-progress tasks (for metrics)
+ Set<String> currentInProgressTasks =
_helixTaskResourceManager.getTasksInProgress(taskType);
+
+ // Get tasks that were in-progress during the previous collection cycle
+ Set<String> previouslyInProgressTasks =
+ _previousInProgressTasks.getOrDefault(taskType,
Collections.emptySet());
+
+ // Get all tasks including those that started after the previous
execution timestamp
+ // This combines in-progress tasks and short-lived tasks that started
and completed between cycles
+ // in a single Helix call, avoiding duplicate
getWorkflowConfig/getWorkflowContext calls
+ Set<String> tasksIncludingShortLived =
_helixTaskResourceManager.getTasksInProgressAndRecent(
+ taskType, previousExecutionTimestamp);
+
+ // Start with all tasks that need reporting (in-progress + short-lived)
+ Set<String> tasksToReport = new HashSet<>(tasksIncludingShortLived);
+
+ // Include tasks that were in-progress previously but are no longer
in-progress
+ // These tasks completed between collection cycles and need their
final metrics reported
+ for (String taskName : previouslyInProgressTasks) {
+ if (!tasksIncludingShortLived.contains(taskName)) {
+ LOGGER.debug("Including task {} that completed between collection
cycles for taskType: {}",
+ taskName, taskType);
+ tasksToReport.add(taskName);
+ }
+ }
+
+ final int numRunningTasks = currentInProgressTasks.size();
+
+ // Process all tasks that need metrics reported
+ for (String task : tasksToReport) {
+ try {
+ Map<String, TaskCount> tableTaskCount =
_helixTaskResourceManager.getTableTaskCount(task);
+ tableTaskCount.forEach((tableNameWithType, taskCount) -> {
+ taskTypeAccumulatedCount.accumulate(taskCount);
+ tableAccumulatedCount.compute(tableNameWithType, (name, count)
-> {
+ if (count == null) {
+ count = new TaskCount();
+ }
+ count.accumulate(taskCount);
+ return count;
+ });
});
- });
+ } catch (Exception e) {
+ LOGGER.warn("Failed to get task count for task: {} of type: {}
(task may have been purged from DAG)",
+ task, taskType, e);
Review Comment:
The error message indicates the task 'may have been purged from DAG', but
this is speculative. The exception could occur for multiple reasons (network
issues, Helix unavailability, etc.). Consider either logging at debug level
with the current message, or using a more general warning message like 'Failed
to get task count for task: {} of type: {}' without speculating about the cause.
```suggestion
LOGGER.warn("Failed to get task count for task: {} of type: {}",
task, taskType, e);
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -441,24 +441,63 @@ public synchronized Map<String, TaskCount>
getTableTaskCount(String taskName) {
* @return Set of task names
*/
public synchronized Set<String> getTasksInProgress(String taskType) {
- WorkflowConfig workflowConfig =
_taskDriver.getWorkflowConfig(getHelixJobQueueName(taskType));
+ return getTasksInProgressAndRecent(taskType, 0);
+ }
+
+ /**
+ * Returns a set of Task names (in the form
"Task_<taskType>_<uuid>_<timestamp>") that are in progress or not started
+ * yet, and optionally includes recent tasks that started after a given
timestamp.
+ * NOTE: For tasks just submitted without the context created, count them as
NOT_STARTED.
+ * This method combines in-progress tasks and recent tasks in a single Helix
call to avoid duplicate calls.
+ *
+ * @param taskType Task type
+ * @param afterTimestampMs If > 0, also include tasks that started after
this timestamp (in milliseconds).
+ * This is used to detect short-lived tasks that
started and completed between cycles.
+ * @return Set of task names that are in-progress, and optionally recent
tasks that started after the timestamp
+ */
+ public synchronized Set<String> getTasksInProgressAndRecent(String taskType,
long afterTimestampMs) {
+ String helixJobQueueName = getHelixJobQueueName(taskType);
+ WorkflowConfig workflowConfig =
_taskDriver.getWorkflowConfig(helixJobQueueName);
if (workflowConfig == null) {
return Collections.emptySet();
}
Set<String> helixJobs = workflowConfig.getJobDag().getAllNodes();
if (helixJobs.isEmpty()) {
return Collections.emptySet();
}
- WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(getHelixJobQueueName(taskType));
+ WorkflowContext workflowContext =
_taskDriver.getWorkflowContext(helixJobQueueName);
if (workflowContext == null) {
- return
helixJobs.stream().map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
- } else {
- Map<String, TaskState> helixJobStates = workflowContext.getJobStates();
- return helixJobs.stream().filter(helixJobName -> {
- TaskState taskState = helixJobStates.get(helixJobName);
- return taskState == null || taskState == TaskState.NOT_STARTED ||
taskState == TaskState.IN_PROGRESS;
-
}).map(PinotHelixTaskResourceManager::getPinotTaskName).collect(Collectors.toSet());
+ // If no context, return all jobs as in-progress (backward compatible
behavior)
+ Set<String> result = helixJobs.stream()
+ .map(PinotHelixTaskResourceManager::getPinotTaskName)
+ .collect(Collectors.toSet());
+ // If timestamp is specified, we can't filter by start time without
context, so return all
Review Comment:
This comment suggests that when a timestamp is specified but no context
exists, all tasks are returned without filtering. However, this behavior may
not align with the caller's expectations when requesting tasks after a specific
timestamp. Consider documenting this limitation in the method's Javadoc to make
it clear that timestamp filtering requires a valid WorkflowContext.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/TaskMetricsEmitter.java:
##########
@@ -90,20 +106,59 @@ protected final void runTask(Properties
periodicTaskProperties) {
TaskCount taskTypeAccumulatedCount = new TaskCount();
Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
try {
- Set<String> tasksInProgress =
_helixTaskResourceManager.getTasksInProgress(taskType);
- final int numRunningTasks = tasksInProgress.size();
- for (String task : tasksInProgress) {
- Map<String, TaskCount> tableTaskCount =
_helixTaskResourceManager.getTableTaskCount(task);
- tableTaskCount.forEach((tableNameWithType, taskCount) -> {
- taskTypeAccumulatedCount.accumulate(taskCount);
- tableAccumulatedCount.compute(tableNameWithType, (name, count) -> {
- if (count == null) {
- count = new TaskCount();
- }
- count.accumulate(taskCount);
- return count;
+ // Capture the current execution timestamp for this task type
collection cycle
+ long currentExecutionTimestamp = System.currentTimeMillis();
+ // For task types encountered for the first time, use current time
minus one collection cycle
+ // as the initial timestamp to ensure we don't miss tasks that started
recently
+ long previousExecutionTimestamp =
_previousExecutionTimestamps.computeIfAbsent(taskType,
+ k -> currentExecutionTimestamp - _taskMetricsEmitterFrequencyMs);
+
+ // Get currently in-progress tasks (for metrics)
+ Set<String> currentInProgressTasks =
_helixTaskResourceManager.getTasksInProgress(taskType);
+
+ // Get tasks that were in-progress during the previous collection cycle
+ Set<String> previouslyInProgressTasks =
+ _previousInProgressTasks.getOrDefault(taskType,
Collections.emptySet());
+
+ // Get all tasks including those that started after the previous
execution timestamp
+ // This combines in-progress tasks and short-lived tasks that started
and completed between cycles
+ // in a single Helix call, avoiding duplicate
getWorkflowConfig/getWorkflowContext calls
+ Set<String> tasksIncludingShortLived =
_helixTaskResourceManager.getTasksInProgressAndRecent(
+ taskType, previousExecutionTimestamp);
+
+ // Start with all tasks that need reporting (in-progress + short-lived)
+ Set<String> tasksToReport = new HashSet<>(tasksIncludingShortLived);
+
+ // Include tasks that were in-progress previously but are no longer
in-progress
+ // These tasks completed between collection cycles and need their
final metrics reported
+ for (String taskName : previouslyInProgressTasks) {
+ if (!tasksIncludingShortLived.contains(taskName)) {
+ LOGGER.debug("Including task {} that completed between collection
cycles for taskType: {}",
Review Comment:
[nitpick] This debug log message may be helpful during development, but in
production it could generate significant log volume for active task types.
Consider whether this debug logging is necessary, or if it should be at trace
level. If kept, ensure it provides sufficient value for troubleshooting in
production environments.
```suggestion
LOGGER.trace("Including task {} that completed between
collection cycles for taskType: {}",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]