This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e35e50052e Fix issues with MSQ Compaction (#17250)
7e35e50052e is described below

commit 7e35e50052ee1b4f4d65222e0d5c4883e9fa26da
Author: Vishesh Garg <[email protected]>
AuthorDate: Sun Oct 6 21:48:26 2024 +0530

    Fix issues with MSQ Compaction (#17250)
    
    The patch makes the following changes:
    1. Fixes a bug causing compaction to fail on array, complex, and other 
non-primitive-type columns
    2. Updates compaction status check to be conscious of partition dimensions 
when comparing dimension ordering.
    3. Ensures only string columns are specified as partition dimensions
    4. Ensures `rollup` is true if and only if metricsSpec is non-empty
    5. Ensures disjoint intervals aren't submitted for compaction
    6. Adds `compactionReason` to compaction task context.
---
 .../druid/msq/indexing/MSQCompactionRunner.java    | 33 ++++++---
 .../msq/indexing/MSQCompactionRunnerTest.java      | 83 +++++++++++++++++-----
 .../indexing/common/task/CompactionRunner.java     |  5 +-
 .../druid/indexing/common/task/CompactionTask.java |  5 +-
 .../common/task/NativeCompactionRunner.java        |  3 +-
 .../indexing/ClientCompactionRunnerInfo.java       | 54 ++++++++++----
 .../druid/server/compaction/CompactionStatus.java  | 44 ++++++++++--
 .../server/coordinator/duty/CompactSegments.java   |  5 ++
 .../indexing/ClientCompactionRunnerInfoTest.java   | 68 ++++++++++++++----
 .../compaction/NewestSegmentFirstPolicyTest.java   | 77 ++++++++++++++++++++
 10 files changed, 317 insertions(+), 60 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index 417fdb60d0f..e20188d5829 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.inject.Injector;
 import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
 import org.apache.druid.data.input.impl.DimensionSchema;
@@ -129,21 +130,35 @@ public class MSQCompactionRunner implements 
CompactionRunner
    * The following configs aren't supported:
    * <ul>
    * <li>partitionsSpec of type HashedParititionsSpec.</li>
+   * <li>'range' partitionsSpec with non-string partition dimensions.</li>
    * <li>maxTotalRows in DynamicPartitionsSpec.</li>
-   * <li>rollup in granularitySpec set to false when metricsSpec is specified 
or true when it's null.
-   * Null is treated as true if metricsSpec exist and false if empty.</li>
-   * <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory 
'A' s.t. 'A != A.combiningFactory()'.</li>
+   * <li>Rollup without metricsSpec being specified or vice-versa.</li>
+   * <li>Any aggregatorFactory {@code A} s.t. {@code A != 
A.combiningFactory()}.</li>
+   * <li>Multiple disjoint intervals in compaction task</li>
    * </ul>
    */
   @Override
   public CompactionConfigValidationResult validateCompactionTask(
-      CompactionTask compactionTask
+      CompactionTask compactionTask,
+      Map<Interval, DataSchema> intervalToDataSchemaMap
   )
   {
+    if (intervalToDataSchemaMap.size() > 1) {
+      // We are currently not able to handle multiple intervals in the map for 
multiple reasons, one of them being that
+      // the subsequent worker ids clash -- since they are derived from 
MSQControllerTask ID which in turn is equal to
+      // CompactionTask ID for each sequentially launched MSQControllerTask.
+      return CompactionConfigValidationResult.failure(
+          "MSQ: Disjoint compaction intervals[%s] not supported",
+          intervalToDataSchemaMap.keySet()
+      );
+    }
     List<CompactionConfigValidationResult> validationResults = new 
ArrayList<>();
     if (compactionTask.getTuningConfig() != null) {
-      
validationResults.add(ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
-          compactionTask.getTuningConfig().getPartitionsSpec())
+      validationResults.add(
+          ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
+              compactionTask.getTuningConfig().getPartitionsSpec(),
+              
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
+          )
       );
     }
     if (compactionTask.getGranularitySpec() != null) {
@@ -300,7 +315,7 @@ public class MSQCompactionRunner implements CompactionRunner
       rowSignatureBuilder.add(TIME_VIRTUAL_COLUMN, ColumnType.LONG);
     }
     for (DimensionSchema dimensionSchema : 
dataSchema.getDimensionsSpec().getDimensions()) {
-      rowSignatureBuilder.add(dimensionSchema.getName(), 
ColumnType.fromString(dimensionSchema.getTypeName()));
+      rowSignatureBuilder.add(dimensionSchema.getName(), 
dimensionSchema.getColumnType());
     }
     // There can be columns that are part of metricsSpec for a datasource.
     for (AggregatorFactory aggregatorFactory : dataSchema.getAggregators()) {
@@ -416,7 +431,9 @@ public class MSQCompactionRunner implements CompactionRunner
   {
     if (dataSchema.getGranularitySpec() != null) {
       // If rollup is true without any metrics, all columns are treated as 
dimensions and
-      // duplicate rows are removed in line with native compaction.
+      // duplicate rows are removed in line with native compaction. This case 
can only happen if the rollup is
+      // specified as null in the compaction spec and is then inferred to be 
true by segment analysis. metrics=null and
+      // rollup=true combination in turn can only have been recorded for 
natively ingested segments.
       return dataSchema.getGranularitySpec().isRollup();
     }
     // If no rollup specified, decide based on whether metrics are present.
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 15b12be1575..0b5395d727f 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
@@ -41,6 +42,7 @@ import org.apache.druid.indexing.common.task.CompactionTask;
 import org.apache.druid.indexing.common.task.TuningConfigBuilder;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.GranularityType;
 import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
@@ -96,7 +98,6 @@ public class MSQCompactionRunnerTest
   private static final int MAX_ROWS_PER_SEGMENT = 150000;
   private static final GranularityType SEGMENT_GRANULARITY = 
GranularityType.HOUR;
   private static final GranularityType QUERY_GRANULARITY = 
GranularityType.HOUR;
-  private static List<String> PARTITION_DIMENSIONS;
 
   private static final StringDimensionSchema STRING_DIMENSION = new 
StringDimensionSchema("string_dim", null, false);
   private static final StringDimensionSchema MV_STRING_DIMENSION = new 
StringDimensionSchema("mv_string_dim", null, null);
@@ -106,24 +107,49 @@ public class MSQCompactionRunnerTest
       LONG_DIMENSION,
       MV_STRING_DIMENSION
   );
+  private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS = 
ImmutableMap.of(
+      COMPACTION_INTERVAL,
+      new DataSchema.Builder()
+          .withDataSource(DATA_SOURCE)
+          .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
+          .withDimensions(new DimensionsSpec(DIMENSIONS))
+          .build()
+  );
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
   private static final AggregatorFactory AGG1 = new 
CountAggregatorFactory("agg_0");
   private static final AggregatorFactory AGG2 = new 
LongSumAggregatorFactory("sum_added", "sum_added");
   private static final List<AggregatorFactory> AGGREGATORS = 
ImmutableList.of(AGG1, AGG2);
   private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new 
MSQCompactionRunner(JSON_MAPPER, TestExprMacroTable.INSTANCE, null);
+  private static final List<String> PARTITION_DIMENSIONS = 
Collections.singletonList(STRING_DIMENSION.getName());
+
 
   @BeforeClass
   public static void setupClass()
   {
     NullHandling.initializeForTests();
+  }
 
-    final StringDimensionSchema stringDimensionSchema = new 
StringDimensionSchema(
-        "string_dim",
+  @Test
+  public void testMultipleDisjointCompactionIntervalsAreInvalid()
+  {
+    Map<Interval, DataSchema> intervalDataschemas = new 
HashMap<>(INTERVAL_DATASCHEMAS);
+    intervalDataschemas.put(Intervals.of("2017-07-01/2018-01-01"), null);
+    CompactionTask compactionTask = createCompactionTask(
+        new HashedPartitionsSpec(3, null, ImmutableList.of("dummy")),
+        null,
+        Collections.emptyMap(),
         null,
         null
     );
-
-    PARTITION_DIMENSIONS = 
Collections.singletonList(stringDimensionSchema.getName());
+    CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(
+        compactionTask,
+        intervalDataschemas
+    );
+    Assert.assertFalse(validationResult.isValid());
+    Assert.assertEquals(
+        StringUtils.format("MSQ: Disjoint compaction intervals[%s] not 
supported", intervalDataschemas.keySet()),
+        validationResult.getReason()
+    );
   }
 
   @Test
@@ -136,11 +162,11 @@ public class MSQCompactionRunnerTest
         null,
         null
     );
-    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
   }
 
   @Test
-  public void testDimensionRangePartitionsSpecIsValid()
+  public void testStringDimensionInRangePartitionsSpecIsValid()
   {
     CompactionTask compactionTask = createCompactionTask(
         new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, 
PARTITION_DIMENSIONS, false),
@@ -149,7 +175,29 @@ public class MSQCompactionRunnerTest
         null,
         null
     );
-    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
+  }
+
+  @Test
+  public void testLongDimensionInRangePartitionsSpecIsInvalid()
+  {
+    List<String> longPartitionDimension = 
Collections.singletonList(LONG_DIMENSION.getName());
+    CompactionTask compactionTask = createCompactionTask(
+        new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, 
longPartitionDimension, false),
+        null,
+        Collections.emptyMap(),
+        null,
+        null
+    );
+
+    CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
+                                                                               
                      INTERVAL_DATASCHEMAS
+    );
+    Assert.assertFalse(validationResult.isValid());
+    Assert.assertEquals(
+        "MSQ: Non-string partition dimension[long_dim] of type[long] not 
supported with 'range' partition spec",
+        validationResult.getReason()
+    );
   }
 
   @Test
