This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 593c3b21503 Do not support non-idempotent aggregator in MSQ compaction 
(#16846)
593c3b21503 is described below

commit 593c3b21503903478ca276acbaa1913710a416ca
Author: Vishesh Garg <[email protected]>
AuthorDate: Tue Aug 6 20:58:08 2024 +0530

    Do not support non-idempotent aggregator in MSQ compaction (#16846)
    
    This PR adds checks for verification of DataSourceCompactionConfig and 
CompactionTask with msq engine to ensure:
    
    each aggregator in metricsSpec is idempotent
    metricsSpec is non-null when rollup is set to true
    Unit tests and existing compaction ITs have been updated accordingly.
---
 .../druid/msq/indexing/MSQCompactionRunner.java    |  56 +-------
 .../msq/indexing/MSQCompactionRunnerTest.java      |  90 ++++++-------
 .../indexing/common/task/CompactionRunner.java     |   5 +-
 .../druid/indexing/common/task/CompactionTask.java |  36 ++---
 .../common/task/NativeCompactionRunner.java        |   3 +-
 .../indexing/common/task/CompactionTaskTest.java   |  48 +++----
 .../coordinator/duty/ITAutoCompactionTest.java     | 146 ++++++++++-----------
 .../indexing/ClientCompactionRunnerInfo.java       |  36 ++++-
 .../druid/segment/indexing/CombinedDataSchema.java |  64 ---------
 .../indexing/ClientCompactionRunnerInfoTest.java   |  59 +++++++--
 .../druid/segment/indexing/DataSchemaTest.java     |  20 ---
 .../CoordinatorCompactionConfigsResourceTest.java  |   2 +-
 12 files changed, 225 insertions(+), 340 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index 7b4d3235dc0..efc2cbb2afb 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -66,7 +66,6 @@ import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.segment.indexing.CombinedDataSchema;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
@@ -120,15 +119,14 @@ public class MSQCompactionRunner implements 
CompactionRunner
    * <ul>
    * <li>partitionsSpec of type HashedParititionsSpec.</li>
    * <li>maxTotalRows in DynamicPartitionsSpec.</li>
-   * <li>rollup set to false in granularitySpec when metricsSpec is specified. 
Null is treated as true.</li>
-   * <li>queryGranularity set to ALL in granularitySpec.</li>
-   * <li>Each metric has output column name same as the input name.</li>
+   * <li>rollup in granularitySpec set to false when metricsSpec is specified 
or true when it's null.
+   * Null is treated as true if metricsSpec exist and false if empty.</li>
+   * <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory 
'A' s.t. 'A != A.combiningFactory()'.</li>
    * </ul>
    */
   @Override
   public CompactionConfigValidationResult validateCompactionTask(
-      CompactionTask compactionTask,
-      Map<Interval, DataSchema> intervalToDataSchemaMap
+      CompactionTask compactionTask
   )
   {
     List<CompactionConfigValidationResult> validationResults = new 
ArrayList<>();
@@ -144,57 +142,13 @@ public class MSQCompactionRunner implements 
CompactionRunner
       ));
     }
     
validationResults.add(ClientCompactionRunnerInfo.validateMaxNumTasksForMSQ(compactionTask.getContext()));
-    validationResults.add(validateRolledUpSegments(intervalToDataSchemaMap));
+    
validationResults.add(ClientCompactionRunnerInfo.validateMetricsSpecForMSQ(compactionTask.getMetricsSpec()));
     return validationResults.stream()
                             .filter(result -> !result.isValid())
                             .findFirst()
                             
.orElse(CompactionConfigValidationResult.success());
   }
 
-  /**
-   * Valides that there are no rolled-up segments where either:
-   * <ul>
-   * <li>aggregator factory differs from its combining factory </li>
-   * <li>input col name is different from the output name (non-idempotent)</li>
-   * </ul>
-   */
-  private CompactionConfigValidationResult 
validateRolledUpSegments(Map<Interval, DataSchema> intervalToDataSchemaMap)
-  {
-    for (Map.Entry<Interval, DataSchema> intervalDataSchema : 
intervalToDataSchemaMap.entrySet()) {
-      if (intervalDataSchema.getValue() instanceof CombinedDataSchema) {
-        CombinedDataSchema combinedDataSchema = (CombinedDataSchema) 
intervalDataSchema.getValue();
-        if (combinedDataSchema.hasRolledUpSegments()) {
-          for (AggregatorFactory aggregatorFactory : 
combinedDataSchema.getAggregators()) {
-            // This is a conservative check as existing rollup may have been 
idempotent but the aggregator provided in
-            // compaction spec isn't. This would get properly compacted yet 
fails in the below pre-check.
-            if (
-                !(
-                    
aggregatorFactory.getClass().equals(aggregatorFactory.getCombiningFactory().getClass())
 &&
-                    (
-                        aggregatorFactory.requiredFields().isEmpty() ||
-                        (aggregatorFactory.requiredFields().size() == 1 &&
-                         aggregatorFactory.requiredFields()
-                                          .get(0)
-                                          .equals(aggregatorFactory.getName()))
-                    )
-                )
-            ) {
-              // MSQ doesn't support rolling up already rolled-up segments 
when aggregate column name is different from
-              // the aggregated column name. This is because the aggregated 
values would then get overwritten by new
-              // values and the existing values would be lost. Note that if no 
rollup is specified in an index spec,
-              // the default value is true.
-              return CompactionConfigValidationResult.failure(
-                  "MSQ: Rolled-up segments in compaction interval[%s].",
-                  intervalDataSchema.getKey()
-              );
-            }
-          }
-        }
-      }
-    }
-    return CompactionConfigValidationResult.success();
-  }
-
   @Override
   public CurrentSubTaskHolder getCurrentSubTaskHolder()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 6c5d1957265..d868ddf20e5 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -42,7 +42,6 @@ import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.TuningConfigBuilder;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.math.expr.ExprMacroTable;
