This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a126e2bfa46 Add support for MSQ CLUSTERED BY expressions to be 
preserved in the segment shard spec as virtual columns (#19061)
a126e2bfa46 is described below

commit a126e2bfa46e6da19b3fe3966fe23b8d185d7acc
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Mar 19 11:08:10 2026 -0700

    Add support for MSQ CLUSTERED BY expressions to be preserved in the segment 
shard spec as virtual columns (#19061)
    
    changes:
    * `ShardSpec` interface has a new method, `getDomainVirtualColumns` to 
provide the virtual column information for pruning
    * `DimensionRangeShardSpec` stores `VirtualColumns` in segment metadata so 
they can be compared to query expressions and be used for pruning
    * `FilterSegmentPruner` is virtual column aware for segment pruning using 
the new methods
    * `SegmentGeneratorStageProcessor` now contains a map of column name to 
`VirtualColumn` alongside, to support cluster key columns being virtual columns
    * `ControllerImpl` persists clustering virtual columns in compaction state 
in the transform spec
    * `MSQCompactionRunner` handles virtual columns in order-by/cluster-by for 
compaction
---
 .../timeline/DimensionRangeShardSpecBenchmark.java |   4 +
 .../embedded/compact/CompactionSupervisorTest.java | 283 +++++++++++++++++----
 .../testing/embedded/msq/MultiStageQueryTest.java  | 206 ++++++++++++++-
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 109 +++++---
 .../druid/msq/indexing/MSQCompactionRunner.java    | 181 ++++++++-----
 .../destination/SegmentGenerationStageSpec.java    |  69 ++++-
 .../processor/SegmentGeneratorStageProcessor.java  |  16 +-
 .../apache/druid/msq/sql/MSQTaskQueryMaker.java    |   9 +-
 .../controller/DartTableInputSpecSlicerTest.java   |   4 +
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  |  61 ++++-
 .../msq/indexing/MSQCompactionRunnerTest.java      |   2 +-
 .../SegmentGeneratorStageProcessorTest.java        |  84 ++++++
 .../table/IndexerTableInputSpecSlicerTest.java     |   8 +-
 .../druid/query/filter/FilterSegmentPruner.java    |  34 ++-
 .../apache/druid/query/groupby/GroupByQuery.java   |   8 +
 .../druid/query/planning/ExecutionVertex.java      |   3 +-
 .../partition/BaseDimensionRangeShardSpec.java     |   4 +
 .../partition/BuildingDimensionRangeShardSpec.java |   2 +
 .../partition/DimensionRangeBucketShardSpec.java   |   3 +-
 .../partition/DimensionRangeShardSpec.java         |  26 +-
 .../apache/druid/timeline/partition/ShardSpec.java |  21 +-
 .../SingleDimensionRangeBucketShardSpec.java       |   2 +
 .../partition/SingleDimensionShardSpec.java        |   2 +
 .../query/filter/FilterSegmentPrunerTest.java      |  88 ++++++-
 .../BuildingDimensionRangeShardSpecTest.java       |   2 +
 .../DimensionRangeBucketShardSpecTest.java         |   5 +-
 .../partition/DimensionRangeShardSpecTest.java     |  34 ++-
 .../partition/PartitionHolderCompletenessTest.java |   7 +
 .../indexing/ClientCompactionRunnerInfo.java       |  53 ++--
 .../indexing/ClientCompactionRunnerInfoTest.java   |   2 +-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |   8 +-
 31 files changed, 1120 insertions(+), 220 deletions(-)

diff --git 
a/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java
 
b/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java
index 9ea11e48eef..df1bd027a9a 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/timeline/DimensionRangeShardSpecBenchmark.java
@@ -26,6 +26,7 @@ import com.google.common.collect.RangeSet;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.filter.InDimFilter;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.junit.Assert;
 import org.openjdk.jmh.annotations.Benchmark;
@@ -73,6 +74,7 @@ public class DimensionRangeShardSpecBenchmark
   // Initial segment (null -> values)
   private final DimensionRangeShardSpec shardSpec0 = new 
DimensionRangeShardSpec(
       Arrays.asList("country", "city"),
+      VirtualColumns.EMPTY,
       new StringTuple(new String[]{null, null}),
       new StringTuple(new String[]{"Germany", "Munich"}),
       0,
@@ -82,6 +84,7 @@ public class DimensionRangeShardSpecBenchmark
   // Middle segment (values -> other values)
   private final DimensionRangeShardSpec shardSpec1 = new 
DimensionRangeShardSpec(
       Arrays.asList("country", "city"),
+      VirtualColumns.EMPTY,
       new StringTuple(new String[]{"Germany", "Munich"}),
       new StringTuple(new String[]{"United States", "New York"}),
       1,
@@ -91,6 +94,7 @@ public class DimensionRangeShardSpecBenchmark
   // End segment (values -> null)
   private final DimensionRangeShardSpec shardSpec2 = new 
DimensionRangeShardSpec(
       Arrays.asList("country", "city"),
+      VirtualColumns.EMPTY,
       new StringTuple(new String[]{"United States", "New York"}),
       new StringTuple(new String[]{null, null}),
       2,
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
index bc9f7236530..838bb559989 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.testing.embedded.compact;
 import org.apache.druid.catalog.guice.CatalogClientModule;
 import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
 import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.data.input.impl.JsonInputFormat;
@@ -51,6 +52,7 @@ import 
org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.EqualityFilter;
 import org.apache.druid.query.filter.NotDimFilter;
@@ -93,6 +95,7 @@ import org.apache.druid.testing.tools.JsonEventSerializer;
 import org.apache.druid.testing.tools.StreamGenerator;
 import org.apache.druid.testing.tools.WikipediaStreamEventStreamGenerator;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
@@ -150,22 +153,6 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
                                .addServer(new EmbeddedRouter());
   }
 
-
-  private void configureCompaction(CompactionEngine compactionEngine, 
@Nullable CompactionCandidateSearchPolicy policy)
-  {
-    final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
-        o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(
-            1.0,
-            100,
-            policy,
-            true,
-            compactionEngine,
-            true
-        ))
-    );
-    Assertions.assertTrue(updateResponse.isSuccess());
-  }
-
   @MethodSource("getEngine")
   @ParameterizedTest(name = "compactionEngine={0}")
   public void 
test_ingestDayGranularity_andCompactToMonthGranularity_andCompactToYearGranularity_withInlineConfig(
@@ -307,41 +294,6 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
     Assertions.assertEquals(4000, getTotalRowCount());
   }
 
-  protected void ingest1kRecords()
-  {
-    final EventSerializer serializer = new 
JsonEventSerializer(overlord.bindings().jsonMapper());
-    final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(serializer, 500, 100);
-    List<byte[]> records = streamGenerator.generateEvents(2);
-
-    final InlineInputSource input = new InlineInputSource(
-        records.stream().map(b -> new String(b, 
StandardCharsets.UTF_8)).collect(Collectors.joining("\n")));
-    final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
-        input,
-        new JsonInputFormat(null, null, null, null, null),
-        true,
-        null
-    );
-    final ParallelIndexIngestionSpec indexIngestionSpec = new 
ParallelIndexIngestionSpec(
-        DataSchema.builder()
-                  .withDataSource(dataSource)
-                  .withTimestamp(new TimestampSpec("timestamp", "iso", null))
-                  
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
-                  .build(),
-        ioConfig,
-        TuningConfigBuilder.forParallelIndexTask().build()
-    );
-    final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
-    final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
-        taskId,
-        null,
-        null,
-        indexIngestionSpec,
-        null
-    );
-    cluster.callApi().submitTask(task);
-    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
-  }
-
   @MethodSource("getEngine")
   @ParameterizedTest(name = "compactionEngine={0}")
   public void 
test_compaction_withPersistLastCompactionStateFalse_storesOnlyFingerprint(CompactionEngine
 compactionEngine)
@@ -575,6 +527,184 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
     Assertions.assertTrue(count > 0);
   }
 
