tibrewalpratik17 commented on code in PR #14596:
URL: https://github.com/apache/pinot/pull/14596#discussion_r1870037061
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/refreshsegment/RefreshSegmentTaskGenerator.java:
##########
@@ -54,84 +54,99 @@ public String getTaskType() {
@Override
public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
- String taskType = RefreshSegmentTask.TASK_TYPE;
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
- PinotHelixResourceManager pinotHelixResourceManager =
_clusterInfoAccessor.getPinotHelixResourceManager();
-
- int tableNumTasks = 0;
-
for (TableConfig tableConfig : tableConfigs) {
- String tableNameWithType = tableConfig.getTableName();
- LOGGER.info("Start generating RefreshSegment tasks for table: {}",
tableNameWithType);
-
// Get the task configs for the table. This is used to restrict the
maximum number of allowed tasks per table at
// any given point.
Map<String, String> taskConfigs;
TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
if (tableTaskConfig == null) {
- LOGGER.warn("Failed to find task config for table: {}",
tableNameWithType);
+ LOGGER.warn("Failed to find task config for table: {}",
tableConfig.getTableName());
continue;
}
taskConfigs =
tableTaskConfig.getConfigsForTaskType(RefreshSegmentTask.TASK_TYPE);
- Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null
for Table: %s", tableNameWithType);
- int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
- String tableMaxNumTasksConfig =
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
- if (tableMaxNumTasksConfig != null) {
- try {
- tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
- } catch (Exception e) {
- tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
- LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and
task {}", tableNameWithType, taskType);
- }
+ Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null
for Table: %s",
+ tableConfig.getTableName());
+
+ pinotTaskConfigs.addAll(generateTasksForTable(tableConfig, taskConfigs));
+ }
+
+ return pinotTaskConfigs;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(TableConfig tableConfig,
Map<String, String> taskConfigs)
+ throws Exception {
+ Preconditions.checkNotNull(taskConfigs, "Task config shouldn't be null for
Table: %s", tableConfig.getTableName());
+ return generateTasksForTable(tableConfig, taskConfigs);
+ }
+
+ private List<PinotTaskConfig> generateTasksForTable(TableConfig tableConfig,
Map<String, String> taskConfigs) {
+ String taskType = RefreshSegmentTask.TASK_TYPE;
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+ PinotHelixResourceManager pinotHelixResourceManager =
_clusterInfoAccessor.getPinotHelixResourceManager();
+
+ String tableNameWithType = tableConfig.getTableName();
+ LOGGER.info("Start generating RefreshSegment tasks for table: {}",
tableNameWithType);
+
+
+ int tableNumTasks = 0;
+ int tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
+ String tableMaxNumTasksConfig =
taskConfigs.get(MinionConstants.TABLE_MAX_NUM_TASKS_KEY);
+ if (tableMaxNumTasksConfig != null) {
+ try {
+ tableMaxNumTasks = Integer.parseInt(tableMaxNumTasksConfig);
+ } catch (Exception e) {
+ tableMaxNumTasks = RefreshSegmentTask.MAX_NUM_TASKS_PER_TABLE;
+ LOGGER.warn("MaxNumTasks have been wrongly set for table : {}, and
task {}", tableNameWithType, taskType);
+ }
+ }
+
+ // Get info about table and schema.
+ Stat tableStat = pinotHelixResourceManager.getTableStat(tableNameWithType);
+ Schema schema =
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+ Stat schemaStat =
pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());
+
+ // Get the running segments for a table.
+ Set<Segment> runningSegments =
+ TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE,
_clusterInfoAccessor);
+
+ // Make a single ZK call to get the segments.
+ List<SegmentZKMetadata> allSegments =
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
+
+ for (SegmentZKMetadata segmentZKMetadata : allSegments) {
+ // Skip if we have reached the maximum number of permissible tasks per
iteration.
+ if (tableNumTasks >= tableMaxNumTasks) {
+ break;
}
- // Get info about table and schema.
- Stat tableStat =
pinotHelixResourceManager.getTableStat(tableNameWithType);
- Schema schema =
pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
- Stat schemaStat =
pinotHelixResourceManager.getSchemaStat(schema.getSchemaName());
-
- // Get the running segments for a table.
- Set<Segment> runningSegments =
- TaskGeneratorUtils.getRunningSegments(RefreshSegmentTask.TASK_TYPE,
_clusterInfoAccessor);
-
- // Make a single ZK call to get the segments.
- List<SegmentZKMetadata> allSegments =
_clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
-
- for (SegmentZKMetadata segmentZKMetadata : allSegments) {
- // Skip if we have reached the maximum number of permissible tasks per
iteration.
- if (tableNumTasks >= tableMaxNumTasks) {
- break;
- }
-
- // Skip consuming segments.
- if (tableConfig.getTableType() == TableType.REALTIME &&
!segmentZKMetadata.getStatus().isCompleted()) {
- continue;
- }
-
- // Skip segments for which a task is already running.
- if (runningSegments.contains(new Segment(tableNameWithType,
segmentZKMetadata.getSegmentName()))) {
- continue;
- }
-
- String segmentName = segmentZKMetadata.getSegmentName();
-
- // Skip if the segment is already up-to-date and doesn't have to be
refreshed.
- if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat,
schemaStat)) {
- continue;
- }
-
- Map<String, String> configs = new
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
- configs.put(MinionConstants.DOWNLOAD_URL_KEY,
segmentZKMetadata.getDownloadUrl());
- configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrl() + "/segments");
- configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY,
String.valueOf(segmentZKMetadata.getCrc()));
- pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
- tableNumTasks++;
+ // Skip consuming segments.
+ if (tableConfig.getTableType() == TableType.REALTIME &&
!segmentZKMetadata.getStatus().isCompleted()) {
+ continue;
+ }
+
+ // Skip segments for which a task is already running.
+ if (runningSegments.contains(new Segment(tableNameWithType,
segmentZKMetadata.getSegmentName()))) {
+ continue;
+ }
+
+ String segmentName = segmentZKMetadata.getSegmentName();
+
+ // Skip if the segment is already up-to-date and doesn't have to be
refreshed.
+ if (!shouldRefreshSegment(segmentZKMetadata, tableConfig, tableStat,
schemaStat)) {
+ continue;
}
- LOGGER.info("Finished generating {} tasks configs for table: {} " + "for
task: {}", tableNumTasks,
- tableNameWithType, taskType);
+ Map<String, String> configs = new
HashMap<>(getBaseTaskConfigs(tableConfig, List.of(segmentName)));
+ configs.put(MinionConstants.DOWNLOAD_URL_KEY,
segmentZKMetadata.getDownloadUrl());
+ configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrl() + "/segments");
+ configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY,
String.valueOf(segmentZKMetadata.getCrc()));
+ pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
+ tableNumTasks++;
}
+ LOGGER.info("Finished generating {} tasks configs for table: {} " + "for
task: {}", tableNumTasks,
+ tableNameWithType, taskType);
Review Comment:
nit: we don't need `+` in the string.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]