@@ -60,7 +59,6 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.data.CompressionFactory;
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
-import org.apache.druid.segment.indexing.CombinedDataSchema;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.transform.TransformSpec;
@@ -131,7 +129,7 @@ public class MSQCompactionRunnerTest
         null,
         null
     );
-    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
Collections.emptyMap()).isValid());
+    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
   }
 
   @Test
@@ -144,7 +142,7 @@ public class MSQCompactionRunnerTest
         null,
         null
     );
-    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
Collections.emptyMap()).isValid());
+    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
   }
 
   @Test
@@ -157,7 +155,7 @@ public class MSQCompactionRunnerTest
         null,
         null
     );
-    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
Collections.emptyMap()).isValid());
+    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
   }
 
   @Test
@@ -170,7 +168,7 @@ public class MSQCompactionRunnerTest
         null,
         null
     );
-    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
Collections.emptyMap()).isValid());
+    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
   }
 
   @Test
@@ -183,7 +181,7 @@ public class MSQCompactionRunnerTest
         new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
         null
     );
-    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
Collections.emptyMap()).isValid());
+    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
   }
 
   @Test
@@ -196,7 +194,41 @@ public class MSQCompactionRunnerTest
         new ClientCompactionTaskGranularitySpec(null, null, false),
         AGGREGATORS.toArray(new AggregatorFactory[0])
     );
-    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
Collections.emptyMap()).isValid());
+    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+  }
+
+  @Test
+  public void testRollupTrueWithoutMetricsSpecIsInValid()
+  {
+    CompactionTask compactionTask = createCompactionTask(
+        new DynamicPartitionsSpec(3, null),
+        null,
+        Collections.emptyMap(),
+        new ClientCompactionTaskGranularitySpec(null, null, true),
+        null
+    );
+    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+  }
+
+  @Test
+  public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
+  {
+    // Aggregators having different input and ouput column names are 
unsupported.
+    final String inputColName = "added";
+    final String outputColName = "sum_added";
+    CompactionTask compactionTask = createCompactionTask(
+        new DynamicPartitionsSpec(3, null),
+        null,
+        Collections.emptyMap(),
+        new ClientCompactionTaskGranularitySpec(null, null, null),
+        new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, 
inputColName)}
+    );
+    CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
+    Assert.assertFalse(validationResult.isValid());
+    Assert.assertEquals(
+        "MSQ: Non-idempotent aggregator[sum_added] not supported in 
'metricsSpec'.",
+        validationResult.getReason()
+    );
   }
 
   @Test
@@ -345,48 +377,6 @@ public class MSQCompactionRunnerTest
     Assert.assertEquals(WorkerAssignmentStrategy.MAX, 
actualMSQSpec.getAssignmentStrategy());
   }
 
