This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a126e2bfa46 Add support for MSQ CLUSTERED BY expressions to be
preserved in the segment shard spec as virtual columns (#19061)
a126e2bfa46 is described below
commit a126e2bfa46e6da19b3fe3966fe23b8d185d7acc
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Mar 19 11:08:10 2026 -0700
Add support for MSQ CLUSTERED BY expressions to be preserved in the segment
shard spec as virtual columns (#19061)
changes:
* `ShardSpec` interface has a new method, `getDomainVirtualColumns` to
provide the virtual column information for pruning
* `DimensionRangeShardSpec` stores `VirtualColumns` in segment metadata so
they can be compared to query expressions and be used for pruning
* `FilterSegmentPruner` is virtual column aware for segment pruning using
the new methods
* `SegmentGeneratorStageProcessor` now contains a map of column name to
`VirtualColumn` alongside, to support cluster key columns being virtual columns
* `ControllerImpl` persists clustering virtual columns in compaction state
in the transform spec
* `MSQCompactionRunner` handles virtual columns in order-by/cluster-by for
compaction
---
.../timeline/DimensionRangeShardSpecBenchmark.java | 4 +
.../embedded/compact/CompactionSupervisorTest.java | 283 +++++++++++++++++----
.../testing/embedded/msq/MultiStageQueryTest.java | 206 ++++++++++++++-
.../org/apache/druid/msq/exec/ControllerImpl.java | 109 +++++---
.../druid/msq/indexing/MSQCompactionRunner.java | 181 ++++++++-----
.../destination/SegmentGenerationStageSpec.java | 69 ++++-
.../processor/SegmentGeneratorStageProcessor.java | 16 +-
.../apache/druid/msq/sql/MSQTaskQueryMaker.java | 9 +-
.../controller/DartTableInputSpecSlicerTest.java | 4 +
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 61 ++++-
.../msq/indexing/MSQCompactionRunnerTest.java | 2 +-
.../SegmentGeneratorStageProcessorTest.java | 84 ++++++
.../table/IndexerTableInputSpecSlicerTest.java | 8 +-
.../druid/query/filter/FilterSegmentPruner.java | 34 ++-
.../apache/druid/query/groupby/GroupByQuery.java | 8 +
.../druid/query/planning/ExecutionVertex.java | 3 +-
.../partition/BaseDimensionRangeShardSpec.java | 4 +
.../partition/BuildingDimensionRangeShardSpec.java | 2 +
.../partition/DimensionRangeBucketShardSpec.java | 3 +-
.../partition/DimensionRangeShardSpec.java | 26 +-
.../apache/druid/timeline/partition/ShardSpec.java | 21 +-
.../SingleDimensionRangeBucketShardSpec.java | 2 +
.../partition/SingleDimensionShardSpec.java | 2 +
.../query/filter/FilterSegmentPrunerTest.java | 88 ++++++-
.../BuildingDimensionRangeShardSpecTest.java | 2 +
.../DimensionRangeBucketShardSpecTest.java | 5 +-
.../partition/DimensionRangeShardSpecTest.java | 34 ++-
.../partition/PartitionHolderCompletenessTest.java | 7 +
.../indexing/ClientCompactionRunnerInfo.java | 53 ++--
.../indexing/ClientCompactionRunnerInfoTest.java | 2 +-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 8 +-
31 files changed, 1120 insertions(+), 220 deletions(-)
diff --git
a/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java
b/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java
index 9ea11e48eef..df1bd027a9a 100644
---
a/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java
+++
b/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java
@@ -26,6 +26,7 @@ import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.junit.Assert;
import org.openjdk.jmh.annotations.Benchmark;
@@ -73,6 +74,7 @@ public class DimensionRangeShardSpecBenchmark
// Initial segment (null -> values)
private final DimensionRangeShardSpec shardSpec0 = new
DimensionRangeShardSpec(
Arrays.asList("country", "city"),
+ VirtualColumns.EMPTY,
new StringTuple(new String[]{null, null}),
new StringTuple(new String[]{"Germany", "Munich"}),
0,
@@ -82,6 +84,7 @@ public class DimensionRangeShardSpecBenchmark
// Middle segment (values -> other values)
private final DimensionRangeShardSpec shardSpec1 = new
DimensionRangeShardSpec(
Arrays.asList("country", "city"),
+ VirtualColumns.EMPTY,
new StringTuple(new String[]{"Germany", "Munich"}),
new StringTuple(new String[]{"United States", "New York"}),
1,
@@ -91,6 +94,7 @@ public class DimensionRangeShardSpecBenchmark
// End segment (values -> null)
private final DimensionRangeShardSpec shardSpec2 = new
DimensionRangeShardSpec(
Arrays.asList("country", "city"),
+ VirtualColumns.EMPTY,
new StringTuple(new String[]{"United States", "New York"}),
new StringTuple(new String[]{null, null}),
2,
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index bc9f7236530..838bb559989 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.testing.embedded.compact;
import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.StringTuple;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InlineInputSource;
import org.apache.druid.data.input.impl.JsonInputFormat;
@@ -51,6 +52,7 @@ import
org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.query.filter.NotDimFilter;
@@ -93,6 +95,7 @@ import org.apache.druid.testing.tools.JsonEventSerializer;
import org.apache.druid.testing.tools.StreamGenerator;
import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
@@ -150,22 +153,6 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
.addServer(new EmbeddedRouter());
}
-
- private void configureCompaction(CompactionEngine compactionEngine,
@Nullable CompactionCandidateSearchPolicy policy)
- {
- final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
- o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(
- 1.0,
- 100,
- policy,
- true,
- compactionEngine,
- true
- ))
- );
- Assertions.assertTrue(updateResponse.isSuccess());
- }
-
@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToYearGranularity_withInlineConfig(
@@ -307,41 +294,6 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
Assertions.assertEquals(4000, getTotalRowCount());
}
- protected void ingest1kRecords()
- {
- final EventSerializer serializer = new
JsonEventSerializer(overlord.bindings().jsonMapper());
- final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(serializer, 500, 100);
- List<byte[]> records = streamGenerator.generateEvents(2);
-
- final InlineInputSource input = new InlineInputSource(
- records.stream().map(b -> new String(b,
StandardCharsets.UTF_8)).collect(Collectors.joining("\n")));
- final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
- input,
- new JsonInputFormat(null, null, null, null, null),
- true,
- null
- );
- final ParallelIndexIngestionSpec indexIngestionSpec = new
ParallelIndexIngestionSpec(
- DataSchema.builder()
- .withDataSource(dataSource)
- .withTimestamp(new TimestampSpec("timestamp", "iso", null))
-
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
- .build(),
- ioConfig,
- TuningConfigBuilder.forParallelIndexTask().build()
- );
- final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
- final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
- taskId,
- null,
- null,
- indexIngestionSpec,
- null
- );
- cluster.callApi().submitTask(task);
- cluster.callApi().waitForTaskToSucceed(taskId, overlord);
- }
-
@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void
test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine
compactionEngine)
@@ -575,6 +527,184 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
Assertions.assertTrue(count > 0);
}
+ @Test
+ public void test_compaction_cluster_by_virtualcolumn()
+ {
+ // Virtual Columns on nested data is only supported with MSQ compaction
engine right now.
+ CompactionEngine compactionEngine = CompactionEngine.MSQ;
+ configureCompaction(compactionEngine, null);
+
+ String jsonDataWithNestedColumn =
+ """
+ {"timestamp": "2023-01-01T00:00:00", "str":"a", "obj":{"a":
"ll"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"", "obj":{"a":
"mm"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a":
"nn"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"b", "obj":{"a":
"oo"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"c", "obj":{"a":
"pp"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"d", "obj":{"a":
"qq"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":null, "obj":{"a":
"rr"}}
+ """;
+
+ final TaskBuilder.Index task = TaskBuilder
+ .ofTypeIndex()
+ .dataSource(dataSource)
+ .jsonInputFormat()
+ .inlineInputSourceWithData(jsonDataWithNestedColumn)
+ .isoTimestampColumn("timestamp")
+ .schemaDiscovery()
+ .granularitySpec("DAY", null, false);
+
+ cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+
+ Assertions.assertEquals(7, getTotalRowCount());
+
+ VirtualColumns virtualColumns = VirtualColumns.create(
+ new ExpressionVirtualColumn("v0", "json_value(obj, '$.a')",
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+ );
+
+ InlineSchemaDataSourceCompactionConfig config =
+ InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(dataSource)
+ .withSkipOffsetFromLatest(Period.seconds(0))
+ .withTransformSpec(
+ new CompactionTransformSpec(
+ null,
+ virtualColumns
+ )
+ )
+ .withTuningConfig(
+ UserCompactionTaskQueryTuningConfig
+ .builder()
+ .partitionsSpec(new DimensionRangePartitionsSpec(4, null,
List.of("v0"), false))
+ .build()
+ )
+ .build();
+
+ runCompactionWithSpec(config);
+ waitForAllCompactionTasksToFinish();
+
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+
+ List<DataSegment> segments =
cluster.callApi().getVisibleUsedSegments(dataSource,
overlord).stream().toList();
+ Assertions.assertEquals(2, segments.size());
+ Assertions.assertEquals(
+ new DimensionRangeShardSpec(
+ List.of("v0"),
+ virtualColumns,
+ null,
+ StringTuple.create("oo"),
+ 0,
+ 2
+ ),
+ segments.get(0).getShardSpec()
+ );
+ Assertions.assertEquals(
+ new DimensionRangeShardSpec(
+ List.of("v0"),
+ virtualColumns,
+ StringTuple.create("oo"),
+ null,
+ 1,
+ 2
+ ),
+ segments.get(1).getShardSpec()
+ );
+ }
+
+ @Test
+ public void test_compaction_cluster_by_virtualcolumn_rollup()
+ {
+ // Virtual Columns on nested data is only supported with MSQ compaction
engine right now.
+ CompactionEngine compactionEngine = CompactionEngine.MSQ;
+ configureCompaction(compactionEngine, null);
+
+ String jsonDataWithNestedColumn =
+ """
+ {"timestamp": "2023-01-01T00:00:00", "str":"a", "obj":{"a":
"ll"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"", "obj":{"a":
"mm"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a":
"nn"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"b", "obj":{"a":
"oo"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"c", "obj":{"a":
"pp"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":"d", "obj":{"a":
"qq"}}
+ {"timestamp": "2023-01-01T00:00:00", "str":null, "obj":{"a":
"rr"}}
+ """;
+
+ final TaskBuilder.Index task = TaskBuilder
+ .ofTypeIndex()
+ .dataSource(dataSource)
+ .jsonInputFormat()
+ .inlineInputSourceWithData(jsonDataWithNestedColumn)
+ .isoTimestampColumn("timestamp")
+ .schemaDiscovery()
+ .dataSchema(builder -> builder.withAggregators(new
CountAggregatorFactory("count")))
+ .granularitySpec("DAY", "MINUTE", true);
+
+ cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+
+ Assertions.assertEquals(7, getTotalRowCount());
+
+ VirtualColumns virtualColumns = VirtualColumns.create(
+ new ExpressionVirtualColumn(
+ "v0",
+ "json_value(obj, '$.a')",
+ ColumnType.STRING,
+ TestExprMacroTable.INSTANCE
+ )
+ );
+
+ InlineSchemaDataSourceCompactionConfig config =
+ InlineSchemaDataSourceCompactionConfig
+ .builder()
+ .forDataSource(dataSource)
+ .withSkipOffsetFromLatest(Period.seconds(0))
+ .withTransformSpec(
+ new CompactionTransformSpec(
+ null,
+ virtualColumns
+ )
+ )
+ .withTuningConfig(
+ UserCompactionTaskQueryTuningConfig
+ .builder()
+ .partitionsSpec(new DimensionRangePartitionsSpec(4, null,
List.of("v0"), false))
+ .build()
+ )
+ .build();
+
+ runCompactionWithSpec(config);
+ waitForAllCompactionTasksToFinish();
+
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+
+ List<DataSegment> segments =
cluster.callApi().getVisibleUsedSegments(dataSource,
overlord).stream().toList();
+ Assertions.assertEquals(2, segments.size());
+ Assertions.assertEquals(
+ new DimensionRangeShardSpec(
+ List.of("v0"),
+ virtualColumns,
+ null,
+ StringTuple.create("oo"),
+ 0,
+ 2
+ ),
+ segments.get(0).getShardSpec()
+ );
+ Assertions.assertEquals(
+ new DimensionRangeShardSpec(
+ List.of("v0"),
+ virtualColumns,
+ StringTuple.create("oo"),
+ null,
+ 1,
+ 2
+ ),
+ segments.get(1).getShardSpec()
+ );
+ }
+
/**
* Tests that when a compaction task filters out all rows using a transform
spec,
* tombstones are created to properly drop the old segments. This test
covers both
@@ -702,6 +832,56 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
Assertions.assertEquals(1, segments.get(0).getDimensions().size());
}
+ private void configureCompaction(CompactionEngine compactionEngine,
@Nullable CompactionCandidateSearchPolicy policy)
+ {
+ final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
+ o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(
+ 1.0,
+ 100,
+ policy,
+ true,
+ compactionEngine,
+ true
+ ))
+ );
+ Assertions.assertTrue(updateResponse.isSuccess());
+ }
+
+ protected void ingest1kRecords()
+ {
+ final EventSerializer serializer = new
JsonEventSerializer(overlord.bindings().jsonMapper());
+ final StreamGenerator streamGenerator = new
WikipediaStreamEventStreamGenerator(serializer, 500, 100);
+ List<byte[]> records = streamGenerator.generateEvents(2);
+
+ final InlineInputSource input = new InlineInputSource(
+ records.stream().map(b -> new String(b,
StandardCharsets.UTF_8)).collect(Collectors.joining("\n")));
+ final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+ input,
+ new JsonInputFormat(null, null, null, null, null),
+ true,
+ null
+ );
+ final ParallelIndexIngestionSpec indexIngestionSpec = new
ParallelIndexIngestionSpec(
+ DataSchema.builder()
+ .withDataSource(dataSource)
+ .withTimestamp(new TimestampSpec("timestamp", "iso", null))
+
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+ .build(),
+ ioConfig,
+ TuningConfigBuilder.forParallelIndexTask().build()
+ );
+ final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
+ final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+ taskId,
+ null,
+ null,
+ indexIngestionSpec,
+ null
+ );
+ cluster.callApi().submitTask(task);
+ cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+ }
+
private int getTotalRowCount()
{
return Numbers.parseInt(cluster.runSql("SELECT COUNT(*) as cnt FROM
\"%s\"", dataSource));
@@ -723,7 +903,6 @@ public class CompactionSupervisorTest extends
EmbeddedClusterTestBase
);
}
-
private String generateEventsInInterval(Interval interval, int numEvents,
long spacingMillis)
{
List<String> events = new ArrayList<>();
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
index d268bb8b897..1d4ec613e6c 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
@@ -19,10 +19,17 @@
package org.apache.druid.testing.embedded.msq;
+import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.http.GetQueryReportResponse;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -32,6 +39,8 @@ import org.apache.druid.testing.embedded.EmbeddedOverlord;
import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.apache.druid.testing.embedded.indexing.Resources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -40,10 +49,15 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
public class MultiStageQueryTest extends EmbeddedClusterTestBase
{
- private final EmbeddedBroker broker = new EmbeddedBroker();
+ private final EmbeddedBroker broker =
+ new EmbeddedBroker().setServerMemory(200_000_000)
+
.addProperty("druid.msq.dart.controller.maxRetainedReportCount", "10")
+
.addProperty("druid.query.default.context.maxConcurrentStages", "1")
+
.addProperty("druid.sql.planner.enableSysQueriesTable", "true");
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
private final EmbeddedIndexer indexer = new EmbeddedIndexer()
@@ -59,6 +73,7 @@ public class MultiStageQueryTest extends
EmbeddedClusterTestBase
return EmbeddedDruidCluster
.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
+ .addCommonProperty("druid.msq.dart.enabled", "true")
.addResource(exportDirectory)
.addServer(overlord)
.addServer(coordinator)
@@ -152,4 +167,193 @@ public class MultiStageQueryTest extends
EmbeddedClusterTestBase
actualResults
);
}
+
+ @Test
+ public void testClusterByVirtualColumn()
+ {
+ final String sqlTemplate =
+ """
+ SET rowsPerSegment = 2;
+ SET groupByEnableMultiValueUnnesting = FALSE;
+ REPLACE INTO %s OVERWRITE ALL
+ WITH "ext" AS (
+ SELECT *
+ FROM TABLE(EXTERN('{"type":"local","files":["%s"]}',
'{"type":"json"}'))
+ EXTEND(
+ "timestamp" VARCHAR,
+ "added" BIGINT,
+ "delta" BIGINT,
+ "deleted" BIGINT,
+ "page" VARCHAR,
+ "city" VARCHAR,
+ "country" VARCHAR,
+ "user" VARCHAR
+ )
+ )
+ SELECT
+ TIME_PARSE("timestamp") AS __time,
+ added,
+ delta,
+ deleted,
+ page,
+ city,
+ country,
+ user
+ FROM "ext"
+ PARTITIONED BY DAY
+ CLUSTERED BY CONCAT(country, ':', city)
+ """;
+ final String sql = StringUtils.format(
+ sqlTemplate,
+ dataSource,
+ Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+ );
+
+ final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
+ cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(),
overlord.latchableEmitter());
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+
+ assertClusterByVirtualColumnSegments();
+ assertClusterByVirtualColumnQueries();
+ }
+
+ @Test
+ public void testClusterByVirtualColumnRollup()
+ {
+ final String sqlTemplate =
+ """
+ SET rowsPerSegment = 2;
+ SET groupByEnableMultiValueUnnesting = FALSE;
+ REPLACE INTO %s OVERWRITE ALL
+ WITH "ext" AS (
+ SELECT *
+ FROM TABLE(EXTERN('{"type":"local","files":["%s"]}',
'{"type":"json"}'))
+ EXTEND(
+ "timestamp" VARCHAR,
+ "added" BIGINT,
+ "delta" BIGINT,
+ "deleted" BIGINT,
+ "page" VARCHAR,
+ "city" VARCHAR,
+ "country" VARCHAR,
+ "user" VARCHAR
+ )
+ )
+ SELECT
+ TIME_PARSE("timestamp") AS __time,
+ page,
+ city,
+ country,
+ user,
+ SUM(added) as added,
+ SUM(delta) as delta,
+ SUM(deleted) as deleted
+ FROM "ext"
+ GROUP BY TIME_PARSE("timestamp"), page, city, country, user,
CONCAT(country, ':', city)
+ PARTITIONED BY DAY
+ CLUSTERED BY CONCAT(country, ':', city)
+ """;
+ final String sql = StringUtils.format(
+ sqlTemplate,
+ dataSource,
+ Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+ );
+
+ final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
+ cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(),
overlord.latchableEmitter());
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+
+ assertClusterByVirtualColumnSegments();
+ assertClusterByVirtualColumnQueries();
+ }
+
+ private void assertClusterByVirtualColumnSegments()
+ {
+ List<DataSegment> segments =
cluster.callApi().getVisibleUsedSegments(dataSource,
overlord).stream().toList();
+ Assertions.assertEquals(2, segments.size());
+ VirtualColumns virtualColumns = VirtualColumns.create(
+ new ExpressionVirtualColumn("v1", "concat(\"country\",':',\"city\")",
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+ );
+ Assertions.assertEquals(
+ new DimensionRangeShardSpec(
+ List.of("v1"),
+ virtualColumns,
+ null,
+ StringTuple.create("Russia:Moscow"),
+ 0,
+ 2
+ ),
+ segments.get(0).getShardSpec()
+ );
+ Assertions.assertEquals(
+ new DimensionRangeShardSpec(
+ List.of("v1"),
+ virtualColumns,
+ StringTuple.create("Russia:Moscow"),
+ null,
+ 1,
+ 2
+ ),
+ segments.get(1).getShardSpec()
+ );
+ }
+
+ private void assertClusterByVirtualColumnQueries()
+ {
+ String queryId = UUID.randomUUID().toString();
+ cluster.callApi().verifySqlQuery(
+ "SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT
__time, country, city, page FROM %s ORDER BY __time",
+ dataSource,
+ """
+ 2013-08-31T01:02:33.000Z,United States,San Francisco,Gypsy Danger
+ 2013-08-31T03:32:45.000Z,Australia,Syndey,Striker Eureka
+ 2013-08-31T07:11:21.000Z,Russia,Moscow,Cherno Alpha"""
+ );
+ Assertions.assertEquals(2, getSegmentsScannedForDartQuery(queryId));
+
+ queryId = UUID.randomUUID().toString();
+ cluster.callApi().verifySqlQuery(
+ "SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT
__time, country, city, page FROM %s WHERE CONCAT(country, ':', city) <=
'Russia' ORDER BY __time",
+ dataSource,
+ """
+ 2013-08-31T03:32:45.000Z,Australia,Syndey,Striker Eureka"""
+ );
+ Assertions.assertEquals(1, getSegmentsScannedForDartQuery(queryId));
+
+ queryId = UUID.randomUUID().toString();
+ cluster.callApi().verifySqlQuery(
+ "SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT
__time, country, city, page FROM %s WHERE CONCAT(country, ':', city) <=
'Russia:St. Petersburg' ORDER BY __time",
+ dataSource,
+ """
+ 2013-08-31T03:32:45.000Z,Australia,Syndey,Striker Eureka
+ 2013-08-31T07:11:21.000Z,Russia,Moscow,Cherno Alpha"""
+ );
+ Assertions.assertEquals(2, getSegmentsScannedForDartQuery(queryId));
+ }
+
+ private long getSegmentsScannedForDartQuery(String sqlQueryId)
+ {
+ ChannelCounters.Snapshot segmentChannelCounters =
getDartSegmentChannelCounters(sqlQueryId);
+ return segmentChannelCounters.getFiles()[0];
+ }
+
+ private ChannelCounters.Snapshot getDartSegmentChannelCounters(String
sqlQueryId)
+ {
+ final GetQueryReportResponse reportResponse =
msqApis.getDartQueryReport(sqlQueryId, broker);
+
+ Assertions.assertNotNull(reportResponse, "Report response should not be
null");
+ ChannelCounters.Snapshot segmentChannelCounters =
+ (ChannelCounters.Snapshot) reportResponse.getReportMap()
+ .findReport("multiStageQuery")
+ .map(r ->
+
((MSQTaskReportPayload) r.getPayload()).getCounters()
+
.snapshotForStage(0)
+
.get(0)
+
.getMap()
+
.get("input0")
+ ).orElse(null);
+
+ Assertions.assertNotNull(segmentChannelCounters);
+ return segmentChannelCounters;
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index b18a2971e2c..6a064c82c5d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -61,6 +61,7 @@ import
org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
@@ -174,6 +175,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.DataSchema;
@@ -1062,6 +1065,7 @@ public class ControllerImpl implements Controller
final ClusterBy clusterBy,
final RowKeyReader keyReader,
final ClusterByPartitions partitionBoundaries,
+ final Map<String, VirtualColumn> clusterByVirtualColumnMappings,
final boolean mayHaveMultiValuedClusterByFields,
@Nullable final Boolean isStageOutputEmpty
) throws IOException
@@ -1073,6 +1077,7 @@ public class ControllerImpl implements Controller
clusterBy,
keyReader,
partitionBoundaries,
+ clusterByVirtualColumnMappings,
mayHaveMultiValuedClusterByFields,
isStageOutputEmpty
);
@@ -1240,6 +1245,7 @@ public class ControllerImpl implements Controller
final ClusterBy clusterBy,
final RowKeyReader keyReader,
final ClusterByPartitions partitionBoundaries,
+ final Map<String, VirtualColumn> clusterByVirtualColumnMappings,
final boolean mayHaveMultiValuedClusterByFields,
@Nullable final Boolean isStageOutputEmpty
) throws IOException
@@ -1254,6 +1260,7 @@ public class ControllerImpl implements Controller
signature,
clusterBy,
querySpec.getColumnMappings(),
+ clusterByVirtualColumnMappings,
mayHaveMultiValuedClusterByFields
);
final List<String> shardColumns = shardReasonPair.lhs;
@@ -1314,8 +1321,14 @@ public class ControllerImpl implements Controller
segmentNumber == ranges.size() - 1
? null
: makeStringTuple(clusterBy, keyReader, range.getEnd(),
shardColumns.size());
-
- shardSpec = new DimensionRangeShardSpec(shardColumns, start, end,
segmentNumber, ranges.size());
+ shardSpec = new DimensionRangeShardSpec(
+ shardColumns,
+ VirtualColumns.create(clusterByVirtualColumnMappings.values()),
+ start,
+ end,
+ segmentNumber,
+ ranges.size()
+ );
}
retVal[partitionNumber] = new
SegmentIdWithShardSpec(destination.getDataSource(), interval, version,
shardSpec);
@@ -1710,19 +1723,13 @@ public class ControllerImpl implements Controller
queryDef.getQueryId()
);
} else {
- DataSchema dataSchema = ((SegmentGeneratorStageProcessor) queryKernel
-
.getStageDefinition(finalStageId).getProcessor()).getDataSchema();
-
- ShardSpec shardSpec = segments.isEmpty() ? null :
segments.stream().findFirst().get().getShardSpec();
- ClusterBy clusterBy =
queryKernel.getStageDefinition(finalStageId).getClusterBy();
+ final ShardSpec shardSpec = segments.isEmpty() ? null :
segments.stream().findFirst().get().getShardSpec();
compactionStateAnnotateFunction = addCompactionStateToSegments(
+ queryDef,
querySpec,
context.jsonMapper(),
- dataSchema,
- shardSpec,
- clusterBy,
- queryDef.getQueryId()
+ shardSpec
);
}
}
@@ -1761,17 +1768,21 @@ public class ControllerImpl implements Controller
}
private static Function<Set<DataSegment>, Set<DataSegment>>
addCompactionStateToSegments(
+ QueryDefinition queryDef,
MSQSpec querySpec,
ObjectMapper jsonMapper,
- DataSchema dataSchema,
- @Nullable ShardSpec shardSpec,
- @Nullable ClusterBy clusterBy,
- String queryId
+ @Nullable ShardSpec shardSpec
)
{
+ final ClusterBy clusterBy =
queryDef.getFinalStageDefinition().getClusterBy();
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
- PartitionsSpec partitionSpec;
+ final DataSourceMSQDestination destination = (DataSourceMSQDestination)
querySpec.getDestination();
+ final SegmentGeneratorStageProcessor segmentProcessor =
+ (SegmentGeneratorStageProcessor)
queryDef.getFinalStageDefinition().getProcessor();
+
+ final DataSchema dataSchema = segmentProcessor.getDataSchema();
+ final PartitionsSpec partitionSpec;
// shardSpec is absent in the absence of segments, which happens when only
tombstones are generated by an
// MSQControllerTask.
if (shardSpec != null) {
@@ -1793,7 +1804,7 @@ public class ControllerImpl implements Controller
UnknownFault.forMessage(
StringUtils.format(
"Query[%s] cannot store compaction state in segments as
shard spec of unsupported type[%s].",
- queryId,
+ queryDef.getQueryId(),
shardSpec.getType()
)));
}
@@ -1811,27 +1822,40 @@ public class ControllerImpl implements Controller
partitionSpec = new
DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
}
- Granularity segmentGranularity = ((DataSourceMSQDestination)
querySpec.getDestination())
- .getSegmentGranularity();
+ Granularity segmentGranularity = destination.getSegmentGranularity();
GranularitySpec granularitySpec = new UniformGranularitySpec(
segmentGranularity,
- querySpec.getContext()
-
.getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY, jsonMapper),
+
querySpec.getContext().getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY,
jsonMapper),
dataSchema.getGranularitySpec().isRollup(),
// Not using dataSchema.getGranularitySpec().inputIntervals() as that
always has ETERNITY
- ((DataSourceMSQDestination)
querySpec.getDestination()).getReplaceTimeChunks()
+ destination.getReplaceTimeChunks()
);
DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
- CompactionTransformSpec transformSpec =
TransformSpec.NONE.equals(dataSchema.getTransformSpec())
- ? null
- :
CompactionTransformSpec.of(dataSchema.getTransformSpec());
+
+ // if the clustered by requires virtual columns, preserve them here so
that we can rebuild during compaction
+ CompactionTransformSpec transformSpec;
+ final Map<String, VirtualColumn> clusterByVirtualColumnMappings =
+ segmentProcessor.getClusterByVirtualColumnMappings();
+
+ // only range partitioning can have virtual columns
+ if (clusterByVirtualColumnMappings.isEmpty() ||
!SecondaryPartitionType.RANGE.equals(partitionSpec.getType())) {
+ transformSpec = TransformSpec.NONE.equals(dataSchema.getTransformSpec())
+ ? null
+ :
CompactionTransformSpec.of(dataSchema.getTransformSpec());
+ } else {
+ transformSpec = new CompactionTransformSpec(
+ dataSchema.getTransformSpec().getFilter(),
+ VirtualColumns.create(clusterByVirtualColumnMappings.values())
+ );
+ }
+
List<AggregatorFactory> metricsSpec = buildMSQCompactionMetrics(querySpec,
dataSchema);
IndexSpec indexSpec = tuningConfig.getIndexSpec();
- log.info("Query[%s] storing compaction state in segments.", queryId);
+ log.info("Query[%s] storing compaction state in segments.",
queryDef.getQueryId());
return CompactionState.addCompactionStateToSegments(
partitionSpec,
@@ -1909,6 +1933,7 @@ public class ControllerImpl implements Controller
final RowSignature signature,
final ClusterBy clusterBy,
final ColumnMappings columnMappings,
+ final Map<String, VirtualColumn> clusterByVirtualColumns,
boolean mayHaveMultiValuedClusterByFields
)
{
@@ -1959,19 +1984,24 @@ public class ControllerImpl implements Controller
);
}
- // DimensionRangeShardSpec only handles columns that appear as-is in the
output.
+ // DimensionRangeShardSpec columns may either be explicitly in the table
or defined as virtual columns
if (outputColumns.isEmpty()) {
- return Pair.of(
- shardColumns,
- StringUtils.format(
- "Using only[%d] CLUSTERED BY columns for 'range' shard specs,
since the next column was not mapped to "
- + "an output column.",
- shardColumns.size()
- )
- );
+ final VirtualColumn vc =
clusterByVirtualColumns.get(column.columnName());
+ if (vc != null) {
+ shardColumns.add(vc.getOutputName());
+ } else {
+ return Pair.of(
+ shardColumns,
+ StringUtils.format(
+ "Using only[%d] CLUSTERED BY columns for 'range' shard
specs, since the next column was not mapped to "
+ + "an output column or virtual column.",
+ shardColumns.size()
+ )
+ );
+ }
+ } else {
+
shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0)));
}
-
-
shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0)));
}
return Pair.of(shardColumns, "Using 'range' shard specs with all CLUSTERED
BY fields.");
@@ -1997,7 +2027,7 @@ public class ControllerImpl implements Controller
final int shardFieldCount
)
{
- final String[] array = new String[clusterBy.getColumns().size() -
clusterBy.getBucketByCount()];
+ final String[] array = new String[shardFieldCount];
for (int i = 0; i < shardFieldCount; i++) {
final Object val = keyReader.read(key, clusterBy.getBucketByCount() + i);
@@ -2633,6 +2663,8 @@ public class ControllerImpl implements Controller
final boolean mayHaveMultiValuedClusterByFields =
!shuffleStageDef.mustGatherResultKeyStatistics()
||
queryKernel.hasStageCollectorEncounteredAnyMultiValueField(shuffleStageId);
+ final SegmentGeneratorStageProcessor segmentGeneratorStageProcessor =
+ (SegmentGeneratorStageProcessor)
queryDef.getFinalStageDefinition().getProcessor();
segmentsToGenerate = generateSegmentIdsWithShardSpecs(
(DataSourceMSQDestination) querySpec.getDestination(),
@@ -2640,6 +2672,7 @@ public class ControllerImpl implements Controller
shuffleStageDef.getClusterBy(),
shuffleStageDef.getClusterBy().keyReader(shuffleStageDef.getSignature(),
rowBasedFrameType),
partitionBoundaries,
+ segmentGeneratorStageProcessor.getClusterByVirtualColumnMappings(),
mayHaveMultiValuedClusterByFields,
isShuffleStageOutputEmpty
);
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index 7195303e4ef..8b4cd39879b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -30,6 +30,7 @@ import com.google.inject.Injector;
import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -67,8 +68,10 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
@@ -76,6 +79,7 @@ import org.apache.druid.segment.indexing.CombinedDataSchema;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.segment.virtual.VirtualizedColumnInspector;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationResult;
@@ -172,7 +176,10 @@ public class MSQCompactionRunner implements
CompactionRunner
validationResults.add(
ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
compactionTask.getTuningConfig().getPartitionsSpec(),
- dataSchema.getDimensionsSpec().getDimensions()
+ dataSchema.getDimensionsSpec().getDimensions(),
+ compactionTask.getTransformSpec() == null
+ ? VirtualColumns.EMPTY
+ : compactionTask.getTransformSpec().getVirtualColumns()
)
);
validationResults.add(
@@ -409,42 +416,73 @@ public class MSQCompactionRunner implements
CompactionRunner
private static List<DimensionSpec> getAggregateDimensions(
DataSchema dataSchema,
- Map<String, VirtualColumn> inputColToVirtualCol
+ Map<String, VirtualColumn> inputColToVirtualCol,
+ List<OrderByColumnSpec> orderBy
)
{
- List<DimensionSpec> dimensionSpecs = new ArrayList<>();
+ List<DimensionSpec> dimensions = new ArrayList<>();
+
+ // build a RowSignature of non-virtual column dimensions of the dataschema
to use to resolve virtual column types
+ RowSignature.Builder baseBuilder = RowSignature.builder().addTimeColumn();
+ for (DimensionSchema schema :
dataSchema.getDimensionsSpec().getDimensions()) {
+ if (inputColToVirtualCol.containsKey(schema.getName())) {
+ continue;
+ }
+ baseBuilder.add(schema.getName(), schema.getColumnType());
+ }
+ final RowSignature baseSignature = baseBuilder.build();
+ // and virtualized inspector from base signature
+ final ColumnInspector inspector = new VirtualizedColumnInspector(
+ baseSignature,
+ VirtualColumns.create(inputColToVirtualCol.values())
+ );
- // if schema is not time-sorted, the time column mapping would already be
in inputColToVirtualCol
- if
(!dataSchema.getDimensionsSpec().getDimensionNames().contains(ColumnHolder.TIME_COLUMN_NAME))
{
+ // if schema is not time-sorted, the time column will be in the dimensions
list, otherwise add time dimension first
+ if
(dataSchema.getDimensionsSpec().getSchema(ColumnHolder.TIME_COLUMN_NAME) ==
null) {
if (isQueryGranularityEmptyOrNone(dataSchema)) {
// Dimensions in group-by aren't allowed to have time column name as
the output name.
- dimensionSpecs.add(new DefaultDimensionSpec(
- ColumnHolder.TIME_COLUMN_NAME,
- TIME_VIRTUAL_COLUMN,
- ColumnType.LONG
- ));
+ dimensions.add(new DefaultDimensionSpec(ColumnHolder.TIME_COLUMN_NAME,
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
} else {
// The changed granularity would result in a new virtual column that
needs to be aggregated upon.
- dimensionSpecs.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN,
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
+ dimensions.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN,
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
+ }
+ }
+
+ // If dimensions point to virtual columns, replace dimension columns names
with virtual column names.
+ for (DimensionSchema schema :
dataSchema.getDimensionsSpec().getDimensions()) {
+ String dimension = schema.getName();
+ ColumnType colType = schema.getColumnType();
+ VirtualColumn vc = inputColToVirtualCol.get(dimension);
+ if (vc != null) {
+ dimension = vc.getOutputName();
+ if (vc instanceof ExpressionVirtualColumn) {
+ colType = ((ExpressionVirtualColumn) vc).getOutputType();
+ } else {
+ colType = ColumnType.fromCapabilities(vc.capabilities(inspector,
vc.getOutputName()));
+ }
}
+ dimensions.add(new DefaultDimensionSpec(dimension, dimension, colType));
}
- // If virtual columns are created from dimensions, replace dimension
columns names with virtual column names.
- dimensionSpecs.addAll(
- dataSchema.getDimensionsSpec().getDimensions().stream()
- .map(dim -> {
- String dimension = dim.getName();
- ColumnType colType = dim.getColumnType();
- if (inputColToVirtualCol.containsKey(dim.getName())) {
- VirtualColumn virtualColumn =
inputColToVirtualCol.get(dimension);
- dimension = virtualColumn.getOutputName();
- if (virtualColumn instanceof ExpressionVirtualColumn) {
- colType = ((ExpressionVirtualColumn)
virtualColumn).getOutputType();
- }
- }
- return new DefaultDimensionSpec(dimension, dimension,
colType);
- })
- .collect(Collectors.toList()));
- return dimensionSpecs;
+
+ // if any orderby columns refer to a virtual column that was not
explicitly a dimension, add it to the list
+ // this is not really optimal, but it works without requiring any
conversion between virtualcolumns and
+ // postaggregators which doesn't really exist here
+ for (OrderByColumnSpec order : orderBy) {
+ if (dataSchema.getDimensionsSpec().getSchema(order.getDimension()) !=
null) {
+ continue;
+ }
+ VirtualColumn vc = inputColToVirtualCol.get(order.getDimension());
+ if (vc != null) {
+ dimensions.add(
+ new DefaultDimensionSpec(
+ vc.getOutputName(),
+ order.getDimension(),
+ ColumnType.fromCapabilities(vc.capabilities(baseSignature,
vc.getOutputName()))
+ )
+ );
+ }
+ }
+ return dimensions;
}
private static ColumnMappings getColumnMappings(DataSchema dataSchema)
@@ -469,11 +507,13 @@ public class MSQCompactionRunner implements
CompactionRunner
.map(dim ->
dim.getName().equals(ColumnHolder.TIME_COLUMN_NAME)
? timeColumnMapping
: new ColumnMapping(dim.getName(),
dim.getName()))
- .collect(Collectors.toList())
+ .toList()
+ );
+ columnMappings.addAll(
+ Arrays.stream(dataSchema.getAggregators())
+ .map(agg -> new ColumnMapping(agg.getName(), agg.getName()))
+ .toList()
);
- columnMappings.addAll(Arrays.stream(dataSchema.getAggregators())
- .map(agg -> new ColumnMapping(agg.getName(),
agg.getName()))
- .collect(Collectors.toList()));
return new ColumnMappings(columnMappings);
}
@@ -508,31 +548,44 @@ public class MSQCompactionRunner implements
CompactionRunner
Map<String, VirtualColumn> inputColToVirtualCol
)
{
- RowSignature rowSignature = getRowSignature(dataSchema);
- VirtualColumns virtualColumns =
VirtualColumns.create(inputColToVirtualCol.values());
+ RowSignature baseRowSignature = getRowSignature(dataSchema);
+ final List<String> columns = new
ArrayList<>(baseRowSignature.getColumnNames());
+ final List<OrderBy> orderBys;
+
+ RowSignature.Builder rowSignatureWithOrderByBuilder =
RowSignature.builder().addAll(baseRowSignature);
+
+ // when clustering by a virtual column, we might need to add the virtual
column to columns list and row signature
+ if (compactionTask.getTuningConfig() != null &&
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
+ List<OrderByColumnSpec> orderByColumnSpecs =
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
+ orderBys = new ArrayList<>();
+ for (OrderByColumnSpec spec : orderByColumnSpecs) {
+ orderBys.add(new OrderBy(spec.getDimension(),
Order.fromString(spec.getDirection().toString())));
+
+ final VirtualColumn vc = inputColToVirtualCol.get(spec.getDimension());
+ if (vc != null) {
+ columns.add(spec.getDimension());
+ final ColumnCapabilities capabilities =
vc.capabilities(baseRowSignature, vc.getOutputName());
+ DruidException.conditionalDefensive(
+ capabilities != null,
+ "virtual column[%s] has null capabilities, cannot determine
output type",
+ vc.getOutputName()
+ );
+ rowSignatureWithOrderByBuilder.add(spec.getDimension(),
capabilities.toColumnType());
+ }
+ }
+ } else {
+ orderBys = null;
+ }
+
Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder()
.dataSource(getInputDataSource(dataSchema.getDataSource()))
- .columns(rowSignature.getColumnNames())
- .virtualColumns(virtualColumns)
- .columnTypes(rowSignature.getColumnTypes())
.intervals(segmentSpec)
.filters(dataSchema.getTransformSpec().getFilter())
+ .virtualColumns(VirtualColumns.create(inputColToVirtualCol.values()))
+ .columns(columns)
+ .columnTypes(rowSignatureWithOrderByBuilder.build().getColumnTypes())
+ .orderBy(orderBys)
.context(buildQueryContext(compactionTask.getContext(), dataSchema));
-
- if (compactionTask.getTuningConfig() != null &&
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
- List<OrderByColumnSpec> orderByColumnSpecs =
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
-
- scanQueryBuilder.orderBy(
- orderByColumnSpecs
- .stream()
- .map(orderByColumnSpec ->
- new OrderBy(
- orderByColumnSpec.getDimension(),
-
Order.fromString(orderByColumnSpec.getDirection().toString())
- ))
- .collect(Collectors.toList())
- );
- }
return scanQueryBuilder.build();
}
@@ -647,7 +700,6 @@ public class MSQCompactionRunner implements CompactionRunner
)
{
DimFilter dimFilter = dataSchema.getTransformSpec().getFilter();
-
VirtualColumns virtualColumns =
VirtualColumns.create(inputColToVirtualCol.values());
// Convert MVDs converted to arrays back to MVDs, with the same name as
the input column.
@@ -668,20 +720,25 @@ public class MSQCompactionRunner implements
CompactionRunner
)
.collect(Collectors.toList());
+ final List<OrderByColumnSpec> orderBy;
+ if (compactionTask.getTuningConfig() != null &&
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
+ orderBy =
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
+ } else {
+ orderBy = List.of();
+ }
+
GroupByQuery.Builder builder = new GroupByQuery.Builder()
.setDataSource(getInputDataSource(compactionTask.getDataSource()))
- .setVirtualColumns(virtualColumns)
- .setDimFilter(dimFilter)
+ .setQuerySegmentSpec(segmentSpec)
.setGranularity(new AllGranularity())
- .setDimensions(getAggregateDimensions(dataSchema,
inputColToVirtualCol))
- .setAggregatorSpecs(Arrays.asList(dataSchema.getAggregators()))
+ .setDimFilter(dimFilter)
+ .setVirtualColumns(virtualColumns)
+ .setDimensions(getAggregateDimensions(dataSchema,
inputColToVirtualCol, orderBy))
+ .setAggregatorSpecs(dataSchema.getAggregators())
.setPostAggregatorSpecs(postAggregators)
- .setContext(buildQueryContext(compactionTask.getContext(), dataSchema))
- .setQuerySegmentSpec(segmentSpec);
+ .setOrderByColumns(orderBy)
+ .setContext(buildQueryContext(compactionTask.getContext(),
dataSchema));
- if (compactionTask.getTuningConfig() != null &&
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
-
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec()).forEach(builder::addOrderByColumn);
- }
return builder.build();
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java
index 269a118f6d8..7898cd8ab6a 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.processor.SegmentGeneratorStageProcessor;
@@ -36,6 +37,10 @@ import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.kernel.controller.WorkerInputs;
import org.apache.druid.query.Query;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@@ -43,7 +48,9 @@ import org.apache.druid.sql.calcite.planner.ColumnMappings;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
public class SegmentGenerationStageSpec implements TerminalStageSpec
{
@@ -70,19 +77,31 @@ public class SegmentGenerationStageSpec implements
TerminalStageSpec
final ClusterBy queryClusterBy =
queryDef.getFinalStageDefinition().getClusterBy();
// Add a segment-generation stage.
- final DataSchema dataSchema =
- SegmentGenerationUtils.makeDataSchemaForIngestion(querySpec,
querySignature, queryClusterBy, columnMappings, jsonMapper, query);
+ final DataSchema dataSchema =
SegmentGenerationUtils.makeDataSchemaForIngestion(
+ querySpec,
+ querySignature,
+ queryClusterBy,
+ columnMappings,
+ jsonMapper,
+ query
+ );
- return StageDefinition.builder(queryDef.getNextStageNumber())
- .inputs(new
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
- .maxWorkerCount(tuningConfig.getMaxNumWorkers())
- .processor(
- new SegmentGeneratorStageProcessor(
- dataSchema,
- columnMappings,
- tuningConfig
- )
+ final Map<String, VirtualColumn> clusterByVirtualColumnMappings =
getClusterByVirtualColumnMappings(
+ query,
+ queryClusterBy
);
+
+ return StageDefinition.builder(queryDef.getNextStageNumber())
+ .inputs(new
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
+ .maxWorkerCount(tuningConfig.getMaxNumWorkers())
+ .processor(
+ new SegmentGeneratorStageProcessor(
+ dataSchema,
+ columnMappings,
+ clusterByVirtualColumnMappings,
+ tuningConfig
+ )
+ );
}
public Int2ObjectMap<List<SegmentIdWithShardSpec>> getWorkerInfo(
@@ -113,4 +132,32 @@ public class SegmentGenerationStageSpec implements
TerminalStageSpec
return retVal;
}
+
+ private static Map<String, VirtualColumn>
getClusterByVirtualColumnMappings(Query<?> query, ClusterBy queryClusterBy)
+ {
+ final Map<String, VirtualColumn> clusterByVirtualColumns = new
LinkedHashMap<>();
+ if (query instanceof GroupByQuery groupByQuery) {
+ final Map<String, VirtualColumn> outputToVc = new LinkedHashMap<>();
+ for (DimensionSpec spec : groupByQuery.getDimensions()) {
+ final VirtualColumn vc =
groupByQuery.getVirtualColumns().getVirtualColumn(spec.getDimension());
+ if (vc != null) {
+ outputToVc.put(spec.getOutputName(), vc);
+ }
+ }
+ for (KeyColumn column : queryClusterBy.getColumns()) {
+ final VirtualColumn vc = outputToVc.get(column.columnName());
+ if (vc != null) {
+ clusterByVirtualColumns.put(column.columnName(), vc);
+ }
+ }
+ } else if (query instanceof ScanQuery scanQuery) {
+ for (KeyColumn column : queryClusterBy.getColumns()) {
+ final VirtualColumn vc =
scanQuery.getVirtualColumns().getVirtualColumn(column.columnName());
+ if (vc != null) {
+ clusterByVirtualColumns.put(column.columnName(), vc);
+ }
+ }
+ }
+ return clusterByVirtualColumns;
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java
index fe9418ee3fb..c80c03268fc 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.indexing.processor;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -51,6 +52,7 @@ import org.apache.druid.msq.kernel.StagePartition;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ReadableInput;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -72,6 +74,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -80,17 +83,20 @@ public class SegmentGeneratorStageProcessor implements
StageProcessor<Set<DataSe
{
private final DataSchema dataSchema;
private final ColumnMappings columnMappings;
+ private final Map<String, VirtualColumn> clusterByVirtualColumnMappings;
private final MSQTuningConfig tuningConfig;
@JsonCreator
public SegmentGeneratorStageProcessor(
@JsonProperty("dataSchema") final DataSchema dataSchema,
@JsonProperty("columnMappings") final ColumnMappings columnMappings,
+ @JsonProperty("clusterByVirtualColumnMappings") @Nullable final
Map<String, VirtualColumn> clusterByVirtualColumnMappings,
@JsonProperty("tuningConfig") final MSQTuningConfig tuningConfig
)
{
this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
this.columnMappings = Preconditions.checkNotNull(columnMappings,
"columnMappings");
+ this.clusterByVirtualColumnMappings = clusterByVirtualColumnMappings ==
null ? Map.of() : clusterByVirtualColumnMappings;
this.tuningConfig = Preconditions.checkNotNull(tuningConfig,
"tuningConfig");
}
@@ -106,6 +112,13 @@ public class SegmentGeneratorStageProcessor implements
StageProcessor<Set<DataSe
return columnMappings;
}
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ public Map<String, VirtualColumn> getClusterByVirtualColumnMappings()
+ {
+ return clusterByVirtualColumnMappings;
+ }
+
@JsonProperty
public MSQTuningConfig getTuningConfig()
{
@@ -255,13 +268,14 @@ public class SegmentGeneratorStageProcessor implements
StageProcessor<Set<DataSe
SegmentGeneratorStageProcessor that = (SegmentGeneratorStageProcessor) o;
return Objects.equals(dataSchema, that.dataSchema)
&& Objects.equals(columnMappings, that.columnMappings)
+ && Objects.equals(clusterByVirtualColumnMappings,
that.clusterByVirtualColumnMappings)
&& Objects.equals(tuningConfig, that.tuningConfig);
}
@Override
public int hashCode()
{
- return Objects.hash(dataSchema, columnMappings, tuningConfig);
+ return Objects.hash(dataSchema, columnMappings,
clusterByVirtualColumnMappings, tuningConfig);
}
@Override
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 0032a6051ac..c8b9546f017 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -211,9 +211,12 @@ public class MSQTaskQueryMaker implements QueryMaker
return querySpec;
}
- private static MSQDestination buildMSQDestination(final IngestDestination
targetDataSource,
- final ColumnMappings columnMappings, final PlannerContext plannerContext,
- final MSQTerminalStageSpecFactory terminalStageSpecFactory)
+ private static MSQDestination buildMSQDestination(
+ final IngestDestination targetDataSource,
+ final ColumnMappings columnMappings,
+ final PlannerContext plannerContext,
+ final MSQTerminalStageSpecFactory terminalStageSpecFactory
+ )
{
final QueryContext sqlQueryContext = plannerContext.queryContext();
final Object segmentGranularity = getSegmentGranularity(plannerContext);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
index b18ee8dde53..07a4ad9769c 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
@@ -46,6 +46,7 @@ import org.apache.druid.query.QueryContext;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.query.filter.FilterSegmentPruner;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
@@ -105,6 +106,7 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
.interval(Intervals.of("2000/2001"))
.shardSpec(new
DimensionRangeShardSpec(
ImmutableList.of(PARTITION_DIM),
+
VirtualColumns.EMPTY,
null,
new
StringTuple(new String[]{"foo"}),
0,
@@ -119,6 +121,7 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
.interval(Intervals.of("2000/2001"))
.shardSpec(new
DimensionRangeShardSpec(
ImmutableList.of(PARTITION_DIM),
+
VirtualColumns.EMPTY,
new
StringTuple(new String[]{"foo"}),
null,
1,
@@ -449,6 +452,7 @@ public class DartTableInputSpecSlicerTest extends
InitializedNullHandlingTest
);
final FilterSegmentPruner pruner = new FilterSegmentPruner(
new EqualityFilter(PARTITION_DIM, ColumnType.STRING, "abc", null),
+ null,
null
);
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 9d4ddde70ca..fdf58b3e874 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -56,13 +56,16 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.NotDimFilter;
import org.apache.druid.query.filter.NullFilter;
import org.apache.druid.query.filter.RangeFilter;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.transform.CompactionTransformSpec;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@@ -328,7 +331,7 @@ public class MSQReplaceTest extends MSQTestBase
SegmentId.of("foo", Intervals.ETERNITY, "test", 0)
)
)
- .setExpectedShardSpec(NumberedShardSpec.class)
+ .setExpectedShardSpec(DimensionRangeShardSpec.class)
.setExpectedResultRows(
ImmutableList.of(
new Object[]{946684800000L, "", 1.0f},
@@ -347,7 +350,7 @@ public class MSQReplaceTest extends MSQTestBase
.setExpectedLastCompactionState(
expectedCompactionState(
context,
- Collections.emptyList(),
+ List.of("v0"),
DimensionsSpec.builder()
.setDimensions(
ImmutableList.of(
@@ -357,13 +360,24 @@ public class MSQReplaceTest extends MSQTestBase
)
.setDimensionExclusions(Collections.singletonList("__time"))
.build(),
+ new CompactionTransformSpec(
+ null,
+ VirtualColumns.create(
+ new ExpressionVirtualColumn(
+ "v0",
+ "lower(\"dim1\")",
+ ColumnType.STRING,
+ TestExprMacroTable.INSTANCE
+ )
+ )
+ ),
GranularityType.ALL,
Intervals.ETERNITY
)
)
.verifyResults();
}
-
+
@MethodSource("data")
@ParameterizedTest(name = "{index}:with context {0}")
public void testReplaceOnFooWithAllClusteredByExpression(String contextName,
Map<String, Object> context)
@@ -2813,6 +2827,25 @@ public class MSQReplaceTest extends MSQTestBase
GranularityType segmentGranularity,
Interval interval
)
+ {
+ return expectedCompactionState(
+ context,
+ partitionDimensions,
+ dimensions,
+ null,
+ segmentGranularity,
+ interval
+ );
+ }
+
+ private CompactionState expectedCompactionState(
+ Map<String, Object> context,
+ List<String> partitionDimensions,
+ List<DimensionSchema> dimensions,
+ CompactionTransformSpec transformSpec,
+ GranularityType segmentGranularity,
+ Interval interval
+ )
{
return expectedCompactionState(
context,
@@ -2821,6 +2854,7 @@ public class MSQReplaceTest extends MSQTestBase
.setDimensions(dimensions)
.setDimensionExclusions(Collections.singletonList("__time"))
.build(),
+ transformSpec,
segmentGranularity,
interval
);
@@ -2833,6 +2867,25 @@ public class MSQReplaceTest extends MSQTestBase
GranularityType segmentGranularity,
Interval interval
)
+ {
+ return expectedCompactionState(
+ context,
+ partitionDimensions,
+ dimensionsSpec,
+ null,
+ segmentGranularity,
+ interval
+ );
+ }
+
+ private CompactionState expectedCompactionState(
+ Map<String, Object> context,
+ List<String> partitionDimensions,
+ DimensionsSpec dimensionsSpec,
+ CompactionTransformSpec transformSpec,
+ GranularityType segmentGranularity,
+ Interval interval
+ )
{
if (!context.containsKey(Tasks.STORE_COMPACTION_STATE_KEY)
|| !((Boolean) context.get(Tasks.STORE_COMPACTION_STATE_KEY))) {
@@ -2866,7 +2919,7 @@ public class MSQReplaceTest extends MSQTestBase
partitionsSpec,
dimensionsSpec,
metricsSpec,
- null,
+ transformSpec,
indexSpec,
granularitySpec,
null
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index fa124f44da3..1375ec64054 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -232,7 +232,7 @@ public class MSQCompactionRunnerTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: Non-string partition dimension[long_dim] of type[long] not
supported with 'range' partition spec",
+ "MSQ: Non-string partition dimension[long_dim] of type[LONG] not
supported with 'range' partition spec",
validationResult.getReason()
);
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessorTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessorTest.java
new file mode 100644
index 00000000000..7a06c419dd6
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessorTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.msq.exec.StageProcessor;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.MSQTuningConfig;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+class SegmentGeneratorStageProcessorTest
+{
+ @Test
+ public void testSerde() throws JsonProcessingException
+ {
+ ObjectMapper mapper = TestHelper.makeJsonMapper();
+ mapper.registerModules(new MSQIndexingModule().getJacksonModules());
+ SegmentGeneratorStageProcessor processor = new
SegmentGeneratorStageProcessor(
+ DataSchema.builder()
+ .withDataSource("test")
+ .withTimestamp(new TimestampSpec("timestamp", "auto", null))
+ .withGranularity(new
UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, null))
+ .withDimensions(new StringDimensionSchema("a"), new
StringDimensionSchema("b"))
+ .withAggregators(new CountAggregatorFactory("cnt"))
+ .build(),
+ new ColumnMappings(
+ List.of(
+ new ColumnMapping("d0", "__time"),
+ new ColumnMapping("d1", "a"),
+ new ColumnMapping("d2", "b"),
+ new ColumnMapping("a0", "cnt")
+ )
+ ),
+ Map.of(
+ "v0",
+ new ExpressionVirtualColumn("v0", "concat(\"a\",'foo')",
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+ ),
+ MSQTuningConfig.defaultConfig()
+ );
+
+ Assertions.assertEquals(processor,
mapper.readValue(mapper.writeValueAsString(processor), StageProcessor.class));
+ }
+
+ @Test
+ public void testEqualsAndHashCode()
+ {
+
EqualsVerifier.forClass(SegmentGeneratorStageProcessor.class).usingGetClass().verify();
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
index 55ba95a9d98..61fadac716a 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.msq.input.NilInputSlice;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.filter.EqualityFilter;
import org.apache.druid.query.filter.FilterSegmentPruner;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
@@ -60,6 +61,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
Collections.emptyList(),
new DimensionRangeShardSpec(
ImmutableList.of("dim"),
+ VirtualColumns.EMPTY,
null,
new StringTuple(new String[]{"foo"}),
0,
@@ -79,6 +81,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
Collections.emptyList(),
new DimensionRangeShardSpec(
ImmutableList.of("dim"),
+ VirtualColumns.EMPTY,
new StringTuple(new String[]{"foo"}),
null,
1,
@@ -276,6 +279,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
);
final FilterSegmentPruner pruner = new FilterSegmentPruner(
new EqualityFilter("dim", ColumnType.STRING, "bar", null),
+ null,
null
);
@@ -309,7 +313,8 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
);
final FilterSegmentPruner segmentPruner = new FilterSegmentPruner(
new EqualityFilter("dim", ColumnType.STRING, "bar", null),
- Collections.emptySet()
+ Collections.emptySet(),
+ null
);
Assert.assertEquals(
@@ -350,6 +355,7 @@ public class IndexerTableInputSpecSlicerTest extends
InitializedNullHandlingTest
);
final FilterSegmentPruner segmentPruner = new FilterSegmentPruner(
new EqualityFilter("dim", ColumnType.STRING, "bar", null),
+ null,
null
);
diff --git
a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
index cb0b88ea079..aef51e2c05d 100644
---
a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
+++
b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
@@ -21,6 +21,8 @@ package org.apache.druid.query.filter;
import com.google.common.collect.RangeSet;
import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
@@ -44,15 +46,18 @@ public class FilterSegmentPruner implements SegmentPruner
{
private final DimFilter filter;
private final Set<String> filterFields;
+ private final VirtualColumns virtualColumns;
private final Map<String, Optional<RangeSet<String>>> rangeCache;
public FilterSegmentPruner(
DimFilter filter,
- @Nullable Set<String> filterFields
+ @Nullable Set<String> filterFields,
+ @Nullable VirtualColumns virtualColumns
)
{
this.filter = InvalidInput.notNull(filter, "filter");
this.filterFields = filterFields == null ? filter.getRequiredColumns() :
filterFields;
+ this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY :
virtualColumns;
this.rangeCache = new HashMap<>();
}
@@ -88,10 +93,25 @@ public class FilterSegmentPruner implements SegmentPruner
Map<String, RangeSet<String>> filterDomain = new HashMap<>();
List<String> dimensions = shard.getDomainDimensions();
for (String dimension : dimensions) {
- if (filterFields == null || filterFields.contains(dimension)) {
+ final VirtualColumn shardVirtualColumn =
shard.getDomainVirtualColumns().getVirtualColumn(dimension);
+ if (shardVirtualColumn != null) {
+ final VirtualColumn queryEquivalent =
virtualColumns.findEquivalent(shardVirtualColumn);
+ if (queryEquivalent != null) {
+ if (filterFields == null ||
filterFields.contains(queryEquivalent.getOutputName())) {
+ Optional<RangeSet<String>> optFilterRangeSet = rangeCache
+ .computeIfAbsent(
+ queryEquivalent.getOutputName(),
+ d ->
Optional.ofNullable(filter.getDimensionRangeSet(d))
+ );
+ optFilterRangeSet.ifPresent(stringRangeSet -> filterDomain.put(
+ shardVirtualColumn.getOutputName(),
+ stringRangeSet
+ ));
+ }
+ }
+ } else if (filterFields == null || filterFields.contains(dimension))
{
Optional<RangeSet<String>> optFilterRangeSet =
rangeCache.computeIfAbsent(dimension, d ->
Optional.ofNullable(filter.getDimensionRangeSet(d)));
-
optFilterRangeSet.ifPresent(stringRangeSet ->
filterDomain.put(dimension, stringRangeSet));
}
}
@@ -114,13 +134,15 @@ public class FilterSegmentPruner implements SegmentPruner
return false;
}
FilterSegmentPruner that = (FilterSegmentPruner) o;
- return Objects.equals(filter, that.filter) && Objects.equals(filterFields,
that.filterFields);
+ return Objects.equals(filter, that.filter) &&
+ Objects.equals(filterFields, that.filterFields) &&
+ Objects.equals(virtualColumns, that.virtualColumns);
}
@Override
public int hashCode()
{
- return Objects.hash(filter, filterFields);
+ return Objects.hash(filter, filterFields, virtualColumns);
}
@Override
@@ -129,7 +151,7 @@ public class FilterSegmentPruner implements SegmentPruner
return "FilterSegmentPruner{" +
"filter=" + filter +
", filterFields=" + filterFields +
- ", rangeCache=" + rangeCache +
+ ", virtualColumns=" + virtualColumns +
'}';
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index 9faf0226bcb..d31f130f665 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -1088,6 +1088,14 @@ public class GroupByQuery extends BaseQuery<ResultRow>
return this;
}
+ public Builder setOrderByColumns(List<OrderByColumnSpec> columnSpec)
+ {
+ ensureExplicitLimitSpecNotSet();
+ this.orderByColumnSpecs = new ArrayList<>(columnSpec);
+ this.postProcessingFn = null;
+ return this;
+ }
+
public Builder setLimitSpec(LimitSpec limitSpec)
{
Preconditions.checkNotNull(limitSpec);
diff --git
a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
index d5baa72dd08..64a974771a2 100644
---
a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
+++
b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
@@ -232,7 +232,8 @@ public class ExecutionVertex
return new FilterSegmentPruner(
topQuery.getFilter(),
- baseFields
+ baseFields,
+ topQuery.getVirtualColumns()
);
}
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
index 9d5abd6f76e..b1e0d01c747 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
@@ -24,6 +24,7 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.segment.VirtualColumns;
import javax.annotation.Nullable;
import java.util.Arrays;
@@ -33,6 +34,7 @@ import java.util.List;
public abstract class BaseDimensionRangeShardSpec implements ShardSpec
{
protected final List<String> dimensions;
+ protected final VirtualColumns virtualColumns;
@Nullable
protected final StringTuple start;
@Nullable
@@ -40,11 +42,13 @@ public abstract class BaseDimensionRangeShardSpec
implements ShardSpec
protected BaseDimensionRangeShardSpec(
List<String> dimensions,
+ @Nullable VirtualColumns virtualColumns,
@Nullable StringTuple start,
@Nullable StringTuple end
)
{
this.dimensions = dimensions;
+ this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY :
virtualColumns;
this.start = start;
this.end = end;
}
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
index 308e36474a6..c15f58370a7 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
import javax.annotation.Nullable;
import java.util.List;
@@ -109,6 +110,7 @@ public class BuildingDimensionRangeShardSpec implements
BuildingShardSpec<Dimens
numCorePartitions
) : new DimensionRangeShardSpec(
dimensions,
+ VirtualColumns.EMPTY,
start,
end,
partitionId,
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
index c37ea140057..fc39f02b02c 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
import javax.annotation.Nullable;
import java.util.List;
@@ -51,7 +52,7 @@ public class DimensionRangeBucketShardSpec extends
BaseDimensionRangeShardSpec
@JsonProperty("end") @Nullable StringTuple end
)
{
- super(dimensions, start, end);
+ super(dimensions, VirtualColumns.EMPTY, start, end);
// Verify that the tuple sizes and number of dimensions are the same
Preconditions.checkArgument(
start == null || start.size() == dimensions.size(),
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
index a0a850dae08..fea3d82cb5b 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
@@ -20,12 +20,14 @@
package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet;
import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -53,13 +55,14 @@ public class DimensionRangeShardSpec extends
BaseDimensionRangeShardSpec
@JsonCreator
public DimensionRangeShardSpec(
@JsonProperty("dimensions") List<String> dimensions,
+ @JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns,
@JsonProperty("start") @Nullable StringTuple start,
@JsonProperty("end") @Nullable StringTuple end,
@JsonProperty("partitionNum") int partitionNum,
@JsonProperty("numCorePartitions") @Nullable Integer numCorePartitions
// nullable for backward compatibility
)
{
- super(dimensions, start, end);
+ super(dimensions, virtualColumns, start, end);
Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0");
Preconditions.checkArgument(
dimensions != null && !dimensions.isEmpty(),
@@ -76,6 +79,13 @@ public class DimensionRangeShardSpec extends
BaseDimensionRangeShardSpec
return dimensions;
}
+ @JsonProperty("virtualColumns")
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ public VirtualColumns getVirtualColumns()
+ {
+ return virtualColumns;
+ }
+
@Nullable
@JsonProperty("start")
public StringTuple getStartTuple()
@@ -107,13 +117,13 @@ public class DimensionRangeShardSpec extends
BaseDimensionRangeShardSpec
@Override
public ShardSpec withPartitionNum(int partitionNum)
{
- return new DimensionRangeShardSpec(dimensions, start, end, partitionNum,
numCorePartitions);
+ return new DimensionRangeShardSpec(dimensions, virtualColumns, start, end,
partitionNum, numCorePartitions);
}
@Override
public ShardSpec withCorePartitions(int partitions)
{
- return new DimensionRangeShardSpec(dimensions, start, end, partitionNum,
partitions);
+ return new DimensionRangeShardSpec(dimensions, virtualColumns, start, end,
partitionNum, partitions);
}
public boolean isNumCorePartitionsUnknown()
@@ -127,6 +137,12 @@ public class DimensionRangeShardSpec extends
BaseDimensionRangeShardSpec
return Collections.unmodifiableList(dimensions);
}
+ @Override
+ public VirtualColumns getDomainVirtualColumns()
+ {
+ return virtualColumns;
+ }
+
/**
* Set[:i] is the cartesian product of Set[0],...,Set[i - 1]
* EffectiveDomain[:i] is defined as QueryDomain[:i] INTERSECTION
SegmentRange[:i]
@@ -289,6 +305,7 @@ public class DimensionRangeShardSpec extends
BaseDimensionRangeShardSpec
return partitionNum == shardSpec.partitionNum &&
numCorePartitions == shardSpec.numCorePartitions &&
Objects.equals(dimensions, shardSpec.dimensions) &&
+ Objects.equals(virtualColumns, shardSpec.virtualColumns) &&
Objects.equals(start, shardSpec.start) &&
Objects.equals(end, shardSpec.end);
}
@@ -296,7 +313,7 @@ public class DimensionRangeShardSpec extends
BaseDimensionRangeShardSpec
@Override
public int hashCode()
{
- return Objects.hash(dimensions, start, end, partitionNum,
numCorePartitions);
+ return Objects.hash(dimensions, virtualColumns, start, end, partitionNum,
numCorePartitions);
}
@Override
@@ -304,6 +321,7 @@ public class DimensionRangeShardSpec extends
BaseDimensionRangeShardSpec
{
return "DimensionRangeShardSpec{" +
"dimensions='" + dimensions + '\'' +
+ ", virtualColumns=" + virtualColumns +
", start='" + start + '\'' +
", end='" + end + '\'' +
", partitionNum=" + partitionNum +
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
index b91c96387a9..da1c046465b 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.RangeSet;
import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
import java.util.List;
import java.util.Map;
@@ -136,13 +138,28 @@ public interface ShardSpec
ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs);
/**
- * Get dimensions who have possible range for the rows this shard contains.
+ * Get dimensions who have possible range for the rows this shard contains.
These columns might be physical columns
+ * stored in the shard, or computed expressions, in which case the manner in
which they were computed is available in
+ * {@link #getDomainVirtualColumns()}.
*
- * @return list of dimensions who has its possible range. Dimensions with
unknown possible range are not listed
+ * @return list of dimensions who has its possible range. Dimensions with
unknown possible range are not listed.
*/
@JsonIgnore
List<String> getDomainDimensions();
+ /**
+ * If any of the columns in {@link #getDomainDimensions()} was computed with
an expression and was not stored, the
+ * {@link org.apache.druid.segment.VirtualColumn} which computes it is
stored here. This allows matching ranges even
+ * when the value is not stored in the shard so long as {@link
VirtualColumns#findEquivalent(VirtualColumn)} exists.
+ *
+ * @return {@link VirtualColumns} associated with columns listed in {@link
#getDomainDimensions()}.
+ */
+ @JsonIgnore
+ default VirtualColumns getDomainVirtualColumns()
+ {
+ return VirtualColumns.EMPTY;
+ }
+
/**
* if given domain ranges are not possible in this shard, return false;
otherwise return true;
*
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
index e47b5aafaea..ae9ea1fb416 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.timeline.partition;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -52,6 +53,7 @@ public class SingleDimensionRangeBucketShardSpec extends
BaseDimensionRangeShard
{
super(
dimension == null ? Collections.emptyList() :
Collections.singletonList(dimension),
+ VirtualColumns.EMPTY,
start == null ? null : StringTuple.create(start),
end == null ? null : StringTuple.create(end)
);
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
index 59ef1be3d6a..aa346adabbf 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
import javax.annotation.Nullable;
import java.util.Collections;
@@ -62,6 +63,7 @@ public class SingleDimensionShardSpec extends
DimensionRangeShardSpec
{
super(
dimension == null ? Collections.emptyList() :
Collections.singletonList(dimension),
+ VirtualColumns.EMPTY,
start == null ? null : StringTuple.create(start),
end == null ? null : StringTuple.create(end),
partitionNum,
diff --git
a/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
index 131337ad468..a1a1dba0c28 100644
---
a/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
@@ -24,7 +24,9 @@ import org.apache.druid.data.input.StringTuple;
import org.apache.druid.error.DruidException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
@@ -46,7 +48,7 @@ class FilterSegmentPrunerTest
{
Throwable t = Assertions.assertThrows(
DruidException.class,
- () -> new FilterSegmentPruner(null, null)
+ () -> new FilterSegmentPruner(null, null, null)
);
Assertions.assertEquals("filter must not be null", t.getMessage());
}
@@ -70,13 +72,73 @@ class FilterSegmentPrunerTest
List<DataSegment> segs = List.of(seg1, seg2, seg3, seg4, seg5, seg6, seg7);
- FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null);
- FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a,
Collections.emptySet());
- FilterSegmentPruner prunerExpression = new
FilterSegmentPruner(expression_b, null);
+ FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null,
null);
+ FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a,
Collections.emptySet(), null);
+ FilterSegmentPruner prunerExpression = new
FilterSegmentPruner(expression_b, null, null);
+ // prune twice to exercise cache
Assertions.assertEquals(Set.of(seg1, seg4, seg5, seg6, seg7),
prunerRange.prune(segs, Function.identity()));
+ Assertions.assertEquals(Set.of(seg1, seg4, seg5, seg6, seg7),
prunerRange.prune(segs, Function.identity()));
+ Assertions.assertEquals(Set.copyOf(segs), prunerExpression.prune(segs,
Function.identity()));
Assertions.assertEquals(Set.copyOf(segs), prunerExpression.prune(segs,
Function.identity()));
Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs,
Function.identity()));
+ Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs,
Function.identity()));
+ }
+
+ @Test
+ void testPruneVirtualColumn()
+ {
+ VirtualColumns shardVirtualColumns = VirtualColumns.create(
+ new ExpressionVirtualColumn("vdim1", "concat(dim1, 'foo')",
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+ );
+ VirtualColumns shardVirtualColumnsDifferentName = VirtualColumns.create(
+ new ExpressionVirtualColumn("vdifferentname", "concat(dim1, 'foo')",
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+ );
+
+ String interval1 = "2026-02-18T00:00:00Z/2026-02-19T00:00:00Z";
+
+ DataSegment seg1 = makeDataSegment(
+ interval1,
+ makeRange(List.of("vdim1"), shardVirtualColumns, 0, null,
StringTuple.create("abcfoo"))
+ );
+ DataSegment seg2 = makeDataSegment(
+ interval1,
+ makeRange(List.of("vdim1"), shardVirtualColumns, 1,
StringTuple.create("abcfoo"), StringTuple.create("lmnfoo"))
+ );
+ // same virtual column with a different name in this segment
+ DataSegment seg3 = makeDataSegment(
+ interval1,
+ makeRange(List.of("vdifferentname"), shardVirtualColumnsDifferentName,
2, StringTuple.create("lmnfoo"), null)
+ );
+
+ List<DataSegment> segs = List.of(seg1, seg2, seg3);
+
+ // same expression, same name
+ VirtualColumns queryVirtualColumns = VirtualColumns.create(
+ new ExpressionVirtualColumn("vdim1", "concat(dim1, 'foo')",
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+ );
+ DimFilter range_a = new RangeFilter("vdim1", ColumnType.STRING, null,
"aaa", null, null, null);
+ FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null,
queryVirtualColumns);
+ FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a,
Collections.emptySet(), queryVirtualColumns);
+ // prune twice to exercise cache
+ Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs,
Function.identity()));
+ Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs,
Function.identity()));
+ Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs,
Function.identity()));
+ Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs,
Function.identity()));
+
+ // same expression, different name
+ queryVirtualColumns = VirtualColumns.create(
+ new ExpressionVirtualColumn("v0", "concat(dim1, 'foo')",
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+ );
+ range_a = new RangeFilter("v0", ColumnType.STRING, null, "aaa", null,
null, null);
+ prunerRange = new FilterSegmentPruner(range_a, null, queryVirtualColumns);
+ prunerEmptyFields = new FilterSegmentPruner(range_a,
Collections.emptySet(), queryVirtualColumns);
+
+ // prune twice to exercise cache
+ Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs,
Function.identity()));
+ Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs,
Function.identity()));
+ Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs,
Function.identity()));
+ Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs,
Function.identity()));
}
@Test
@@ -106,9 +168,27 @@ class FilterSegmentPrunerTest
@Nullable StringTuple start,
@Nullable StringTuple end
)
+ {
+ return makeRange(
+ columns,
+ null,
+ partitionNumber,
+ start,
+ end
+ );
+ }
+
+ private ShardSpec makeRange(
+ List<String> columns,
+ VirtualColumns virtualColumns,
+ int partitionNumber,
+ @Nullable StringTuple start,
+ @Nullable StringTuple end
+ )
{
return new DimensionRangeShardSpec(
columns,
+ virtualColumns,
start,
end,
partitionNumber,
diff --git
a/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
b/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
index b8099e227a4..6f7af3fecb8 100644
---
a/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
+++
b/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
import org.junit.Assert;
import org.junit.Test;
@@ -39,6 +40,7 @@ public class BuildingDimensionRangeShardSpecTest
Assert.assertEquals(
new DimensionRangeShardSpec(
Arrays.asList("dim1", "dim2"),
+ VirtualColumns.EMPTY,
StringTuple.create("start1", "start2"),
StringTuple.create("end1", "end2"),
5,
diff --git
a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
index f4a2d88b5cd..7eb607ecffb 100644
---
a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
+++
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
@@ -198,6 +198,9 @@ public class DimensionRangeBucketShardSpecTest
@Test
public void testEquals()
{
-
EqualsVerifier.forClass(DimensionRangeBucketShardSpec.class).usingGetClass().verify();
+ EqualsVerifier.forClass(DimensionRangeBucketShardSpec.class)
+ .usingGetClass()
+ .withIgnoredFields("virtualColumns")
+ .verify();
}
}
diff --git
a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
index ade40e6ae38..0f62da3518b 100644
---
a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
+++
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.segment.VirtualColumns;
import org.junit.Assert;
import org.junit.Test;
@@ -50,15 +51,30 @@ public class DimensionRangeShardSpecTest
setDimensions("dim1", "dim2");
final List<ShardSpec> shardSpecs = ImmutableList.of(
- new DimensionRangeShardSpec(dimensions, null,
StringTuple.create("India", "Delhi"), 1, 1),
new DimensionRangeShardSpec(
dimensions,
+ VirtualColumns.EMPTY,
+ null,
+ StringTuple.create("India", "Delhi"),
+ 1,
+ 1
+ ),
+ new DimensionRangeShardSpec(
+ dimensions,
+ VirtualColumns.EMPTY,
StringTuple.create("India", "Delhi"),
StringTuple.create("Spain", "Valencia"),
2,
1
),
- new DimensionRangeShardSpec(dimensions, StringTuple.create("Spain",
"Valencia"), null, 3, 1)
+ new DimensionRangeShardSpec(
+ dimensions,
+ VirtualColumns.EMPTY,
+ StringTuple.create("Spain", "Valencia"),
+ null,
+ 3,
+ 1
+ )
);
final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs);
final long currentTime = DateTimes.nowUtc().getMillis();
@@ -143,6 +159,7 @@ public class DimensionRangeShardSpecTest
final DimensionRangeShardSpec shard0 = new DimensionRangeShardSpec(
dimensions,
+ VirtualColumns.EMPTY,
null,
StringTuple.create("India", null),
1,
@@ -151,6 +168,7 @@ public class DimensionRangeShardSpecTest
final DimensionRangeShardSpec shard1 = new DimensionRangeShardSpec(
dimensions,
+ VirtualColumns.EMPTY,
StringTuple.create("India", null),
StringTuple.create("Spain", "Valencia"),
10,
@@ -159,6 +177,7 @@ public class DimensionRangeShardSpecTest
final DimensionRangeShardSpec shard2 = new DimensionRangeShardSpec(
dimensions,
+ VirtualColumns.EMPTY,
StringTuple.create("Spain", "Valencia"),
StringTuple.create("Tokyo", null),
10,
@@ -167,6 +186,7 @@ public class DimensionRangeShardSpecTest
final DimensionRangeShardSpec shard3 = new DimensionRangeShardSpec(
dimensions,
+ VirtualColumns.EMPTY,
StringTuple.create("Tokyo", null),
null,
100,
@@ -202,7 +222,7 @@ public class DimensionRangeShardSpecTest
final RangeSet<String> universalSet = TreeRangeSet.create();
universalSet.add(Range.all());
- ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0,
null);
+ ShardSpec shard = new DimensionRangeShardSpec(dimensions,
VirtualColumns.EMPTY, start, end, 0, null);
Map<String, RangeSet<String>> domain = new HashMap<>();
// {Mars} * {Zoo, Zuu} * {Blah, Random}
@@ -265,7 +285,7 @@ public class DimensionRangeShardSpecTest
final RangeSet<String> universalSet = TreeRangeSet.create();
universalSet.add(Range.all());
- ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0,
null);
+ ShardSpec shard = new DimensionRangeShardSpec(dimensions,
VirtualColumns.EMPTY, start, end, 0, null);
Map<String, RangeSet<String>> domain = new HashMap<>();
// (-INF, INF) * (-INF, INF) * (-INF, INF)
@@ -345,7 +365,7 @@ public class DimensionRangeShardSpecTest
final RangeSet<String> universalSet = TreeRangeSet.create();
universalSet.add(Range.all());
- ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0,
null);
+ ShardSpec shard = new DimensionRangeShardSpec(dimensions,
VirtualColumns.EMPTY, start, end, 0, null);
Map<String, RangeSet<String>> domain = new HashMap<>();
// (-INF, INF) * (-INF, INF) * (-INF, INF)
@@ -442,7 +462,7 @@ public class DimensionRangeShardSpecTest
final RangeSet<String> universalSet = TreeRangeSet.create();
universalSet.add(Range.all());
- ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0,
null);
+ ShardSpec shard = new DimensionRangeShardSpec(dimensions,
VirtualColumns.EMPTY, start, end, 0, null);
Map<String, RangeSet<String>> domain = new HashMap<>();
// (-INF, Earth) U (Earth, INF) * (-INF, INF) * (-INF, INF)
@@ -504,7 +524,7 @@ public class DimensionRangeShardSpecTest
final RangeSet<String> universalSet = TreeRangeSet.create();
universalSet.add(Range.all());
- ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0,
null);
+ ShardSpec shard = new DimensionRangeShardSpec(dimensions,
VirtualColumns.EMPTY, start, end, 0, null);
Map<String, RangeSet<String>> domain = new HashMap<>();
// {Earth} U {Mars} * (USA, INF) * (-INF, INF)
diff --git
a/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
b/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
index 2c3442645d2..daad7aafc94 100644
---
a/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
+++
b/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.timeline.DataSegment;
import org.junit.Assert;
import org.junit.Test;
@@ -86,6 +87,7 @@ public class PartitionHolderCompletenessTest
ImmutableList.of(
new DimensionRangeShardSpec(
Collections.singletonList("dim"),
+ VirtualColumns.EMPTY,
null,
StringTuple.create("aaa"),
0,
@@ -93,6 +95,7 @@ public class PartitionHolderCompletenessTest
),
new DimensionRangeShardSpec(
Collections.singletonList("dim"),
+ VirtualColumns.EMPTY,
StringTuple.create("ttt"),
StringTuple.create("zzz"),
2,
@@ -100,6 +103,7 @@ public class PartitionHolderCompletenessTest
),
new DimensionRangeShardSpec(
Collections.singletonList("dim"),
+ VirtualColumns.EMPTY,
StringTuple.create("bbb"),
StringTuple.create("fff"),
1,
@@ -116,6 +120,7 @@ public class PartitionHolderCompletenessTest
ImmutableList.of(
new DimensionRangeShardSpec(
Collections.singletonList("dim"),
+ VirtualColumns.EMPTY,
StringTuple.create("bbb"),
StringTuple.create("fff"),
1,
@@ -123,6 +128,7 @@ public class PartitionHolderCompletenessTest
),
new DimensionRangeShardSpec(
Collections.singletonList("dim"),
+ VirtualColumns.EMPTY,
StringTuple.create("fff"),
null,
2,
@@ -130,6 +136,7 @@ public class PartitionHolderCompletenessTest
),
new DimensionRangeShardSpec(
Collections.singletonList("dim"),
+ VirtualColumns.EMPTY,
null,
StringTuple.create("bbb"),
0,
diff --git
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index 532c7197718..c66def98793 100644
---
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -28,7 +28,12 @@ import
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.virtual.VirtualizedColumnInspector;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
@@ -38,9 +43,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
/**
@@ -119,7 +121,8 @@ public class ClientCompactionRunnerInfo
if (newConfig.getTuningConfig() != null) {
validationResults.add(validatePartitionsSpecForMSQ(
newConfig.getTuningConfig().getPartitionsSpec(),
- newConfig.getDimensionsSpec() == null ? null :
newConfig.getDimensionsSpec().getDimensions()
+ newConfig.getDimensionsSpec() == null ? null :
newConfig.getDimensionsSpec().getDimensions(),
+ newConfig.getTransformSpec() == null ? VirtualColumns.EMPTY :
newConfig.getTransformSpec().getVirtualColumns()
));
}
if (newConfig.getGranularitySpec() != null) {
@@ -142,7 +145,8 @@ public class ClientCompactionRunnerInfo
*/
public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
@Nullable PartitionsSpec partitionsSpec,
- @Nullable List<DimensionSchema> dimensionSchemas
+ @Nullable List<DimensionSchema> dimensionSchemas,
+ VirtualColumns virtualColumns
)
{
if (partitionsSpec == null) {
@@ -165,19 +169,36 @@ public class ClientCompactionRunnerInfo
);
}
if (partitionsSpec instanceof DimensionRangePartitionsSpec &&
dimensionSchemas != null) {
- Map<String, DimensionSchema> dimensionSchemaMap =
dimensionSchemas.stream().collect(
- Collectors.toMap(DimensionSchema::getName, Function.identity())
- );
- Optional<String> nonStringDimension = ((DimensionRangePartitionsSpec)
partitionsSpec)
- .getPartitionDimensions()
- .stream()
- .filter(dim ->
!ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType()))
- .findAny();
- if (nonStringDimension.isPresent()) {
+ RowSignature.Builder baseSignatureBuilder =
RowSignature.builder().addTimeColumn();
+ for (DimensionSchema dimensionSchema : dimensionSchemas) {
+ baseSignatureBuilder.add(dimensionSchema.getName(),
dimensionSchema.getColumnType());
+ }
+ final RowSignature baseSignature = baseSignatureBuilder.build();
+ final ColumnInspector inspector = new
VirtualizedColumnInspector(baseSignature, virtualColumns);
+
+ String nonString = null;
+ ColumnType nonStringType = null;
+ for (String dim : ((DimensionRangePartitionsSpec)
partitionsSpec).getPartitionDimensions()) {
+ ColumnType partitionType =
baseSignature.getColumnType(dim).orElse(null);
+ if (partitionType == null) {
+ VirtualColumn virtualColumn = virtualColumns.getVirtualColumn(dim);
+ if (virtualColumn != null) {
+ partitionType = ColumnType.fromCapabilities(
+ virtualColumn.capabilities(inspector,
virtualColumn.getOutputName())
+ );
+ }
+ }
+ if (!ColumnType.STRING.equals(partitionType)) {
+ nonString = dim;
+ nonStringType = partitionType;
+ break;
+ }
+ }
+ if (nonString != null) {
return CompactionConfigValidationResult.failure(
"MSQ: Non-string partition dimension[%s] of type[%s] not supported
with 'range' partition spec",
- nonStringDimension.get(),
- dimensionSchemaMap.get(nonStringDimension.get()).getTypeName()
+ nonString,
+ nonStringType
);
}
}
diff --git
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index f0d35ef324b..2730c98899e 100644
---
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -161,7 +161,7 @@ public class ClientCompactionRunnerInfoTest
);
Assert.assertFalse(validationResult.isValid());
Assert.assertEquals(
- "MSQ: Non-string partition dimension[partitionDim] of type[long] not
supported with 'range' partition spec",
+ "MSQ: Non-string partition dimension[partitionDim] of type[LONG] not
supported with 'range' partition spec",
validationResult.getReason()
);
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 8e166dbfcf9..91963e3a96d 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -46,6 +46,7 @@ import
org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.FingerprintGenerator;
import org.apache.druid.segment.metadata.HeapMemoryIndexingStateStorage;
@@ -670,7 +671,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
ImmutableMap.of("path", "a-" + i),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
- new DimensionRangeShardSpec(List.of("dim1"), null, null, i - 1, 8),
+ new DimensionRangeShardSpec(List.of("dim1"), null, null, null, i -
1, 8),
9,
100
);
@@ -695,7 +696,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
ImmutableMap.of("path", "b-" + i),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
- new DimensionRangeShardSpec(List.of("dim1"), null, null, i - 1, 8),
+ new DimensionRangeShardSpec(List.of("dim1"), null, null, null, i -
1, 8),
9,
100
);
@@ -3622,6 +3623,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
metrics,
new DimensionRangeShardSpec(
Collections.singletonList("dim"),
+ VirtualColumns.EMPTY,
i == 0 ? null : StringTuple.create(String.valueOf(i - 1)),
i == 5 ? null : StringTuple.create(String.valueOf(i)),
i,
@@ -4316,7 +4318,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
loadspec,
dimensions,
metrics,
- new DimensionRangeShardSpec(dimensions, null, null, 0, 1),
+ new DimensionRangeShardSpec(dimensions, VirtualColumns.EMPTY, null,
null, 0, 1),
0,
100
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]