kfaraz commented on code in PR #16778:
URL: https://github.com/apache/druid/pull/16778#discussion_r1694478722


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -485,7 +488,8 @@ static Map<Interval, DataSchema> 
createDataSchemasForIntervals(
       @Nullable final ClientCompactionTaskTransformSpec transformSpec,
       @Nullable final AggregatorFactory[] metricsSpec,
       @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
-      final ServiceMetricEvent.Builder metricBuilder
+      final ServiceMetricEvent.Builder metricBuilder,
+      CompactionRunner compactionRunner

Review Comment:
   I think we should make the `createDataSchema()` and 
`createDataSchemasForIntervals()` non-static and remove some arguments from the 
constructor since most of them are already fields of the class.



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java:
##########
@@ -139,13 +144,58 @@ public CompactionConfigValidationResult 
validateCompactionTask(
       ));
     }
     
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
-    
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
+    validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
     return validationResults.stream()
                             .filter(result -> !result.isValid())
                             .findFirst()
                             .orElse(new CompactionConfigValidationResult(true, 
null));
   }
 
+  /**
+   * Valides that there are no rolled-up segments where either:
+   * <ul>
+   * <li>aggregator factory differs from its combining factory </li>
+   * <li>input col name is different from the output name (non-idempotent)</li>
+   * </ul>
+   */
+  private CompactionConfigValidationResult 
validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
+  {
+    for (Map.Entry<Interval, DataSchema> intervalDataSchema : 
intervalToDataSchemaMap.entrySet()) {
+      if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
+        CombinedDataSchema combinedDataSchema = (CombinedDataSchema) 
intervalDataSchema.getValue();
+        if (combinedDataSchema.hasRolledUpSegments()) {
+          for (AggregatorFactory aggregatorFactory : 
combinedDataSchema.getAggregators()) {
+            // This is a conservative check as existing rollup may have been 
idempotent but the aggregator provided in
+            // compaction spec isn't. This would get properly compacted yet 
fails in the below pre-check.
+            if (
+                !(
+                    
aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass())
 &&
+                    (
+                        aggregatorFactory.requiredFields().isEmpty() ||
+                        (aggregatorFactory.requiredFields().size() == 1 &&
+                         aggregatorFactory.requiredFields()
+                                          .get(0)
+                                          .equals(aggregatorFactory.getName()))
+                    )
+                )
+            ) {
+              // MSQ doesn't support rolling up already rolled-up segments 
when aggregate column name is different from
+              // the aggregated column name. This is because the aggregated 
values would then get overwritten by new
+              // values and the existing values would be lost. Note that if no 
rollup is specified in an index spec,
+              // the default value is true.
+              return new CompactionConfigValidationResult(
+                  false,

Review Comment:
   this signature has changed in a recent PR.
   ```suggestion
                 return CompactionConfigValidationResult.failure(
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java:
##########
@@ -604,13 +610,17 @@ private static DataSchema createDataSchema(
       @Nullable DimensionsSpec dimensionsSpec,
       @Nullable ClientCompactionTaskTransformSpec transformSpec,
       @Nullable AggregatorFactory[] metricsSpec,
-      @Nonnull ClientCompactionTaskGranularitySpec granularitySpec
+      @Nonnull ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable CompactionRunner compactionRunner
   )
   {
     // Check index metadata & decide which values to propagate (i.e. carry 
over) for rollup & queryGranularity
     final ExistingSegmentAnalyzer existingSegmentAnalyzer = new 
ExistingSegmentAnalyzer(
         segments,
-        granularitySpec.isRollup() == null,
+        // For MSQ, always need rollup to check if there are some rollup 
segments already present.
+        compactionRunner == null || compactionRunner instanceof 
NativeCompactionRunner

Review Comment:
   `compactionRunner` can never be `null`, it is already being initialized to a 
non-null value in the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to