-  @Test
-  public void 
testIntervalsWithRolledUpSegmentsAndNonIdempotentAggregatorFails()
-  {
-    final String inputColName = "added";
-    final String outputColName = "sum_added";
-    CompactionTask compactionTask = createCompactionTask(
-        null,
-        null,
-        Collections.emptyMap(),
-        null,
-        new AggregatorFactory[]{
-            new LongSumAggregatorFactory(
-                outputColName,
-                inputColName
-            )
-        }
-    );
-    CombinedDataSchema dataSchema = new CombinedDataSchema(
-        DATA_SOURCE,
-        new TimestampSpec(TIMESTAMP_COLUMN, null, null),
-        new DimensionsSpec(DIMENSIONS),
-        new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, 
inputColName)},
-        new UniformGranularitySpec(
-            SEGMENT_GRANULARITY.getDefaultGranularity(),
-            null,
-            false,
-            Collections.singletonList(COMPACTION_INTERVAL)
-        ),
-        null,
-        true
-    );
-    CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(
-        compactionTask,
-        Collections.singletonMap(COMPACTION_INTERVAL, dataSchema)
-    );
-    Assert.assertFalse(validationResult.isValid());
-    Assert.assertEquals(validationResult.getReason(), StringUtils.format(
-        "MSQ: Rolled-up segments in compaction interval[%s].",
-        COMPACTION_INTERVAL
-    ));
-  }
-
   private CompactionTask createCompactionTask(
       @Nullable PartitionsSpec partitionsSpec,
       @Nullable DimFilter dimFilter,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
index 0abaeed8eb2..8d30a60d04e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
@@ -57,9 +57,6 @@ public interface CompactionRunner
    * Checks if the provided compaction config is supported by the runner.
    * The same validation is done at {@link 
org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
    */
-  CompactionConfigValidationResult validateCompactionTask(
-      CompactionTask compactionTask,
-      Map<Interval, DataSchema> intervalToDataSchemaMap
-  );
+  CompactionConfigValidationResult validateCompactionTask(CompactionTask 
compactionTask);
 
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 68320387845..8659eb0f397 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -77,7 +77,6 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
-import org.apache.druid.segment.indexing.CombinedDataSchema;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.TuningConfig;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -460,13 +459,11 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
         transformSpec,
         metricsSpec,
         granularitySpec,
-        getMetricBuilder(),
-        compactionRunner
+        getMetricBuilder()
     );
 
     
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
-    CompactionConfigValidationResult supportsCompactionConfig =
-        compactionRunner.validateCompactionTask(this, intervalDataSchemas);
+    CompactionConfigValidationResult supportsCompactionConfig = 
compactionRunner.validateCompactionTask(this);
     if (!supportsCompactionConfig.isValid()) {
       throw InvalidInput.exception("Compaction spec not supported. 
Reason[%s].", supportsCompactionConfig.getReason());
     }
@@ -488,8 +485,7 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
       @Nullable final ClientCompactionTaskTransformSpec transformSpec,
       @Nullable final AggregatorFactory[] metricsSpec,
       @Nullable final ClientCompactionTaskGranularitySpec granularitySpec,
-      final ServiceMetricEvent.Builder metricBuilder,
-      CompactionRunner compactionRunner
+      final ServiceMetricEvent.Builder metricBuilder
   ) throws IOException
   {
     final Iterable<DataSegment> timelineSegments = 
retrieveRelevantTimelineHolders(
@@ -553,8 +549,7 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
             metricsSpec,
             granularitySpec == null
             ? new ClientCompactionTaskGranularitySpec(segmentGranularityToUse, 
null, null)
-            : granularitySpec.withSegmentGranularity(segmentGranularityToUse),
-            compactionRunner
+            : granularitySpec.withSegmentGranularity(segmentGranularityToUse)
         );
         intervalDataSchemaMap.put(interval, dataSchema);
       }
@@ -579,8 +574,7 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
           dimensionsSpec,
           transformSpec,
           metricsSpec,
-          granularitySpec,
-          compactionRunner
+          granularitySpec
       );
       return Collections.singletonMap(segmentProvider.interval, dataSchema);
     }
@@ -610,17 +604,13 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
       @Nullable DimensionsSpec dimensionsSpec,
       @Nullable ClientCompactionTaskTransformSpec transformSpec,
       @Nullable AggregatorFactory[] metricsSpec,
-      @Nonnull ClientCompactionTaskGranularitySpec granularitySpec,
-      @Nullable CompactionRunner compactionRunner
+      @Nonnull ClientCompactionTaskGranularitySpec granularitySpec
   )
   {
     // Check index metadata & decide which values to propagate (i.e. carry 
over) for rollup & queryGranularity
     final ExistingSegmentAnalyzer existingSegmentAnalyzer = new 
ExistingSegmentAnalyzer(
         segments,
-        // For MSQ, always need rollup to check if there are some rollup 
segments already present.
-        compactionRunner instanceof NativeCompactionRunner
-        ? (granularitySpec.isRollup() == null)
-        : true,
+        granularitySpec.isRollup() == null,
         granularitySpec.getQueryGranularity() == null,
         dimensionsSpec == null,
         metricsSpec == null
@@ -675,14 +665,13 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
       finalMetricsSpec = metricsSpec;
     }
 
-    return new CombinedDataSchema(
+    return new DataSchema(
         dataSource,
         new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
         finalDimensionsSpec,
         finalMetricsSpec,
         uniformGranularitySpec,
-        transformSpec == null ? null : new 
TransformSpec(transformSpec.getFilter(), null),
-        existingSegmentAnalyzer.hasRolledUpSegments()
+        transformSpec == null ? null : new 
TransformSpec(transformSpec.getFilter(), null)
     );
   }
 
@@ -759,7 +748,6 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
 
     // For processRollup:
     private boolean rollup = true;
-    private boolean hasRolledUpSegments = false;
 
     // For processQueryGranularity:
     private Granularity queryGranularity;
@@ -827,11 +815,6 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
       return rollup;
     }
 
