This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5da99499926 Fail MSQ compaction if multi-valued partition dimensions 
are found  (#17344)
5da99499926 is described below

commit 5da99499926395c7c5fe2cc23b56c8e358c04fd9
Author: Vishesh Garg <[email protected]>
AuthorDate: Sat Oct 19 13:33:33 2024 +0530

    Fail MSQ compaction if multi-valued partition dimensions are found  (#17344)
    
    MSQ currently supports only single-valued string dimensions as partition 
keys.
    This patch adds a check to ensure that partition keys are single-valued in 
case
    this info is available by virtue of segment download for schema inference.
    
    During compaction, if MSQ finds multi-valued dimensions (MVDs) declared as 
part
    of `range` partitionsSpec, it switches partitioning type to dynamic, ending 
up in
    repeated compactions of the same interval. To avoid this scenario, the 
segment
    download logic is also updated to always download segments if info on 
multi-valued
    dimensions is required.
---
 .../druid/msq/indexing/MSQCompactionRunner.java    |  43 +++++++-
 .../msq/indexing/MSQCompactionRunnerTest.java      |  58 ++++++++--
 .../druid/indexing/common/task/CompactionTask.java |  66 ++++++++++--
 .../indexing/common/task/CompactionTaskTest.java   | 117 +++++++++++++++++++++
 .../indexing/ClientCompactionRunnerInfo.java       |   9 +-
 .../indexing/ClientCompactionRunnerInfoTest.java   |  29 ++++-
 6 files changed, 296 insertions(+), 26 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index e20188d5829..d05ab12ea3f 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Iterables;
 import com.google.inject.Injector;
 import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
 import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -84,6 +85,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -130,7 +132,7 @@ public class MSQCompactionRunner implements CompactionRunner
    * The following configs aren't supported:
    * <ul>
    * <li>partitionsSpec of type HashedParititionsSpec.</li>
-   * <li>'range' partitionsSpec with non-string partition dimensions.</li>
+   * <li>'range' partitionsSpec with multi-valued or non-string partition 
dimensions.</li>
    * <li>maxTotalRows in DynamicPartitionsSpec.</li>
    * <li>Rollup without metricsSpec being specified or vice-versa.</li>
    * <li>Any aggregatorFactory {@code A} s.t. {@code A != 
A.combiningFactory()}.</li>
@@ -153,13 +155,24 @@ public class MSQCompactionRunner implements 
CompactionRunner
       );
     }
     List<CompactionConfigValidationResult> validationResults = new 
ArrayList<>();
+    DataSchema dataSchema = 
Iterables.getOnlyElement(intervalToDataSchemaMap.values());
     if (compactionTask.getTuningConfig() != null) {
       validationResults.add(
           ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
               compactionTask.getTuningConfig().getPartitionsSpec(),
-              
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
+              dataSchema.getDimensionsSpec().getDimensions()
           )
       );
+      validationResults.add(
+          validatePartitionDimensionsAreNotMultiValued(
+              compactionTask.getTuningConfig().getPartitionsSpec(),
+              dataSchema.getDimensionsSpec(),
+              dataSchema instanceof CombinedDataSchema
+              ? ((CombinedDataSchema) dataSchema).getMultiValuedDimensions()
+              : null
+          )
+      );
+
     }
     if (compactionTask.getGranularitySpec() != null) {
       validationResults.add(ClientCompactionRunnerInfo.validateRollupForMSQ(
@@ -175,6 +188,32 @@ public class MSQCompactionRunner implements 
CompactionRunner
                             
.orElse(CompactionConfigValidationResult.success());
   }
 
+  private CompactionConfigValidationResult 
validatePartitionDimensionsAreNotMultiValued(
+      PartitionsSpec partitionsSpec,
+      DimensionsSpec dimensionsSpec,
+      Set<String> multiValuedDimensions
+  )
+  {
+    List<String> dimensionSchemas = dimensionsSpec.getDimensionNames();
+    if (partitionsSpec instanceof DimensionRangePartitionsSpec
+        && dimensionSchemas != null
+        && multiValuedDimensions != null
+        && !multiValuedDimensions.isEmpty()) {
+      Optional<String> multiValuedDimension = ((DimensionRangePartitionsSpec) 
partitionsSpec)
+          .getPartitionDimensions()
+          .stream()
+          .filter(multiValuedDimensions::contains)
+          .findAny();
+      if (multiValuedDimension.isPresent()) {
+        return CompactionConfigValidationResult.failure(
+            "MSQ: Multi-valued string partition dimension[%s] not supported 
with 'range' partition spec",
+            multiValuedDimension.get()
+        );
+      }
+    }
+    return CompactionConfigValidationResult.success();
+  }
+
   @Override
   public CurrentSubTaskHolder getCurrentSubTaskHolder()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 0b5395d727f..0a54f8550a9 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
@@ -109,11 +110,15 @@ public class MSQCompactionRunnerTest
   );
   private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS = 
ImmutableMap.of(
       COMPACTION_INTERVAL,
-      new DataSchema.Builder()
-          .withDataSource(DATA_SOURCE)
-          .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
-          .withDimensions(new DimensionsSpec(DIMENSIONS))
-          .build()
+      new CombinedDataSchema(
+          DATA_SOURCE,
+          new TimestampSpec(TIMESTAMP_COLUMN, null, null),
+          new DimensionsSpec(DIMENSIONS),
+          null,
+          null,
+          null,
+          ImmutableSet.of(MV_STRING_DIMENSION.getName())
+      )
   );
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
   private static final AggregatorFactory AGG1 = new 
CountAggregatorFactory("agg_0");
@@ -139,6 +144,7 @@ public class MSQCompactionRunnerTest
         null,
         Collections.emptyMap(),
         null,
+        null,
         null
     );
     CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(
@@ -160,6 +166,7 @@ public class MSQCompactionRunnerTest
         null,
         Collections.emptyMap(),
         null,
+        null,
         null
     );
     
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
@@ -173,6 +180,7 @@ public class MSQCompactionRunnerTest
         null,
         Collections.emptyMap(),
         null,
