This is an automated email from the ASF dual-hosted git repository.

jiaguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cb3c54630 RefreshSegmentTask - Support for scheduling adhoc tasks 
(#14596)
3cb3c54630 is described below

commit 3cb3c54630de81682fc0497147b6a6758e688c42
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Fri Dec 6 05:29:27 2024 +0530

    RefreshSegmentTask - Support for scheduling adhoc tasks (#14596)
---
 .../RefreshSegmentTaskGenerator.java               | 137 +++++++++++----------
 1 file changed, 75 insertions(+), 62 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
index 59e85c1b1e..cc4f4d5781 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
@@ -54,84 +54,97 @@ public class RefreshSegmentTaskGenerator extends 
BaseTaskGenerator {
 
   @Override
   public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
-    String taskType = RefreshSegmentTask.TASK_TYPE;
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
-    PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
-
-    int tableNumTasks = 0;
-
     for (TableConfig tableConfig : tableConfigs) {
-      String tableNameWithType = tableConfig.getTableName();
-      LOGGER.info("Start generating RefreshSegment tasks for table: {}", 
tableNameWithType);
-
       // Get the task configs for the table. This is used to restrict the 
maximum number of allowed tasks per table at
       // any given point.
       Map<String, String> taskConfigs;
       TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
       if (tableTaskConfig == null) {
-        LOGGER.warn("Failed to find task config for table: {}", 
tableNameWithType);
+        LOGGER.warn("Failed to find task config for table: {}", 
tableConfig.getTableName());
         continue;
       }
       taskConfigs = 
tableTaskConfig.getConfigsForTaskType(RefreshSegmentTask.TASK_TYPE);
-      Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null 
for Table: %s", tableNameWithType);
-      int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
-      String tableMaxNumTasksConfig = 
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
-      if (tableMaxNumTasksConfig != null) {
-        try {
-          tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
-        } catch (Exception e) {
-          tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
-          LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and 
task {}", tableNameWithType, taskType);
-        }
+      pinotTaskConfigs.addAll(generateTasksForTable(tableConfig, taskConfigs));
+    }
+
+    return pinotTaskConfigs;
+  }
+
+  @Override
+  public List<PinotTaskConfig> generateTasks(TableConfig tableConfig, 
Map<String, String> taskConfigs)
+      throws Exception {
+    return generateTasksForTable(tableConfig, taskConfigs);
+  }
+
+  private List<PinotTaskConfig> generateTasksForTable(TableConfig tableConfig, 
Map<String, String> taskConfigs) {
+    String tableNameWithType = tableConfig.getTableName();
+    Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for 
Table: %s", tableNameWithType);
+
+
+    String taskType = RefreshSegmentTask.TASK_TYPE;
+    List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+    PinotHelixResourceManager pinotHelixResourceManager = 
_clusterInfoAccessor.getPinotHelixResourceManager();
+
+    LOGGER.info("Start generating RefreshSegment tasks for table: {}", 
tableNameWithType);
+
+    int tableNumTasks = 0;
+    int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
+    String tableMaxNumTasksConfig = 
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+    if (tableMaxNumTasksConfig != null) {
+      try {
+        tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+      } catch (Exception e) {
+        tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
+        LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and 
task {}", tableNameWithType, taskType);
+      }
+    }
+
+    // Get info about table and schema.
+    Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType);
+    Schema schema = 
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+    Stat schemaStat = 
pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());
+
+    // Get the running segments for a table.
+    Set<Segment> runningSegments =
+        TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, 
_clusterInfoAccessor);
+
+    // Make a single ZK call to get the segments.
+    List<SegmentZKMetadata> allSegments = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+
+    for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+      // Skip if we have reached the maximum number of permissible tasks per 
iteration.
+      if (tableNumTasks >= tableMaxNumTasks) {
+        break;
       }
 
-      // Get info about table and schema.
-      Stat tableStat = 
pinotHelixResourceManager.getTableStat(tableNameWithType);
-      Schema schema = 
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
-      Stat schemaStat = 
pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());
-
-      // Get the running segments for a table.
-      Set<Segment> runningSegments =
-          TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE, 
_clusterInfoAccessor);
-
-      // Make a single ZK call to get the segments.
-      List<SegmentZKMetadata> allSegments = 
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
-
-      for (SegmentZKMetadata segmentZKMetadata : allSegments) {
-        // Skip if we have reached the maximum number of permissible tasks per 
iteration.
-        if (tableNumTasks >= tableMaxNumTasks) {
-          break;
-        }
-
-        // Skip consuming segments.
-        if (tableConfig.getTableType() == TableType.REALTIME && 
!segmentZKMetadata.getStatus().isCompleted()) {
-          continue;
-        }
-
-        // Skip segments for which a task is already running.
-        if (runningSegments.contains(new Segment(tableNameWithType, 
segmentZKMetadata.getSegmentName()))) {
-          continue;
-        }
-
-        String segmentName = segmentZKMetadata.getSegmentName();
-
-        // Skip if the segment is already up-to-date and doesn't have to be 
refreshed.
-        if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, 
schemaStat)) {
-          continue;
-        }
-
-        Map<String, String> configs = new 
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
-        configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
segmentZKMetadata.getDownloadUrl());
-        configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
-        configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, 
String.valueOf(segmentZKMetadata.getCrc()));
-        pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
-        tableNumTasks++;
+      // Skip consuming segments.
+      if (tableConfig.getTableType() == TableType.REALTIME && 
!segmentZKMetadata.getStatus().isCompleted()) {
+        continue;
+      }
+
+      // Skip segments for which a task is already running.
+      if (runningSegments.contains(new Segment(tableNameWithType, 
segmentZKMetadata.getSegmentName()))) {
+        continue;
+      }
+
+      String segmentName = segmentZKMetadata.getSegmentName();
+
+      // Skip if the segment is already up-to-date and doesn't have to be 
refreshed.
+      if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat, 
schemaStat)) {
+        continue;
       }
 
-      LOGGER.info("Finished generating {} tasks configs for table: {} " + "for 
task: {}", tableNumTasks,
-          tableNameWithType, taskType);
+      Map<String, String> configs = new 
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
+      configs.put(MinionConstants.DOWNLOAD_URL_KEY, 
segmentZKMetadata.getDownloadUrl());
+      configs.put(MinionConstants.UPLOAD_URL_KEY, 
_clusterInfoAccessor.getVipUrl() + "/segments");
+      configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, 
String.valueOf(segmentZKMetadata.getCrc()));
+      pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
+      tableNumTasks++;
     }
 
+    LOGGER.info("Finished generating {} tasks configs for table: {} for task: 
{}", tableNumTasks, tableNameWithType,
+        taskType);
     return pinotTaskConfigs;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to