@@ -162,7 +210,7 @@ public class MSQCompactionRunnerTest
         null,
         null
     );
-    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
   }
 
   @Test
@@ -175,7 +223,7 @@ public class MSQCompactionRunnerTest
         null,
         null
     );
-    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
   }
 
   @Test
@@ -188,7 +236,7 @@ public class MSQCompactionRunnerTest
         new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
         null
     );
-    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+    
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
   }
 
   @Test
@@ -201,7 +249,7 @@ public class MSQCompactionRunnerTest
         new ClientCompactionTaskGranularitySpec(null, null, false),
         AGGREGATORS.toArray(new AggregatorFactory[0])
     );
-    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
   }
 
   @Test
@@ -214,7 +262,7 @@ public class MSQCompactionRunnerTest
         new ClientCompactionTaskGranularitySpec(null, null, true),
         null
     );
-    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask).isValid());
+    
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
   }
 
   @Test
@@ -227,13 +275,16 @@ public class MSQCompactionRunnerTest
         new DynamicPartitionsSpec(3, null),
         null,
         Collections.emptyMap(),
-        new ClientCompactionTaskGranularitySpec(null, null, null),
+        new ClientCompactionTaskGranularitySpec(null, null, true),
         new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, 
inputColName)}
     );
-    CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask);
+    CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(
+        compactionTask,
+        INTERVAL_DATASCHEMAS
+    );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "MSQ: Non-idempotent aggregator[sum_added] not supported in 
'metricsSpec'.",
+        "MSQ: Aggregator[sum_added] not supported in 'metricsSpec'",
         validationResult.getReason()
     );
   }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
