This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e37fe93f093 Add support for a custom `DimensionSchema` in
`DataSourceMSQDestination` (#16864)
e37fe93f093 is described below
commit e37fe93f093a2d2991d39ff3245fc1d07f1e85d4
Author: Vishesh Garg <[email protected]>
AuthorDate: Fri Aug 16 15:24:49 2024 +0530
Add support for a custom `DimensionSchema` in `DataSourceMSQDestination`
(#16864)
This PR adds support for passing in a custom DimensionSchema map to MSQ
query destination of type DataSourceMSQDestination
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 49 +++++++++++------
.../druid/msq/indexing/MSQCompactionRunner.java | 10 +++-
.../org/apache/druid/msq/indexing/MSQSpec.java | 3 +-
.../destination/DataSourceMSQDestination.java | 26 +++++++--
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 3 +-
.../druid/msq/exec/MSQParseExceptionsTest.java | 4 +-
.../msq/indexing/MSQCompactionRunnerTest.java | 17 +++---
.../druid/msq/indexing/MSQControllerTaskTest.java | 3 +-
.../destination/DataSourceMSQDestinationTest.java | 26 ++++++++-
.../sql/resources/SqlStatementResourceTest.java | 1 +
.../msq/util/SqlStatementResourceHelperTest.java | 1 +
.../coordinator/duty/ITAutoCompactionTest.java | 62 ++++++++++++++++++++++
12 files changed, 167 insertions(+), 38 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 839839db4e4..278a85685dd 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1950,7 +1950,8 @@ public class ControllerImpl implements Controller
destination.getSegmentSortOrder(),
columnMappings,
isRollupQuery,
- querySpec.getQuery()
+ querySpec.getQuery(),
+ destination.getDimensionToSchemaMap()
);
return new DataSchema(
@@ -2122,13 +2123,34 @@ public class ControllerImpl implements Controller
return new StringTuple(array);
}
+ private static DimensionSchema getDimensionSchema(
+ final String outputColumnName,
+ @Nullable final ColumnType queryType,
+ QueryContext context,
+ @Nullable Map<String, DimensionSchema> dimensionToSchemaMap
+ )
+ {
+ if (dimensionToSchemaMap != null &&
dimensionToSchemaMap.containsKey(outputColumnName)) {
+ return dimensionToSchemaMap.get(outputColumnName);
+ }
+ // In case of ingestion, or when metrics are converted to dimensions when
compaction is performed without rollup,
+ // we won't have an entry in the map. For those cases, use the default
config.
+ return DimensionSchemaUtils.createDimensionSchema(
+ outputColumnName,
+ queryType,
+ MultiStageQueryContext.useAutoColumnSchemas(context),
+ MultiStageQueryContext.getArrayIngestMode(context)
+ );
+ }
+
private static Pair<List<DimensionSchema>, List<AggregatorFactory>>
makeDimensionsAndAggregatorsForIngestion(
final RowSignature querySignature,
final ClusterBy queryClusterBy,
final List<String> segmentSortOrder,
final ColumnMappings columnMappings,
final boolean isRollupQuery,
- final Query<?> query
+ final Query<?> query,
+ @Nullable final Map<String, DimensionSchema> dimensionToSchemaMap
)
{
// Log a warning unconditionally if arrayIngestMode is MVD, since the
behaviour is incorrect, and is subject to
@@ -2214,18 +2236,14 @@ public class ControllerImpl implements Controller
outputColumnAggregatorFactories,
outputColumnName,
type,
- query.context()
+ query.context(),
+ dimensionToSchemaMap
);
} else {
// complex columns only
if
(DimensionHandlerUtils.DIMENSION_HANDLER_PROVIDERS.containsKey(type.getComplexTypeName()))
{
dimensions.add(
- DimensionSchemaUtils.createDimensionSchema(
- outputColumnName,
- type,
-
MultiStageQueryContext.useAutoColumnSchemas(query.context()),
- MultiStageQueryContext.getArrayIngestMode(query.context())
- )
+ getDimensionSchema(outputColumnName, type, query.context(),
dimensionToSchemaMap)
);
} else if (!isRollupQuery) {
aggregators.add(new PassthroughAggregatorFactory(outputColumnName,
type.getComplexTypeName()));
@@ -2236,7 +2254,8 @@ public class ControllerImpl implements Controller
outputColumnAggregatorFactories,
outputColumnName,
type,
- query.context()
+ query.context(),
+ dimensionToSchemaMap
);
}
}
@@ -2263,19 +2282,15 @@ public class ControllerImpl implements Controller
Map<String, AggregatorFactory> outputColumnAggregatorFactories,
String outputColumn,
ColumnType type,
- QueryContext context
+ QueryContext context,
+ Map<String, DimensionSchema> dimensionToSchemaMap
)
{
if (outputColumnAggregatorFactories.containsKey(outputColumn)) {
aggregators.add(outputColumnAggregatorFactories.get(outputColumn));
} else {
dimensions.add(
- DimensionSchemaUtils.createDimensionSchema(
- outputColumn,
- type,
- MultiStageQueryContext.useAutoColumnSchemas(context),
- MultiStageQueryContext.getArrayIngestMode(context)
- )
+ getDimensionSchema(outputColumn, type, context, dimensionToSchemaMap)
);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index efc2cbb2afb..3abf4b59c7d 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -81,6 +81,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.Function;
import java.util.stream.Collectors;
public class MSQCompactionRunner implements CompactionRunner
@@ -237,7 +238,11 @@ public class MSQCompactionRunner implements
CompactionRunner
dataSchema.getDataSource(),
dataSchema.getGranularitySpec().getSegmentGranularity(),
null,
- ImmutableList.of(replaceInterval)
+ ImmutableList.of(replaceInterval),
+ dataSchema.getDimensionsSpec()
+ .getDimensions()
+ .stream()
+ .collect(Collectors.toMap(DimensionSchema::getName,
Function.identity()))
);
}
@@ -494,9 +499,10 @@ public class MSQCompactionRunner implements
CompactionRunner
// Used for writing the data schema during segment generation phase.
context.putIfAbsent(MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS,
false);
// Add appropriate finalization to native query context i.e. for the
GroupBy query
- context.put(QueryContexts.FINALIZE_KEY, false);
+ context.putIfAbsent(QueryContexts.FINALIZE_KEY, false);
// Only scalar or array-type dimensions are allowed as grouping keys.
context.putIfAbsent(GroupByQueryConfig.CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING,
false);
+ context.putIfAbsent(MultiStageQueryContext.CTX_ARRAY_INGEST_MODE, "array");
return context;
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQSpec.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQSpec.java
index 065471d32ba..4bb4e32754f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQSpec.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQSpec.java
@@ -28,7 +28,6 @@ import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.query.Query;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
-import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
@@ -43,7 +42,7 @@ public class MSQSpec
@JsonCreator
public MSQSpec(
@JsonProperty("query") Query<?> query,
- @JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings,
+ @JsonProperty("columnMappings") ColumnMappings columnMappings,
@JsonProperty("destination") MSQDestination destination,
@JsonProperty("assignmentStrategy") WorkerAssignmentStrategy
assignmentStrategy,
@JsonProperty("tuningConfig") MSQTuningConfig tuningConfig
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
index ea3072bfe45..74be1329467 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestination.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -35,6 +36,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -49,18 +51,23 @@ public class DataSourceMSQDestination implements
MSQDestination
@Nullable
private final List<Interval> replaceTimeChunks;
+ @Nullable
+ private final Map<String, DimensionSchema> dimensionToSchemaMap;
+
@JsonCreator
public DataSourceMSQDestination(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segmentGranularity") Granularity segmentGranularity,
@JsonProperty("segmentSortOrder") @Nullable List<String>
segmentSortOrder,
- @JsonProperty("replaceTimeChunks") @Nullable List<Interval>
replaceTimeChunks
+ @JsonProperty("replaceTimeChunks") @Nullable List<Interval>
replaceTimeChunks,
+ @JsonProperty("dimensionToSchemaMap") @Nullable Map<String,
DimensionSchema> dimensionToSchemaMap
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.segmentGranularity = Preconditions.checkNotNull(segmentGranularity,
"segmentGranularity");
this.segmentSortOrder = segmentSortOrder != null ? segmentSortOrder :
Collections.emptyList();
this.replaceTimeChunks = replaceTimeChunks;
+ this.dimensionToSchemaMap = dimensionToSchemaMap;
if (replaceTimeChunks != null) {
// Verify that if replaceTimeChunks is provided, it is nonempty.
@@ -125,6 +132,17 @@ public class DataSourceMSQDestination implements
MSQDestination
return replaceTimeChunks;
}
+ /**
+ * Returns the map of dimension name to its schema.
+ */
+ @Nullable
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public Map<String, DimensionSchema> getDimensionToSchemaMap()
+ {
+ return dimensionToSchemaMap;
+ }
+
/**
* Whether this object is in replace-existing-time-chunks mode.
*/
@@ -158,13 +176,14 @@ public class DataSourceMSQDestination implements
MSQDestination
return Objects.equals(dataSource, that.dataSource)
&& Objects.equals(segmentGranularity, that.segmentGranularity)
&& Objects.equals(segmentSortOrder, that.segmentSortOrder)
- && Objects.equals(replaceTimeChunks, that.replaceTimeChunks);
+ && Objects.equals(replaceTimeChunks, that.replaceTimeChunks)
+ && Objects.equals(dimensionToSchemaMap, that.dimensionToSchemaMap);
}
@Override
public int hashCode()
{
- return Objects.hash(dataSource, segmentGranularity, segmentSortOrder,
replaceTimeChunks);
+ return Objects.hash(dataSource, segmentGranularity, segmentSortOrder,
replaceTimeChunks, dimensionToSchemaMap);
}
@Override
@@ -175,6 +194,7 @@ public class DataSourceMSQDestination implements
MSQDestination
", segmentGranularity=" + segmentGranularity +
", segmentSortOrder=" + segmentSortOrder +
", replaceTimeChunks=" + replaceTimeChunks +
+ ", dimensionToSchemaMap=" + dimensionToSchemaMap +
'}';
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index c6396c0b306..7af34a1eb55 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -246,7 +246,8 @@ public class MSQTaskQueryMaker implements QueryMaker
targetDataSource.getDestinationName(),
segmentGranularityObject,
segmentSortOrder,
- replaceTimeChunks
+ replaceTimeChunks,
+ null
);
MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext,
dataSourceMSQDestination.isReplaceTimeChunks());
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
index 330f1cdbbe6..bc8d517ffba 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQParseExceptionsTest.java
@@ -225,7 +225,7 @@ public class MSQParseExceptionsTest extends MSQTestBase
new ColumnMapping("v1", "agent_category")
)
))
- .destination(new DataSourceMSQDestination("foo1",
Granularities.ALL, null, null))
+ .destination(new DataSourceMSQDestination("foo1",
Granularities.ALL, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(DEFAULT_MSQ_CONTEXT)
@@ -318,7 +318,7 @@ public class MSQParseExceptionsTest extends MSQTestBase
new ColumnMapping("agent_category", "agent_category")
)
))
- .destination(new DataSourceMSQDestination("foo1",
Granularities.ALL, null, null))
+ .destination(new DataSourceMSQDestination("foo1",
Granularities.ALL, null, null, null))
.tuningConfig(MSQTuningConfig.defaultConfig())
.build())
.setQueryContext(runtimeContext)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index d868ddf20e5..9c4bb91637c 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -74,6 +74,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import java.util.stream.Collectors;
public class MSQCompactionRunnerTest
@@ -87,13 +88,9 @@ public class MSQCompactionRunnerTest
private static final GranularityType QUERY_GRANULARITY =
GranularityType.HOUR;
private static List<String> PARTITION_DIMENSIONS;
- private static final StringDimensionSchema DIM1 = new StringDimensionSchema(
- "string_dim",
- null,
- null
- );
- private static final LongDimensionSchema LONG_DIMENSION_SCHEMA = new
LongDimensionSchema("long_dim");
- private static final List<DimensionSchema> DIMENSIONS =
ImmutableList.of(DIM1, LONG_DIMENSION_SCHEMA);
+ private static final StringDimensionSchema STRING_DIMENSION = new
StringDimensionSchema("string_dim", null, null);
+ private static final LongDimensionSchema LONG_DIMENSION = new
LongDimensionSchema("long_dim");
+ private static final List<DimensionSchema> DIMENSIONS =
ImmutableList.of(STRING_DIMENSION, LONG_DIMENSION);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new
CountAggregatorFactory("agg_0");
private static final AggregatorFactory AGG2 = new
LongSumAggregatorFactory("sum_added", "sum_added");
@@ -291,7 +288,8 @@ public class MSQCompactionRunnerTest
DATA_SOURCE,
SEGMENT_GRANULARITY.getDefaultGranularity(),
null,
- Collections.singletonList(COMPACTION_INTERVAL)
+ Collections.singletonList(COMPACTION_INTERVAL),
+
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName,
Function.identity()))
),
actualMSQSpec.getDestination()
);
@@ -360,7 +358,8 @@ public class MSQCompactionRunnerTest
DATA_SOURCE,
SEGMENT_GRANULARITY.getDefaultGranularity(),
null,
- Collections.singletonList(COMPACTION_INTERVAL)
+ Collections.singletonList(COMPACTION_INTERVAL),
+
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName,
Function.identity()))
),
actualMSQSpec.getDestination()
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
index 9df6c38f30e..e969e209387 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java
@@ -58,7 +58,8 @@ public class MSQControllerTaskTest
"target",
Granularities.DAY,
null,
- INTERVALS
+ INTERVALS,
+ null
))
.query(new Druids.ScanQueryBuilder()
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java
index 6d3a5ebfd9b..242c00213e2 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/destination/DataSourceMSQDestinationTest.java
@@ -20,9 +20,14 @@
package org.apache.druid.msq.indexing.destination;
+import com.google.common.collect.ImmutableMap;
import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.junit.Test;
+import java.util.Map;
+
public class DataSourceMSQDestinationTest
{
@@ -30,7 +35,26 @@ public class DataSourceMSQDestinationTest
public void testEquals()
{
EqualsVerifier.forClass(DataSourceMSQDestination.class)
- .withNonnullFields("dataSource", "segmentGranularity",
"segmentSortOrder")
+ .withNonnullFields("dataSource", "segmentGranularity",
"segmentSortOrder", "dimensionToSchemaMap")
+ .withPrefabValues(
+ Map.class,
+ ImmutableMap.of(
+ "language",
+ new StringDimensionSchema(
+ "language",
+ DimensionSchema.MultiValueHandling.SORTED_ARRAY,
+ false
+ )
+ ),
+ ImmutableMap.of(
+ "region",
+ new StringDimensionSchema(
+ "region",
+ DimensionSchema.MultiValueHandling.SORTED_ARRAY,
+ false
+ )
+ )
+ )
.usingGetClass()
.verify();
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
index 4ea2993050e..2a753e21d16 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/sql/resources/SqlStatementResourceTest.java
@@ -199,6 +199,7 @@ public class SqlStatementResourceTest extends MSQTestBase
"test",
Granularities.DAY,
null,
+ null,
null
))
.tuningConfig(
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
index 1966d1e5b10..58856adf366 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/util/SqlStatementResourceHelperTest.java
@@ -375,6 +375,7 @@ public class SqlStatementResourceHelperTest
"test",
Granularities.DAY,
null,
+ null,
null
)
);
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 24ede82d0de..31a6bccffc7 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -25,7 +25,9 @@ import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
@@ -52,6 +54,8 @@ import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggrega
import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.AutoTypeColumnSchema;
+import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -515,6 +519,42 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
}
}
+ @Test(dataProvider = "engine")
+ public void
testAutoCompactionPreservesCreateBitmapIndexInDimensionSchema(CompactionEngine
engine) throws Exception
+ {
+ loadData(INDEX_TASK);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 4 segments across 2 days (4 total)...
+ verifySegmentsCount(4);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+
+ LOG.info("Auto compaction test with YEAR segment granularity,
dropExisting is true");
+ Granularity newSegmentGranularity = Granularities.YEAR;
+
+ List<DimensionSchema> dimensionSchemas = ImmutableList.of(
+ new StringDimensionSchema("language",
DimensionSchema.MultiValueHandling.SORTED_ARRAY, false),
+ new AutoTypeColumnSchema("deleted", ColumnType.DOUBLE)
+ );
+
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ new UserCompactionTaskGranularityConfig(newSegmentGranularity, null,
true),
+ new UserCompactionTaskDimensionsConfig(dimensionSchemas),
+ null,
+ new AggregatorFactory[] {new LongSumAggregatorFactory("added",
"added")},
+ true,
+ engine
+ );
+ //...compacted into 1 segment for the entire year.
+ forceTriggerAutoCompaction(1);
+ verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
+ verifySegmentsCompactedDimensionSchema(dimensionSchemas);
+ }
+ }
+
@Test
public void testAutoCompactionDutySubmitAndVerifyCompaction() throws
Exception
{
@@ -1941,6 +1981,28 @@ public class ITAutoCompactionTest extends
AbstractIndexerTest
Assert.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec());
Assert.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec(),
partitionsSpec);
}
+
+ }
+
+ private void verifySegmentsCompactedDimensionSchema(List<DimensionSchema>
dimensionSchemas)
+ {
+ List<DataSegment> segments =
coordinator.getFullSegmentsMetadata(fullDatasourceName);
+ List<DataSegment> foundCompactedSegments = new ArrayList<>();
+ for (DataSegment segment : segments) {
+ if (segment.getLastCompactionState() != null) {
+ foundCompactedSegments.add(segment);
+ }
+ }
+ for (DataSegment compactedSegment : foundCompactedSegments) {
+ MatcherAssert.assertThat(
+ dimensionSchemas,
+ Matchers.containsInAnyOrder(
+ compactedSegment.getLastCompactionState()
+ .getDimensionsSpec()
+ .getDimensions()
+ .toArray(new DimensionSchema[0]))
+ );
+ }
}
private void updateCompactionTaskSlot(double compactionTaskSlotRatio, int
maxCompactionTaskSlots, Boolean useAutoScaleSlots) throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]