This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5da99499926 Fail MSQ compaction if multi-valued partition dimensions
are found (#17344)
5da99499926 is described below
commit 5da99499926395c7c5fe2cc23b56c8e358c04fd9
Author: Vishesh Garg <[email protected]>
AuthorDate: Sat Oct 19 13:33:33 2024 +0530
Fail MSQ compaction if multi-valued partition dimensions are found (#17344)
MSQ currently supports only single-valued string dimensions as partition
keys.
This patch adds a check to ensure that partition keys are single-valued in
case
this info is available by virtue of segment download for schema inference.
During compaction, if MSQ finds multi-valued dimensions (MVDs) declared as
part
of `range` partitionsSpec, it switches partitioning type to dynamic, ending
up in
repeated compactions of the same interval. To avoid this scenario, the
segment
download logic is also updated to always download segments if info on
multi-valued
dimensions is required.
---
.../druid/msq/indexing/MSQCompactionRunner.java | 43 +++++++-
.../msq/indexing/MSQCompactionRunnerTest.java | 58 ++++++++--
.../druid/indexing/common/task/CompactionTask.java | 66 ++++++++++--
.../indexing/common/task/CompactionTaskTest.java | 117 +++++++++++++++++++++
.../indexing/ClientCompactionRunnerInfo.java | 9 +-
.../indexing/ClientCompactionRunnerInfoTest.java | 29 ++++-
6 files changed, 296 insertions(+), 26 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index e20188d5829..d05ab12ea3f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Iterables;
import com.google.inject.Injector;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -84,6 +85,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -130,7 +132,7 @@ public class MSQCompactionRunner implements CompactionRunner
* The following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
- * <li>'range' partitionsSpec with non-string partition dimensions.</li>
+ * <li>'range' partitionsSpec with multi-valued or non-string partition
dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A !=
A.combiningFactory()}.</li>
@@ -153,13 +155,24 @@ public class MSQCompactionRunner implements
CompactionRunner
);
}
List<CompactionConfigValidationResult> validationResults = new
ArrayList<>();
+ DataSchema dataSchema =
Iterables.getOnlyElement(intervalToDataSchemaMap.values());
if (compactionTask.getTuningConfig() != null) {
validationResults.add(
ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec(),
-
Iterables.getOnlyElement(intervalToDataSchemaMap.values()).getDimensionsSpec().getDimensions()
+ dataSchema.getDimensionsSpec().getDimensions()
)
);
+ validationResults.add(
+ validatePartitionDimensionsAreNotMultiValued(
+ compactionTask.getTuningConfig().getPartitionsSpec(),
+ dataSchema.getDimensionsSpec(),
+ dataSchema instanceof CombinedDataSchema
+ ? ((CombinedDataSchema) dataSchema).getMultiValuedDimensions()
+ : null
+ )
+ );
+
}
if (compactionTask.getGranularitySpec() != null) {
validationResults.add(ClientCompactionRunnerInfo.validateRollupForMSQ(
@@ -175,6 +188,32 @@ public class MSQCompactionRunner implements
CompactionRunner
.orElse(CompactionConfigValidationResult.success());
}
+ private CompactionConfigValidationResult
validatePartitionDimensionsAreNotMultiValued(
+ PartitionsSpec partitionsSpec,
+ DimensionsSpec dimensionsSpec,
+ Set<String> multiValuedDimensions
+ )
+ {
+ List<String> dimensionSchemas = dimensionsSpec.getDimensionNames();
+ if (partitionsSpec instanceof DimensionRangePartitionsSpec
+ && dimensionSchemas != null
+ && multiValuedDimensions != null
+ && !multiValuedDimensions.isEmpty()) {
+ Optional<String> multiValuedDimension = ((DimensionRangePartitionsSpec)
partitionsSpec)
+ .getPartitionDimensions()
+ .stream()
+ .filter(multiValuedDimensions::contains)
+ .findAny();
+ if (multiValuedDimension.isPresent()) {
+ return CompactionConfigValidationResult.failure(
+ "MSQ: Multi-valued string partition dimension[%s] not supported
with 'range' partition spec",
+ multiValuedDimension.get()
+ );
+ }
+ }
+ return CompactionConfigValidationResult.success();
+ }
+
@Override
public CurrentSubTaskHolder getCurrentSubTaskHolder()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 0b5395d727f..0a54f8550a9 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskTransformSpec;
@@ -109,11 +110,15 @@ public class MSQCompactionRunnerTest
);
private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS =
ImmutableMap.of(
COMPACTION_INTERVAL,
- new DataSchema.Builder()
- .withDataSource(DATA_SOURCE)
- .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null, null))
- .withDimensions(new DimensionsSpec(DIMENSIONS))
- .build()
+ new CombinedDataSchema(
+ DATA_SOURCE,
+ new TimestampSpec(TIMESTAMP_COLUMN, null, null),
+ new DimensionsSpec(DIMENSIONS),
+ null,
+ null,
+ null,
+ ImmutableSet.of(MV_STRING_DIMENSION.getName())
+ )
);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new
CountAggregatorFactory("agg_0");
@@ -139,6 +144,7 @@ public class MSQCompactionRunnerTest
null,
Collections.emptyMap(),
null,
+ null,
null
);
CompactionConfigValidationResult validationResult =
MSQ_COMPACTION_RUNNER.validateCompactionTask(
@@ -160,6 +166,7 @@ public class MSQCompactionRunnerTest
null,
Collections.emptyMap(),
null,
+ null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
@@ -173,6 +180,7 @@ public class MSQCompactionRunnerTest
null,
Collections.emptyMap(),
null,
+ null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
@@ -187,6 +195,7 @@ public class MSQCompactionRunnerTest
null,
Collections.emptyMap(),
null,
+ null,
null
);
@@ -200,6 +209,29 @@ public class MSQCompactionRunnerTest
);
}
+ @Test
+ public void testMultiValuedDimensionInRangePartitionsSpecIsInvalid()
+ {
+ List<String> mvStringPartitionDimension =
Collections.singletonList(MV_STRING_DIMENSION.getName());
+ CompactionTask compactionTask = createCompactionTask(
+ new DimensionRangePartitionsSpec(TARGET_ROWS_PER_SEGMENT, null,
mvStringPartitionDimension, false),
+ null,
+ Collections.emptyMap(),
+ null,
+ null,
+ null
+ );
+
+ CompactionConfigValidationResult validationResult =
MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
+
INTERVAL_DATASCHEMAS
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: Multi-valued string partition dimension[mv_string_dim] not
supported with 'range' partition spec",
+ validationResult.getReason()
+ );
+ }
+
@Test
public void testMaxTotalRowsIsInvalid()
{
@@ -208,6 +240,7 @@ public class MSQCompactionRunnerTest
null,
Collections.emptyMap(),
null,
+ null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
@@ -221,6 +254,7 @@ public class MSQCompactionRunnerTest
null,
Collections.emptyMap(),
null,
+ null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
@@ -234,6 +268,7 @@ public class MSQCompactionRunnerTest
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, Granularities.ALL, null),
+ null,
null
);
Assert.assertTrue(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
@@ -247,26 +282,28 @@ public class MSQCompactionRunnerTest
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, false),
+ null,
AGGREGATORS.toArray(new AggregatorFactory[0])
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
}
@Test
- public void testRollupTrueWithoutMetricsSpecIsInValid()
+ public void testRollupTrueWithoutMetricsSpecIsInvalid()
{
CompactionTask compactionTask = createCompactionTask(
new DynamicPartitionsSpec(3, null),
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, true),
+ null,
null
);
Assert.assertFalse(MSQ_COMPACTION_RUNNER.validateCompactionTask(compactionTask,
INTERVAL_DATASCHEMAS).isValid());
}
@Test
- public void testMSQEngineWithUnsupportedMetricsSpecIsInValid()
+ public void testMSQEngineWithUnsupportedMetricsSpecIsInvalid()
{
// Aggregators having different input and ouput column names are
unsupported.
final String inputColName = "added";
@@ -276,6 +313,7 @@ public class MSQCompactionRunnerTest
null,
Collections.emptyMap(),
new ClientCompactionTaskGranularitySpec(null, null, true),
+ null,
new AggregatorFactory[]{new LongSumAggregatorFactory(outputColName,
inputColName)}
);
CompactionConfigValidationResult validationResult =
MSQ_COMPACTION_RUNNER.validateCompactionTask(
@@ -292,7 +330,7 @@ public class MSQCompactionRunnerTest
@Test
public void testRunCompactionTasksWithEmptyTaskListFails() throws Exception
{
- CompactionTask compactionTask = createCompactionTask(null, null,
Collections.emptyMap(), null, null);
+ CompactionTask compactionTask = createCompactionTask(null, null,
Collections.emptyMap(), null, null, null);
TaskStatus taskStatus =
MSQ_COMPACTION_RUNNER.runCompactionTasks(compactionTask,
Collections.emptyMap(), null);
Assert.assertTrue(taskStatus.isFailure());
}
@@ -307,6 +345,7 @@ public class MSQCompactionRunnerTest
dimFilter,
Collections.emptyMap(),
null,
+ null,
null
);
@@ -384,6 +423,7 @@ public class MSQCompactionRunnerTest
dimFilter,
Collections.emptyMap(),
null,
+ null,
null
);
@@ -481,6 +521,7 @@ public class MSQCompactionRunnerTest
@Nullable DimFilter dimFilter,
Map<String, Object> contextParams,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+ @Nullable List<DimensionSchema> dimensionSchemas,
@Nullable AggregatorFactory[] metricsSpec
)
{
@@ -504,6 +545,7 @@ public class MSQCompactionRunnerTest
))
.transformSpec(transformSpec)
.granularitySpec(granularitySpec)
+ .dimensionsSpec(new DimensionsSpec(dimensionSchemas))
.metricsSpec(metricsSpec)
.compactionRunner(MSQ_COMPACTION_RUNNER)
.context(context);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 4594fc1e9b2..dc2eb75e3cc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -48,6 +48,7 @@ import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexer.Checks;
import org.apache.druid.indexer.Property;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -80,6 +81,7 @@ import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
@@ -452,6 +454,50 @@ public class CompactionTask extends AbstractBatchIndexTask
implements PendingSeg
return tuningConfig != null && tuningConfig.isForceGuaranteedRollup();
}
+ /**
+ * Checks if multi-valued string dimensions need to be analyzed by
downloading the segments.
+ * This method returns true only for MSQ engine when either of the following
holds true:
+ * <ul>
+ * <li> Range partitioning is done on a string dimension or an unknown
dimension
+ * (since MSQ does not support partitioning on a multi-valued string
dimension) </li>
+ * <li> Rollup is done on a string dimension or an unknown dimension
+ * (since MSQ requires multi-valued string dimensions to be converted to
arrays for rollup) </li>
+ * </ul>
+ * @return false for native engine, true for MSQ engine only when
partitioning or rollup is done on a string
+ * or unknown dimension.
+ */
+ boolean identifyMultiValuedDimensions()
+ {
+ if (compactionRunner instanceof NativeCompactionRunner) {
+ return false;
+ }
+ // Rollup can be true even when granularitySpec is not known since rollup
is then decided based on segment analysis
+ final boolean isPossiblyRollup = granularitySpec == null ||
!Boolean.FALSE.equals(granularitySpec.isRollup());
+ boolean isRangePartitioned = tuningConfig != null
+ && tuningConfig.getPartitionsSpec()
instanceof DimensionRangePartitionsSpec;
+
+ if (dimensionsSpec == null || dimensionsSpec.getDimensions().isEmpty()) {
+ return isPossiblyRollup || isRangePartitioned;
+ } else {
+ boolean isRollupOnStringDimension = isPossiblyRollup &&
+ dimensionsSpec.getDimensions()
+ .stream()
+ .anyMatch(dim ->
ColumnType.STRING.equals(dim.getColumnType()));
+
+ boolean isPartitionedOnStringDimension =
+ isRangePartitioned &&
+ dimensionsSpec.getDimensions()
+ .stream()
+ .anyMatch(
+ dim ->
ColumnType.STRING.equals(dim.getColumnType())
+ && ((DimensionRangePartitionsSpec)
tuningConfig.getPartitionsSpec())
+ .getPartitionDimensions()
+ .contains(dim.getName())
+ );
+ return isRollupOnStringDimension || isPartitionedOnStringDimension;
+ }
+ }
+
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
@@ -466,7 +512,7 @@ public class CompactionTask extends AbstractBatchIndexTask
implements PendingSeg
metricsSpec,
granularitySpec,
getMetricBuilder(),
- !(compactionRunner instanceof NativeCompactionRunner)
+ this.identifyMultiValuedDimensions()
);
registerResourceCloserOnAbnormalExit(compactionRunner.getCurrentSubTaskHolder());
@@ -794,23 +840,25 @@ public class CompactionTask extends
AbstractBatchIndexTask implements PendingSeg
this.needMultiValuedDimensions = needMultiValuedDimensions;
}
- private boolean shouldFetchSegments()
+ /**
+ * Segments are downloaded even when just needMultiValuedDimensions=true
since MSQ switches to dynamic partitioning
+ * on finding any 'range' partition dimension to be multivalued at
runtime, which ends up in a mismatch between
+ * the compaction config and the actual segments (lastCompactionState),
leading to repeated compactions.
+ */
+ private boolean shouldDownloadSegments()
{
- // Don't fetch segments for just needMultiValueDimensions
- return needRollup || needQueryGranularity || needDimensionsSpec ||
needMetricsSpec;
+
+ return needRollup || needQueryGranularity || needDimensionsSpec ||
needMetricsSpec || needMultiValuedDimensions;
}
public void fetchAndProcessIfNeeded()
{
- if (!shouldFetchSegments()) {
+ if (!shouldDownloadSegments()) {
// Nothing to do; short-circuit and don't fetch segments.
return;
}
- if (needMultiValuedDimensions) {
- multiValuedDimensions = new HashSet<>();
- }
-
+ multiValuedDimensions = new HashSet<>();
final List<Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>>
segments = sortSegmentsListNewestFirst();
for (Pair<DataSegment, Supplier<ResourceHolder<QueryableIndex>>>
segmentPair : segments) {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 8908d0d50c7..f2a795fec04 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -55,6 +55,7 @@ import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -129,6 +130,7 @@ import
org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.selector.settable.SettableColumnValueSelector;
import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -1533,6 +1535,89 @@ public class CompactionTaskTest
}
}
+ @Test
+ public void testMSQRollupWithNoDimensionsSpecNeedsMVDInfo()
+ {
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentCacheManagerFactory
+ );
+ builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
+ builder.compactionRunner(new TestMSQCompactionRunner());
+ final CompactionTask compactionTask = builder.build();
+ // granularitySpec=null should assume a possible rollup
+ Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
+ }
+
+ @Test
+ public void testMSQRangePartitionWithNoDimensionsSpecNeedsMVDInfo()
+ {
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentCacheManagerFactory
+ );
+ builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
+ builder.compactionRunner(new TestMSQCompactionRunner());
+ builder.tuningConfig(TuningConfigBuilder.forCompactionTask()
+ .withForceGuaranteedRollup(true)
+ .withPartitionsSpec(
+ new
DimensionRangePartitionsSpec(
+ 3,
+ null,
+ ImmutableList.of(
+ "string_dim_1"),
+ false
+ ))
+ .build());
+ final CompactionTask compactionTask = builder.build();
+ Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
+ }
+
+ @Test
+ public void testMSQRollupOnStringNeedsMVDInfo()
+ {
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentCacheManagerFactory
+ );
+ builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
+ builder.compactionRunner(new TestMSQCompactionRunner());
+ builder.granularitySpec(new ClientCompactionTaskGranularitySpec(null,
null, true));
+
+ DimensionSchema stringDim = new StringDimensionSchema("string_dim_1",
null, null);
+ builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+ final CompactionTask compactionTask = builder.build();
+ // A string dimension with rollup=true should need MVD info
+ Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
+ }
+
+ @Test
+ public void testMSQRangePartitionOnStringNeedsMVDInfo()
+ {
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentCacheManagerFactory
+ );
+ builder.inputSpec(new CompactionIntervalSpec(COMPACTION_INTERVAL,
SegmentUtils.hashIds(SEGMENTS)));
+ builder.compactionRunner(new TestMSQCompactionRunner());
+
+ DimensionSchema stringDim = new StringDimensionSchema("string_dim_1",
null, null);
+ builder.tuningConfig(TuningConfigBuilder.forCompactionTask()
+ .withForceGuaranteedRollup(true)
+ .withPartitionsSpec(
+ new
DimensionRangePartitionsSpec(
+ 3,
+ null,
+ ImmutableList.of(
+ stringDim.getName()),
+ false
+ ))
+ .build());
+ builder.dimensionsSpec(new DimensionsSpec(ImmutableList.of(stringDim)));
+ CompactionTask compactionTask = builder.build();
+ Assert.assertTrue(compactionTask.identifyMultiValuedDimensions());
+ }
+
@Test
public void testChooseFinestGranularityWithNulls()
{
@@ -2015,6 +2100,38 @@ public class CompactionTaskTest
}
+ /**
+ * A class to mimic validations with MSQCompactionRunner behaviour, since
the actual class resides in the MSQ extn.
+ * Since validations just depend on the type of runner, all overrideen
functions just return null.
+ */
+ private static class TestMSQCompactionRunner implements CompactionRunner
+ {
+ @Override
+ public TaskStatus runCompactionTasks(
+ CompactionTask compactionTask,
+ Map<Interval, DataSchema> intervalDataSchemaMap,
+ TaskToolbox taskToolbox
+ )
+ {
+ return null;
+ }
+
+ @Override
+ public CurrentSubTaskHolder getCurrentSubTaskHolder()
+ {
+ return null;
+ }
+
+ @Override
+ public CompactionConfigValidationResult validateCompactionTask(
+ CompactionTask compactionTask,
+ Map<Interval, DataSchema> intervalToDataSchemaMap
+ )
+ {
+ return null;
+ }
+ }
+
/**
* The compaction task spec in 0.16.0 except for the tuningConfig.
* The original spec accepts only {@link IndexTuningConfig}, but this class
acceps any type of tuningConfig for
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index f6a009afe1c..532c7197718 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -107,7 +107,7 @@ public class ClientCompactionRunnerInfo
* Checks if the provided compaction config is supported by MSQ. The
following configs aren't supported:
* <ul>
* <li>partitionsSpec of type HashedParititionsSpec.</li>
- * <li>'range' partitionsSpec with non-string partition dimensions.</li>
+ * <li>'range' partitionsSpec with multi-valued or non-string partition
dimensions.</li>
* <li>maxTotalRows in DynamicPartitionsSpec.</li>
* <li>Rollup without metricsSpec being specified or vice-versa.</li>
* <li>Any aggregatorFactory {@code A} s.t. {@code A !=
A.combiningFactory()}.</li>
@@ -138,13 +138,18 @@ public class ClientCompactionRunnerInfo
/**
* Validate that partitionSpec is either 'dynamic` or 'range'. If 'dynamic',
ensure 'maxTotalRows' is null. If range
- * ensure all partition columns are of string type.
+ * ensure all partition columns are of type string.
*/
public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
@Nullable PartitionsSpec partitionsSpec,
@Nullable List<DimensionSchema> dimensionSchemas
)
{
+ if (partitionsSpec == null) {
+ return CompactionConfigValidationResult.failure(
+ "MSQ: tuningConfig.partitionsSpec must be specified"
+ );
+ }
if (!(partitionsSpec instanceof DimensionRangePartitionsSpec
|| partitionsSpec instanceof DynamicPartitionsSpec)) {
return CompactionConfigValidationResult.failure(
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index b1f06542280..449915a7846 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -95,6 +95,27 @@ public class ClientCompactionRunnerInfoTest
);
}
+ @Test
+ public void testMSQEngineWithNullPartitionsSpecIsInvalid()
+ {
+ DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
+ null,
+ Collections.emptyMap(),
+ null,
+ null,
+ null
+ );
+ CompactionConfigValidationResult validationResult =
ClientCompactionRunnerInfo.validateCompactionConfig(
+ compactionConfig,
+ CompactionEngine.NATIVE
+ );
+ Assert.assertFalse(validationResult.isValid());
+ Assert.assertEquals(
+ "MSQ: tuningConfig.partitionsSpec must be specified",
+ validationResult.getReason()
+ );
+ }
+
@Test
public void testMSQEngineWithDynamicPartitionsSpecIsValid()
{
@@ -124,7 +145,7 @@ public class ClientCompactionRunnerInfoTest
}
@Test
- public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsValid()
+ public void testMSQEngineWithLongDimensionsInRangePartitionsSpecIsInvalid()
{
DataSourceCompactionConfig compactionConfig = createMSQCompactionConfig(
new DimensionRangePartitionsSpec(100, null,
ImmutableList.of("partitionDim"), false),
@@ -253,7 +274,7 @@ public class ClientCompactionRunnerInfoTest
List<DimensionSchema> dimensions
)
{
- final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ return new DataSourceCompactionConfig(
"dataSource",
null,
500L,
@@ -268,12 +289,11 @@ public class ClientCompactionRunnerInfoTest
CompactionEngine.MSQ,
context
);
- return config;
}
private static UserCompactionTaskQueryTuningConfig
createTuningConfig(PartitionsSpec partitionsSpec)
{
- final UserCompactionTaskQueryTuningConfig tuningConfig = new
UserCompactionTaskQueryTuningConfig(
+ return new UserCompactionTaskQueryTuningConfig(
40000,
null,
2000L,
@@ -302,6 +322,5 @@ public class ClientCompactionRunnerInfoTest
100,
2
);
- return tuningConfig;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]