index 8d30a60d04e..0abaeed8eb2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionRunner.java
@@ -57,6 +57,9 @@ public interface CompactionRunner
    * Checks if the provided compaction config is supported by the runner.
    * The same validation is done at {@link 
org.apache.druid.msq.indexing.MSQCompactionRunner#validateCompactionTask}
    */
-  CompactionConfigValidationResult validateCompactionTask(CompactionTask 
compactionTask);
+  CompactionConfigValidationResult validateCompactionTask(
+      CompactionTask compactionTask,
+      Map<Interval, DataSchema> intervalToDataSchemaMap
+  );
 
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index b3c01d79f98..4594fc1e9b2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -470,7 +470,10 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
     );
 
     
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
-    CompactionConfigValidationResult supportsCompactionConfig = 
compactionRunner.validateCompactionTask(this);
+    CompactionConfigValidationResult supportsCompactionConfig = 
compactionRunner.validateCompactionTask(
+        this,
+        intervalDataSchemas
+    );
     if (!supportsCompactionConfig.isValid()) {
       throw InvalidInput.exception("Compaction spec not supported. 
Reason[%s].", supportsCompactionConfig.getReason());
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
index 2074d14f0f9..541f24fe088 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NativeCompactionRunner.java
@@ -85,7 +85,8 @@ public class NativeCompactionRunner implements 
CompactionRunner
 
   @Override
   public CompactionConfigValidationResult validateCompactionTask(
-      CompactionTask compactionTask
+      CompactionTask compactionTask,
+      Map<Interval, DataSchema> intervalDataSchemaMap
   )
   {
     return CompactionConfigValidationResult.success();
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index 806b35e9481..f6a009afe1c 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -21,12 +21,14 @@ package org.apache.druid.client.indexing;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 
@@ -36,6 +38,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 
 /**
@@ -102,16 +107,20 @@ public class ClientCompactionRunnerInfo
    * Checks if the provided compaction config is supported by MSQ. The 
following configs aren't supported:
    * <ul>
    * <li>partitionsSpec of type HashedParititionsSpec.</li>
+   * <li>'range' partitionsSpec with non-string partition dimensions.</li>
    * <li>maxTotalRows in DynamicPartitionsSpec.</li>
-   * <li>rollup in granularitySpec set to false when metricsSpec is specified 
or true when it's empty.</li>
-   * <li>any metric is non-idempotent, i.e. it defines some aggregatorFactory 
'A' s.t. 'A != A.combiningFactory()'.</li>
+   * <li>Rollup without metricsSpec being specified or vice-versa.</li>
+   * <li>Any aggregatorFactory {@code A} s.t. {@code A != 
A.combiningFactory()}.</li>
    * </ul>
    */
   private static CompactionConfigValidationResult 
compactionConfigSupportedByMSQEngine(DataSourceCompactionConfig newConfig)
   {
     List<CompactionConfigValidationResult> validationResults = new 
ArrayList<>();
     if (newConfig.getTuningConfig() != null) {
-      
validationResults.add(validatePartitionsSpecForMSQ(newConfig.getTuningConfig().getPartitionsSpec()));
+      validationResults.add(validatePartitionsSpecForMSQ(
+          newConfig.getTuningConfig().getPartitionsSpec(),
+          newConfig.getDimensionsSpec() == null ? null : 
newConfig.getDimensionsSpec().getDimensions()
+      ));
     }
     if (newConfig.getGranularitySpec() != null) {
       validationResults.add(validateRollupForMSQ(
@@ -128,9 +137,13 @@ public class ClientCompactionRunnerInfo
   }
 
   /**
-   * Validate that partitionSpec is either 'dynamic` or 'range', and if 
'dynamic', ensure 'maxTotalRows' is null.
+   * Validate that partitionSpec is either 'dynamic` or 'range'. If 'dynamic', 
ensure 'maxTotalRows' is null. If range
+   * ensure all partition columns are of string type.
    */
-  public static CompactionConfigValidationResult 
validatePartitionsSpecForMSQ(PartitionsSpec partitionsSpec)
+  public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
+      @Nullable PartitionsSpec partitionsSpec,
+      @Nullable List<DimensionSchema> dimensionSchemas
+  )
   {
     if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
           || partitionsSpec instanceof DynamicPartitionsSpec)) {
@@ -146,11 +159,28 @@ public class ClientCompactionRunnerInfo
           "MSQ: 'maxTotalRows' not supported with 'dynamic' partitioning"
       );
     }
+    if (partitionsSpec instanceof DimensionRangePartitionsSpec && 
dimensionSchemas != null) {
+      Map<String, DimensionSchema> dimensionSchemaMap = 
dimensionSchemas.stream().collect(
+          Collectors.toMap(DimensionSchema::getName, Function.identity())
+      );
+      Optional<String> nonStringDimension = ((DimensionRangePartitionsSpec) 
partitionsSpec)
+          .getPartitionDimensions()
+          .stream()
+          .filter(dim -> 
!ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType()))
+          .findAny();
+      if (nonStringDimension.isPresent()) {
+        return CompactionConfigValidationResult.failure(
+            "MSQ: Non-string partition dimension[%s] of type[%s] not supported 
with 'range' partition spec",
+            nonStringDimension.get(),
+            dimensionSchemaMap.get(nonStringDimension.get()).getTypeName()
+        );
+      }
+    }
     return CompactionConfigValidationResult.success();
   }
 
   /**
-   * Validate rollup in granularitySpec is set to true when metricsSpec is 
specified and false if it's null.
+   * Validate rollup in granularitySpec is set to true iff metricsSpec is 
specified.
    * If rollup set to null, all existing segments are analyzed, and it's set 
to true iff all segments have rollup
    * set to true.
    */
@@ -159,13 +189,9 @@ public class ClientCompactionRunnerInfo
       @Nullable Boolean isRollup
   )
   {
-    if (metricsSpec != null && metricsSpec.length != 0 && isRollup != null && 
!isRollup) {
-      return CompactionConfigValidationResult.failure(
-          "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is 
specified"
-      );
-    } else if ((metricsSpec == null || metricsSpec.length == 0) && isRollup != 
null && isRollup) {
+    if ((metricsSpec != null && metricsSpec.length > 0) != 
Boolean.TRUE.equals(isRollup)) {
       return CompactionConfigValidationResult.failure(
-          "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is 
null"
+          "MSQ: 'granularitySpec.rollup' must be true if and only if 
'metricsSpec' is specified"
       );
     }
     return CompactionConfigValidationResult.success();
@@ -190,7 +216,7 @@ public class ClientCompactionRunnerInfo
   }
 
   /**
-   * Validate each metric is idempotent, i.e. it defines some 
aggregatorFactory 'A' s.t. 'A = A.combiningFactory()'.
+   * Validate each metric defines some aggregatorFactory 'A' s.t. 'A = 
A.combiningFactory()'.
    */
   public static CompactionConfigValidationResult 
validateMetricsSpecForMSQ(AggregatorFactory[] metricsSpec)
   {
@@ -202,7 +228,7 @@ public class ClientCompactionRunnerInfo
                  .findFirst()
                  .map(aggregatorFactory ->
                           CompactionConfigValidationResult.failure(
-                              "MSQ: Non-idempotent aggregator[%s] not 
supported in 'metricsSpec'.",
+                              "MSQ: Aggregator[%s] not supported in 
'metricsSpec'",
                               aggregatorFactory.getName()
                           )
                  ).orElse(CompactionConfigValidationResult.success());
diff --git 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
index 2bc6d251f06..fd53ed38c25 100644
--- 
a/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
+++ 
b/server/src/main/java/org/apache/druid/server/compaction/CompactionStatus.java
@@ -25,7 +25,7 @@ import 
org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
 import org.apache.druid.common.config.Configs;
-import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
@@ -44,6 +44,7 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Represents the status of compaction for a given {@link CompactionCandidate}.
@@ -230,6 +231,21 @@ public class CompactionStatus
     }
   }
 