+  @Test
+  public void test_compaction_cluster_by_virtualcolumn()
+  {
+    // Virtual Columns on nested data is only supported with MSQ compaction 
engine right now.
+    CompactionEngine compactionEngine = CompactionEngine.MSQ;
+    configureCompaction(compactionEngine, null);
+
+    String jsonDataWithNestedColumn =
+        """
+            {"timestamp": "2023-01-01T00:00:00", "str":"a",    "obj":{"a": 
"ll"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"",     "obj":{"a": 
"mm"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a": 
"nn"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"b",    "obj":{"a": 
"oo"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"c",    "obj":{"a": 
"pp"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"d",    "obj":{"a": 
"qq"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":null,   "obj":{"a": 
"rr"}}
+            """;
+
+    final TaskBuilder.Index task = TaskBuilder
+        .ofTypeIndex()
+        .dataSource(dataSource)
+        .jsonInputFormat()
+        .inlineInputSourceWithData(jsonDataWithNestedColumn)
+        .isoTimestampColumn("timestamp")
+        .schemaDiscovery()
+        .granularitySpec("DAY", null, false);
+
+    cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+
+    Assertions.assertEquals(7, getTotalRowCount());
+
+    VirtualColumns virtualColumns = VirtualColumns.create(
+        new ExpressionVirtualColumn("v0", "json_value(obj, '$.a')", 
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+    );
+
+    InlineSchemaDataSourceCompactionConfig config =
+        InlineSchemaDataSourceCompactionConfig
+            .builder()
+            .forDataSource(dataSource)
+            .withSkipOffsetFromLatest(Period.seconds(0))
+            .withTransformSpec(
+                new CompactionTransformSpec(
+                    null,
+                    virtualColumns
+                )
+            )
+            .withTuningConfig(
+                UserCompactionTaskQueryTuningConfig
+                    .builder()
+                    .partitionsSpec(new DimensionRangePartitionsSpec(4, null, 
List.of("v0"), false))
+                    .build()
+            )
+            .build();
+
+    runCompactionWithSpec(config);
+    waitForAllCompactionTasksToFinish();
+
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+
+    List<DataSegment> segments = 
cluster.callApi().getVisibleUsedSegments(dataSource, 
overlord).stream().toList();
+    Assertions.assertEquals(2, segments.size());
+    Assertions.assertEquals(
+        new DimensionRangeShardSpec(
+            List.of("v0"),
+            virtualColumns,
+            null,
+            StringTuple.create("oo"),
+            0,
+            2
+        ),
+        segments.get(0).getShardSpec()
+    );
+    Assertions.assertEquals(
+        new DimensionRangeShardSpec(
+            List.of("v0"),
+            virtualColumns,
+            StringTuple.create("oo"),
+            null,
+            1,
+            2
+        ),
+        segments.get(1).getShardSpec()
+    );
+  }
+
+  @Test
+  public void test_compaction_cluster_by_virtualcolumn_rollup()
+  {
+    // Virtual Columns on nested data is only supported with MSQ compaction 
engine right now.
+    CompactionEngine compactionEngine = CompactionEngine.MSQ;
+    configureCompaction(compactionEngine, null);
+
+    String jsonDataWithNestedColumn =
+        """
+            {"timestamp": "2023-01-01T00:00:00", "str":"a",    "obj":{"a": 
"ll"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"",     "obj":{"a": 
"mm"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a": 
"nn"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"b",    "obj":{"a": 
"oo"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"c",    "obj":{"a": 
"pp"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":"d",    "obj":{"a": 
"qq"}}
+            {"timestamp": "2023-01-01T00:00:00", "str":null,   "obj":{"a": 
"rr"}}
+            """;
+
+    final TaskBuilder.Index task = TaskBuilder
+        .ofTypeIndex()
+        .dataSource(dataSource)
+        .jsonInputFormat()
+        .inlineInputSourceWithData(jsonDataWithNestedColumn)
+        .isoTimestampColumn("timestamp")
+        .schemaDiscovery()
+        .dataSchema(builder -> builder.withAggregators(new 
CountAggregatorFactory("count")))
+        .granularitySpec("DAY", "MINUTE", true);
+
+    cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+
+    Assertions.assertEquals(7, getTotalRowCount());
+
+    VirtualColumns virtualColumns = VirtualColumns.create(
+        new ExpressionVirtualColumn(
+            "v0",
+            "json_value(obj, '$.a')",
+            ColumnType.STRING,
+            TestExprMacroTable.INSTANCE
+        )
+    );
+
+    InlineSchemaDataSourceCompactionConfig config =
+        InlineSchemaDataSourceCompactionConfig
+            .builder()
+            .forDataSource(dataSource)
+            .withSkipOffsetFromLatest(Period.seconds(0))
+            .withTransformSpec(
+                new CompactionTransformSpec(
+                    null,
+                    virtualColumns
+                )
+            )
+            .withTuningConfig(
+                UserCompactionTaskQueryTuningConfig
+                    .builder()
+                    .partitionsSpec(new DimensionRangePartitionsSpec(4, null, 
List.of("v0"), false))
+                    .build()
+            )
+            .build();
+
+    runCompactionWithSpec(config);
+    waitForAllCompactionTasksToFinish();
+
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+
+    List<DataSegment> segments = 
cluster.callApi().getVisibleUsedSegments(dataSource, 
overlord).stream().toList();
+    Assertions.assertEquals(2, segments.size());
+    Assertions.assertEquals(
+        new DimensionRangeShardSpec(
+            List.of("v0"),
+            virtualColumns,
+            null,
+            StringTuple.create("oo"),
+            0,
+            2
+        ),
+        segments.get(0).getShardSpec()
+    );
+    Assertions.assertEquals(
+        new DimensionRangeShardSpec(
+            List.of("v0"),
+            virtualColumns,
+            StringTuple.create("oo"),
+            null,
+            1,
+            2
+        ),
+        segments.get(1).getShardSpec()
+    );
+  }
+
   /**
    * Tests that when a compaction task filters out all rows using a transform 
spec,
    * tombstones are created to properly drop the old segments. This test 
covers both
@@ -702,6 +832,56 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
     Assertions.assertEquals(1, segments.get(0).getDimensions().size());
   }
 
+  private void configureCompaction(CompactionEngine compactionEngine, 
@Nullable CompactionCandidateSearchPolicy policy)
+  {
+    final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
+        o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(
+            1.0,
+            100,
+            policy,
+            true,
+            compactionEngine,
+            true
+        ))
+    );
+    Assertions.assertTrue(updateResponse.isSuccess());
+  }
+
+  protected void ingest1kRecords()
+  {
+    final EventSerializer serializer = new 
JsonEventSerializer(overlord.bindings().jsonMapper());
+    final StreamGenerator streamGenerator = new 
WikipediaStreamEventStreamGenerator(serializer, 500, 100);
+    List<byte[]> records = streamGenerator.generateEvents(2);
+
+    final InlineInputSource input = new InlineInputSource(
+        records.stream().map(b -> new String(b, 
StandardCharsets.UTF_8)).collect(Collectors.joining("\n")));
+    final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig(
+        input,
+        new JsonInputFormat(null, null, null, null, null),
+        true,
+        null
+    );
+    final ParallelIndexIngestionSpec indexIngestionSpec = new 
ParallelIndexIngestionSpec(
+        DataSchema.builder()
+                  .withDataSource(dataSource)
+                  .withTimestamp(new TimestampSpec("timestamp", "iso", null))
+                  
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+                  .build(),
+        ioConfig,
+        TuningConfigBuilder.forParallelIndexTask().build()
+    );
+    final String taskId = EmbeddedClusterApis.newTaskId(dataSource);
+    final ParallelIndexSupervisorTask task = new ParallelIndexSupervisorTask(
+        taskId,
+        null,
+        null,
+        indexIngestionSpec,
+        null
+    );
+    cluster.callApi().submitTask(task);
+    cluster.callApi().waitForTaskToSucceed(taskId, overlord);
+  }
+
   private int getTotalRowCount()
   {
     return Numbers.parseInt(cluster.runSql("SELECT COUNT(*) as cnt FROM 
\"%s\"", dataSource));
@@ -723,7 +903,6 @@ public class CompactionSupervisorTest extends 
EmbeddedClusterTestBase
     );
   }
 
-
   private String generateEventsInInterval(Interval interval, int numEvents, 
long spacingMillis)
   {
     List<String> events = new ArrayList<>();
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
index d268bb8b897..1d4ec613e6c 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java
@@ -19,10 +19,17 @@
 
 package org.apache.druid.testing.embedded.msq;
 
+import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.indexing.report.MSQResultsReport;
 import org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
+import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.http.GetQueryReportResponse;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -32,6 +39,8 @@ import org.apache.druid.testing.embedded.EmbeddedOverlord;
 import org.apache.druid.testing.embedded.indexing.MoreResources;
 import org.apache.druid.testing.embedded.indexing.Resources;
 import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -40,10 +49,15 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 
 public class MultiStageQueryTest extends EmbeddedClusterTestBase
 {
-  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedBroker broker =
+      new EmbeddedBroker().setServerMemory(200_000_000)
+                          
.addProperty("druid.msq.dart.controller.maxRetainedReportCount", "10")
+                          
.addProperty("druid.query.default.context.maxConcurrentStages", "1")
+                          
.addProperty("druid.sql.planner.enableSysQueriesTable", "true");
   private final EmbeddedOverlord overlord = new EmbeddedOverlord();
   private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
   private final EmbeddedIndexer indexer = new EmbeddedIndexer()
@@ -59,6 +73,7 @@ public class MultiStageQueryTest extends 
EmbeddedClusterTestBase
     return EmbeddedDruidCluster
         .withEmbeddedDerbyAndZookeeper()
         .useLatchableEmitter()
+        .addCommonProperty("druid.msq.dart.enabled", "true")
         .addResource(exportDirectory)
         .addServer(overlord)
         .addServer(coordinator)
@@ -152,4 +167,193 @@ public class MultiStageQueryTest extends 
EmbeddedClusterTestBase
         actualResults
     );
   }
+
+  @Test
+  public void testClusterByVirtualColumn()
+  {
+    final String sqlTemplate =
+        """
+            SET rowsPerSegment = 2;
+            SET groupByEnableMultiValueUnnesting = FALSE;
+            REPLACE INTO %s OVERWRITE ALL
+            WITH "ext" AS (
+              SELECT *
+              FROM TABLE(EXTERN('{"type":"local","files":["%s"]}', 
'{"type":"json"}'))
+              EXTEND(
+                "timestamp" VARCHAR,
+                "added" BIGINT,
+                "delta" BIGINT,
+                "deleted" BIGINT,
+                "page" VARCHAR,
+                "city" VARCHAR,
+                "country" VARCHAR,
+                "user" VARCHAR
+              )
+            )
+            SELECT
+              TIME_PARSE("timestamp") AS __time,
+              added,
+              delta,
+              deleted,
+              page,
+              city,
+              country,
+              user
+            FROM "ext"
+            PARTITIONED BY DAY
+            CLUSTERED BY CONCAT(country, ':', city)
+            """;
+    final String sql = StringUtils.format(
+        sqlTemplate,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+    );
+
+    final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
+    cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), 
overlord.latchableEmitter());
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+
+    assertClusterByVirtualColumnSegments();
+    assertClusterByVirtualColumnQueries();
+  }
+
+  @Test
+  public void testClusterByVirtualColumnRollup()
+  {
+    final String sqlTemplate =
+        """
+            SET rowsPerSegment = 2;
+            SET groupByEnableMultiValueUnnesting = FALSE;
+            REPLACE INTO %s OVERWRITE ALL
+            WITH "ext" AS (
+              SELECT *
+              FROM TABLE(EXTERN('{"type":"local","files":["%s"]}', 
'{"type":"json"}'))
+              EXTEND(
+                "timestamp" VARCHAR,
+                "added" BIGINT,
+                "delta" BIGINT,
+                "deleted" BIGINT,
+                "page" VARCHAR,
+                "city" VARCHAR,
+                "country" VARCHAR,
+                "user" VARCHAR
+              )
+            )
+            SELECT
+              TIME_PARSE("timestamp") AS __time,
+              page,
+              city,
+              country,
+              user,
+              SUM(added) as added,
+              SUM(delta) as delta,
+              SUM(deleted) as deleted
+            FROM "ext"
+            GROUP BY TIME_PARSE("timestamp"), page, city, country, user, 
CONCAT(country, ':', city)
+            PARTITIONED BY DAY
+            CLUSTERED BY CONCAT(country, ':', city)
+            """;
+    final String sql = StringUtils.format(
+        sqlTemplate,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json().getAbsolutePath()
+    );
+
+    final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
+    cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), 
overlord.latchableEmitter());
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+
+    assertClusterByVirtualColumnSegments();
+    assertClusterByVirtualColumnQueries();
+  }
+
+  private void assertClusterByVirtualColumnSegments()
+  {
+    List<DataSegment> segments = 
cluster.callApi().getVisibleUsedSegments(dataSource, 
overlord).stream().toList();
+    Assertions.assertEquals(2, segments.size());
+    VirtualColumns virtualColumns = VirtualColumns.create(
+        new ExpressionVirtualColumn("v1", "concat(\"country\",':',\"city\")", 
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+    );
+    Assertions.assertEquals(
+        new DimensionRangeShardSpec(
+            List.of("v1"),
+            virtualColumns,
+            null,
+            StringTuple.create("Russia:Moscow"),
+            0,
+            2
+        ),
+        segments.get(0).getShardSpec()
+    );
+    Assertions.assertEquals(
+        new DimensionRangeShardSpec(
+            List.of("v1"),
+            virtualColumns,
+            StringTuple.create("Russia:Moscow"),
+            null,
+            1,
+            2
+        ),
+        segments.get(1).getShardSpec()
+    );
+  }
+
+  private void assertClusterByVirtualColumnQueries()
+  {
+    String queryId = UUID.randomUUID().toString();
+    cluster.callApi().verifySqlQuery(
+        "SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT 
__time, country, city, page FROM %s ORDER BY __time",
+        dataSource,
+        """
+            2013-08-31T01:02:33.000Z,United States,San Francisco,Gypsy Danger
+            2013-08-31T03:32:45.000Z,Australia,Syndey,Striker Eureka
+            2013-08-31T07:11:21.000Z,Russia,Moscow,Cherno Alpha"""
+    );
+    Assertions.assertEquals(2, getSegmentsScannedForDartQuery(queryId));
+
+    queryId = UUID.randomUUID().toString();
+    cluster.callApi().verifySqlQuery(
+        "SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT 
__time, country, city, page FROM %s WHERE CONCAT(country, ':', city) <= 
'Russia' ORDER BY __time",
+        dataSource,
+        """
+            2013-08-31T03:32:45.000Z,Australia,Syndey,Striker Eureka"""
+    );
+    Assertions.assertEquals(1, getSegmentsScannedForDartQuery(queryId));
+
+    queryId = UUID.randomUUID().toString();
+    cluster.callApi().verifySqlQuery(
+        "SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT 
__time, country, city, page FROM %s WHERE CONCAT(country, ':', city) <= 
'Russia:St. Petersburg' ORDER BY __time",
+        dataSource,
+        """
+            2013-08-31T03:32:45.000Z,Australia,Syndey,Striker Eureka
+            2013-08-31T07:11:21.000Z,Russia,Moscow,Cherno Alpha"""
+    );
+    Assertions.assertEquals(2, getSegmentsScannedForDartQuery(queryId));
+  }
+
+  private long getSegmentsScannedForDartQuery(String sqlQueryId)
+  {
+    ChannelCounters.Snapshot segmentChannelCounters = 
getDartSegmentChannelCounters(sqlQueryId);
+    return segmentChannelCounters.getFiles()[0];
+  }
+
+  private ChannelCounters.Snapshot getDartSegmentChannelCounters(String 
sqlQueryId)
+  {
+    final GetQueryReportResponse reportResponse = 
msqApis.getDartQueryReport(sqlQueryId, broker);
+
+    Assertions.assertNotNull(reportResponse, "Report response should not be 
null");
+    ChannelCounters.Snapshot segmentChannelCounters =
+        (ChannelCounters.Snapshot) reportResponse.getReportMap()
+                                                 .findReport("multiStageQuery")
+                                                 .map(r ->
+                                                          
((MSQTaskReportPayload) r.getPayload()).getCounters()
+                                                                               
                  .snapshotForStage(0)
+                                                                               
                  .get(0)
+                                                                               
                  .getMap()
+                                                                               
                  .get("input0")
+                                                 ).orElse(null);
+
+    Assertions.assertNotNull(segmentChannelCounters);
+    return segmentChannelCounters;
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index b18a2971e2c..6a064c82c5d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -61,6 +61,7 @@ import 
org.apache.druid.indexer.granularity.UniformGranularitySpec;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SecondaryPartitionType;
 import org.apache.druid.indexer.report.TaskReport;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLock;
@@ -174,6 +175,8 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.rowsandcols.serde.WireTransferableContext;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.indexing.DataSchema;
@@ -1062,6 +1065,7 @@ public class ControllerImpl implements Controller
       final ClusterBy clusterBy,
       final RowKeyReader keyReader,
       final ClusterByPartitions partitionBoundaries,
+      final Map<String, VirtualColumn> clusterByVirtualColumnMappings,
       final boolean mayHaveMultiValuedClusterByFields,
       @Nullable final Boolean isStageOutputEmpty
   ) throws IOException
@@ -1073,6 +1077,7 @@ public class ControllerImpl implements Controller
           clusterBy,
           keyReader,
           partitionBoundaries,
+          clusterByVirtualColumnMappings,
           mayHaveMultiValuedClusterByFields,
           isStageOutputEmpty
       );
@@ -1240,6 +1245,7 @@ public class ControllerImpl implements Controller
       final ClusterBy clusterBy,
       final RowKeyReader keyReader,
       final ClusterByPartitions partitionBoundaries,
+      final Map<String, VirtualColumn> clusterByVirtualColumnMappings,
       final boolean mayHaveMultiValuedClusterByFields,
       @Nullable final Boolean isStageOutputEmpty
   ) throws IOException
@@ -1254,6 +1260,7 @@ public class ControllerImpl implements Controller
         signature,
         clusterBy,
         querySpec.getColumnMappings(),
+        clusterByVirtualColumnMappings,
         mayHaveMultiValuedClusterByFields
     );
     final List<String> shardColumns = shardReasonPair.lhs;
@@ -1314,8 +1321,14 @@ public class ControllerImpl implements Controller
               segmentNumber == ranges.size() - 1
               ? null
               : makeStringTuple(clusterBy, keyReader, range.getEnd(), 
shardColumns.size());
-
-          shardSpec = new DimensionRangeShardSpec(shardColumns, start, end, 
segmentNumber, ranges.size());
+          shardSpec = new DimensionRangeShardSpec(
+              shardColumns,
+              VirtualColumns.create(clusterByVirtualColumnMappings.values()),
+              start,
+              end,
+              segmentNumber,
+              ranges.size()
+          );
         }
 
         retVal[partitionNumber] = new 
SegmentIdWithShardSpec(destination.getDataSource(), interval, version, 
shardSpec);
@@ -1710,19 +1723,13 @@ public class ControllerImpl implements Controller
               queryDef.getQueryId()
           );
         } else {
-          DataSchema dataSchema = ((SegmentGeneratorStageProcessor) queryKernel
-              
.getStageDefinition(finalStageId).getProcessor()).getDataSchema();
-
-          ShardSpec shardSpec = segments.isEmpty() ? null : 
segments.stream().findFirst().get().getShardSpec();
-          ClusterBy clusterBy = 
queryKernel.getStageDefinition(finalStageId).getClusterBy();
+          final ShardSpec shardSpec = segments.isEmpty() ? null : 
segments.stream().findFirst().get().getShardSpec();
 
           compactionStateAnnotateFunction = addCompactionStateToSegments(
+              queryDef,
               querySpec,
               context.jsonMapper(),
-              dataSchema,
-              shardSpec,
-              clusterBy,
-              queryDef.getQueryId()
+              shardSpec
           );
         }
       }
@@ -1761,17 +1768,21 @@ public class ControllerImpl implements Controller
   }
 
   private static Function<Set<DataSegment>, Set<DataSegment>> 
addCompactionStateToSegments(
+      QueryDefinition queryDef,
       MSQSpec querySpec,
       ObjectMapper jsonMapper,
-      DataSchema dataSchema,
-      @Nullable ShardSpec shardSpec,
-      @Nullable ClusterBy clusterBy,
-      String queryId
+      @Nullable ShardSpec shardSpec
   )
   {
+    final ClusterBy clusterBy = 
queryDef.getFinalStageDefinition().getClusterBy();
     final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
-    PartitionsSpec partitionSpec;
+    final DataSourceMSQDestination destination = (DataSourceMSQDestination) 
querySpec.getDestination();
+    final SegmentGeneratorStageProcessor segmentProcessor =
+        (SegmentGeneratorStageProcessor) 
queryDef.getFinalStageDefinition().getProcessor();
+
+    final DataSchema dataSchema = segmentProcessor.getDataSchema();
 
+    final PartitionsSpec partitionSpec;
     // shardSpec is absent in the absence of segments, which happens when only 
tombstones are generated by an
     // MSQControllerTask.
     if (shardSpec != null) {
@@ -1793,7 +1804,7 @@ public class ControllerImpl implements Controller
             UnknownFault.forMessage(
                 StringUtils.format(
                     "Query[%s] cannot store compaction state in segments as 
shard spec of unsupported type[%s].",
-                    queryId,
+                    queryDef.getQueryId(),
                     shardSpec.getType()
                 )));
       }
@@ -1811,27 +1822,40 @@ public class ControllerImpl implements Controller
       partitionSpec = new 
DynamicPartitionsSpec(tuningConfig.getRowsPerSegment(), Long.MAX_VALUE);
     }
 
-    Granularity segmentGranularity = ((DataSourceMSQDestination) 
querySpec.getDestination())
-        .getSegmentGranularity();
+    Granularity segmentGranularity = destination.getSegmentGranularity();
 
     GranularitySpec granularitySpec = new UniformGranularitySpec(
         segmentGranularity,
-        querySpec.getContext()
-                    
.getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY, jsonMapper),
+        
querySpec.getContext().getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY,
 jsonMapper),
         dataSchema.getGranularitySpec().isRollup(),
         // Not using dataSchema.getGranularitySpec().inputIntervals() as that 
always has ETERNITY
-        ((DataSourceMSQDestination) 
querySpec.getDestination()).getReplaceTimeChunks()
+        destination.getReplaceTimeChunks()
     );
 
     DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
-    CompactionTransformSpec transformSpec = 
TransformSpec.NONE.equals(dataSchema.getTransformSpec())
-                                            ? null
-                                            : 
CompactionTransformSpec.of(dataSchema.getTransformSpec());
+
+    // if the clustered by requires virtual columns, preserve them here so 
that we can rebuild during compaction
+    CompactionTransformSpec transformSpec;
+    final Map<String, VirtualColumn> clusterByVirtualColumnMappings =
+        segmentProcessor.getClusterByVirtualColumnMappings();
+
+    // only range partitioning can have virtual columns
+    if (clusterByVirtualColumnMappings.isEmpty() || 
!SecondaryPartitionType.RANGE.equals(partitionSpec.getType())) {
+      transformSpec = TransformSpec.NONE.equals(dataSchema.getTransformSpec())
+                      ? null
+                      : 
CompactionTransformSpec.of(dataSchema.getTransformSpec());
+    } else {
+      transformSpec = new CompactionTransformSpec(
+          dataSchema.getTransformSpec().getFilter(),
+          VirtualColumns.create(clusterByVirtualColumnMappings.values())
+      );
+    }
+
     List<AggregatorFactory> metricsSpec = buildMSQCompactionMetrics(querySpec, 
dataSchema);
 
     IndexSpec indexSpec = tuningConfig.getIndexSpec();
 
-    log.info("Query[%s] storing compaction state in segments.", queryId);
+    log.info("Query[%s] storing compaction state in segments.", 
queryDef.getQueryId());
 
     return CompactionState.addCompactionStateToSegments(
         partitionSpec,
@@ -1909,6 +1933,7 @@ public class ControllerImpl implements Controller
       final RowSignature signature,
       final ClusterBy clusterBy,
       final ColumnMappings columnMappings,
+      final Map<String, VirtualColumn> clusterByVirtualColumns,
       boolean mayHaveMultiValuedClusterByFields
   )
   {
@@ -1959,19 +1984,24 @@ public class ControllerImpl implements Controller
         );
       }
 
-      // DimensionRangeShardSpec only handles columns that appear as-is in the 
output.
+      // DimensionRangeShardSpec columns may either be explicitly in the table 
or defined as virtual columns
       if (outputColumns.isEmpty()) {
-        return Pair.of(
-            shardColumns,
-            StringUtils.format(
-                "Using only[%d] CLUSTERED BY columns for 'range' shard specs, 
since the next column was not mapped to "
-                + "an output column.",
-                shardColumns.size()
-            )
-        );
+        final VirtualColumn vc = 
clusterByVirtualColumns.get(column.columnName());
+        if (vc != null) {
+          shardColumns.add(vc.getOutputName());
+        } else {
+          return Pair.of(
+              shardColumns,
+              StringUtils.format(
+                  "Using only[%d] CLUSTERED BY columns for 'range' shard 
specs, since the next column was not mapped to "
+                  + "an output column or virtual column.",
+                  shardColumns.size()
+              )
+          );
+        }
+      } else {
+        
shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0)));
       }
-
-      
shardColumns.add(columnMappings.getOutputColumnName(outputColumns.getInt(0)));
     }
 
     return Pair.of(shardColumns, "Using 'range' shard specs with all CLUSTERED 
BY fields.");
@@ -1997,7 +2027,7 @@ public class ControllerImpl implements Controller
       final int shardFieldCount
   )
   {
-    final String[] array = new String[clusterBy.getColumns().size() - 
clusterBy.getBucketByCount()];
+    final String[] array = new String[shardFieldCount];
 
     for (int i = 0; i < shardFieldCount; i++) {
       final Object val = keyReader.read(key, clusterBy.getBucketByCount() + i);
@@ -2633,6 +2663,8 @@ public class ControllerImpl implements Controller
       final boolean mayHaveMultiValuedClusterByFields =
           !shuffleStageDef.mustGatherResultKeyStatistics()
           || 
queryKernel.hasStageCollectorEncounteredAnyMultiValueField(shuffleStageId);
+      final SegmentGeneratorStageProcessor segmentGeneratorStageProcessor =
+          (SegmentGeneratorStageProcessor) 
queryDef.getFinalStageDefinition().getProcessor();
 
       segmentsToGenerate = generateSegmentIdsWithShardSpecs(
           (DataSourceMSQDestination) querySpec.getDestination(),
@@ -2640,6 +2672,7 @@ public class ControllerImpl implements Controller
           shuffleStageDef.getClusterBy(),
           
shuffleStageDef.getClusterBy().keyReader(shuffleStageDef.getSignature(), 
rowBasedFrameType),
           partitionBoundaries,
+          segmentGeneratorStageProcessor.getClusterByVirtualColumnMappings(),
           mayHaveMultiValuedClusterByFields,
           isShuffleStageOutputEmpty
       );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
index 7195303e4ef..8b4cd39879b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQCompactionRunner.java
@@ -30,6 +30,7 @@ import com.google.inject.Injector;
 import org.apache.druid.client.indexing.ClientCompactionRunnerInfo;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -67,8 +68,10 @@ import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.query.policy.PolicyEnforcer;
 import org.apache.druid.query.spec.QuerySegmentSpec;
+import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
@@ -76,6 +79,7 @@ import org.apache.druid.segment.indexing.CombinedDataSchema;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.transform.CompactionTransformSpec;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.segment.virtual.VirtualizedColumnInspector;
 import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthorizationResult;
@@ -172,7 +176,10 @@ public class MSQCompactionRunner implements 
CompactionRunner
       validationResults.add(
           ClientCompactionRunnerInfo.validatePartitionsSpecForMSQ(
               compactionTask.getTuningConfig().getPartitionsSpec(),
-              dataSchema.getDimensionsSpec().getDimensions()
+              dataSchema.getDimensionsSpec().getDimensions(),
+              compactionTask.getTransformSpec() == null
+              ? VirtualColumns.EMPTY
+              : compactionTask.getTransformSpec().getVirtualColumns()
           )
       );
       validationResults.add(
@@ -409,42 +416,73 @@ public class MSQCompactionRunner implements 
CompactionRunner
 
   private static List<DimensionSpec> getAggregateDimensions(
       DataSchema dataSchema,
-      Map<String, VirtualColumn> inputColToVirtualCol
+      Map<String, VirtualColumn> inputColToVirtualCol,
+      List<OrderByColumnSpec> orderBy
   )
   {
-    List<DimensionSpec> dimensionSpecs = new ArrayList<>();
+    List<DimensionSpec> dimensions = new ArrayList<>();
+
+    // build a RowSignature of non-virtual column dimensions of the dataschema 
to use to resolve virtual column types
+    RowSignature.Builder baseBuilder = RowSignature.builder().addTimeColumn();
+    for (DimensionSchema schema : 
dataSchema.getDimensionsSpec().getDimensions()) {
+      if (inputColToVirtualCol.containsKey(schema.getName())) {
+        continue;
+      }
+      baseBuilder.add(schema.getName(), schema.getColumnType());
+    }
+    final RowSignature baseSignature = baseBuilder.build();
+    // and virtualized inspector from base signature
+    final ColumnInspector inspector = new VirtualizedColumnInspector(
+        baseSignature,
+        VirtualColumns.create(inputColToVirtualCol.values())
+    );
 
-    // if schema is not time-sorted, the time column mapping would already be 
in inputColToVirtualCol
-    if 
(!dataSchema.getDimensionsSpec().getDimensionNames().contains(ColumnHolder.TIME_COLUMN_NAME))
 {
+    // if schema is not time-sorted, the time column will be in the dimensions 
list, otherwise add time dimension first
+    if 
(dataSchema.getDimensionsSpec().getSchema(ColumnHolder.TIME_COLUMN_NAME) == 
null) {
       if (isQueryGranularityEmptyOrNone(dataSchema)) {
         // Dimensions in group-by aren't allowed to have time column name as 
the output name.
-        dimensionSpecs.add(new DefaultDimensionSpec(
-            ColumnHolder.TIME_COLUMN_NAME,
-            TIME_VIRTUAL_COLUMN,
-            ColumnType.LONG
-        ));
+        dimensions.add(new DefaultDimensionSpec(ColumnHolder.TIME_COLUMN_NAME, 
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
       } else {
         // The changed granularity would result in a new virtual column that 
needs to be aggregated upon.
-        dimensionSpecs.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN, 
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
+        dimensions.add(new DefaultDimensionSpec(TIME_VIRTUAL_COLUMN, 
TIME_VIRTUAL_COLUMN, ColumnType.LONG));
+      }
+    }
+
+    // If dimensions point to virtual columns, replace dimension columns names 
with virtual column names.
+    for (DimensionSchema schema : 
dataSchema.getDimensionsSpec().getDimensions()) {
+      String dimension = schema.getName();
+      ColumnType colType = schema.getColumnType();
+      VirtualColumn vc = inputColToVirtualCol.get(dimension);
+      if (vc != null) {
+        dimension = vc.getOutputName();
+        if (vc instanceof ExpressionVirtualColumn) {
+          colType = ((ExpressionVirtualColumn) vc).getOutputType();
+        } else {
+          colType = ColumnType.fromCapabilities(vc.capabilities(inspector, 
vc.getOutputName()));
+        }
       }
+      dimensions.add(new DefaultDimensionSpec(dimension, dimension, colType));
     }
-    // If virtual columns are created from dimensions, replace dimension 
columns names with virtual column names.
-    dimensionSpecs.addAll(
-        dataSchema.getDimensionsSpec().getDimensions().stream()
-                  .map(dim -> {
-                    String dimension = dim.getName();
-                    ColumnType colType = dim.getColumnType();
-                    if (inputColToVirtualCol.containsKey(dim.getName())) {
-                      VirtualColumn virtualColumn = 
inputColToVirtualCol.get(dimension);
-                      dimension = virtualColumn.getOutputName();
-                      if (virtualColumn instanceof ExpressionVirtualColumn) {
-                        colType = ((ExpressionVirtualColumn) 
virtualColumn).getOutputType();
-                      }
-                    }
-                    return new DefaultDimensionSpec(dimension, dimension, 
colType);
-                  })
-                  .collect(Collectors.toList()));
-    return dimensionSpecs;
+
+    // if any orderby columns refer to a virtual column that was not 
explicitly a dimension, add it to the list
+    // this is not really optimal, but it works without requiring any 
conversion between virtualcolumns and
+    // postaggregators which doesn't really exist here
+    for (OrderByColumnSpec order : orderBy) {
+      if (dataSchema.getDimensionsSpec().getSchema(order.getDimension()) != 
null) {
+        continue;
+      }
+      VirtualColumn vc = inputColToVirtualCol.get(order.getDimension());
+      if (vc != null) {
+        dimensions.add(
+            new DefaultDimensionSpec(
+                vc.getOutputName(),
+                order.getDimension(),
+                ColumnType.fromCapabilities(vc.capabilities(baseSignature, 
vc.getOutputName()))
+            )
+        );
+      }
+    }
+    return dimensions;
   }
 
   private static ColumnMappings getColumnMappings(DataSchema dataSchema)
@@ -469,11 +507,13 @@ public class MSQCompactionRunner implements 
CompactionRunner
                   .map(dim -> 
dim.getName().equals(ColumnHolder.TIME_COLUMN_NAME)
                               ? timeColumnMapping
                               : new ColumnMapping(dim.getName(), 
dim.getName()))
-                  .collect(Collectors.toList())
+                  .toList()
+    );
+    columnMappings.addAll(
+        Arrays.stream(dataSchema.getAggregators())
+              .map(agg -> new ColumnMapping(agg.getName(), agg.getName()))
+              .toList()
     );
-    columnMappings.addAll(Arrays.stream(dataSchema.getAggregators())
-                                .map(agg -> new ColumnMapping(agg.getName(), 
agg.getName()))
-                                .collect(Collectors.toList()));
     return new ColumnMappings(columnMappings);
   }
 
@@ -508,31 +548,44 @@ public class MSQCompactionRunner implements 
CompactionRunner
       Map<String, VirtualColumn> inputColToVirtualCol
   )
   {
-    RowSignature rowSignature = getRowSignature(dataSchema);
-    VirtualColumns virtualColumns = 
VirtualColumns.create(inputColToVirtualCol.values());
+    RowSignature baseRowSignature = getRowSignature(dataSchema);
+    final List<String> columns = new 
ArrayList<>(baseRowSignature.getColumnNames());
+    final List<OrderBy> orderBys;
+
+    RowSignature.Builder rowSignatureWithOrderByBuilder = 
RowSignature.builder().addAll(baseRowSignature);
+
+    // when clustering by a virtual column, we might need to add the virtual 
column to columns list and row signature
+    if (compactionTask.getTuningConfig() != null && 
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
+      List<OrderByColumnSpec> orderByColumnSpecs = 
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
+      orderBys = new ArrayList<>();
+      for (OrderByColumnSpec spec : orderByColumnSpecs) {
+        orderBys.add(new OrderBy(spec.getDimension(), 
Order.fromString(spec.getDirection().toString())));
+
+        final VirtualColumn vc = inputColToVirtualCol.get(spec.getDimension());
+        if (vc != null) {
+          columns.add(spec.getDimension());
+          final ColumnCapabilities capabilities = 
vc.capabilities(baseRowSignature, vc.getOutputName());
+          DruidException.conditionalDefensive(
+              capabilities != null,
+              "virtual column[%s] has null capabilities, cannot determine 
output type",
+              vc.getOutputName()
+          );
+          rowSignatureWithOrderByBuilder.add(spec.getDimension(), 
capabilities.toColumnType());
+        }
+      }
+    } else {
+      orderBys = null;
+    }
+
     Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder()
         .dataSource(getInputDataSource(dataSchema.getDataSource()))
-        .columns(rowSignature.getColumnNames())
-        .virtualColumns(virtualColumns)
-        .columnTypes(rowSignature.getColumnTypes())
         .intervals(segmentSpec)
         .filters(dataSchema.getTransformSpec().getFilter())
+        .virtualColumns(VirtualColumns.create(inputColToVirtualCol.values()))
+        .columns(columns)
+        .columnTypes(rowSignatureWithOrderByBuilder.build().getColumnTypes())
+        .orderBy(orderBys)
         .context(buildQueryContext(compactionTask.getContext(), dataSchema));
-
-    if (compactionTask.getTuningConfig() != null && 
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
-      List<OrderByColumnSpec> orderByColumnSpecs = 
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
-
-      scanQueryBuilder.orderBy(
-          orderByColumnSpecs
-              .stream()
-              .map(orderByColumnSpec ->
-                       new OrderBy(
-                           orderByColumnSpec.getDimension(),
-                           
Order.fromString(orderByColumnSpec.getDirection().toString())
-                       ))
-              .collect(Collectors.toList())
-      );
-    }
     return scanQueryBuilder.build();
   }
 
@@ -647,7 +700,6 @@ public class MSQCompactionRunner implements CompactionRunner
   )
   {
     DimFilter dimFilter = dataSchema.getTransformSpec().getFilter();
-
     VirtualColumns virtualColumns = 
VirtualColumns.create(inputColToVirtualCol.values());
 
     // Convert MVDs converted to arrays back to MVDs, with the same name as 
the input column.
@@ -668,20 +720,25 @@ public class MSQCompactionRunner implements 
CompactionRunner
                             )
                             .collect(Collectors.toList());
 
+    final List<OrderByColumnSpec> orderBy;
+    if (compactionTask.getTuningConfig() != null && 
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
+      orderBy = 
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec());
+    } else {
+      orderBy = List.of();
+    }
+
     GroupByQuery.Builder builder = new GroupByQuery.Builder()
         .setDataSource(getInputDataSource(compactionTask.getDataSource()))
-        .setVirtualColumns(virtualColumns)
-        .setDimFilter(dimFilter)
+        .setQuerySegmentSpec(segmentSpec)
         .setGranularity(new AllGranularity())
-        .setDimensions(getAggregateDimensions(dataSchema, 
inputColToVirtualCol))
-        .setAggregatorSpecs(Arrays.asList(dataSchema.getAggregators()))
+        .setDimFilter(dimFilter)
+        .setVirtualColumns(virtualColumns)
+        .setDimensions(getAggregateDimensions(dataSchema, 
inputColToVirtualCol, orderBy))
+        .setAggregatorSpecs(dataSchema.getAggregators())
         .setPostAggregatorSpecs(postAggregators)
-        .setContext(buildQueryContext(compactionTask.getContext(), dataSchema))
-        .setQuerySegmentSpec(segmentSpec);
+        .setOrderByColumns(orderBy)
+        .setContext(buildQueryContext(compactionTask.getContext(), 
dataSchema));
 
-    if (compactionTask.getTuningConfig() != null && 
compactionTask.getTuningConfig().getPartitionsSpec() != null) {
-      
getOrderBySpec(compactionTask.getTuningConfig().getPartitionsSpec()).forEach(builder::addOrderByColumn);
-    }
     return builder.build();
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java
index 269a118f6d8..7898cd8ab6a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/SegmentGenerationStageSpec.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
 import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
 import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.indexing.MSQTuningConfig;
 import org.apache.druid.msq.indexing.processor.SegmentGeneratorStageProcessor;
@@ -36,6 +37,10 @@ import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.StageDefinitionBuilder;
 import org.apache.druid.msq.kernel.controller.WorkerInputs;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@@ -43,7 +48,9 @@ import org.apache.druid.sql.calcite.planner.ColumnMappings;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 public class SegmentGenerationStageSpec implements TerminalStageSpec
 {
@@ -70,19 +77,31 @@ public class SegmentGenerationStageSpec implements 
TerminalStageSpec
     final ClusterBy queryClusterBy = 
queryDef.getFinalStageDefinition().getClusterBy();
 
     // Add a segment-generation stage.
-    final DataSchema dataSchema =
-        SegmentGenerationUtils.makeDataSchemaForIngestion(querySpec, 
querySignature, queryClusterBy, columnMappings, jsonMapper, query);
+    final DataSchema dataSchema = 
SegmentGenerationUtils.makeDataSchemaForIngestion(
+        querySpec,
+        querySignature,
+        queryClusterBy,
+        columnMappings,
+        jsonMapper,
+        query
+    );
 
-    return StageDefinition.builder(queryDef.getNextStageNumber())
-                       .inputs(new 
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
-                       .maxWorkerCount(tuningConfig.getMaxNumWorkers())
-                       .processor(
-                           new SegmentGeneratorStageProcessor(
-                               dataSchema,
-                               columnMappings,
-                               tuningConfig
-                           )
+    final Map<String, VirtualColumn> clusterByVirtualColumnMappings = 
getClusterByVirtualColumnMappings(
+        query,
+        queryClusterBy
     );
+
+    return StageDefinition.builder(queryDef.getNextStageNumber())
+                          .inputs(new 
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
+                          .maxWorkerCount(tuningConfig.getMaxNumWorkers())
+                          .processor(
+                              new SegmentGeneratorStageProcessor(
+                                  dataSchema,
+                                  columnMappings,
+                                  clusterByVirtualColumnMappings,
+                                  tuningConfig
+                              )
+                          );
   }
 
   public Int2ObjectMap<List<SegmentIdWithShardSpec>> getWorkerInfo(
@@ -113,4 +132,32 @@ public class SegmentGenerationStageSpec implements 
TerminalStageSpec
 
     return retVal;
   }
+
+  private static Map<String, VirtualColumn> 
getClusterByVirtualColumnMappings(Query<?> query, ClusterBy queryClusterBy)
+  {
+    final Map<String, VirtualColumn> clusterByVirtualColumns = new 
LinkedHashMap<>();
+    if (query instanceof GroupByQuery groupByQuery) {
+      final Map<String, VirtualColumn> outputToVc = new LinkedHashMap<>();
+      for (DimensionSpec spec : groupByQuery.getDimensions()) {
+        final VirtualColumn vc = 
groupByQuery.getVirtualColumns().getVirtualColumn(spec.getDimension());
+        if (vc != null) {
+          outputToVc.put(spec.getOutputName(), vc);
+        }
+      }
+      for (KeyColumn column : queryClusterBy.getColumns()) {
+        final VirtualColumn vc = outputToVc.get(column.columnName());
+        if (vc != null) {
+          clusterByVirtualColumns.put(column.columnName(), vc);
+        }
+      }
+    } else if (query instanceof ScanQuery scanQuery) {
+      for (KeyColumn column : queryClusterBy.getColumns()) {
+        final VirtualColumn vc = 
scanQuery.getVirtualColumns().getVirtualColumn(column.columnName());
+        if (vc != null) {
+          clusterByVirtualColumns.put(column.columnName(), vc);
+        }
+      }
+    }
+    return clusterByVirtualColumns;
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java
index fe9418ee3fb..c80c03268fc 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessor.java
@@ -20,6 +20,7 @@
 package org.apache.druid.msq.indexing.processor;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -51,6 +52,7 @@ import org.apache.druid.msq.kernel.StagePartition;
 import org.apache.druid.msq.querykit.QueryKitUtils;
 import org.apache.druid.msq.querykit.ReadableInput;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.data.CompressionFactory;
 import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -72,6 +74,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
@@ -80,17 +83,20 @@ public class SegmentGeneratorStageProcessor implements 
StageProcessor<Set<DataSe
 {
   private final DataSchema dataSchema;
   private final ColumnMappings columnMappings;
+  private final Map<String, VirtualColumn> clusterByVirtualColumnMappings;
   private final MSQTuningConfig tuningConfig;
 
   @JsonCreator
   public SegmentGeneratorStageProcessor(
       @JsonProperty("dataSchema") final DataSchema dataSchema,
       @JsonProperty("columnMappings") final ColumnMappings columnMappings,
+      @JsonProperty("clusterByVirtualColumnMappings") @Nullable final 
Map<String, VirtualColumn> clusterByVirtualColumnMappings,
       @JsonProperty("tuningConfig") final MSQTuningConfig tuningConfig
   )
   {
     this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
     this.columnMappings = Preconditions.checkNotNull(columnMappings, 
"columnMappings");
+    this.clusterByVirtualColumnMappings = clusterByVirtualColumnMappings == 
null ? Map.of() : clusterByVirtualColumnMappings;
     this.tuningConfig = Preconditions.checkNotNull(tuningConfig, 
"tuningConfig");
   }
 
@@ -106,6 +112,13 @@ public class SegmentGeneratorStageProcessor implements 
StageProcessor<Set<DataSe
     return columnMappings;
   }
 
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_EMPTY)
+  public Map<String, VirtualColumn> getClusterByVirtualColumnMappings()
+  {
+    return clusterByVirtualColumnMappings;
+  }
+
   @JsonProperty
   public MSQTuningConfig getTuningConfig()
   {
@@ -255,13 +268,14 @@ public class SegmentGeneratorStageProcessor implements 
StageProcessor<Set<DataSe
     SegmentGeneratorStageProcessor that = (SegmentGeneratorStageProcessor) o;
     return Objects.equals(dataSchema, that.dataSchema)
            && Objects.equals(columnMappings, that.columnMappings)
+           && Objects.equals(clusterByVirtualColumnMappings, 
that.clusterByVirtualColumnMappings)
            && Objects.equals(tuningConfig, that.tuningConfig);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(dataSchema, columnMappings, tuningConfig);
+    return Objects.hash(dataSchema, columnMappings, 
clusterByVirtualColumnMappings, tuningConfig);
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 0032a6051ac..c8b9546f017 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -211,9 +211,12 @@ public class MSQTaskQueryMaker implements QueryMaker
     return querySpec;
   }
 
-  private static MSQDestination buildMSQDestination(final IngestDestination 
targetDataSource,
-      final ColumnMappings columnMappings, final PlannerContext plannerContext,
-      final MSQTerminalStageSpecFactory terminalStageSpecFactory)
+  private static MSQDestination buildMSQDestination(
+      final IngestDestination targetDataSource,
+      final ColumnMappings columnMappings,
+      final PlannerContext plannerContext,
+      final MSQTerminalStageSpecFactory terminalStageSpecFactory
+  )
   {
     final QueryContext sqlQueryContext = plannerContext.queryContext();
     final Object segmentGranularity = getSegmentGranularity(plannerContext);
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
index b18ee8dde53..07a4ad9769c 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerTest.java
@@ -46,6 +46,7 @@ import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.filter.EqualityFilter;
 import org.apache.druid.query.filter.FilterSegmentPruner;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
@@ -105,6 +106,7 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                                                          
.interval(Intervals.of("2000/2001"))
                                                          .shardSpec(new 
DimensionRangeShardSpec(
                                                              
ImmutableList.of(PARTITION_DIM),
+                                                             
VirtualColumns.EMPTY,
                                                              null,
                                                              new 
StringTuple(new String[]{"foo"}),
                                                              0,
@@ -119,6 +121,7 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
                                                          
.interval(Intervals.of("2000/2001"))
                                                          .shardSpec(new 
DimensionRangeShardSpec(
                                                              
ImmutableList.of(PARTITION_DIM),
+                                                             
VirtualColumns.EMPTY,
                                                              new 
StringTuple(new String[]{"foo"}),
                                                              null,
                                                              1,
@@ -449,6 +452,7 @@ public class DartTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     );
     final FilterSegmentPruner pruner = new FilterSegmentPruner(
         new EqualityFilter(PARTITION_DIM, ColumnType.STRING, "abc", null),
+        null,
         null
     );
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 9d4ddde70ca..fdf58b3e874 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -56,13 +56,16 @@ import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.filter.NotDimFilter;
 import org.apache.druid.query.filter.NullFilter;
 import org.apache.druid.query.filter.RangeFilter;
 import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.transform.CompactionTransformSpec;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.timeline.CompactionState;
 import org.apache.druid.timeline.DataSegment;
@@ -328,7 +331,7 @@ public class MSQReplaceTest extends MSQTestBase
                              SegmentId.of("foo", Intervals.ETERNITY, "test", 0)
                          )
                      )
-                     .setExpectedShardSpec(NumberedShardSpec.class)
+                     .setExpectedShardSpec(DimensionRangeShardSpec.class)
                      .setExpectedResultRows(
                          ImmutableList.of(
                              new Object[]{946684800000L, "", 1.0f},
@@ -347,7 +350,7 @@ public class MSQReplaceTest extends MSQTestBase
                      .setExpectedLastCompactionState(
                          expectedCompactionState(
                              context,
-                             Collections.emptyList(),
+                             List.of("v0"),
                              DimensionsSpec.builder()
                                            .setDimensions(
                                                ImmutableList.of(
@@ -357,13 +360,24 @@ public class MSQReplaceTest extends MSQTestBase
                                            )
                                            
.setDimensionExclusions(Collections.singletonList("__time"))
                                            .build(),
+                             new CompactionTransformSpec(
+                                 null,
+                                 VirtualColumns.create(
+                                     new ExpressionVirtualColumn(
+                                         "v0",
+                                         "lower(\"dim1\")",
+                                         ColumnType.STRING,
+                                         TestExprMacroTable.INSTANCE
+                                     )
+                                 )
+                             ),
                              GranularityType.ALL,
                              Intervals.ETERNITY
                          )
                      )
                      .verifyResults();
   }
-
+  
   @MethodSource("data")
   @ParameterizedTest(name = "{index}:with context {0}")
   public void testReplaceOnFooWithAllClusteredByExpression(String contextName, 
Map<String, Object> context)
@@ -2813,6 +2827,25 @@ public class MSQReplaceTest extends MSQTestBase
       GranularityType segmentGranularity,
       Interval interval
   )
+  {
+    return expectedCompactionState(
+        context,
+        partitionDimensions,
+        dimensions,
+        null,
+        segmentGranularity,
+        interval
+    );
+  }
+
+  private CompactionState expectedCompactionState(
+      Map<String, Object> context,
+      List<String> partitionDimensions,
+      List<DimensionSchema> dimensions,
+      CompactionTransformSpec transformSpec,
+      GranularityType segmentGranularity,
+      Interval interval
+  )
   {
     return expectedCompactionState(
         context,
@@ -2821,6 +2854,7 @@ public class MSQReplaceTest extends MSQTestBase
                       .setDimensions(dimensions)
                       
.setDimensionExclusions(Collections.singletonList("__time"))
                       .build(),
+        transformSpec,
         segmentGranularity,
         interval
     );
@@ -2833,6 +2867,25 @@ public class MSQReplaceTest extends MSQTestBase
       GranularityType segmentGranularity,
       Interval interval
   )
+  {
+    return expectedCompactionState(
+        context,
+        partitionDimensions,
+        dimensionsSpec,
+        null,
+        segmentGranularity,
+        interval
+    );
+  }
+
+  private CompactionState expectedCompactionState(
+      Map<String, Object> context,
+      List<String> partitionDimensions,
+      DimensionsSpec dimensionsSpec,
+      CompactionTransformSpec transformSpec,
+      GranularityType segmentGranularity,
+      Interval interval
+  )
   {
     if (!context.containsKey(Tasks.STORE_COMPACTION_STATE_KEY)
         || !((Boolean) context.get(Tasks.STORE_COMPACTION_STATE_KEY))) {
@@ -2866,7 +2919,7 @@ public class MSQReplaceTest extends MSQTestBase
         partitionsSpec,
         dimensionsSpec,
         metricsSpec,
-        null,
+        transformSpec,
         indexSpec,
         granularitySpec,
         null
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
index fa124f44da3..1375ec64054 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQCompactionRunnerTest.java
@@ -232,7 +232,7 @@ public class MSQCompactionRunnerTest
     );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "MSQ: Non-string partition dimension[long_dim] of type[long] not 
supported with 'range' partition spec",
+        "MSQ: Non-string partition dimension[long_dim] of type[LONG] not 
supported with 'range' partition spec",
         validationResult.getReason()
     );
   }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessorTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessorTest.java
new file mode 100644
index 00000000000..7a06c419dd6
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorStageProcessorTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing.processor;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.msq.exec.StageProcessor;
+import org.apache.druid.msq.guice.MSQIndexingModule;
+import org.apache.druid.msq.indexing.MSQTuningConfig;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+class SegmentGeneratorStageProcessorTest
+{
+  @Test
+  public void testSerde() throws JsonProcessingException
+  {
+    ObjectMapper mapper = TestHelper.makeJsonMapper();
+    mapper.registerModules(new MSQIndexingModule().getJacksonModules());
+    SegmentGeneratorStageProcessor processor = new 
SegmentGeneratorStageProcessor(
+        DataSchema.builder()
+                  .withDataSource("test")
+                  .withTimestamp(new TimestampSpec("timestamp", "auto", null))
+                  .withGranularity(new 
UniformGranularitySpec(Granularities.HOUR, Granularities.MINUTE, null))
+                  .withDimensions(new StringDimensionSchema("a"), new 
StringDimensionSchema("b"))
+                  .withAggregators(new CountAggregatorFactory("cnt"))
+                  .build(),
+        new ColumnMappings(
+            List.of(
+                new ColumnMapping("d0", "__time"),
+                new ColumnMapping("d1", "a"),
+                new ColumnMapping("d2", "b"),
+                new ColumnMapping("a0", "cnt")
+            )
+        ),
+        Map.of(
+            "v0",
+            new ExpressionVirtualColumn("v0", "concat(\"a\",'foo')", 
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+        ),
+        MSQTuningConfig.defaultConfig()
+    );
+
+    Assertions.assertEquals(processor, 
mapper.readValue(mapper.writeValueAsString(processor), StageProcessor.class));
+  }
+
+  @Test
+  public void testEqualsAndHashCode()
+  {
+    
EqualsVerifier.forClass(SegmentGeneratorStageProcessor.class).usingGetClass().verify();
+  }
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
index 55ba95a9d98..61fadac716a 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/IndexerTableInputSpecSlicerTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.msq.input.NilInputSlice;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.filter.EqualityFilter;
 import org.apache.druid.query.filter.FilterSegmentPruner;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
@@ -60,6 +61,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
       Collections.emptyList(),
       new DimensionRangeShardSpec(
           ImmutableList.of("dim"),
+          VirtualColumns.EMPTY,
           null,
           new StringTuple(new String[]{"foo"}),
           0,
@@ -79,6 +81,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
       Collections.emptyList(),
       new DimensionRangeShardSpec(
           ImmutableList.of("dim"),
+          VirtualColumns.EMPTY,
           new StringTuple(new String[]{"foo"}),
           null,
           1,
@@ -276,6 +279,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     );
     final FilterSegmentPruner pruner = new FilterSegmentPruner(
         new EqualityFilter("dim", ColumnType.STRING, "bar", null),
+        null,
         null
     );
 
@@ -309,7 +313,8 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     );
     final FilterSegmentPruner segmentPruner = new FilterSegmentPruner(
         new EqualityFilter("dim", ColumnType.STRING, "bar", null),
-        Collections.emptySet()
+        Collections.emptySet(),
+        null
     );
 
     Assert.assertEquals(
@@ -350,6 +355,7 @@ public class IndexerTableInputSpecSlicerTest extends 
InitializedNullHandlingTest
     );
     final FilterSegmentPruner segmentPruner = new FilterSegmentPruner(
         new EqualityFilter("dim", ColumnType.STRING, "bar", null),
+        null,
         null
     );
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
 
b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
index cb0b88ea079..aef51e2c05d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
+++ 
b/processing/src/main/java/org/apache/druid/query/filter/FilterSegmentPruner.java
@@ -21,6 +21,8 @@ package org.apache.druid.query.filter;
 
 import com.google.common.collect.RangeSet;
 import org.apache.druid.error.InvalidInput;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.ShardSpec;
 
@@ -44,15 +46,18 @@ public class FilterSegmentPruner implements SegmentPruner
 {
   private final DimFilter filter;
   private final Set<String> filterFields;
+  private final VirtualColumns virtualColumns;
   private final Map<String, Optional<RangeSet<String>>> rangeCache;
 
   public FilterSegmentPruner(
       DimFilter filter,
-      @Nullable Set<String> filterFields
+      @Nullable Set<String> filterFields,
+      @Nullable VirtualColumns virtualColumns
   )
   {
     this.filter = InvalidInput.notNull(filter, "filter");
     this.filterFields = filterFields == null ? filter.getRequiredColumns() : 
filterFields;
+    this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : 
virtualColumns;
     this.rangeCache = new HashMap<>();
   }
 
@@ -88,10 +93,25 @@ public class FilterSegmentPruner implements SegmentPruner
         Map<String, RangeSet<String>> filterDomain = new HashMap<>();
         List<String> dimensions = shard.getDomainDimensions();
         for (String dimension : dimensions) {
-          if (filterFields == null || filterFields.contains(dimension)) {
+          final VirtualColumn shardVirtualColumn = 
shard.getDomainVirtualColumns().getVirtualColumn(dimension);
+          if (shardVirtualColumn != null) {
+            final VirtualColumn queryEquivalent = 
virtualColumns.findEquivalent(shardVirtualColumn);
+            if (queryEquivalent != null) {
+              if (filterFields == null || 
filterFields.contains(queryEquivalent.getOutputName())) {
+                Optional<RangeSet<String>> optFilterRangeSet = rangeCache
+                    .computeIfAbsent(
+                        queryEquivalent.getOutputName(),
+                        d -> 
Optional.ofNullable(filter.getDimensionRangeSet(d))
+                    );
+                optFilterRangeSet.ifPresent(stringRangeSet -> filterDomain.put(
+                    shardVirtualColumn.getOutputName(),
+                    stringRangeSet
+                ));
+              }
+            }
+          } else if (filterFields == null || filterFields.contains(dimension)) 
{
             Optional<RangeSet<String>> optFilterRangeSet =
                 rangeCache.computeIfAbsent(dimension, d -> 
Optional.ofNullable(filter.getDimensionRangeSet(d)));
-
             optFilterRangeSet.ifPresent(stringRangeSet -> 
filterDomain.put(dimension, stringRangeSet));
           }
         }
@@ -114,13 +134,15 @@ public class FilterSegmentPruner implements SegmentPruner
       return false;
     }
     FilterSegmentPruner that = (FilterSegmentPruner) o;
-    return Objects.equals(filter, that.filter) && Objects.equals(filterFields, 
that.filterFields);
+    return Objects.equals(filter, that.filter) &&
+           Objects.equals(filterFields, that.filterFields) &&
+           Objects.equals(virtualColumns, that.virtualColumns);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(filter, filterFields);
+    return Objects.hash(filter, filterFields, virtualColumns);
   }
 
   @Override
@@ -129,7 +151,7 @@ public class FilterSegmentPruner implements SegmentPruner
     return "FilterSegmentPruner{" +
            "filter=" + filter +
            ", filterFields=" + filterFields +
-           ", rangeCache=" + rangeCache +
+           ", virtualColumns=" + virtualColumns +
            '}';
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java 
b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
index 9faf0226bcb..d31f130f665 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQuery.java
@@ -1088,6 +1088,14 @@ public class GroupByQuery extends BaseQuery<ResultRow>
       return this;
     }
 
+    public Builder setOrderByColumns(List<OrderByColumnSpec> columnSpec)
+    {
+      ensureExplicitLimitSpecNotSet();
+      this.orderByColumnSpecs = new ArrayList<>(columnSpec);
+      this.postProcessingFn = null;
+      return this;
+    }
+
     public Builder setLimitSpec(LimitSpec limitSpec)
     {
       Preconditions.checkNotNull(limitSpec);
diff --git 
a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java 
b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
index d5baa72dd08..64a974771a2 100644
--- 
a/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
+++ 
b/processing/src/main/java/org/apache/druid/query/planning/ExecutionVertex.java
@@ -232,7 +232,8 @@ public class ExecutionVertex
 
     return new FilterSegmentPruner(
         topQuery.getFilter(),
-        baseFields
+        baseFields,
+        topQuery.getVirtualColumns()
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
index 9d5abd6f76e..b1e0d01c747 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java
@@ -24,6 +24,7 @@ import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.guava.Comparators;
+import org.apache.druid.segment.VirtualColumns;
 
 import javax.annotation.Nullable;
 import java.util.Arrays;
@@ -33,6 +34,7 @@ import java.util.List;
 public abstract class BaseDimensionRangeShardSpec implements ShardSpec
 {
   protected final List<String> dimensions;
+  protected final VirtualColumns virtualColumns;
   @Nullable
   protected final StringTuple start;
   @Nullable
@@ -40,11 +42,13 @@ public abstract class BaseDimensionRangeShardSpec 
implements ShardSpec
 
   protected BaseDimensionRangeShardSpec(
       List<String> dimensions,
+      @Nullable VirtualColumns virtualColumns,
       @Nullable StringTuple start,
       @Nullable StringTuple end
   )
   {
     this.dimensions = dimensions;
+    this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : 
virtualColumns;
     this.start = start;
     this.end = end;
   }
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
index 308e36474a6..c15f58370a7 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.timeline.partition;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -109,6 +110,7 @@ public class BuildingDimensionRangeShardSpec implements 
BuildingShardSpec<Dimens
         numCorePartitions
     ) : new DimensionRangeShardSpec(
         dimensions,
+        VirtualColumns.EMPTY,
         start,
         end,
         partitionId,
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
index c37ea140057..fc39f02b02c 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpec.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
 
 import javax.annotation.Nullable;
 import java.util.List;
@@ -51,7 +52,7 @@ public class DimensionRangeBucketShardSpec extends 
BaseDimensionRangeShardSpec
       @JsonProperty("end") @Nullable StringTuple end
   )
   {
-    super(dimensions, start, end);
+    super(dimensions, VirtualColumns.EMPTY, start, end);
     // Verify that the tuple sizes and number of dimensions are the same
     Preconditions.checkArgument(
         start == null || start.size() == dimensions.size(),
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
index a0a850dae08..fea3d82cb5b 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionRangeShardSpec.java
@@ -20,12 +20,14 @@
 package org.apache.druid.timeline.partition;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import com.google.common.collect.TreeRangeSet;
 import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -53,13 +55,14 @@ public class DimensionRangeShardSpec extends 
BaseDimensionRangeShardSpec
   @JsonCreator
   public DimensionRangeShardSpec(
       @JsonProperty("dimensions") List<String> dimensions,
+      @JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns,
       @JsonProperty("start") @Nullable StringTuple start,
       @JsonProperty("end") @Nullable StringTuple end,
       @JsonProperty("partitionNum") int partitionNum,
       @JsonProperty("numCorePartitions") @Nullable Integer numCorePartitions 
// nullable for backward compatibility
   )
   {
-    super(dimensions, start, end);
+    super(dimensions, virtualColumns, start, end);
     Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0");
     Preconditions.checkArgument(
         dimensions != null && !dimensions.isEmpty(),
@@ -76,6 +79,13 @@ public class DimensionRangeShardSpec extends 
BaseDimensionRangeShardSpec
     return dimensions;
   }
 
+  @JsonProperty("virtualColumns")
+  @JsonInclude(JsonInclude.Include.NON_EMPTY)
+  public VirtualColumns getVirtualColumns()
+  {
+    return virtualColumns;
+  }
+
   @Nullable
   @JsonProperty("start")
   public StringTuple getStartTuple()
@@ -107,13 +117,13 @@ public class DimensionRangeShardSpec extends 
BaseDimensionRangeShardSpec
   @Override
   public ShardSpec withPartitionNum(int partitionNum)
   {
-    return new DimensionRangeShardSpec(dimensions, start, end, partitionNum, 
numCorePartitions);
+    return new DimensionRangeShardSpec(dimensions, virtualColumns, start, end, 
partitionNum, numCorePartitions);
   }
 
   @Override
   public ShardSpec withCorePartitions(int partitions)
   {
-    return new DimensionRangeShardSpec(dimensions, start, end, partitionNum, 
partitions);
+    return new DimensionRangeShardSpec(dimensions, virtualColumns, start, end, 
partitionNum, partitions);
   }
 
   public boolean isNumCorePartitionsUnknown()
@@ -127,6 +137,12 @@ public class DimensionRangeShardSpec extends 
BaseDimensionRangeShardSpec
     return Collections.unmodifiableList(dimensions);
   }
 
+  @Override
+  public VirtualColumns getDomainVirtualColumns()
+  {
+    return virtualColumns;
+  }
+
   /**
    * Set[:i] is the cartesian product of Set[0],...,Set[i - 1]
    * EffectiveDomain[:i] is defined as QueryDomain[:i] INTERSECTION 
SegmentRange[:i]
@@ -289,6 +305,7 @@ public class DimensionRangeShardSpec extends 
BaseDimensionRangeShardSpec
     return partitionNum == shardSpec.partitionNum &&
            numCorePartitions == shardSpec.numCorePartitions &&
            Objects.equals(dimensions, shardSpec.dimensions) &&
+           Objects.equals(virtualColumns, shardSpec.virtualColumns) &&
            Objects.equals(start, shardSpec.start) &&
            Objects.equals(end, shardSpec.end);
   }
@@ -296,7 +313,7 @@ public class DimensionRangeShardSpec extends 
BaseDimensionRangeShardSpec
   @Override
   public int hashCode()
   {
-    return Objects.hash(dimensions, start, end, partitionNum, 
numCorePartitions);
+    return Objects.hash(dimensions, virtualColumns, start, end, partitionNum, 
numCorePartitions);
   }
 
   @Override
@@ -304,6 +321,7 @@ public class DimensionRangeShardSpec extends 
BaseDimensionRangeShardSpec
   {
     return "DimensionRangeShardSpec{" +
            "dimensions='" + dimensions + '\'' +
+           ", virtualColumns=" + virtualColumns +
            ", start='" + start + '\'' +
            ", end='" + end + '\'' +
            ", partitionNum=" + partitionNum +
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java 
b/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
index b91c96387a9..da1c046465b 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.google.common.collect.RangeSet;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
 
 import java.util.List;
 import java.util.Map;
@@ -136,13 +138,28 @@ public interface ShardSpec
   ShardSpecLookup getLookup(List<? extends ShardSpec> shardSpecs);
 
   /**
-   * Get dimensions who have possible range for the rows this shard contains.
+   * Get dimensions who have possible range for the rows this shard contains. 
These columns might be physical columns
+   * stored in the shard, or computed expressions, in which case the manner in 
which they were computed is available in
+   * {@link #getDomainVirtualColumns()}.
    *
-   * @return list of dimensions who has its possible range. Dimensions with 
unknown possible range are not listed
+   * @return list of dimensions who has its possible range. Dimensions with 
unknown possible range are not listed.
    */
   @JsonIgnore
   List<String> getDomainDimensions();
 
+  /**
+   * If any of the columns in {@link #getDomainDimensions()} was computed with 
an expression and was not stored, the
+   * {@link org.apache.druid.segment.VirtualColumn} which computes it is 
stored here. This allows matching ranges even
+   * when the value is not stored in the shard so long as {@link 
VirtualColumns#findEquivalent(VirtualColumn)} exists.
+   *
+   * @return {@link VirtualColumns} associated with columns listed in {@link 
#getDomainDimensions()}.
+   */
+  @JsonIgnore
+  default VirtualColumns getDomainVirtualColumns()
+  {
+    return VirtualColumns.EMPTY;
+  }
+
   /**
    * if given domain ranges are not possible in this shard, return false; 
otherwise return true;
    *
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
index e47b5aafaea..ae9ea1fb416 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionRangeBucketShardSpec.java
@@ -22,6 +22,7 @@ package org.apache.druid.timeline.partition;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -52,6 +53,7 @@ public class SingleDimensionRangeBucketShardSpec extends 
BaseDimensionRangeShard
   {
     super(
         dimension == null ? Collections.emptyList() : 
Collections.singletonList(dimension),
+        VirtualColumns.EMPTY,
         start == null ? null : StringTuple.create(start),
         end == null ? null : StringTuple.create(end)
     );
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
index 59ef1be3d6a..aa346adabbf 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/SingleDimensionShardSpec.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
 import com.google.common.collect.Range;
 import com.google.common.collect.RangeSet;
 import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
 
 import javax.annotation.Nullable;
 import java.util.Collections;
@@ -62,6 +63,7 @@ public class SingleDimensionShardSpec extends 
DimensionRangeShardSpec
   {
     super(
         dimension == null ? Collections.emptyList() : 
Collections.singletonList(dimension),
+        VirtualColumns.EMPTY,
         start == null ? null : StringTuple.create(start),
         end == null ? null : StringTuple.create(end),
         partitionNum,
diff --git 
a/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
 
b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
index 131337ad468..a1a1dba0c28 100644
--- 
a/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/filter/FilterSegmentPrunerTest.java
@@ -24,7 +24,9 @@ import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
@@ -46,7 +48,7 @@ class FilterSegmentPrunerTest
   {
     Throwable t = Assertions.assertThrows(
         DruidException.class,
-        () -> new FilterSegmentPruner(null, null)
+        () -> new FilterSegmentPruner(null, null, null)
     );
     Assertions.assertEquals("filter must not be null", t.getMessage());
   }
@@ -70,13 +72,73 @@ class FilterSegmentPrunerTest
 
     List<DataSegment> segs = List.of(seg1, seg2, seg3, seg4, seg5, seg6, seg7);
 
-    FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null);
-    FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a, 
Collections.emptySet());
-    FilterSegmentPruner prunerExpression = new 
FilterSegmentPruner(expression_b, null);
+    FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null, 
null);
+    FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a, 
Collections.emptySet(), null);
+    FilterSegmentPruner prunerExpression = new 
FilterSegmentPruner(expression_b, null, null);
 
+    // prune twice to exercise cache
     Assertions.assertEquals(Set.of(seg1, seg4, seg5, seg6, seg7), 
prunerRange.prune(segs, Function.identity()));
+    Assertions.assertEquals(Set.of(seg1, seg4, seg5, seg6, seg7), 
prunerRange.prune(segs, Function.identity()));
+    Assertions.assertEquals(Set.copyOf(segs), prunerExpression.prune(segs, 
Function.identity()));
     Assertions.assertEquals(Set.copyOf(segs), prunerExpression.prune(segs, 
Function.identity()));
     Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, 
Function.identity()));
+    Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, 
Function.identity()));
+  }
+
+  @Test
+  void testPruneVirtualColumn()
+  {
+    VirtualColumns shardVirtualColumns = VirtualColumns.create(
+        new ExpressionVirtualColumn("vdim1", "concat(dim1, 'foo')", 
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+    );
+    VirtualColumns shardVirtualColumnsDifferentName = VirtualColumns.create(
+        new ExpressionVirtualColumn("vdifferentname", "concat(dim1, 'foo')", 
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+    );
+
+    String interval1 = "2026-02-18T00:00:00Z/2026-02-19T00:00:00Z";
+
+    DataSegment seg1 = makeDataSegment(
+        interval1,
+        makeRange(List.of("vdim1"), shardVirtualColumns, 0, null, 
StringTuple.create("abcfoo"))
+    );
+    DataSegment seg2 = makeDataSegment(
+        interval1,
+        makeRange(List.of("vdim1"), shardVirtualColumns, 1, 
StringTuple.create("abcfoo"), StringTuple.create("lmnfoo"))
+    );
+    // same virtual column with a different name in this segment
+    DataSegment seg3 = makeDataSegment(
+        interval1,
+        makeRange(List.of("vdifferentname"), shardVirtualColumnsDifferentName, 
2, StringTuple.create("lmnfoo"), null)
+    );
+
+    List<DataSegment> segs = List.of(seg1, seg2, seg3);
+
+    // same expression, same name
+    VirtualColumns queryVirtualColumns = VirtualColumns.create(
+        new ExpressionVirtualColumn("vdim1", "concat(dim1, 'foo')", 
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+    );
+    DimFilter range_a = new RangeFilter("vdim1", ColumnType.STRING, null, 
"aaa", null, null, null);
+    FilterSegmentPruner prunerRange = new FilterSegmentPruner(range_a, null, 
queryVirtualColumns);
+    FilterSegmentPruner prunerEmptyFields = new FilterSegmentPruner(range_a, 
Collections.emptySet(), queryVirtualColumns);
+    // prune twice to exercise cache
+    Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, 
Function.identity()));
+    Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, 
Function.identity()));
+    Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, 
Function.identity()));
+    Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, 
Function.identity()));
+
+    // same expression, different name
+    queryVirtualColumns = VirtualColumns.create(
+        new ExpressionVirtualColumn("v0", "concat(dim1, 'foo')", 
ColumnType.STRING, TestExprMacroTable.INSTANCE)
+    );
+    range_a = new RangeFilter("v0", ColumnType.STRING, null, "aaa", null, 
null, null);
+    prunerRange = new FilterSegmentPruner(range_a, null, queryVirtualColumns);
+    prunerEmptyFields = new FilterSegmentPruner(range_a, 
Collections.emptySet(), queryVirtualColumns);
+
+    // prune twice to exercise cache
+    Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, 
Function.identity()));
+    Assertions.assertEquals(Set.of(seg1), prunerRange.prune(segs, 
Function.identity()));
+    Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, 
Function.identity()));
+    Assertions.assertEquals(Set.copyOf(segs), prunerEmptyFields.prune(segs, 
Function.identity()));
   }
 
   @Test
@@ -106,9 +168,27 @@ class FilterSegmentPrunerTest
       @Nullable StringTuple start,
       @Nullable StringTuple end
   )
+  {
+    return makeRange(
+        columns,
+        null,
+        partitionNumber,
+        start,
+        end
+    );
+  }
+
+  private ShardSpec makeRange(
+      List<String> columns,
+      VirtualColumns virtualColumns,
+      int partitionNumber,
+      @Nullable StringTuple start,
+      @Nullable StringTuple end
+  )
   {
     return new DimensionRangeShardSpec(
         columns,
+        virtualColumns,
         start,
         end,
         partitionNumber,
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
index b8099e227a4..6f7af3fecb8 100644
--- 
a/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
+++ 
b/processing/src/test/java/org/apache/druid/timeline/partition/BuildingDimensionRangeShardSpecTest.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.segment.VirtualColumns;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -39,6 +40,7 @@ public class BuildingDimensionRangeShardSpecTest
     Assert.assertEquals(
         new DimensionRangeShardSpec(
             Arrays.asList("dim1", "dim2"),
+            VirtualColumns.EMPTY,
             StringTuple.create("start1", "start2"),
             StringTuple.create("end1", "end2"),
             5,
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
index f4a2d88b5cd..7eb607ecffb 100644
--- 
a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
+++ 
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeBucketShardSpecTest.java
@@ -198,6 +198,9 @@ public class DimensionRangeBucketShardSpecTest
   @Test
   public void testEquals()
   {
-    
EqualsVerifier.forClass(DimensionRangeBucketShardSpec.class).usingGetClass().verify();
+    EqualsVerifier.forClass(DimensionRangeBucketShardSpec.class)
+                  .usingGetClass()
+                  .withIgnoredFields("virtualColumns")
+                  .verify();
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
index ade40e6ae38..0f62da3518b 100644
--- 
a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
+++ 
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.segment.VirtualColumns;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -50,15 +51,30 @@ public class DimensionRangeShardSpecTest
     setDimensions("dim1", "dim2");
 
     final List<ShardSpec> shardSpecs = ImmutableList.of(
-        new DimensionRangeShardSpec(dimensions, null, 
StringTuple.create("India", "Delhi"), 1, 1),
         new DimensionRangeShardSpec(
             dimensions,
+            VirtualColumns.EMPTY,
+            null,
+            StringTuple.create("India", "Delhi"),
+            1,
+            1
+        ),
+        new DimensionRangeShardSpec(
+            dimensions,
+            VirtualColumns.EMPTY,
             StringTuple.create("India", "Delhi"),
             StringTuple.create("Spain", "Valencia"),
             2,
             1
         ),
-        new DimensionRangeShardSpec(dimensions, StringTuple.create("Spain", 
"Valencia"), null, 3, 1)
+        new DimensionRangeShardSpec(
+            dimensions,
+            VirtualColumns.EMPTY,
+            StringTuple.create("Spain", "Valencia"),
+            null,
+            3,
+            1
+        )
     );
     final ShardSpecLookup lookup = shardSpecs.get(0).getLookup(shardSpecs);
     final long currentTime = DateTimes.nowUtc().getMillis();
@@ -143,6 +159,7 @@ public class DimensionRangeShardSpecTest
 
     final DimensionRangeShardSpec shard0 = new DimensionRangeShardSpec(
         dimensions,
+        VirtualColumns.EMPTY,
         null,
         StringTuple.create("India", null),
         1,
@@ -151,6 +168,7 @@ public class DimensionRangeShardSpecTest
 
     final DimensionRangeShardSpec shard1 = new DimensionRangeShardSpec(
         dimensions,
+        VirtualColumns.EMPTY,
         StringTuple.create("India", null),
         StringTuple.create("Spain", "Valencia"),
         10,
@@ -159,6 +177,7 @@ public class DimensionRangeShardSpecTest
 
     final DimensionRangeShardSpec shard2 = new DimensionRangeShardSpec(
         dimensions,
+        VirtualColumns.EMPTY,
         StringTuple.create("Spain", "Valencia"),
         StringTuple.create("Tokyo", null),
         10,
@@ -167,6 +186,7 @@ public class DimensionRangeShardSpecTest
 
     final DimensionRangeShardSpec shard3 = new DimensionRangeShardSpec(
         dimensions,
+        VirtualColumns.EMPTY,
         StringTuple.create("Tokyo", null),
         null,
         100,
@@ -202,7 +222,7 @@ public class DimensionRangeShardSpecTest
     final RangeSet<String> universalSet = TreeRangeSet.create();
     universalSet.add(Range.all());
 
-    ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, 
null);
+    ShardSpec shard = new DimensionRangeShardSpec(dimensions, 
VirtualColumns.EMPTY, start, end, 0, null);
     Map<String, RangeSet<String>> domain = new HashMap<>();
 
     // {Mars} * {Zoo, Zuu} * {Blah, Random}
@@ -265,7 +285,7 @@ public class DimensionRangeShardSpecTest
     final RangeSet<String> universalSet = TreeRangeSet.create();
     universalSet.add(Range.all());
 
-    ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, 
null);
+    ShardSpec shard = new DimensionRangeShardSpec(dimensions, 
VirtualColumns.EMPTY, start, end, 0, null);
     Map<String, RangeSet<String>> domain = new HashMap<>();
 
     // (-INF, INF) * (-INF, INF) * (-INF, INF)
@@ -345,7 +365,7 @@ public class DimensionRangeShardSpecTest
     final RangeSet<String> universalSet = TreeRangeSet.create();
     universalSet.add(Range.all());
 
-    ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, 
null);
+    ShardSpec shard = new DimensionRangeShardSpec(dimensions, 
VirtualColumns.EMPTY, start, end, 0, null);
     Map<String, RangeSet<String>> domain = new HashMap<>();
 
     // (-INF, INF) * (-INF, INF) * (-INF, INF)
@@ -442,7 +462,7 @@ public class DimensionRangeShardSpecTest
     final RangeSet<String> universalSet = TreeRangeSet.create();
     universalSet.add(Range.all());
 
-    ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, 
null);
+    ShardSpec shard = new DimensionRangeShardSpec(dimensions, 
VirtualColumns.EMPTY, start, end, 0, null);
     Map<String, RangeSet<String>> domain = new HashMap<>();
 
     // (-INF, Earth) U (Earth, INF) * (-INF, INF) * (-INF, INF)
@@ -504,7 +524,7 @@ public class DimensionRangeShardSpecTest
     final RangeSet<String> universalSet = TreeRangeSet.create();
     universalSet.add(Range.all());
 
-    ShardSpec shard = new DimensionRangeShardSpec(dimensions, start, end, 0, 
null);
+    ShardSpec shard = new DimensionRangeShardSpec(dimensions, 
VirtualColumns.EMPTY, start, end, 0, null);
     Map<String, RangeSet<String>> domain = new HashMap<>();
 
     // {Earth} U {Mars} * (USA, INF) * (-INF, INF)
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
index 2c3442645d2..daad7aafc94 100644
--- 
a/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
+++ 
b/processing/src/test/java/org/apache/druid/timeline/partition/PartitionHolderCompletenessTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.timeline.DataSegment;
 import org.junit.Assert;
 import org.junit.Test;
@@ -86,6 +87,7 @@ public class PartitionHolderCompletenessTest
             ImmutableList.of(
                 new DimensionRangeShardSpec(
                     Collections.singletonList("dim"),
+                    VirtualColumns.EMPTY,
                     null,
                     StringTuple.create("aaa"),
                     0,
@@ -93,6 +95,7 @@ public class PartitionHolderCompletenessTest
                 ),
                 new DimensionRangeShardSpec(
                     Collections.singletonList("dim"),
+                    VirtualColumns.EMPTY,
                     StringTuple.create("ttt"),
                     StringTuple.create("zzz"),
                     2,
@@ -100,6 +103,7 @@ public class PartitionHolderCompletenessTest
                 ),
                 new DimensionRangeShardSpec(
                     Collections.singletonList("dim"),
+                    VirtualColumns.EMPTY,
                     StringTuple.create("bbb"),
                     StringTuple.create("fff"),
                     1,
@@ -116,6 +120,7 @@ public class PartitionHolderCompletenessTest
             ImmutableList.of(
                 new DimensionRangeShardSpec(
                     Collections.singletonList("dim"),
+                    VirtualColumns.EMPTY,
                     StringTuple.create("bbb"),
                     StringTuple.create("fff"),
                     1,
@@ -123,6 +128,7 @@ public class PartitionHolderCompletenessTest
                 ),
                 new DimensionRangeShardSpec(
                     Collections.singletonList("dim"),
+                    VirtualColumns.EMPTY,
                     StringTuple.create("fff"),
                     null,
                     2,
@@ -130,6 +136,7 @@ public class PartitionHolderCompletenessTest
                 ),
                 new DimensionRangeShardSpec(
                     Collections.singletonList("dim"),
+                    VirtualColumns.EMPTY,
                     null,
                     StringTuple.create("bbb"),
                     0,
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
index 532c7197718..c66def98793 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfo.java
@@ -28,7 +28,12 @@ import 
org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.VirtualColumn;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.virtual.VirtualizedColumnInspector;
 import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
 
@@ -38,9 +43,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 
 /**
@@ -119,7 +121,8 @@ public class ClientCompactionRunnerInfo
     if (newConfig.getTuningConfig() != null) {
       validationResults.add(validatePartitionsSpecForMSQ(
           newConfig.getTuningConfig().getPartitionsSpec(),
-          newConfig.getDimensionsSpec() == null ? null : 
newConfig.getDimensionsSpec().getDimensions()
+          newConfig.getDimensionsSpec() == null ? null : 
newConfig.getDimensionsSpec().getDimensions(),
+          newConfig.getTransformSpec() == null ? VirtualColumns.EMPTY : 
newConfig.getTransformSpec().getVirtualColumns()
       ));
     }
     if (newConfig.getGranularitySpec() != null) {
@@ -142,7 +145,8 @@ public class ClientCompactionRunnerInfo
    */
   public static CompactionConfigValidationResult validatePartitionsSpecForMSQ(
       @Nullable PartitionsSpec partitionsSpec,
-      @Nullable List<DimensionSchema> dimensionSchemas
+      @Nullable List<DimensionSchema> dimensionSchemas,
+      VirtualColumns virtualColumns
   )
   {
     if (partitionsSpec == null) {
@@ -165,19 +169,36 @@ public class ClientCompactionRunnerInfo
       );
     }
     if (partitionsSpec instanceof DimensionRangePartitionsSpec && 
dimensionSchemas != null) {
-      Map<String, DimensionSchema> dimensionSchemaMap = 
dimensionSchemas.stream().collect(
-          Collectors.toMap(DimensionSchema::getName, Function.identity())
-      );
-      Optional<String> nonStringDimension = ((DimensionRangePartitionsSpec) 
partitionsSpec)
-          .getPartitionDimensions()
-          .stream()
-          .filter(dim -> 
!ColumnType.STRING.equals(dimensionSchemaMap.get(dim).getColumnType()))
-          .findAny();
-      if (nonStringDimension.isPresent()) {
+      RowSignature.Builder baseSignatureBuilder = 
RowSignature.builder().addTimeColumn();
+      for (DimensionSchema dimensionSchema : dimensionSchemas) {
+        baseSignatureBuilder.add(dimensionSchema.getName(), 
dimensionSchema.getColumnType());
+      }
+      final RowSignature baseSignature = baseSignatureBuilder.build();
+      final ColumnInspector inspector = new 
VirtualizedColumnInspector(baseSignature, virtualColumns);
+
+      String nonString = null;
+      ColumnType nonStringType = null;
+      for (String dim : ((DimensionRangePartitionsSpec) 
partitionsSpec).getPartitionDimensions()) {
+        ColumnType partitionType = 
baseSignature.getColumnType(dim).orElse(null);
+        if (partitionType == null) {
+          VirtualColumn virtualColumn = virtualColumns.getVirtualColumn(dim);
+          if (virtualColumn != null) {
+            partitionType = ColumnType.fromCapabilities(
+                virtualColumn.capabilities(inspector, 
virtualColumn.getOutputName())
+            );
+          }
+        }
+        if (!ColumnType.STRING.equals(partitionType)) {
+          nonString = dim;
+          nonStringType = partitionType;
+          break;
+        }
+      }
+      if (nonString != null) {
         return CompactionConfigValidationResult.failure(
             "MSQ: Non-string partition dimension[%s] of type[%s] not supported 
with 'range' partition spec",
-            nonStringDimension.get(),
-            dimensionSchemaMap.get(nonStringDimension.get()).getTypeName()
+            nonString,
+            nonStringType
         );
       }
     }
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
index f0d35ef324b..2730c98899e 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionRunnerInfoTest.java
@@ -161,7 +161,7 @@ public class ClientCompactionRunnerInfoTest
     );
     Assert.assertFalse(validationResult.isValid());
     Assert.assertEquals(
-        "MSQ: Non-string partition dimension[partitionDim] of type[long] not 
supported with 'range' partition spec",
+        "MSQ: Non-string partition dimension[partitionDim] of type[LONG] not 
supported with 'range' partition spec",
         validationResult.getReason()
     );
   }
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 8e166dbfcf9..91963e3a96d 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -46,6 +46,7 @@ import 
org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.SegmentSchemaMapping;
 import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
 import org.apache.druid.segment.metadata.FingerprintGenerator;
 import org.apache.druid.segment.metadata.HeapMemoryIndexingStateStorage;