+        null,
         null
     );
     
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
@@ -187,6 +195,7 @@ public class MSQCompactionRunnerTest
         null,
         Collections.emptyMap(),
         null,
+        null,
         null
     );
 
@@ -200,6 +209,29 @@ public class MSQCompactionRunnerTest
     );
   }
 
+  @Test
+  public void testMultiValuedDimensionInRangePartitionsSpecIsInvalid()
+  {
+    List<String> mvStringPartitionDimension = 
Collections.singletonList(MV_STRING_DIMENSION.getName());
+    CompactionTask compactionTask = createCompactionTask(
+        new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null, 
mvStringPartitionDimension, false),
+        null,
+        Collections.emptyMap(),
+        null,
+        null,
+        null
+    );
+
+    CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
+                                                                               
                      INTERVAL_DATASCHEMAS
+    );
+    Assert.assertFalse(validationResult.isValid());
+    Assert.assertEquals(
+        "MSQ: Multi-valued string partition dimension[mv_string_dim] not 
supported with 'range' partition spec",
+        validationResult.getReason()
+    );
+  }
+
   @Test
   public void testMaxTotalRowsIsInvalid()
   {
@@ -208,6 +240,7 @@ public class MSQCompactionRunnerTest
         null,
         Collections.emptyMap(),
         null,
+        null,
         null
     );
     
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
@@ -221,6 +254,7 @@ public class MSQCompactionRunnerTest
         null,
         Collections.emptyMap(),
         null,
+        null,
         null
     );
     
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
@@ -234,6 +268,7 @@ public class MSQCompactionRunnerTest
         null,
         Collections.emptyMap(),
         new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
+        null,
         null
     );
     
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
@@ -247,26 +282,28 @@ public class MSQCompactionRunnerTest
         null,
         Collections.emptyMap(),
         new ClientCompactionTaskGranularitySpec(null, null, false),
+        null,
         AGGREGATORS.toArray(new AggregatorFactory[0])
     );
     
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
   }
 
   @Test
-  public void testRollupTrueWithoutMetricsSpecIsInValid()
+  public void testRollupTrueWithoutMetricsSpecIsInvalid()
   {
     CompactionTask compactionTask = createCompactionTask(
         new DynamicPartitionsSpec(3, null),
         null,
         Collections.emptyMap(),
         new ClientCompactionTaskGranularitySpec(null, null, true),
+        null,
         null
     );
     
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask, 
INTERVAL_DATASCHEMAS).isValid());
   }
 
   @Test
-  public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
+  public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
   {
     // Aggregators having different input and ouput column names are 
unsupported.
     final String inputColName = "added";
@@ -276,6 +313,7 @@ public class MSQCompactionRunnerTest
         null,
         Collections.emptyMap(),
         new ClientCompactionTaskGranularitySpec(null, null, true),
+        null,
         new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName, 
inputColName)}
     );
     CompactionConfigValidationResult validationResult = 