+  private static List<DimensionSchema> getNonPartitioningDimensions(
+      @Nullable final List<DimensionSchema> dimensionSchemas,
+      @Nullable final PartitionsSpec partitionsSpec
+  )
+  {
+    if (dimensionSchemas == null || !(partitionsSpec instanceof 
DimensionRangePartitionsSpec)) {
+      return dimensionSchemas;
+    }
+
+    final List<String> partitionsDimensions = ((DimensionRangePartitionsSpec) 
partitionsSpec).getPartitionDimensions();
+    return dimensionSchemas.stream()
+                     .filter(dim -> 
!partitionsDimensions.contains(dim.getName()))
+                     .collect(Collectors.toList());
+  }
+
   /**
    * Converts to have only the effective maxRowsPerSegment to avoid false 
positives when targetRowsPerSegment is set but
    * effectively translates to the same maxRowsPerSegment.
@@ -389,18 +405,34 @@ public class CompactionStatus
       }
     }
 
+    /**
+     * Removes partition dimensions before comparison, since they are placed 
in front of the sort order --
+     * which can create a mismatch between expected and actual order of 
dimensions. Partition dimensions are separately
+     * covered in {@link Evaluator#partitionsSpecIsUpToDate()} check.
+     */
     private CompactionStatus dimensionsSpecIsUpToDate()
     {
       if (compactionConfig.getDimensionsSpec() == null) {
         return COMPLETE;
       } else {
-        final DimensionsSpec existingDimensionsSpec = 
lastCompactionState.getDimensionsSpec();
-        return CompactionStatus.completeIfEqual(
-            "dimensionsSpec",
+        List<DimensionSchema> existingDimensions = 
getNonPartitioningDimensions(
+            lastCompactionState.getDimensionsSpec() == null
+            ? null
+            : lastCompactionState.getDimensionsSpec().getDimensions(),
+            lastCompactionState.getPartitionsSpec()
+        );
+        List<DimensionSchema> configuredDimensions = 
getNonPartitioningDimensions(
             compactionConfig.getDimensionsSpec().getDimensions(),
-            existingDimensionsSpec == null ? null : 
existingDimensionsSpec.getDimensions(),
-            String::valueOf
+            compactionConfig.getTuningConfig() == null ? null : 
compactionConfig.getTuningConfig().getPartitionsSpec()
         );
+        {
+          return CompactionStatus.completeIfEqual(
+              "dimensionsSpec",
+              configuredDimensions,
+              existingDimensions,
+              String::valueOf
+          );
+        }
       }
     }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index b347a57dcb6..035286692bf 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -86,6 +86,7 @@ public class CompactSegments implements CoordinatorCustomDuty
    * Must be the same as 
org.apache.druid.indexing.common.task.Tasks.STORE_COMPACTION_STATE_KEY
    */
   public static final String STORE_COMPACTION_STATE_KEY = 
"storeCompactionState";
+  private static final String COMPACTION_REASON_KEY = "compactionReason";
 
   private static final Logger LOG = new Logger(CompactSegments.class);
 
@@ -567,6 +568,10 @@ public class CompactSegments implements 
CoordinatorCustomDuty
         slotsRequiredForCurrentTask = 
findMaxNumTaskSlotsUsedByOneNativeCompactionTask(config.getTuningConfig());
       }
 