-    public boolean hasRolledUpSegments()
-    {
-      return hasRolledUpSegments;
-    }
-
     public Granularity getQueryGranularity()
     {
       if (!needQueryGranularity) {
@@ -921,7 +904,6 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
       // Pick rollup value if all segments being compacted have the same, 
non-null, value otherwise set it to false
       final Boolean isIndexRollup = index.getMetadata().isRollup();
       rollup = rollup && Boolean.valueOf(true).equals(isIndexRollup);
-      hasRolledUpSegments = hasRolledUpSegments || 
Boolean.valueOf(true).equals(isIndexRollup);
     }
 
     private void processQueryGranularity(final QueryableIndex index)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 5aa7af71451..2074d14f0f9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -85,8 +85,7 @@ public class NativeCompactionRunner implements 
CompactionRunner
 
   @Override
   public CompactionConfigValidationResult validateCompactionTask(
-      CompactionTask compactionTask,
-      Map<Interval, DataSchema> intervalToDataSchemaMap
+      CompactionTask compactionTask
   )
   {
     return CompactionConfigValidationResult.success();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 3a386bc4aa7..f9849b1483d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -749,8 +749,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -811,8 +810,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -874,8 +872,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -938,8 +935,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -1009,8 +1005,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -1060,8 +1055,7 @@ public class CompactionTaskTest
         null,
         customMetricsSpec,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -1104,8 +1098,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -1155,8 +1148,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     NativeCompactionRunner.createIngestionSpecs(
@@ -1186,8 +1178,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     NativeCompactionRunner.createIngestionSpecs(
@@ -1228,8 +1219,7 @@ public class CompactionTaskTest
         null,
         null,
         new ClientCompactionTaskGranularitySpec(new 
PeriodGranularity(Period.months(3), null, null), null, null),
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -1273,8 +1263,7 @@ public class CompactionTaskTest
         null,
         null,
         new ClientCompactionTaskGranularitySpec(null, new 
PeriodGranularity(Period.months(3), null, null), null),
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
         dataSchemasForIntervals,
@@ -1319,8 +1308,7 @@ public class CompactionTaskTest
             new PeriodGranularity(Period.months(3), null, null),
             null
         ),
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -1367,8 +1355,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -1413,8 +1400,7 @@ public class CompactionTaskTest
         null,
         null,
         new ClientCompactionTaskGranularitySpec(null, null, null),
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -1459,8 +1445,7 @@ public class CompactionTaskTest
         null,
         null,
         new ClientCompactionTaskGranularitySpec(null, null, true),
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
@@ -1490,8 +1475,7 @@ public class CompactionTaskTest
         null,
         null,
         new ClientCompactionTaskGranularitySpec(null, null, null),
-        METRIC_BUILDER,
-        null
+        METRIC_BUILDER
     );
 
     final List<ParallelIndexIngestionSpec> ingestionSpecs = 
NativeCompactionRunner.createIngestionSpecs(
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index d09bced3313..8f070f33405 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -80,7 +80,6 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
-import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
@@ -184,7 +183,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
               new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 
12, TgtHllType.HLL_4.name(), null, false, false),
               new DoublesSketchAggregatorFactory("quantilesDoublesSketch", 
"delta", 128, 1000000000L, null)
           },
-          false
+          false,
+          CompactionEngine.NATIVE
       );
       // should now only have 1 row after compaction
       // added = null, count = 3, sum_added = 93.0
@@ -286,7 +286,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
               ),
               new DoublesSketchAggregatorFactory("quantilesDoublesSketch", 
"delta", 128, 1000000000L, null)
           },
-          false
+          false,
+          CompactionEngine.NATIVE
       );
       // should now only have 1 row after compaction
       // added = null, count = 3, sum_added = 93
@@ -328,8 +329,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     }
   }
 
-  @Test(dataProvider = "engine")
-  public void 
testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics(CompactionEngine 
engine) throws Exception
+  @Test()
+  public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() 
throws Exception
   {
     // added = 31, count = null, sum_added = null
     loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
@@ -357,7 +358,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           null,
           new AggregatorFactory[] {new CountAggregatorFactory("count"), new 
LongSumAggregatorFactory("sum_added", "added")},
           false,
-          engine
+          CompactionEngine.NATIVE
       );
       // should now only have 1 row after compaction
       // added = null, count = 2, sum_added = 62
@@ -480,7 +481,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           new 
UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
           null,
           new AggregatorFactory[] {new CountAggregatorFactory("count"), new 
LongSumAggregatorFactory("sum_added", "added")},
-          false
+          false,
+          CompactionEngine.NATIVE
       );
       // should now only have 1 row after compaction
       // added = null, count = 4, sum_added = 124
@@ -521,7 +523,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       verifySegmentsCount(4);
       verifyQuery(INDEX_QUERIES_RESOURCE);
 
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1));
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, Period.days(1), 
CompactionEngine.NATIVE);
       //...compacted into 1 new segment for 1 day. 1 day compacted and 1 day 