@@ -670,7 +671,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
           ImmutableMap.of("path", "a-" + i),
           ImmutableList.of("dim1"),
           ImmutableList.of("m1"),
-          new DimensionRangeShardSpec(List.of("dim1"), null, null, i - 1, 8),
+          new DimensionRangeShardSpec(List.of("dim1"), null, null, null, i - 
1, 8),
           9,
           100
       );
@@ -695,7 +696,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
           ImmutableMap.of("path", "b-" + i),
           ImmutableList.of("dim1"),
           ImmutableList.of("m1"),
-          new DimensionRangeShardSpec(List.of("dim1"), null, null, i - 1, 8),
+          new DimensionRangeShardSpec(List.of("dim1"), null, null, null, i - 
1, 8),
           9,
           100
       );
@@ -3622,6 +3623,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
               metrics,
               new DimensionRangeShardSpec(
                   Collections.singletonList("dim"),
+                  VirtualColumns.EMPTY,
                   i == 0 ? null : StringTuple.create(String.valueOf(i - 1)),
                   i == 5 ? null : StringTuple.create(String.valueOf(i)),
                   i,
@@ -4316,7 +4318,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
           loadspec,
           dimensions,
           metrics,
-          new DimensionRangeShardSpec(dimensions, null, null, 0, 1),
+          new DimensionRangeShardSpec(dimensions, VirtualColumns.EMPTY, null, 
null, 0, 1),
           0,
           100
       );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to