MSQ_COMPACTION_RUNNER.validateCompactionTask(
@@ -292,7 +330,7 @@ public class MSQCompactionRunnerTest
   @Test
   public void testRunCompactionTasksWithEmptyTaskListFails() throws Exception
   {
-    CompactionTask compactionTask = createCompactionTask(null, null, 
Collections.emptyMap(), null, null);
+    CompactionTask compactionTask = createCompactionTask(null, null, 
Collections.emptyMap(), null, null, null);
     TaskStatus taskStatus = 
MSQ_COMPACTION_RUNNER.runCompactionTasks(compactionTask, 
Collections.emptyMap(), null);
     Assert.assertTrue(taskStatus.isFailure());
   }
@@ -307,6 +345,7 @@ public class MSQCompactionRunnerTest
         dimFilter,
         Collections.emptyMap(),
         null,
+        null,
         null
     );
 
@@ -384,6 +423,7 @@ public class MSQCompactionRunnerTest
         dimFilter,
         Collections.emptyMap(),
         null,
+        null,
         null
     );
 
@@ -481,6 +521,7 @@ public class MSQCompactionRunnerTest
       @Nullable DimFilter dimFilter,
       Map<String, Object> contextParams,
       @Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+      @Nullable List<DimensionSchema> dimensionSchemas,
       @Nullable AggregatorFactory[] metricsSpec
   )
   {
@@ -504,6 +545,7 @@ public class MSQCompactionRunnerTest
         ))
         .transformSpec(transformSpec)
         .granularitySpec(granularitySpec)
+        .dimensionsSpec(new DimensionsSpec(dimensionSchemas))
         .metricsSpec(metricsSpec)
         .compactionRunner(MSQ_COMPACTION_RUNNER)
         .context(context);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 4594fc1e9b2..dc2eb75e3cc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -48,6 +48,7 @@ import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexer.Checks;
 import org.apache.druid.indexer.Property;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -80,6 +81,7 @@ import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.indexing.CombinedDataSchema;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -452,6 +454,50 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
     return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
   }
 
+  /**
+   * Checks if multi-valued string dimensions need to be analyzed by 
downloading the segments.
+   * This method returns true only for MSQ engine when either of the following 
holds true:
+   * <ul>
+   * <li> Range partitioning is done on a string dimension or an unknown 
dimension
+   * (since MSQ does not support partitioning on a multi-valued string 
dimension) </li>
+   * <li> Rollup is done on a string dimension or an unknown dimension
+   * (since MSQ requires multi-valued string dimensions to be converted to 
arrays for rollup) </li>
+   * </ul>
+   * @return false for native engine, true for MSQ engine only when 
partitioning or rollup is done on a string
+   * or unknown dimension.
+   */
+  boolean identifyMultiValuedDimensions()
+  {
+    if (compactionRunner instanceof NativeCompactionRunner) {
+      return false;
+    }
+    // Rollup can be true even when granularitySpec is not known since rollup 
is then decided based on segment analysis
+    final boolean isPossiblyRollup = granularitySpec == null || 
!Boolean.FALSE.equals(granularitySpec.isRollup());
+    boolean isRangePartitioned = tuningConfig != null
+                                 && tuningConfig.getPartitionsSpec() 
instanceof DimensionRangePartitionsSpec;
+
+    if (dimensionsSpec == null || dimensionsSpec.getDimensions().isEmpty()) {
+      return isPossiblyRollup || isRangePartitioned;
+    } else {
+      boolean isRollupOnStringDimension = isPossiblyRollup &&
+                                          dimensionsSpec.getDimensions()
+                                                        .stream()
+                                                        .anyMatch(dim -> 
ColumnType.STRING.equals(dim.getColumnType()));
+
+      boolean isPartitionedOnStringDimension =
+          isRangePartitioned &&
+          dimensionsSpec.getDimensions()
+                        .stream()
+                        .anyMatch(
+                            dim -> 
ColumnType.STRING.equals(dim.getColumnType())
+                                   && ((DimensionRangePartitionsSpec) 
tuningConfig.getPartitionsSpec())
+                                       .getPartitionDimensions()
+                                       .contains(dim.getName())
+                        );
+      return isRollupOnStringDimension || isPartitionedOnStringDimension;
+    }
+  }
+
   @Override
   public TaskStatus runTask(TaskToolbox toolbox) throws Exception
   {
@@ -466,7 +512,7 @@ public class CompactionTask extends AbstractBatchIndexTask 
implements PendingSeg
         metricsSpec,
         granularitySpec,
         getMetricBuilder(),
-        !(compactionRunner instanceof NativeCompactionRunner)
+        this.identifyMultiValuedDimensions()
     );
 
     
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
@@ -794,23 +840,25 @@ public class CompactionTask extends 
AbstractBatchIndexTask implements PendingSeg
       this.needMultiValuedDimensions = needMultiValuedDimensions;
     }
 