skipped/remains uncompacted. (3 total)
       forceTriggerAutoCompaction(3);
       verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -539,7 +541,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           0,
           1,
           1);
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
CompactionEngine.NATIVE);
       //...compacted into 1 new segment for the remaining one day. 2 day 
compacted and 0 day uncompacted. (2 total)
       forceTriggerAutoCompaction(2);
       verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -651,7 +653,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       verifySegmentsCount(4);
       verifyQuery(INDEX_QUERIES_RESOURCE);
 
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET);
+      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
CompactionEngine.NATIVE);
       // ...should remains unchanged (4 total)
       forceTriggerAutoCompaction(4);
       verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -863,7 +865,13 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       Granularity newGranularity = Granularities.YEAR;
       // Set dropExisting to true
       // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
+      submitCompactionConfig(
+          1000,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+          true,
+          engine
+      );
 
       List<String> expectedIntervalAfterCompaction = new ArrayList<>();
       for (String interval : intervalsBeforeCompaction) {
@@ -881,7 +889,13 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       //  "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
       newGranularity = Granularities.MONTH;
       // Set dropExisting to true
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null, null), true, engine);
+      submitCompactionConfig(
+          1000,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+          true,
+          engine
+      );
 
       // Since dropExisting is set to true...
       // Again data is only in two days
@@ -950,7 +964,13 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       Granularity newGranularity = Granularities.YEAR;
       // Set dropExisting to false
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+      submitCompactionConfig(
+          1000,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+          false,
+          CompactionEngine.NATIVE
+      );
 
       LOG.info("Auto compaction test with YEAR segment granularity");
 
@@ -967,7 +987,13 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       newGranularity = Granularities.DAY;
       // Set dropExisting to false
-      submitCompactionConfig(1000, NO_SKIP_OFFSET, new 
UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+      submitCompactionConfig(
+          1000,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+          false,
+          CompactionEngine.NATIVE
+      );
 
       LOG.info("Auto compaction test with DAY segment granularity");
 
@@ -1169,7 +1195,13 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       Granularity newGranularity = Granularities.YEAR;
       // Set dropExisting to false
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+          false,
+          CompactionEngine.NATIVE
+      );
 
       List<String> expectedIntervalAfterCompaction = new ArrayList<>();
       // We wil have one segment with interval of 2013-01-01/2014-01-01 
(compacted with YEAR)
@@ -1195,7 +1227,13 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
 
       newGranularity = Granularities.MONTH;
       // Set dropExisting to false
-      submitCompactionConfig(MAX_ROWS_PER_SEGMENT_COMPACTED, NO_SKIP_OFFSET, 
new UserCompactionTaskGranularityConfig(newGranularity, null, null), false);
+      submitCompactionConfig(
+          MAX_ROWS_PER_SEGMENT_COMPACTED,
+          NO_SKIP_OFFSET,
+          new UserCompactionTaskGranularityConfig(newGranularity, null, null),
+          false,
+          CompactionEngine.NATIVE
+      );
       // Since dropExisting is set to true...
       // This will submit a single compaction task for interval of 
2013-01-01/2014-01-01 with MONTH granularity
       expectedIntervalAfterCompaction = new ArrayList<>();
@@ -1239,7 +1277,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           MAX_ROWS_PER_SEGMENT_COMPACTED,
           NO_SKIP_OFFSET,
           new UserCompactionTaskGranularityConfig(Granularities.WEEK, null, 
null),
-          false
+          false,
+          CompactionEngine.NATIVE
       );
       // Before compaction, we have segments with the interval 
2013-08-01/2013-09-01 and 2013-09-01/2013-10-01
       // We will compact the latest segment, 2013-09-01/2013-10-01, to WEEK.
@@ -1319,8 +1358,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     }
   }
 
-  @Test(dataProvider = "engine")
-  public void testAutoCompactionDutyWithRollup(CompactionEngine engine) throws 
Exception
+  @Test()
+  public void testAutoCompactionDutyWithRollup() throws Exception
   {
     final ISOChronology chrono = 
ISOChronology.getInstance(DateTimes.inferTzFromString("America/Los_Angeles"));
     Map<String, Object> specs = ImmutableMap.of("%%GRANULARITYSPEC%%", new 
UniformGranularitySpec(Granularities.DAY, Granularities.DAY, false, 
ImmutableList.of(new Interval("2013-08-31/2013-09-02", chrono))));
@@ -1337,7 +1376,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           NO_SKIP_OFFSET,
           new UserCompactionTaskGranularityConfig(null, null, true),
           false,
-          engine
+          CompactionEngine.NATIVE
       );
       forceTriggerAutoCompaction(2);
       queryAndResultFields = ImmutableMap.of(
@@ -1470,7 +1509,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           null,
           new UserCompactionTaskTransformConfig(new SelectorDimFilter("page", 
"Striker Eureka", null)),
           null,
-          false
+          false,
+          CompactionEngine.NATIVE
       );
       forceTriggerAutoCompaction(2);
 
