gargvishesh commented on code in PR #16778:
URL: https://github.com/apache/druid/pull/16778#discussion_r1695162905
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -139,13 +144,58 @@ public CompactionConfigValidationResult
validateCompactionTask(
));
}
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
-
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
+ validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
return validationResults.stream()
.filter(result -> !result.isValid())
.findFirst()
.orElse(new CompactionConfigValidationResult(true,
null));
}
+ /**
+ * Valides that there are no rolled-up segments where either:
+ * <ul>
+ * <li>aggregator factory differs from its combining factory </li>
+ * <li>input col name is different from the output name (non-idempotent)</li>
+ * </ul>
+ */
+ private CompactionConfigValidationResult
validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
+ {
+ for (Map.Entry<Interval, DataSchema> intervalDataSchema :
intervalToDataSchemaMap.entrySet()) {
+ if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
+ CombinedDataSchema combinedDataSchema = (CombinedDataSchema)
intervalDataSchema.getValue();
+ if (combinedDataSchema.hasRolledUpSegments()) {
+ for (AggregatorFactory aggregatorFactory :
combinedDataSchema.getAggregators()) {
+ // This is a conservative check as existing rollup may have been
idempotent but the aggregator provided in
+ // compaction spec isn't. This would get properly compacted yet
fails in the below pre-check.
+ if (
+ !(
+
aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass())
&&
+ (
+ aggregatorFactory.requiredFields().isEmpty() ||
+ (aggregatorFactory.requiredFields().size() == 1 &&
+ aggregatorFactory.requiredFields()
+ .get(0)
+ .equals(aggregatorFactory.getName()))
+ )
+ )
+ ) {
+ // MSQ doesn't support rolling up already rolled-up segments
when aggregate column name is different from
+ // the aggregated column name. This is because the aggregated
values would then get overwritten by new
+ // values and the existing values would be lost. Note that if no
rollup is specified in an index spec,
+ // the default value is true.
+ return new CompactionConfigValidationResult(
+ false,
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]