This is an automated email from the ASF dual-hosted git repository.
jiaguo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3cb3c54630 RefreshSegmentTask - Support for scheduling adhoc tasks
(#14596)
3cb3c54630 is described below
commit 3cb3c54630de81682fc0497147b6a6758e688c42
Author: Vivek Iyer Vaidyanathan <[email protected]>
AuthorDate: Fri Dec 6 05:29:27 2024 +0530
RefreshSegmentTask - Support for scheduling adhoc tasks (#14596)
---
.../RefreshSegmentTaskGenerator.java | 137 +++++++++++----------
1 file changed, 75 insertions(+), 62 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
index 59e85c1b1e..cc4f4d5781 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java
@@ -54,84 +54,97 @@ public class RefreshSegmentTaskGenerator extends
BaseTaskGenerator {
@Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
- String taskType = RefreshSegmentTask.TASK_TYPE;
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
- PinotHelixResourceManager pinotHelixResourceManager =
_clusterInfoAccessor.getPinotHelixResourceManager();
-
- int tableNumTasks = 0;
-
for (TableConfig tableConfig : tableConfigs) {
- String tableNameWithType = tableConfig.getTableName();
- LOGGER.info("Start generating RefreshSegment tasks for table: {}",
tableNameWithType);
-
// Get the task configs for the table. This is used to restrict the
maximum number of allowed tasks per table at
// any given point.
Map<String, String> taskConfigs;
TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
if (tableTaskConfig == null) {
- LOGGER.warn("Failed to find task config for table: {}",
tableNameWithType);
+ LOGGER.warn("Failed to find task config for table: {}",
tableConfig.getTableName());
continue;
}
taskConfigs =
tableTaskConfig.getConfigsForTaskType(RefreshSegmentTask.TASK_TYPE);
- Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null
for Table: %s", tableNameWithType);
- int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
- String tableMaxNumTasksConfig =
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
- if (tableMaxNumTasksConfig != null) {
- try {
- tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
- } catch (Exception e) {
- tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
- LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and
task {}", tableNameWithType, taskType);
- }
+ pinotTaskConfigs.addAll(generateTasksForTable(tableConfig, taskConfigs));
+ }
+
+ return pinotTaskConfigs;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(TableConfig tableConfig,
Map<String, String> taskConfigs)
+ throws Exception {
+ return generateTasksForTable(tableConfig, taskConfigs);
+ }
+
+ private List<PinotTaskConfig> generateTasksForTable(TableConfig tableConfig,
Map<String, String> taskConfigs) {
+ String tableNameWithType = tableConfig.getTableName();
+ Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for
Table: %s", tableNameWithType);
+
+
+ String taskType = RefreshSegmentTask.TASK_TYPE;
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+ PinotHelixResourceManager pinotHelixResourceManager =
_clusterInfoAccessor.getPinotHelixResourceManager();
+
+ LOGGER.info("Start generating RefreshSegment tasks for table: {}",
tableNameWithType);
+
+ int tableNumTasks = 0;
+ int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
+ String tableMaxNumTasksConfig =
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+ if (tableMaxNumTasksConfig != null) {
+ try {
+ tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+ } catch (Exception e) {
+ tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
+ LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and
task {}", tableNameWithType, taskType);
+ }
+ }
+
+ // Get info about table and schema.
+ Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType);
+ Schema schema =
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+ Stat schemaStat =
pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());
+
+ // Get the running segments for a table.
+ Set<Segment> runningSegments =
+ TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE,
_clusterInfoAccessor);
+
+ // Make a single ZK call to get the segments.
+ List<SegmentZKMetadata> allSegments =
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+
+ for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+ // Skip if we have reached the maximum number of permissible tasks per
iteration.
+ if (tableNumTasks >= tableMaxNumTasks) {
+ break;
}
- // Get info about table and schema.
- Stat tableStat =
pinotHelixResourceManager.getTableStat(tableNameWithType);
- Schema schema =
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
- Stat schemaStat =
pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());
-
- // Get the running segments for a table.
- Set<Segment> runningSegments =
- TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE,
_clusterInfoAccessor);
-
- // Make a single ZK call to get the segments.
- List<SegmentZKMetadata> allSegments =
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
-
- for (SegmentZKMetadata segmentZKMetadata : allSegments) {
- // Skip if we have reached the maximum number of permissible tasks per
iteration.
- if (tableNumTasks >= tableMaxNumTasks) {
- break;
- }
-
- // Skip consuming segments.
- if (tableConfig.getTableType() == TableType.REALTIME &&
!segmentZKMetadata.getStatus().isCompleted()) {
- continue;
- }
-
- // Skip segments for which a task is already running.
- if (runningSegments.contains(new Segment(tableNameWithType,
segmentZKMetadata.getSegmentName()))) {
- continue;
- }
-
- String segmentName = segmentZKMetadata.getSegmentName();
-
- // Skip if the segment is already up-to-date and doesn't have to be
refreshed.
- if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat,
schemaStat)) {
- continue;
- }
-
- Map<String, String> configs = new
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
- configs.put(MinionConstants.DOWNLOAD_URL_KEY,
segmentZKMetadata.getDownloadUrl());
- configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrl() + "/segments");
- configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY,
String.valueOf(segmentZKMetadata.getCrc()));
- pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
- tableNumTasks++;
+ // Skip consuming segments.
+ if (tableConfig.getTableType() == TableType.REALTIME &&
!segmentZKMetadata.getStatus().isCompleted()) {
+ continue;
+ }
+
+ // Skip segments for which a task is already running.
+ if (runningSegments.contains(new Segment(tableNameWithType,
segmentZKMetadata.getSegmentName()))) {
+ continue;
+ }
+
+ String segmentName = segmentZKMetadata.getSegmentName();
+
+ // Skip if the segment is already up-to-date and doesn't have to be
refreshed.
+ if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat,
schemaStat)) {
+ continue;
}
- LOGGER.info("Finished generating {} tasks configs for table: {} " + "for
task: {}", tableNumTasks,
- tableNameWithType, taskType);
+ Map<String, String> configs = new
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
+ configs.put(MinionConstants.DOWNLOAD_URL_KEY,
segmentZKMetadata.getDownloadUrl());
+ configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrl() + "/segments");
+ configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY,
String.valueOf(segmentZKMetadata.getCrc()));
+ pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
+ tableNumTasks++;
}
+ LOGGER.info("Finished generating {} tasks configs for table: {} for task:
{}", tableNumTasks, tableNameWithType,
+ taskType);
return pinotTaskConfigs;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]