@@ -1517,7 +1557,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           null,
           null,
           new AggregatorFactory[] {new 
DoubleSumAggregatorFactory("double_sum_added", "added"), new 
LongSumAggregatorFactory("long_sum_added", "added")},
-          false
+          false,
+          CompactionEngine.NATIVE
       );
       forceTriggerAutoCompaction(2);
 
@@ -1577,7 +1618,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           null,
           null,
           null,
-          false
+          false,
+          CompactionEngine.NATIVE
       );
       // Compact the MONTH segment
       forceTriggerAutoCompaction(2);
@@ -1679,57 +1721,31 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     queryHelper.testQueriesFromString(queryResponseTemplate);
   }
 
-  private void submitCompactionConfig(Integer maxRowsPerSegment, Period 
skipOffsetFromLatest)
-      throws Exception
-  {
-    submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null, 
null);
-  }
-
   private void submitCompactionConfig(
       Integer maxRowsPerSegment,
       Period skipOffsetFromLatest,
-      @Nullable CompactionEngine engine
+      CompactionEngine engine
   ) throws Exception
   {
     submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, null, 
engine);
   }
 
-  private void submitCompactionConfig(
-      Integer maxRowsPerSegment,
-      Period skipOffsetFromLatest,
-      UserCompactionTaskGranularityConfig granularitySpec
-  ) throws Exception
-  {
-    submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, 
granularitySpec, false, null);
-  }
-
-
   private void submitCompactionConfig(
       Integer maxRowsPerSegment,
       Period skipOffsetFromLatest,
       UserCompactionTaskGranularityConfig granularitySpec,
-      @Nullable CompactionEngine engine
+      CompactionEngine engine
   ) throws Exception
   {
     submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, 
granularitySpec, false, engine);
   }
 
-  private void submitCompactionConfig(
-      Integer maxRowsPerSegment,
-      Period skipOffsetFromLatest,
-      UserCompactionTaskGranularityConfig granularitySpec,
-      boolean dropExisting
-  ) throws Exception
-  {
-    submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, 
granularitySpec, dropExisting, null);
-  }
-
   private void submitCompactionConfig(
       Integer maxRowsPerSegment,
       Period skipOffsetFromLatest,
       UserCompactionTaskGranularityConfig granularitySpec,
       boolean dropExisting,
-      @Nullable CompactionEngine engine
+      CompactionEngine engine
   ) throws Exception
   {
     submitCompactionConfig(
@@ -1744,28 +1760,6 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
     );
   }
 
-  private void submitCompactionConfig(
-      Integer maxRowsPerSegment,
-      Period skipOffsetFromLatest,
-      UserCompactionTaskGranularityConfig granularitySpec,
-      UserCompactionTaskDimensionsConfig dimensionsSpec,
-      UserCompactionTaskTransformConfig transformSpec,
-      AggregatorFactory[] metricsSpec,
-      boolean dropExisting
-  ) throws Exception
-  {
-    submitCompactionConfig(
-        maxRowsPerSegment,
-        skipOffsetFromLatest,
-        granularitySpec,
-        dimensionsSpec,
-        transformSpec,
-        metricsSpec,
-        dropExisting,
-        null
-    );
-  }
-
   private void submitCompactionConfig(
       Integer maxRowsPerSegment,
       Period skipOffsetFromLatest,
@@ -1774,7 +1768,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       UserCompactionTaskTransformConfig transformSpec,
       AggregatorFactory[] metricsSpec,
       boolean dropExisting,
-      @Nullable CompactionEngine engine
+      CompactionEngine engine
   ) throws Exception
   {
     submitCompactionConfig(
@@ -1799,7 +1793,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       UserCompactionTaskTransformConfig transformSpec,
       AggregatorFactory[] metricsSpec,
       boolean dropExisting,
-      @Nullable CompactionEngine engine
+      CompactionEngine engine
   ) throws Exception
   {
     DataSourceCompactionConfig dataSourceCompactionConfig = new 
DataSourceCompactionConfig(
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index 0f92d99db10..806b35e9481 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -32,6 +32,7 @@ import 
org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -102,9 +103,8 @@ public class ClientCompactionRunnerInfo
    * <ul>
    * <li>partitionsSpec of type HashedParititionsSpec.</li>
    * <li>maxTotalRows in DynamicPartitionsSpec.</li>
-   * <li>rollup set to false in granularitySpec when metricsSpec is specified. 
Null is treated as true.</li>
-   * <li>queryGranularity set to ALL in granularitySpec.</li>
-   * <li>Each metric has output column name same as the input name.</li>
+   * <li>rollup in granularitySpec set to false when metricsSpec is specified 
or true when it's empty.</li>
+   * <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory 
'A' s.t. 'A != A.combiningFactory()'.</li>
    * </ul>
    */
   private static CompactionConfigValidationResult 
compactionConfigSupportedByMSQEngine(DataSourceCompactionConfig newConfig)
@@ -120,6 +120,7 @@ public class ClientCompactionRunnerInfo
       ));
     }
     
