This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5333c53d710 Support non time order in MSQ compaction (#17318)
5333c53d710 is described below
commit 5333c53d710f0238b5314b7508d6a7877233a33b
Author: Vishesh Garg <[email protected]>
AuthorDate: Wed Nov 27 13:26:10 2024 +0530
Support non time order in MSQ compaction (#17318)
This patch supports sorting segments by non-time columns (added in #16849)
to MSQ compaction.
Specifically, if `forceSegmentSortByTime` is set in the data schema, either
via the user-supplied
compaction config or in the inferred schema, the following steps are taken:
- Skip adding `__time` explicitly as the first column to the dimension
schema since it already comes
as part of the schema
- Ensure column mappings propagate `__time` in the order specified by the
schema
- Set `forceSegmentSortByTime` in the MSQ context.
---
.../druid/msq/indexing/MSQCompactionRunner.java | 62 +++++--
.../msq/indexing/MSQCompactionRunnerTest.java | 205 ++++++++++++++-------
2 files changed, 176 insertions(+), 91 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index d05ab12ea3f..2d8a510fd98 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -348,7 +348,10 @@ public class MSQCompactionRunner implements
CompactionRunner
private static RowSignature getRowSignature(DataSchema dataSchema)
{
RowSignature.Builder rowSignatureBuilder = RowSignature.builder();
-
rowSignatureBuilder.add(dataSchema.getTimestampSpec().getTimestampColumn(),
ColumnType.LONG);
+ if (dataSchema.getDimensionsSpec().isForceSegmentSortByTime() == true) {
+ // If sort not forced by time, __time appears as part of dimensions in
DimensionsSpec
+
rowSignatureBuilder.add(dataSchema.getTimestampSpec().getTimestampColumn(),
ColumnType.LONG);
+ }
if (!isQueryGranularityEmptyOrNone(dataSchema)) {
// A virtual column for query granularity would have been added. Add
corresponding column type.
rowSignatureBuilder.add(TIME_VIRTUAL_COLUMN, ColumnType.LONG);
@@ -398,25 +401,31 @@ public class MSQCompactionRunner implements
CompactionRunner
private static ColumnMappings getColumnMappings(DataSchema dataSchema)
{
- List<ColumnMapping> columnMappings = dataSchema.getDimensionsSpec()
- .getDimensions()
- .stream()
- .map(dim -> new
ColumnMapping(
- dim.getName(),
dim.getName()))
-
.collect(Collectors.toList());
+ List<ColumnMapping> columnMappings = new ArrayList<>();
+ // For scan queries, a virtual column is created from __time if a custom
query granularity is provided. For
+ // group-by queries, as insert needs __time, it will always be one of the
dimensions. Since dimensions in groupby
+ // aren't allowed to have time column as the output name, we map time
dimension to TIME_VIRTUAL_COLUMN in
+ // dimensions, and map it back to the time column here.
+ String timeColumn = (isGroupBy(dataSchema) ||
!isQueryGranularityEmptyOrNone(dataSchema))
+ ? TIME_VIRTUAL_COLUMN
+ : ColumnHolder.TIME_COLUMN_NAME;
+ ColumnMapping timeColumnMapping = new ColumnMapping(timeColumn,
ColumnHolder.TIME_COLUMN_NAME);
+ if (dataSchema.getDimensionsSpec().isForceSegmentSortByTime()) {
+ // When not sorted by time, the __time column is missing from
dimensionsSpec
+ columnMappings.add(timeColumnMapping);
+ }
+ columnMappings.addAll(
+ dataSchema.getDimensionsSpec()
+ .getDimensions()
+ .stream()
+ .map(dim ->
dim.getName().equals(ColumnHolder.TIME_COLUMN_NAME)
+ ? timeColumnMapping
+ : new ColumnMapping(dim.getName(),
dim.getName()))
+ .collect(Collectors.toList())
+ );
columnMappings.addAll(Arrays.stream(dataSchema.getAggregators())
.map(agg -> new ColumnMapping(agg.getName(),
agg.getName()))
- .collect(
- Collectors.toList()));
- if (isGroupBy(dataSchema) || !isQueryGranularityEmptyOrNone(dataSchema)) {
- // For scan queries, a virtual column is created from __time if a custom
query granularity is provided. For
- // group-by queries, as insert needs __time, it will always be one of
the dimensions. Since dimensions in groupby
- // aren't allowed to have time column as the output name, we map time
dimension to TIME_VIRTUAL_COLUMN in
- // dimensions, and map it back to the time column here.
- columnMappings.add(new ColumnMapping(TIME_VIRTUAL_COLUMN,
ColumnHolder.TIME_COLUMN_NAME));
- } else {
- columnMappings.add(new ColumnMapping(ColumnHolder.TIME_COLUMN_NAME,
ColumnHolder.TIME_COLUMN_NAME));
- }
+ .collect(Collectors.toList()));
return new ColumnMappings(columnMappings);
}
@@ -431,6 +440,19 @@ public class MSQCompactionRunner implements
CompactionRunner
return Collections.emptyList();
}
+ private static Map<String, Object> buildQueryContext(
+ Map<String, Object> taskContext,
+ DataSchema dataSchema
+ )
+ {
+ if (dataSchema.getDimensionsSpec().isForceSegmentSortByTime()) {
+ return taskContext;
+ }
+ Map<String, Object> queryContext = new HashMap<>(taskContext);
+ queryContext.put(MultiStageQueryContext.CTX_FORCE_TIME_SORT, false);
+ return queryContext;
+ }
+
private static Query<?> buildScanQuery(
CompactionTask compactionTask,
Interval interval,
@@ -447,7 +469,7 @@ public class MSQCompactionRunner implements CompactionRunner
.columnTypes(rowSignature.getColumnTypes())
.intervals(new
MultipleIntervalSegmentSpec(Collections.singletonList(interval)))
.filters(dataSchema.getTransformSpec().getFilter())
- .context(compactionTask.getContext());
+ .context(buildQueryContext(compactionTask.getContext(), dataSchema));
if (compactionTask.getTuningConfig() != null &&
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
List<OrderByColumnSpec> orderByColumnSpecs =
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
@@ -599,7 +621,7 @@ public class MSQCompactionRunner implements CompactionRunner
.setDimensions(getAggregateDimensions(dataSchema,
inputColToVirtualCol))
.setAggregatorSpecs(Arrays.asList(dataSchema.getAggregators()))
.setPostAggregatorSpecs(postAggregators)
- .setContext(compactionTask.getContext())
+ .setContext(buildQueryContext(compactionTask.getContext(), dataSchema))
.setInterval(interval);
if (compactionTask.getTuningConfig() != null &&
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index 0a54f8550a9..a05ccd499d0 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -60,7 +60,9 @@ import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.NestedDataColumnSchema;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.data.CompressionFactory;
@@ -72,14 +74,13 @@ import
org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
-import org.hamcrest.MatcherAssert;
-import org.hamcrest.Matchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -103,10 +104,14 @@ public class MSQCompactionRunnerTest
private static final StringDimensionSchema STRING_DIMENSION = new
StringDimensionSchema("string_dim", null, false);
private static final StringDimensionSchema MV_STRING_DIMENSION = new
StringDimensionSchema("mv_string_dim", null, null);
private static final LongDimensionSchema LONG_DIMENSION = new
LongDimensionSchema("long_dim");
+ private static final NestedDataColumnSchema NESTED_DIMENSION = new
NestedDataColumnSchema("nested_dim", 4);
+ private static final AutoTypeColumnSchema AUTO_DIMENSION = new
AutoTypeColumnSchema("auto_dim", null);
private static final List<DimensionSchema> DIMENSIONS = ImmutableList.of(
STRING_DIMENSION,
LONG_DIMENSION,
- MV_STRING_DIMENSION
+ MV_STRING_DIMENSION,
+ NESTED_DIMENSION,
+ AUTO_DIMENSION
);
private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS =
ImmutableMap.of(
COMPACTION_INTERVAL,
@@ -336,7 +341,7 @@ public class MSQCompactionRunnerTest
}
@Test
- public void testMSQControllerTaskSpecWithScanIsValid() throws
JsonProcessingException
+ public void testCompactionConfigWithoutMetricsSpecProducesCorrectSpec()
throws JsonProcessingException
{
DimFilter dimFilter = new SelectorDimFilter("dim1", "foo", null);
@@ -357,7 +362,7 @@ public class MSQCompactionRunnerTest
.withGranularity(
new UniformGranularitySpec(
SEGMENT_GRANULARITY.getDefaultGranularity(),
- null,
+ QUERY_GRANULARITY.getDefaultGranularity(),
false,
Collections.singletonList(COMPACTION_INTERVAL)
)
@@ -375,37 +380,37 @@ public class MSQCompactionRunnerTest
MSQSpec actualMSQSpec = msqControllerTask.getQuerySpec();
- Assert.assertEquals(
- new MSQTuningConfig(
- 1,
- MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
- MAX_ROWS_PER_SEGMENT,
- null,
- createIndexSpec()
- ),
- actualMSQSpec.getTuningConfig()
- );
- Assert.assertEquals(
- new DataSourceMSQDestination(
- DATA_SOURCE,
- SEGMENT_GRANULARITY.getDefaultGranularity(),
- null,
- Collections.singletonList(COMPACTION_INTERVAL),
-
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName,
Function.identity())),
- null
- ),
- actualMSQSpec.getDestination()
- );
+ Assert.assertEquals(getExpectedTuningConfig(),
actualMSQSpec.getTuningConfig());
+ Assert.assertEquals(getExpectedDestination(),
actualMSQSpec.getDestination());
Assert.assertTrue(actualMSQSpec.getQuery() instanceof ScanQuery);
ScanQuery scanQuery = (ScanQuery) actualMSQSpec.getQuery();
+ List<String> expectedColumns = new ArrayList<>();
+ List<ColumnType> expectedColumnTypes = new ArrayList<>();
+ // Add __time since this is a time-ordered query which doesn't have __time
explicitly defined in dimensionsSpec
+ expectedColumns.add(ColumnHolder.TIME_COLUMN_NAME);
+ expectedColumnTypes.add(ColumnType.LONG);
+
+ // Add TIME_VIRTUAL_COLUMN since a query granularity is specified
+ expectedColumns.add(MSQCompactionRunner.TIME_VIRTUAL_COLUMN);
+ expectedColumnTypes.add(ColumnType.LONG);
+
+
expectedColumns.addAll(DIMENSIONS.stream().map(DimensionSchema::getName).collect(Collectors.toList()));
+
expectedColumnTypes.addAll(DIMENSIONS.stream().map(DimensionSchema::getColumnType).collect(Collectors.toList()));
+
+ Assert.assertEquals(expectedColumns, scanQuery.getColumns());
+ Assert.assertEquals(expectedColumnTypes, scanQuery.getColumnTypes());
+
Assert.assertEquals(dimFilter, scanQuery.getFilter());
Assert.assertEquals(
JSON_MAPPER.writeValueAsString(SEGMENT_GRANULARITY.toString()),
msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_SEGMENT_GRANULARITY)
);
-
Assert.assertNull(msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY));
+ Assert.assertEquals(
+ JSON_MAPPER.writeValueAsString(QUERY_GRANULARITY.toString()),
+
msqControllerTask.getContext().get(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY)
+ );
Assert.assertEquals(WorkerAssignmentStrategy.MAX,
actualMSQSpec.getAssignmentStrategy());
Assert.assertEquals(
PARTITION_DIMENSIONS.stream().map(OrderBy::ascending).collect(Collectors.toList()),
@@ -414,7 +419,60 @@ public class MSQCompactionRunnerTest
}
@Test
- public void testMSQControllerTaskSpecWithAggregatorsIsValid() throws
JsonProcessingException
+ public void
testCompactionConfigWithSortOnNonTimeDimensionsProducesCorrectSpec() throws
JsonProcessingException
+ {
+ List<DimensionSchema> nonTimeSortedDimensions = ImmutableList.of(
+ STRING_DIMENSION,
+ new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME),
+ LONG_DIMENSION
+ );
+ CompactionTask taskCreatedWithTransformSpec = createCompactionTask(
+ new DynamicPartitionsSpec(TARGET_ROWS_PER_SEGMENT, null),
+ null,
+ Collections.emptyMap(),
+ null,
+ null
+ );
+
+ // Set forceSegmentSortByTime=false to enable non-time order
+ DimensionsSpec dimensionsSpec = DimensionsSpec.builder()
+
.setDimensions(nonTimeSortedDimensions)
+
.setForceSegmentSortByTime(false)
+ .build();
+ DataSchema dataSchema =
+ DataSchema.builder()
+ .withDataSource(DATA_SOURCE)
+ .withTimestamp(new TimestampSpec(TIMESTAMP_COLUMN, null,
null))
+ .withDimensions(dimensionsSpec)
+ .withGranularity(
+ new UniformGranularitySpec(
+ SEGMENT_GRANULARITY.getDefaultGranularity(),
+ null,
+ false,
+ Collections.singletonList(COMPACTION_INTERVAL)
+ )
+ )
+ .build();
+
+ List<MSQControllerTask> msqControllerTasks =
MSQ_COMPACTION_RUNNER.createMsqControllerTasks(
+ taskCreatedWithTransformSpec,
+ Collections.singletonMap(COMPACTION_INTERVAL, dataSchema)
+ );
+
+ MSQSpec actualMSQSpec =
Iterables.getOnlyElement(msqControllerTasks).getQuerySpec();
+
+ Assert.assertTrue(actualMSQSpec.getQuery() instanceof ScanQuery);
+ ScanQuery scanQuery = (ScanQuery) actualMSQSpec.getQuery();
+
+ // Dimensions should already list __time and the order should remain intact
+ Assert.assertEquals(
+
nonTimeSortedDimensions.stream().map(DimensionSchema::getName).collect(Collectors.toList()),
+ scanQuery.getColumns()
+ );
+ }
+
+ @Test
+ public void testCompactionConfigWithMetricsSpecProducesCorrectSpec() throws
JsonProcessingException
{
DimFilter dimFilter = new SelectorDimFilter("dim1", "foo", null);
@@ -444,7 +502,6 @@ public class MSQCompactionRunnerTest
multiValuedDimensions
);
-
List<MSQControllerTask> msqControllerTasks =
MSQ_COMPACTION_RUNNER.createMsqControllerTasks(
taskCreatedWithTransformSpec,
Collections.singletonMap(COMPACTION_INTERVAL, dataSchema)
@@ -454,27 +511,8 @@ public class MSQCompactionRunnerTest
MSQSpec actualMSQSpec = msqControllerTask.getQuerySpec();
- Assert.assertEquals(
- new MSQTuningConfig(
- 1,
- MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
- MAX_ROWS_PER_SEGMENT,
- null,
- createIndexSpec()
- ),
- actualMSQSpec.getTuningConfig()
- );
- Assert.assertEquals(
- new DataSourceMSQDestination(
- DATA_SOURCE,
- SEGMENT_GRANULARITY.getDefaultGranularity(),
- null,
- Collections.singletonList(COMPACTION_INTERVAL),
-
DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName,
Function.identity())),
- null
- ),
- actualMSQSpec.getDestination()
- );
+ Assert.assertEquals(getExpectedTuningConfig(),
actualMSQSpec.getTuningConfig());
+ Assert.assertEquals(getExpectedDestination(),
actualMSQSpec.getDestination());
Assert.assertTrue(actualMSQSpec.getQuery() instanceof GroupByQuery);
GroupByQuery groupByQuery = (GroupByQuery) actualMSQSpec.getQuery();
@@ -490,30 +528,32 @@ public class MSQCompactionRunnerTest
);
Assert.assertEquals(WorkerAssignmentStrategy.MAX,
actualMSQSpec.getAssignmentStrategy());
-
- // Since only MV_STRING_DIMENSION is indicated to be MVD by the
CombinedSchema, conversion to array should happen
- // only for that column.
- List<DimensionSpec> expectedDimensionSpec = DIMENSIONS.stream()
- .filter(dim ->
!MV_STRING_DIMENSION.getName()
-
.equals(dim.getName()))
- .map(dim -> new
DefaultDimensionSpec(
- dim.getName(),
- dim.getName(),
-
dim.getColumnType()
- ))
- .collect(
-
Collectors.toList());
+ List<DimensionSpec> expectedDimensionSpec = new ArrayList<>();
expectedDimensionSpec.add(
- new DefaultDimensionSpec(MSQCompactionRunner.TIME_VIRTUAL_COLUMN,
-
MSQCompactionRunner.TIME_VIRTUAL_COLUMN,
- ColumnType.LONG)
+ new DefaultDimensionSpec(
+ MSQCompactionRunner.TIME_VIRTUAL_COLUMN,
+ MSQCompactionRunner.TIME_VIRTUAL_COLUMN,
+ ColumnType.LONG
+ )
);
String mvToArrayStringDim =
MSQCompactionRunner.ARRAY_VIRTUAL_COLUMN_PREFIX + MV_STRING_DIMENSION.getName();
- expectedDimensionSpec.add(new DefaultDimensionSpec(mvToArrayStringDim,
mvToArrayStringDim, ColumnType.STRING_ARRAY));
- MatcherAssert.assertThat(
- expectedDimensionSpec,
- Matchers.containsInAnyOrder(groupByQuery.getDimensions().toArray(new
DimensionSpec[0]))
- );
+ // Since only MV_STRING_DIMENSION is indicated to be MVD by the
CombinedSchema, conversion to array should happen
+ // only for that column.
+ expectedDimensionSpec.addAll(DIMENSIONS.stream()
+ .map(dim ->
+
MV_STRING_DIMENSION.getName().equals(dim.getName())
+ ? new DefaultDimensionSpec(
+ mvToArrayStringDim,
+ mvToArrayStringDim,
+ ColumnType.STRING_ARRAY
+ )
+ : new DefaultDimensionSpec(
+ dim.getName(),
+ dim.getName(),
+ dim.getColumnType()
+ ))
+ .collect(Collectors.toList()));
+ Assert.assertEquals(expectedDimensionSpec, groupByQuery.getDimensions());
}
private CompactionTask createCompactionTask(
@@ -580,4 +620,27 @@ public class MSQCompactionRunnerTest
.withLongEncoding(CompressionFactory.LongEncodingStrategy.LONGS)
.build();
}
+
+ private static DataSourceMSQDestination getExpectedDestination()
+ {
+ return new DataSourceMSQDestination(
+ DATA_SOURCE,
+ SEGMENT_GRANULARITY.getDefaultGranularity(),
+ null,
+ Collections.singletonList(COMPACTION_INTERVAL),
+ DIMENSIONS.stream().collect(Collectors.toMap(DimensionSchema::getName,
Function.identity())),
+ null
+ );
+ }
+
+ private static MSQTuningConfig getExpectedTuningConfig()
+ {
+ return new MSQTuningConfig(
+ 1,
+ MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
+ MAX_ROWS_PER_SEGMENT,
+ null,
+ createIndexSpec()
+ );
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]