kfaraz commented on code in PR #16778:
URL: https://github.com/apache/druid/pull/16778#discussion_r1694478722
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -485,7 +488,8 @@ static Map<Interval, DataSchema>
createDataSchemasForIntervals(
@Nullable final ClientCompactionTaskTransformSpec transformSpec,
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
- final ServiceMetricEvent.Builder metricBuilder
+ final ServiceMetricEvent.Builder metricBuilder,
+ CompactionRunner compactionRunner
Review Comment:
I think we should make the `createDataSchema()` and
`createDataSchemasForIntervals()` non-static and remove some arguments from the
constructor since most of them are already fields of the class.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -139,13 +144,58 @@ public CompactionConfigValidationResult
validateCompactionTask(
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
-
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
+ validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(new CompactionConfigValidationResult(true,
null));
}
+ /**
+ * Valides that there are no rolled-up segments where either:
+ * <ul>
+ * <li>aggregator factory differs from its combining factory </li>
+ * <li>input col name is different from the output name (non-idempotent)</li>
+ * </ul>
+ */
+ private CompactionConfigValidationResult
validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
+ {
+ for (Map.Entry<Interval, DataSchema> intervalDataSchema :
intervalToDataSchemaMap.entrySet()) {
+ if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
+ CombinedDataSchema combinedDataSchema = (CombinedDataSchema)
intervalDataSchema.getValue();
+ if (combinedDataSchema.hasRolledUpSegments()) {
+ for (AggregatorFactory aggregatorFactory :
combinedDataSchema.getAggregators()) {
+ // This is a conservative check as existing rollup may have been
idempotent but the aggregator provided in
+ // compaction spec isn't. This would get properly compacted yet
fails in the below pre-check.
+ if (
+ !(
+
aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass())
&&
+ (
+ aggregatorFactory.requiredFields().isEmpty() ||
+ (aggregatorFactory.requiredFields().size() == 1 &&
+ aggregatorFactory.requiredFields()
+ .get(0)
+ .equals(aggregatorFactory.getName()))
+ )
+ )
+ ) {
+ // MSQ doesn't support rolling up already rolled-up segments
when aggregate column name is different from
+ // the aggregated column name. This is because the aggregated
values would then get overwritten by new
+ // values and the existing values would be lost. Note that if no
rollup is specified in an index spec,
+ // the default value is true.
+ return new CompactionConfigValidationResult(
+ false,
Review Comment:
this signature has changed in a recent PR.
```suggestion
return CompactionConfigValidationResult.failure(
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -604,13 +610,17 @@ private static DataSchema createDataSchema(
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
- @Nonnull ClientCompactionTaskGranularitySpec granularitySpec
+ @Nonnull ClientCompactionTaskGranularitySpec granularitySpec,
+ @Nullable CompactionRunner compactionRunner
)
{
// Check index metadata & decide which values to propagate (i.e. carry
over) for rollup & queryGranularity
final ExistingSegmentAnalyzer existingSegmentAnalyzer = new
ExistingSegmentAnalyzer(
segments,
- granularitySpec.isRollup() == null,
+ // For MSQ, always need rollup to check if there are some rollup
segments already present.
+ compactionRunner == null || compactionRunner instanceof
NativeCompactionRunner
Review Comment:
`compactionRunner` can never be `null`, it is already being initialized to a
non-null value in the constructor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]