validationResults.add(validateMaxNumTasksForMSQ(newConfig.getTaskContext()));
+    
validationResults.add(validateMetricsSpecForMSQ(newConfig.getMetricsSpec()));
     return validationResults.stream()
                             .filter(result -> !result.isValid())
                             .findFirst()
@@ -149,17 +150,23 @@ public class ClientCompactionRunnerInfo
   }
 
   /**
-   * Validate rollup is set to false in granularitySpec when metricsSpec is 
specified.
+   * Validate rollup in granularitySpec is set to true when metricsSpec is 
specified and false if it's null.
+   * If rollup set to null, all existing segments are analyzed, and it's set 
to true iff all segments have rollup
+   * set to true.
    */
   public static CompactionConfigValidationResult validateRollupForMSQ(
       AggregatorFactory[] metricsSpec,
       @Nullable Boolean isRollup
   )
   {
-    if (metricsSpec != null && isRollup != null && !isRollup) {
+    if (metricsSpec != null && metricsSpec.length != 0 && isRollup != null && 
!isRollup) {
       return CompactionConfigValidationResult.failure(
           "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is 
specified"
       );
+    } else if ((metricsSpec == null || metricsSpec.length == 0) && isRollup != 
null && isRollup) {
+      return CompactionConfigValidationResult.failure(
+          "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is 
null"
+      );
     }
     return CompactionConfigValidationResult.success();
   }
@@ -181,4 +188,23 @@ public class ClientCompactionRunnerInfo
     }
     return CompactionConfigValidationResult.success();
   }
+
+  /**
+   * Validate each metric is idempotent, i.e. it defines some 
aggregatorFactory 'A' s.t. 'A = A.combiningFactory()'.
+   */
+  public static CompactionConfigValidationResult 
validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
+  {
+    if (metricsSpec == null) {
+      return CompactionConfigValidationResult.success();
+    }
+    return Arrays.stream(metricsSpec)
+                 .filter(aggregatorFactory -> 
!aggregatorFactory.equals(aggregatorFactory.getCombiningFactory()))
+                 .findFirst()
+                 .map(aggregatorFactory ->
+                          CompactionConfigValidationResult.failure(
+                              "MSQ: Non-idempotent aggregator[%s] not 
supported in 'metricsSpec'.",
+                              aggregatorFactory.getName()
+                          )
+                 ).orElse(CompactionConfigValidationResult.success());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java
 
b/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java
deleted file mode 100644
index b2cb90bc0ce..00000000000
--- 
a/server/src/main/java/org/apache/druid/segment/indexing/CombinedDataSchema.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.indexing;
-
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.segment.indexing.granularity.GranularitySpec;
-import org.apache.druid.segment.transform.TransformSpec;
-
-import javax.annotation.Nullable;
-
-/**
- * Class representing the combined DataSchema of a set of segments, currently 
used only by Compaction.
- */
-public class CombinedDataSchema extends DataSchema
-{
-  private final boolean hasRolledUpSegments;
-
-  public CombinedDataSchema(
-      String dataSource,
-      @Nullable TimestampSpec timestampSpec,
-      @Nullable DimensionsSpec dimensionsSpec,
-      AggregatorFactory[] aggregators,
-      GranularitySpec granularitySpec,
-      TransformSpec transformSpec,
-      @Nullable boolean hasRolledUpSegments
-  )
-  {
-    super(
-        dataSource,
-        timestampSpec,
-        dimensionsSpec,
-        aggregators,
-        granularitySpec,
-        transformSpec,
-        null,
-        null
-    );
-    this.hasRolledUpSegments = hasRolledUpSegments;
-  }
-
-  public boolean hasRolledUpSegments()
-  {
-    return hasRolledUpSegments;
-  }
-}
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index 7742eaaf138..011a4640da3 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -52,7 +52,7 @@ public class ClientCompactionRunnerInfoTest
   @Test
   public void testMSQEngineWithHashedPartitionsSpecIsInvalid()
   {
-    DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new HashedPartitionsSpec(100, null, null),
         Collections.emptyMap(),
         null,
@@ -72,7 +72,7 @@ public class ClientCompactionRunnerInfoTest
   @Test
   public void testMSQEngineWithMaxTotalRowsIsInvalid()
   {
-    DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DynamicPartitionsSpec(100, 100L),
         Collections.emptyMap(),
         null,
@@ -92,7 +92,7 @@ public class ClientCompactionRunnerInfoTest
   @Test
   public void testMSQEngineWithDynamicPartitionsSpecIsValid()
   {
-    DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DynamicPartitionsSpec(100, null),
         Collections.emptyMap(),
         null,
@@ -105,7 +105,7 @@ public class ClientCompactionRunnerInfoTest
   @Test
   public void testMSQEngineWithDimensionRangePartitionsSpecIsValid()
   {
-    DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DimensionRangePartitionsSpec(100, null, 
ImmutableList.of("partitionDim"), false),
         Collections.emptyMap(),
         null,
@@ -118,7 +118,7 @@ public class ClientCompactionRunnerInfoTest
   @Test
   public void testMSQEngineWithQueryGranularityAllIsValid()
   {
-    DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DynamicPartitionsSpec(3, null),
         Collections.emptyMap(),
         new UserCompactionTaskGranularityConfig(Granularities.ALL, 
Granularities.ALL, false),
@@ -131,7 +131,7 @@ public class ClientCompactionRunnerInfoTest
   @Test
   public void testMSQEngineWithRollupFalseWithMetricsSpecIsInvalid()
   {
-    DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DynamicPartitionsSpec(3, null),
         Collections.emptyMap(),
         new UserCompactionTaskGranularityConfig(null, null, false),
@@ -148,10 +148,53 @@ public class ClientCompactionRunnerInfoTest
     );
   }
 
+  @Test
+  public void testMSQEngineWithRollupTrueWithoutMetricsSpecIsInvalid()
+  {
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
+        new DynamicPartitionsSpec(3, null),
+        Collections.emptyMap(),
+        new UserCompactionTaskGranularityConfig(null, null, true),
+        null
+    );
+    CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
+        compactionConfig,
+        CompactionEngine.NATIVE
+    );
+    Assert.assertFalse(validationResult.isValid());
+    Assert.assertEquals(
+        "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null",
+        validationResult.getReason()
+    );
+  }
+
+  @Test
+  public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
+  {
+    // Aggregators having combiningFactory different from the 
aggregatorFactory are unsupported.
+    final String inputColName = "added";
+    final String outputColName = "sum_added";
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
+        new DynamicPartitionsSpec(3, null),
+        Collections.emptyMap(),
+        new UserCompactionTaskGranularityConfig(null, null, null),
+        new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, 
inputColName)}
+    );
+    CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
+        compactionConfig,
+        CompactionEngine.NATIVE
+    );
+    Assert.assertFalse(validationResult.isValid());
+    Assert.assertEquals(
+        "MSQ: Non-idempotent aggregator[sum_added] not supported in 
'metricsSpec'.",
+        validationResult.getReason()
+    );
+  }
+
   @Test
   public void testMSQEngineWithRollupNullWithMetricsSpecIsValid()
   {
-    DataSourceCompactionConfig compactionConfig = createCompactionConfig(
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DynamicPartitionsSpec(3, null),
         Collections.emptyMap(),
         new UserCompactionTaskGranularityConfig(null, null, null),
@@ -161,7 +204,7 @@ public class ClientCompactionRunnerInfoTest
                                          .isValid());
   }
 