-    private boolean shouldFetchSegments()
+    /**
+     * Segments are downloaded even when just needMultiValuedDimensions=true 
since MSQ switches to dynamic partitioning
+     * on finding any 'range' partition dimension to be multivalued at 
runtime, which ends up in a mismatch between
+     * the compaction config and the actual segments (lastCompactionState), 
leading to repeated compactions.
+     */
+    private boolean shouldDownloadSegments()
     {
-      // Don't fetch segments for just needMultiValueDimensions
-      return needRollup || needQueryGranularity || needDimensionsSpec || 
needMetricsSpec;
+
+      return needRollup || needQueryGranularity || needDimensionsSpec || 
needMetricsSpec || needMultiValuedDimensions;
     }
 
     public void fetchAndProcessIfNeeded()
     {
-      if (!shouldFetchSegments()) {
+      if (!shouldDownloadSegments()) {
         // Nothing to do; short-circuit and don't fetch segments.
         return;
       }
 
-      if (needMultiValuedDimensions) {
-        multiValuedDimensions = new HashSet<>();
-      }
-
+      multiValuedDimensions = new HashSet<>();
       final List<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>> 
segments = sortSegmentsListNewestFirst();
 
       for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>> 
segmentPair : segments) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 8908d0d50c7..f2a795fec04 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -55,6 +55,7 @@ import org.apache.druid.guice.GuiceInjectableValues;
 import org.apache.druid.guice.GuiceInjectors;
 import org.apache.druid.guice.IndexingServiceTuningConfigModule;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -129,6 +130,7 @@ import 
org.apache.druid.segment.realtime.NoopChatHandlerProvider;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -1533,6 +1535,89 @@ public class CompactionTaskTest
     }
   }
 
+  @Test
+  public void testMSQRollupWithNoDimensionsSpecNeedsMVDInfo()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory
+    );
+    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
+    builder.compactionRunner(new TestMSQCompactionRunner());
+    final CompactionTask compactionTask = builder.build();
+    // granularitySpec=null should assume a possible rollup
+    Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
+  }
+
+  @Test
+  public void testMSQRangePartitionWithNoDimensionsSpecNeedsMVDInfo()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory
+    );
+    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
+    builder.compactionRunner(new TestMSQCompactionRunner());
+    builder.tuningConfig(TuningConfigBuilder.forCompactionTask()
+                                            .withForceGuaranteedRollup(true)
+                                            .withPartitionsSpec(
+                                                new 
DimensionRangePartitionsSpec(
+                                                    3,
+                                                    null,
+                                                    ImmutableList.of(
+                                                        "string_dim_1"),
+                                                    false
+                                                ))
+                                            .build());
+    final CompactionTask compactionTask = builder.build();
+    Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
+  }
+
+  @Test
+  public void testMSQRollupOnStringNeedsMVDInfo()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory
+    );
+    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
+    builder.compactionRunner(new TestMSQCompactionRunner());
+    builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null, 
null, true));
+
+    DimensionSchema stringDim = new StringDimensionSchema("string_dim_1", 
null, null);
+    builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+    final CompactionTask compactionTask = builder.build();
+    // A string dimension with rollup=true should need MVD info
+    Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
+  }
+
+  @Test
+  public void testMSQRangePartitionOnStringNeedsMVDInfo()
+  {
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        segmentCacheManagerFactory
+    );
+    builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL, 
SegmentUtils.hashIds(SEGMENTS)));
+    builder.compactionRunner(new TestMSQCompactionRunner());
+
+    DimensionSchema stringDim = new StringDimensionSchema("string_dim_1", 
null, null);
+    builder.tuningConfig(TuningConfigBuilder.forCompactionTask()
+                                            .withForceGuaranteedRollup(true)
+                                            .withPartitionsSpec(
+                                                new 
DimensionRangePartitionsSpec(
+                                                    3,
+                                                    null,
+                                                    ImmutableList.of(
+                                                        stringDim.getName()),
+                                                    false
+                                                ))
+                                            .build());
+    builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+    CompactionTask compactionTask = builder.build();
+    Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
+  }
+
   @Test
   public void testChooseFinestGranularityWithNulls()
   {
@@ -2015,6 +2100,38 @@ public class CompactionTaskTest
 
   }
 