+      if (entry.getCurrentStatus() != null) {
+        autoCompactionContext.put(COMPACTION_REASON_KEY, 
entry.getCurrentStatus().getReason());
+      }
+
       final String taskId = compactSegments(
           entry,
           config.getTaskPriority(),
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index 011a4640da3..b1f06542280 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -21,6 +21,8 @@ package org.apache.druid.client.indexing;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.druid.data.input.SegmentsSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.indexer.CompactionEngine;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@@ -36,6 +38,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
 import org.joda.time.Duration;
@@ -45,6 +48,7 @@ import org.junit.Test;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 public class ClientCompactionRunnerInfoTest
@@ -56,6 +60,7 @@ public class ClientCompactionRunnerInfoTest
         new HashedPartitionsSpec(100, null, null),
         Collections.emptyMap(),
         null,
+        null,
         null
     );
     CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
@@ -76,6 +81,7 @@ public class ClientCompactionRunnerInfoTest
         new DynamicPartitionsSpec(100, 100L),
         Collections.emptyMap(),
         null,
+        null,
         null
     );
     CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
@@ -96,6 +102,7 @@ public class ClientCompactionRunnerInfoTest
         new DynamicPartitionsSpec(100, null),
         Collections.emptyMap(),
         null,
+        null,
         null
     );
     
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
 CompactionEngine.NATIVE)
