jtao15 commented on a change in pull request #6975:
URL: https://github.com/apache/incubator-pinot/pull/6975#discussion_r641844363
##########
File path:
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
##########
@@ -40,40 +53,133 @@
* TODO:
* 1. Add the support for roll-up
* 2. Add the support for time split to provide backfill support for merged
segments
- * 3. Change the way to decide the number of output segments (explicit
numPartition config -> maxNumRowsPerSegment)
+ * 3. Add merge/rollup name prefixes for generated segments
+ * 4. Add the support for realtime table
*/
public class MergeRollupTaskExecutor extends
BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
+ private static final String INPUT_SEGMENTS_DIR = "input_segments";
+ private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig
pinotTaskConfig, List<File> originalIndexDirs,
File workingDir)
throws Exception {
+ String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
+ LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
+ long startMillis = System.currentTimeMillis();
+
String mergeTypeString =
configs.get(MinionConstants.MergeRollupTask.MERGE_TYPE_KEY);
// TODO: add the support for rollup
Preconditions.checkNotNull(mergeTypeString, "MergeType cannot be null");
- MergeType mergeType = MergeType.fromString(mergeTypeString);
- Preconditions.checkState(mergeType == MergeType.CONCATENATE, "Only
'CONCATENATE' mode is currently supported.");
-
- String mergedSegmentName =
configs.get(MinionConstants.MergeRollupTask.MERGED_SEGMENT_NAME_KEY);
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
-
TableConfig tableConfig = getTableConfig(tableNameWithType);
+ Schema schema = getSchema(tableNameWithType);
+ Set<String> schemaColumns = schema.getPhysicalColumnNames();
+
+ Map<String, ValueAggregatorFactory.ValueAggregatorType> aggregatorConfigs =
+ MergeRollupTaskUtils.getRollupAggregationTypeMap(configs);
+ String numRecordsPerSegment =
configs.get(MinionConstants.MergeRollupTask.MAX_NUM_RECORDS_PER_SEGMENT);
+
+ SegmentProcessorConfig.Builder segmentProcessorConfigBuilder =
+ new
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema);
+
+ // Partition config from tableConfig
+ if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
+
tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
+ PartitionerConfig partitionerConfig =
getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+
segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
+ }
- MergeRollupSegmentConverter rollupSegmentConverter =
- new
MergeRollupSegmentConverter.Builder().setMergeType(mergeType).setTableName(tableNameWithType)
-
.setSegmentName(mergedSegmentName).setInputIndexDirs(originalIndexDirs).setWorkingDir(workingDir)
- .setTableConfig(tableConfig).build();
+ // Aggregations using configured Collector
+ List<String> sortedColumns =
tableConfig.getIndexingConfig().getSortedColumn();
+ CollectorConfig collectorConfig =
+ getCollectorConfig(mergeTypeString, aggregatorConfigs, schemaColumns,
sortedColumns);
+ Preconditions.checkState(collectorConfig.getCollectorType() ==
CollectorFactory.CollectorType.CONCAT,
+ "Only 'CONCAT' mode is currently supported.");
+ segmentProcessorConfigBuilder.setCollectorConfig(collectorConfig);
- List<File> resultFiles = rollupSegmentConverter.convert();
+ // Segment config
+ if (numRecordsPerSegment != null) {
+ SegmentConfig segmentConfig = getSegmentConfig(numRecordsPerSegment);
+ segmentProcessorConfigBuilder.setSegmentConfig(segmentConfig);
+ }
+
+ SegmentProcessorConfig segmentProcessorConfig =
segmentProcessorConfigBuilder.build();
+
+ File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
+ Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create
input directory: %s for task: %s",
+ inputSegmentsDir.getAbsolutePath(), taskType);
+ for (File indexDir : originalIndexDirs) {
+ FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+ }
+ File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
+ Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create
output directory: %s for task: %s",
+ outputSegmentsDir.getAbsolutePath(), taskType);
+
+ SegmentProcessorFramework segmentProcessorFramework =
+ new SegmentProcessorFramework(inputSegmentsDir,
segmentProcessorConfig, outputSegmentsDir);
+ try {
+ segmentProcessorFramework.processSegments();
+ } finally {
+ segmentProcessorFramework.cleanup();
+ }
+
+ long endMillis = System.currentTimeMillis();
+ LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms",
taskType, configs, (endMillis - startMillis));
List<SegmentConversionResult> results = new ArrayList<>();
- for (File file : resultFiles) {
+ for (File file : outputSegmentsDir.listFiles()) {
String outputSegmentName = file.getName();
results.add(new
SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
.setTableNameWithType(tableNameWithType).build());
}
return results;
}
+
+ @Override
+ protected SegmentZKMetadataCustomMapModifier
getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig) {
+ return new
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
Collections
+ .singletonMap(MinionConstants.MergeRollupTask.TASK_TYPE +
MinionConstants.TASK_BUCKET_GRANULARITY_SUFFIX,
+
pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY)));
Review comment:
This will be controlled on scheduler side, the scheduler should call
`MergeRollupTaskUtils.getAllMergeProperties()` to get the granularity, and use
upper case to create the task config. Maybe we can use
`pinotTaskConfig.getConfigs().get(MinionConstants.MergeRollupTask.GRANULARITY_KEY).toUpperCase()`
to make it less error-prone.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]