+  /**
+   * A class to mimic validations with MSQCompactionRunner behaviour, since 
the actual class resides in the MSQ extn.
+   * Since validations just depend on the type of runner, all overrideen 
functions just return null.
+   */
+  private static class TestMSQCompactionRunner implements CompactionRunner
+  {
+    @Override
+    public TaskStatus runCompactionTasks(
+        CompactionTask compactionTask,
+        Map<Interval, DataSchema> intervalDataSchemaMap,
+        TaskToolbox taskToolbox
+    )
+    {
+      return null;
+    }
+
+    @Override
+    public CurrentSubTaskHolder getCurrentSubTaskHolder()
+    {
+      return null;
+    }
+
+    @Override
+    public CompactionConfigValidationResult validateCompactionTask(
+        CompactionTask compactionTask,
+        Map<Interval, DataSchema> intervalToDataSchemaMap
+    )
+    {
+      return null;
+    }
+  }
+
   /**
    * The compaction task spec in 0.16.0 except for the tuningConfig.
    * The original spec accepts only {@link IndexTuningConfig}, but this class 
acceps any type of tuningConfig for
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index f6a009afe1c..532c7197718 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -107,7 +107,7 @@ public class ClientCompactionRunnerInfo
    * Checks if the provided compaction config is supported by MSQ. The 
following configs aren't supported:
    * <ul>
    * <li>partitionsSpec of type HashedParititionsSpec.</li>
-   * <li>'range' partitionsSpec with non-string partition dimensions.</li>
+   * <li>'range' partitionsSpec with multi-valued or non-string partition 
dimensions.</li>
    * <li>maxTotalRows in DynamicPartitionsSpec.</li>
    * <li>Rollup without metricsSpec being specified or vice-versa.</li>
    * <li>Any aggregatorFactory {@code A} s.t. {@code A != 
A.combiningFactory()}.</li>
@@ -138,13 +138,18 @@ public class ClientCompactionRunnerInfo
 
   /**
    * Validate that partitionSpec is either 'dynamic` or 'range'. If 'dynamic', 
ensure 'maxTotalRows' is null. If range
-   * ensure all partition columns are of string type.
+   * ensure all partition columns are of type string.
    */
   public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
       @Nullable PartitionsSpec partitionsSpec,
       @Nullable List<DimensionSchema> dimensionSchemas
   )
   {
+    if (partitionsSpec == null) {
+      return CompactionConfigValidationResult.failure(
+          "MSQ: tuningConfig.partitionsSpec must be specified"
+      );
+    }
     if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
           || partitionsSpec instanceof DynamicPartitionsSpec)) {
       return CompactionConfigValidationResult.failure(
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index b1f06542280..449915a7846 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -95,6 +95,27 @@ public class ClientCompactionRunnerInfoTest
     );
   }
 
+  @Test
+  public void testMSQEngineWithNullPartitionsSpecIsInvalid()
+  {
+    DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
+        null,
+        Collections.emptyMap(),
+        null,
+        null,
+        null
+    );
+    CompactionConfigValidationResult validationResult = 
ClientCompactionRunnerInfo.validateCompactionConfig(
+        compactionConfig,
+        CompactionEngine.NATIVE
+    );
+    Assert.assertFalse(validationResult.isValid());
+    Assert.assertEquals(
+        "MSQ: tuningConfig.partitionsSpec must be specified",
+        validationResult.getReason()
+    );
+  }
+
   @Test
   public void testMSQEngineWithDynamicPartitionsSpecIsValid()
   {
@@ -124,7 +145,7 @@ public class ClientCompactionRunnerInfoTest
   }
 
   @Test
-  public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsValid()
+  public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsInvalid()
   {
     DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
         new DimensionRangePartitionsSpec(100, null, 
ImmutableList.of("partitionDim"), false),
@@ -253,7 +274,7 @@ public class ClientCompactionRunnerInfoTest
       List<DimensionSchema> dimensions
   )
   {
-    final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+    return new DataSourceCompactionConfig(
         "dataSource",
         null,
         500L,
@@ -268,12 +289,11 @@ public class ClientCompactionRunnerInfoTest
         CompactionEngine.MSQ,
         context
     );
-    return config;
   }
 
   private static UserCompactionTaskQueryTuningConfig 
createTuningConfig(PartitionsSpec partitionsSpec)
   {
-    final UserCompactionTaskQueryTuningConfig tuningConfig = new 
UserCompactionTaskQueryTuningConfig(
+    return new UserCompactionTaskQueryTuningConfig(
         40000,
         null,
         2000L,
@@ -302,6 +322,5 @@ public class ClientCompactionRunnerInfoTest
         100,
         2
     );
-    return tuningConfig;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to