@@ -103,18 +110,40 @@ public class ClientCompactionRunnerInfoTest
   }
 
   @Test
-  public void testMSQEngineWithDimensionRangePartitionsSpecIsValid()
+  public void testMSQEngineWithStringDimensionsInRangePartitionsSpecIsValid()
   {
     DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DimensionRangePartitionsSpec(100, null, 
ImmutableList.of("partitionDim"), false),
         Collections.emptyMap(),
         null,
+        null,
         null
     );
     
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
 CompactionEngine.NATIVE)
                                          .isValid());
   }
 
+  @Test
+  public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsValid()
+  {
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
+        new DimensionRangePartitionsSpec(100, null, 
ImmutableList.of("partitionDim"), false),
+        Collections.emptyMap(),
+        null,
+        null,
+        ImmutableList.of(new LongDimensionSchema("partitionDim"))
+    );
+    CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
+        compactionConfig,
+        CompactionEngine.NATIVE
+    );
+    Assert.assertFalse(validationResult.isValid());
+    Assert.assertEquals(
+        "MSQ: Non-string partition dimension[partitionDim] of type[long] not 
supported with 'range' partition spec",
+        validationResult.getReason()
+    );
+  }
+
   @Test
   public void testMSQEngineWithQueryGranularityAllIsValid()
   {
@@ -122,6 +151,7 @@ public class ClientCompactionRunnerInfoTest
         new DynamicPartitionsSpec(3, null),
         Collections.emptyMap(),
         new UserCompactionTaskGranularityConfig(Granularities.ALL, 
Granularities.ALL, false),
+        null,
         null
     );
     
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
 CompactionEngine.NATIVE)