-  private static DataSourceCompactionConfig createCompactionConfig(
+  private static DataSourceCompactionConfig createMSQCompactionConfig(
       PartitionsSpec partitionsSpec,
       Map<String, Object> context,
       @Nullable UserCompactionTaskGranularityConfig granularitySpec,
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java 
b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
index 87ddac50f01..78294fca0c4 100644
--- a/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
+++ b/server/src/test/java/org/apache/druid/segment/indexing/DataSchemaTest.java
@@ -684,24 +684,4 @@ public class DataSchemaTest extends 
InitializedNullHandlingTest
     Assert.assertSame(oldSchema.getParserMap(), newSchema.getParserMap());
 
   }
-
-  @Test
-  public void testCombinedDataSchemaSetsHasRolledUpSegments()
-  {
-    CombinedDataSchema schema = new CombinedDataSchema(
-        IdUtilsTest.VALID_ID_CHARS,
-        new TimestampSpec("time", "auto", null),
-        DimensionsSpec.builder()
-                      .setDimensions(
-                          
DimensionsSpec.getDefaultSchemas(ImmutableList.of("dimA", "dimB", "metric1"))
-                      )
-                      .setDimensionExclusions(ImmutableList.of("dimC"))
-                      .build(),
-        null,
-        new ArbitraryGranularitySpec(Granularities.DAY, 
ImmutableList.of(Intervals.of("2014/2015"))),
-        null,
-        true
-    );
-    Assert.assertTrue(schema.hasRolledUpSegments());
-  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index c44b9b2f358..24cbfbd9462 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -202,7 +202,7 @@ public class CoordinatorCompactionConfigsResourceTest
         .withInputSegmentSizeBytes(1000L)
         .withSkipOffsetFromLatest(Period.hours(3))
         .withGranularitySpec(
-            new UserCompactionTaskGranularityConfig(Granularities.DAY, null, 
true)
+            new UserCompactionTaskGranularityConfig(Granularities.DAY, null, 
false)
         )
         .withEngine(CompactionEngine.MSQ)
         .build();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to