Copilot commented on code in PR #17330:
URL: https://github.com/apache/pinot/pull/17330#discussion_r2599964056
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -958,6 +959,141 @@ private boolean hasTasksForTable(String taskName, String
tableNameWithType) {
}
}
+ /**
+ * Get the server tenant name for a given table by looking up its
configuration.
+ * Returns "unknown" if the table or tenant cannot be determined.
+ *
+ * @param tableName Table name with type (e.g., "myTable_OFFLINE")
+ * @return Server tenant name or "unknown"
+ */
+ private String getTenantForTable(String tableName) {
+ if (tableName == null || UNKNOWN_TABLE_NAME.equals(tableName)) {
+ return UNKNOWN_TABLE_NAME;
+ }
+
+ try {
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableName);
+ if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+ String serverTenant = tableConfig.getTenantConfig().getServer();
+ return serverTenant != null ? serverTenant : UNKNOWN_TABLE_NAME;
+ }
+ return UNKNOWN_TABLE_NAME;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to determine tenant for table: {}", tableName, e);
+ return UNKNOWN_TABLE_NAME;
+ }
+ }
+
+ /**
+ * Get a summary of all tasks across all task types, grouped by tenant.
+ *
+ * <p>Only includes tasks with RUNNING or WAITING subtasks. Completed,
failed, or aborted tasks are excluded.
+ * Tasks are first resolved to their table, then grouped by the table's
server tenant.
+ *
+ * @param tenantFilter Optional tenant name to filter results. If null,
returns all tenants.
+ * @return TaskSummaryResponse containing aggregated task counts grouped by
tenant
+ */
+ public synchronized TaskSummaryResponse getTasksSummary(@Nullable String
tenantFilter) {
+ TaskSummaryResponse response = new TaskSummaryResponse();
+ Set<String> taskTypes = getTaskTypes();
+
+ if (taskTypes == null || taskTypes.isEmpty()) {
+ return response;
+ }
+
+ // Map: tenant -> taskType -> aggregated TaskCount
+ Map<String, Map<String, TaskCount>> tenantToTaskTypeCounts = new
TreeMap<>();
+ int totalRunning = 0;
+ int totalWaiting = 0;
+
+ for (String taskType : taskTypes) {
+ Map<String, TaskCount> taskCounts = getTaskCounts(taskType);
+ if (taskCounts == null || taskCounts.isEmpty()) {
+ continue;
+ }
+
+ // For each parent task, only fetch table breakdown if it has active
tasks
+ for (Map.Entry<String, TaskCount> entry : taskCounts.entrySet()) {
+ String taskName = entry.getKey();
+ TaskCount totalTaskCount = entry.getValue();
+
+ // Skip if this parent task has no running/waiting tasks
(optimization: avoid table breakdown call)
+ if (totalTaskCount.getRunning() == 0 && totalTaskCount.getWaiting() ==
0) {
+ continue;
+ }
Review Comment:
The comment on line 1020 is excellent as it explains both the condition and
the performance rationale. However, the similar optimization at lines 1074-1079
lacks an explanatory comment about why only non-zero counts are included.
Adding a comment there would improve consistency and code clarity.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java:
##########
@@ -958,6 +959,141 @@ private boolean hasTasksForTable(String taskName, String
tableNameWithType) {
}
}
+ /**
+ * Get the server tenant name for a given table by looking up its
configuration.
+ * Returns "unknown" if the table or tenant cannot be determined.
+ *
+ * @param tableName Table name with type (e.g., "myTable_OFFLINE")
+ * @return Server tenant name or "unknown"
+ */
+ private String getTenantForTable(String tableName) {
+ if (tableName == null || UNKNOWN_TABLE_NAME.equals(tableName)) {
+ return UNKNOWN_TABLE_NAME;
+ }
+
+ try {
+ TableConfig tableConfig =
_helixResourceManager.getTableConfig(tableName);
+ if (tableConfig != null && tableConfig.getTenantConfig() != null) {
+ String serverTenant = tableConfig.getTenantConfig().getServer();
+ return serverTenant != null ? serverTenant : UNKNOWN_TABLE_NAME;
+ }
+ return UNKNOWN_TABLE_NAME;
+ } catch (Exception e) {
+ LOGGER.warn("Failed to determine tenant for table: {}", tableName, e);
+ return UNKNOWN_TABLE_NAME;
+ }
+ }
+
+ /**
+ * Get a summary of all tasks across all task types, grouped by tenant.
+ *
+ * <p>Only includes tasks with RUNNING or WAITING subtasks. Completed,
failed, or aborted tasks are excluded.
+ * Tasks are first resolved to their table, then grouped by the table's
server tenant.
+ *
+ * @param tenantFilter Optional tenant name to filter results. If null,
returns all tenants.
+ * @return TaskSummaryResponse containing aggregated task counts grouped by
tenant
+ */
+ public synchronized TaskSummaryResponse getTasksSummary(@Nullable String
tenantFilter) {
+ TaskSummaryResponse response = new TaskSummaryResponse();
+ Set<String> taskTypes = getTaskTypes();
+
+ if (taskTypes == null || taskTypes.isEmpty()) {
+ return response;
+ }
+
+ // Map: tenant -> taskType -> aggregated TaskCount
+ Map<String, Map<String, TaskCount>> tenantToTaskTypeCounts = new
TreeMap<>();
+ int totalRunning = 0;
+ int totalWaiting = 0;
+
+ for (String taskType : taskTypes) {
+ Map<String, TaskCount> taskCounts = getTaskCounts(taskType);
+ if (taskCounts == null || taskCounts.isEmpty()) {
+ continue;
+ }
+
+ // For each parent task, only fetch table breakdown if it has active
tasks
+ for (Map.Entry<String, TaskCount> entry : taskCounts.entrySet()) {
+ String taskName = entry.getKey();
+ TaskCount totalTaskCount = entry.getValue();
+
+ // Skip if this parent task has no running/waiting tasks
(optimization: avoid table breakdown call)
+ if (totalTaskCount.getRunning() == 0 && totalTaskCount.getWaiting() ==
0) {
+ continue;
+ }
+
+ // Get the table name from the first subtask
+ // Note: In practice, all subtasks in a parent task belong to the same
table
+ List<PinotTaskConfig> subtaskConfigs = getSubtaskConfigs(taskName);
+ if (subtaskConfigs.isEmpty()) {
+ continue;
+ }
+
+ Map<String, String> configs = subtaskConfigs.get(0).getConfigs();
Review Comment:
The code retrieves configs from the first subtask without checking if the
list is empty. Line 1028 checks `subtaskConfigs.isEmpty()` and continues, but
`get(0)` at line 1032 could still throw `IndexOutOfBoundsException` if the list
becomes empty between the check and access due to concurrent modification.
Although the method is `synchronized`, it's safer to use a defensive pattern
like storing the first config in a variable after the isEmpty check to ensure
clarity and prevent potential issues.
```suggestion
PinotTaskConfig firstSubtaskConfig = subtaskConfigs.get(0);
Map<String, String> configs = firstSubtaskConfig.getConfigs();
```
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManagerTest.java:
##########
@@ -1273,4 +1277,687 @@ private void mockTaskJobConfigAndContext(TaskDriver
taskDriver, String taskName,
when(jobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
when(jobContext.getPartitionState(0)).thenReturn(state);
}
+
+ @Test
+ public void testGetTasksSummaryWithEmptyTaskTypes() {
+ TaskDriver taskDriver = mock(TaskDriver.class);
+ PinotHelixResourceManager helixResourceManager =
mock(PinotHelixResourceManager.class);
+ PinotHelixTaskResourceManager mgr = new
PinotHelixTaskResourceManager(helixResourceManager, taskDriver);
+
+ PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+ when(spyMgr.getTaskTypes()).thenReturn(Collections.emptySet());
+
+ PinotHelixTaskResourceManager.TaskSummaryResponse response =
spyMgr.getTasksSummary(null);
+
+ assertNotNull(response);
+ assertEquals(response.getTotalRunningTasks(), 0);
+ assertEquals(response.getTotalWaitingTasks(), 0);
+ assertTrue(response.getTaskBreakdown().isEmpty());
+ }
+
+ @Test
+ public void testGetTasksSummaryWithNullTaskTypes() {
+ TaskDriver taskDriver = mock(TaskDriver.class);
+ PinotHelixResourceManager helixResourceManager =
mock(PinotHelixResourceManager.class);
+ PinotHelixTaskResourceManager mgr = new
PinotHelixTaskResourceManager(helixResourceManager, taskDriver);
+
+ PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+ when(spyMgr.getTaskTypes()).thenReturn(null);
+
+ PinotHelixTaskResourceManager.TaskSummaryResponse response =
spyMgr.getTasksSummary(null);
+
+ assertNotNull(response);
+ assertEquals(response.getTotalRunningTasks(), 0);
+ assertEquals(response.getTotalWaitingTasks(), 0);
+ assertTrue(response.getTaskBreakdown().isEmpty());
+ }
+
+ @Test
+ public void testGetTasksSummaryWithNoActiveTasks() {
+ TaskDriver taskDriver = mock(TaskDriver.class);
+ PinotHelixResourceManager helixResourceManager =
mock(PinotHelixResourceManager.class);
+ PinotHelixTaskResourceManager mgr = new
PinotHelixTaskResourceManager(helixResourceManager, taskDriver);
+
+ String taskType = "TestTask";
+ String taskName = "Task_TestTask_12345";
+
+ PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+ when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
+
+ // Task with no running/waiting tasks (all completed)
+ PinotHelixTaskResourceManager.TaskCount taskCount = new
PinotHelixTaskResourceManager.TaskCount();
+ taskCount.addTaskState(TaskPartitionState.COMPLETED);
+ taskCount.addTaskState(TaskPartitionState.COMPLETED);
+ Map<String, PinotHelixTaskResourceManager.TaskCount> taskCounts = new
HashMap<>();
+ taskCounts.put(taskName, taskCount);
+ when(spyMgr.getTaskCounts(taskType)).thenReturn(taskCounts);
+
+ PinotHelixTaskResourceManager.TaskSummaryResponse response =
spyMgr.getTasksSummary(null);
+
+ assertNotNull(response);
+ assertEquals(response.getTotalRunningTasks(), 0);
+ assertEquals(response.getTotalWaitingTasks(), 0);
+ assertTrue(response.getTaskBreakdown().isEmpty());
+ }
+
+ @Test
+ public void testGetTasksSummaryWithSingleTenant() {
+ TaskDriver taskDriver = mock(TaskDriver.class);
+ PinotHelixResourceManager helixResourceManager =
mock(PinotHelixResourceManager.class);
+ PinotHelixTaskResourceManager mgr = new
PinotHelixTaskResourceManager(helixResourceManager, taskDriver);
+
+ String taskType = "TestTask";
+ String taskName = "Task_TestTask_12345";
+ String tableName = "testTable_OFFLINE";
+ String tenant = "defaultTenant";
+
+ PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+ when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
+
+ // Task with running and waiting tasks
+ PinotHelixTaskResourceManager.TaskCount taskCount = new
PinotHelixTaskResourceManager.TaskCount();
+ taskCount.addTaskState(TaskPartitionState.RUNNING);
+ taskCount.addTaskState(TaskPartitionState.RUNNING);
+ taskCount.addTaskState(null);
+ Map<String, PinotHelixTaskResourceManager.TaskCount> taskCounts = new
HashMap<>();
+ taskCounts.put(taskName, taskCount);
+ when(spyMgr.getTaskCounts(taskType)).thenReturn(taskCounts);
+
+ // Mock subtask configs with table name
+ Map<String, String> configs = new HashMap<>();
+ configs.put(MinionConstants.TABLE_NAME_KEY, tableName);
+ PinotTaskConfig subtaskConfig = new PinotTaskConfig(taskType, configs);
+
when(spyMgr.getSubtaskConfigs(taskName)).thenReturn(Collections.singletonList(subtaskConfig));
+
+ // Mock tenant lookup
+ TableConfig tableConfig = mock(TableConfig.class);
+ TenantConfig tenantConfig = mock(TenantConfig.class);
+ when(tableConfig.getTenantConfig()).thenReturn(tenantConfig);
+ when(tenantConfig.getServer()).thenReturn(tenant);
+
when(helixResourceManager.getTableConfig(tableName)).thenReturn(tableConfig);
+
+ PinotHelixTaskResourceManager.TaskSummaryResponse response =
spyMgr.getTasksSummary(null);
+
+ assertNotNull(response);
+ assertEquals(response.getTotalRunningTasks(), 2);
+ assertEquals(response.getTotalWaitingTasks(), 1);
+ assertEquals(response.getTaskBreakdown().size(), 1);
+
+ PinotHelixTaskResourceManager.TenantTaskBreakdown tenantBreakdown =
response.getTaskBreakdown().get(0);
+ assertEquals(tenantBreakdown.getTenant(), tenant);
+ assertEquals(tenantBreakdown.getRunningTasks(), 2);
+ assertEquals(tenantBreakdown.getWaitingTasks(), 1);
+ assertEquals(tenantBreakdown.getTaskTypeBreakdown().size(), 1);
+
+ PinotHelixTaskResourceManager.TaskTypeBreakdown taskTypeBreakdown =
tenantBreakdown.getTaskTypeBreakdown().get(0);
+ assertEquals(taskTypeBreakdown.getTaskType(), taskType);
+ assertEquals(taskTypeBreakdown.getRunningCount(), 2);
+ assertEquals(taskTypeBreakdown.getWaitingCount(), 1);
+ }
+
+ @Test
+ public void testGetTasksSummaryWithMultipleTenants() {
+ TaskDriver taskDriver = mock(TaskDriver.class);
+ PinotHelixResourceManager helixResourceManager =
mock(PinotHelixResourceManager.class);
+ PinotHelixTaskResourceManager mgr = new
PinotHelixTaskResourceManager(helixResourceManager, taskDriver);
+
+ String taskType = "TestTask";
+ String taskName1 = "Task_TestTask_12345";
+ String taskName2 = "Task_TestTask_67890";
+ String tableName1 = "table1_OFFLINE";
+ String tableName2 = "table2_OFFLINE";
+ String tenant1 = "tenant1";
+ String tenant2 = "tenant2";
+
+ PinotHelixTaskResourceManager spyMgr = Mockito.spy(mgr);
+ when(spyMgr.getTaskTypes()).thenReturn(Collections.singleton(taskType));
+
+ // Task 1: tenant1 with running tasks
+ PinotHelixTaskResourceManager.TaskCount taskCount1 = new
PinotHelixTaskResourceManager.TaskCount();
+ taskCount1.addTaskState(TaskPartitionState.RUNNING);
+ taskCount1.addTaskState(TaskPartitionState.RUNNING);
+
+ // Task 2: tenant2 with waiting tasks (null state means waiting/not yet
assigned)
+ PinotHelixTaskResourceManager.TaskCount taskCount2 = new
PinotHelixTaskResourceManager.TaskCount();
+ taskCount2.addTaskState(null); // null state counts as waiting
+ taskCount2.addTaskState(null); // null state counts as waiting
Review Comment:
[nitpick] These inline comments explaining null state behavior are helpful
and should be consistently applied. Similar null state additions at lines 1361,
1422-1423, and 1630 have comments, but line 1361's comment style differs.
Consider standardizing the comment format across all null state additions for
consistency.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]