@@ -135,7 +165,8 @@ public class ClientCompactionRunnerInfoTest
         new DynamicPartitionsSpec(3, null),
         Collections.emptyMap(),
         new UserCompactionTaskGranularityConfig(null, null, false),
-        new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
+        new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")},
+        null
     );
     CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
         compactionConfig,
@@ -143,7 +174,7 @@ public class ClientCompactionRunnerInfoTest
     );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "MSQ: 'granularitySpec.rollup' must be true if 'metricsSpec' is 
specified",
+        "MSQ: 'granularitySpec.rollup' must be true if and only if 
'metricsSpec' is specified",
         validationResult.getReason()
     );
   }
@@ -155,6 +186,7 @@ public class ClientCompactionRunnerInfoTest
         new DynamicPartitionsSpec(3, null),
         Collections.emptyMap(),
         new UserCompactionTaskGranularityConfig(null, null, true),
+        null,
         null
     );
     CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
@@ -163,7 +195,7 @@ public class ClientCompactionRunnerInfoTest
     );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "MSQ: 'granularitySpec.rollup' must be false if 'metricsSpec' is null",
+        "MSQ: 'granularitySpec.rollup' must be true if and only if 
'metricsSpec' is specified",
         validationResult.getReason()
     );
   }
@@ -177,8 +209,9 @@ public class ClientCompactionRunnerInfoTest
     DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DynamicPartitionsSpec(3, null),
         Collections.emptyMap(),
-        new UserCompactionTaskGranularityConfig(null, null, null),
-        new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, 
inputColName)}
+        new UserCompactionTaskGranularityConfig(null, null, true),
+        new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, 
inputColName)},
+        null
     );
     CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
         compactionConfig,
@@ -186,29 +219,38 @@ public class ClientCompactionRunnerInfoTest
     );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "MSQ: Non-idempotent aggregator[sum_added] not supported in 
'metricsSpec'.",
+        "MSQ: Aggregator[sum_added] not supported in 'metricsSpec'",
         validationResult.getReason()
     );
   }
 
   @Test
-  public void testMSQEngineWithRollupNullWithMetricsSpecIsValid()
+  public void testMSQEngineWithRollupNullWithMetricsSpecIsInvalid()
   {
     DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DynamicPartitionsSpec(3, null),
         Collections.emptyMap(),
         new UserCompactionTaskGranularityConfig(null, null, null),
-        new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")}
+        new AggregatorFactory[]{new LongSumAggregatorFactory("sum", "sum")},
+        null
+    );
+    CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
+        compactionConfig,
+        CompactionEngine.NATIVE
+    );
+    Assert.assertFalse(validationResult.isValid());
+    Assert.assertEquals(
+        "MSQ: 'granularitySpec.rollup' must be true if and only if 
'metricsSpec' is specified",
+        validationResult.getReason()
     );
-    
Assert.assertTrue(ClientCompactionRunnerInfo.validateCompactionConfig(compactionConfig,
 CompactionEngine.NATIVE)
-                                         .isValid());
   }
 
   private static DataSourceCompactionConfig createMSQCompactionConfig(
       PartitionsSpec partitionsSpec,
       Map<String, Object> context,
       @Nullable UserCompactionTaskGranularityConfig granularitySpec,
-      @Nullable AggregatorFactory[] metricsSpec
+      @Nullable AggregatorFactory[] metricsSpec,
+      List<DimensionSchema> dimensions
   )
   {
     final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
@@ -219,7 +261,7 @@ public class ClientCompactionRunnerInfoTest
         new Period(3600),
         createTuningConfig(partitionsSpec),
         granularitySpec,
-        null,
+        new UserCompactionTaskDimensionsConfig(dimensions),
         metricsSpec,
         null,
         null,
diff --git 
a/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
 
b/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
index 7580582685b..5659a0ff5bf 100644
--- 
a/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/compaction/NewestSegmentFirstPolicyTest.java
@@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
@@ -1137,6 +1138,82 @@ public class NewestSegmentFirstPolicyTest
     Assert.assertFalse(iterator.hasNext());
   }
 
+  @Test
+  public void 
testIteratorDoesNotReturnsSegmentsWhenPartitionDimensionsPrefixed()
+  {
+    // Same indexSpec as what is set in the auto compaction config
+    Map<String, Object> indexSpec = IndexSpec.DEFAULT.asMap(mapper);
+    // Set range partitions spec with dimensions ["dim2", "dim4"] -- the same 
as what is set in the auto compaction config
+    PartitionsSpec partitionsSpec = new DimensionRangePartitionsSpec(
+        null,
+        Integer.MAX_VALUE,
+        ImmutableList.of("dim2", "dim4"),
+        false
+    );
+
+    // Create segments that were compacted (CompactionState != null) and have
+    // Dimensions=["dim2", "dim4", "dim3", "dim1"] with ["dim2", "dim4"] as 
partition dimensions for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
+    // Dimensions=["dim2", "dim4", "dim1", "dim3"] with ["dim2", "dim4"] as 
partition dimensions for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
+    final SegmentTimeline timeline = createTimeline(
+        createSegments()
+            .startingAt("2017-10-01")
+            .withNumPartitions(4)
+            .withCompactionState(
+                new CompactionState(partitionsSpec, new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2", 
"dim4", "dim3", "dim1"))), null, null, indexSpec, null)
+            ),
+        createSegments()
+            .startingAt("2017-10-02")
+            .withNumPartitions(4)
+            .withCompactionState(
+                new CompactionState(partitionsSpec, new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim2", 
"dim4", "dim1", "dim3"))), null, null, indexSpec, null)
+            )
+    );
+
+    // Auto compaction config sets Dimensions=["dim1", "dim2", "dim3", "dim4"] 
and partition dimensions as ["dim2", "dim4"]
+    CompactionSegmentIterator iterator = createIterator(
+        configBuilder().withDimensionsSpec(
+                           new 
UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1",
 "dim2", "dim3", "dim4")))
+                       )
+                       .withTuningConfig(
+                           new UserCompactionTaskQueryTuningConfig(
+                               null,
+                               null,
+                               null,
+                               1000L,
+                               null,
+                               partitionsSpec,
+                               IndexSpec.DEFAULT,
+                               null,
+                               null,
+                               null,
+                               null,
+                               null,
+                               null,
+                               null,
+                               null,
+                               null,
+                               null,
+                               null,
+                               null
+                           )
+                       )
+                       .build(),
+        timeline
+    );
+    // We should get only interval 2017-10-01T00:00:00/2017-10-02T00:00:00 
since 2017-10-02T00:00:00/2017-10-03T00:00:00
+    // has dimension order as expected post reordering of partition dimensions.
+    Assert.assertTrue(iterator.hasNext());
+    List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+        
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
 Partitions.ONLY_COMPLETE)
+    );
+    Assert.assertEquals(
+        ImmutableSet.copyOf(expectedSegmentsToCompact),
+        ImmutableSet.copyOf(iterator.next().getSegments())
+    );
+    // No more
+    Assert.assertFalse(iterator.hasNext());
+  }
+
   @Test
   public void 
testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentFilter() 
throws Exception
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to