This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ec8d527cec Support Sort in decoupled Dart plans (#18123)
7ec8d527cec is described below

commit 7ec8d527cecbb8b0ba753dd1c9173278c794c177
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Wed Jun 18 15:29:25 2025 +0200

    Support Sort in decoupled Dart plans (#18123)
    
    * 🔁 runs `Sort`-s with the use of a separate stage - which makes this plan 
not entirely the same as the one executed in the end - but its easier to 
comprehend and when I've started working on adding real grouping it seems like 
a good fit
    * 📄 the translation to `QueryDefinition` has its own place now ; it right 
now adds surpulus stages which could be collapsed - but that could be fixed 
later
    * 📦 moves the `*Stage` classes outside of `LogicalStageBuilder` (which was 
never really a builder)
---
 .../controller/sql/PrePlannedDartQueryMaker.java   |  11 +-
 .../druid/msq/kernel/StageDefinitionBuilder.java   |   2 +-
 .../DruidLogicalToQueryDefinitionTranslator.java   |  74 ++++--
 .../apache/druid/msq/logical/LogicalInputSpec.java |  98 +++++++
 .../druid/msq/logical/LogicalStageBuilder.java     | 287 ---------------------
 .../org/apache/druid/msq/logical/StageMaker.java   | 155 +++++++++++
 .../stages/AbstractFrameProcessorStage.java        |  52 ++++
 .../msq/logical/stages/AbstractLogicalStage.java   |  64 +++++
 .../msq/logical/stages/AbstractShuffleStage.java   |  43 +++
 .../druid/msq/logical/stages/FilterStage.java      |  78 ++++++
 .../msq/logical/{ => stages}/LogicalStage.java     |  34 ++-
 .../druid/msq/logical/stages/ProjectStage.java     |  38 +++
 .../apache/druid/msq/logical/stages/ReadStage.java |  98 +++++++
 .../apache/druid/msq/logical/stages/SortStage.java |  65 +++++
 .../scan/ScanQueryFrameProcessorFactory.java       |  23 ++
 .../org/apache/druid/msq/test/CalciteDartTest.java |  40 ++-
 .../java/org/apache/druid/frame/key/KeyColumn.java |   9 +
 .../java/org/apache/druid/frame/key/KeyOrder.java  |  14 +
 .../planner/querygen/DruidQueryGenerator.java      |  22 +-
 .../apache/druid/sql/calcite/rel/DruidQuery.java   |  43 +--
 .../druid/sql/calcite/rel/logical/DruidSort.java   |   5 +
 .../apache/druid/sql/calcite/NotYetSupported.java  |   2 +-
 22 files changed, 901 insertions(+), 356 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
index 68df90b6867..d22b7ebe028 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
@@ -25,6 +25,8 @@ import org.apache.druid.msq.indexing.LegacyMSQSpec;
 import org.apache.druid.msq.indexing.QueryDefMSQSpec;
 import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.logical.DruidLogicalToQueryDefinitionTranslator;
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.logical.stages.LogicalStage;
 import org.apache.druid.msq.sql.MSQTaskQueryMaker;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
@@ -65,9 +67,14 @@ class PrePlannedDartQueryMaker implements QueryMaker, 
QueryMaker.FromDruidLogica
       throw new 
ForbiddenException(plannerContext.getAuthorizationResult().getErrorMessage());
     }
     DruidLogicalToQueryDefinitionTranslator qdt = new 
DruidLogicalToQueryDefinitionTranslator(plannerContext);
-    QueryDefinition queryDef = qdt.translate(rootRel);
+    LogicalStage logicalStage = qdt.translate(rootRel);
+
+    StageMaker maker = new StageMaker(plannerContext);
+    maker.buildStage(logicalStage);
+    QueryDefinition queryDef = maker.buildQueryDefinition();
+
     QueryContext context = plannerContext.queryContext();
-    ColumnMappings columnMappings = 
QueryUtils.buildColumnMappings(dartQueryMaker.fieldMapping, 
queryDef.getOutputRowSignature());
+    ColumnMappings columnMappings = 
QueryUtils.buildColumnMappings(dartQueryMaker.fieldMapping, 
logicalStage.getLogicalRowSignature());
     QueryDefMSQSpec querySpec = MSQTaskQueryMaker.makeQueryDefMSQSpec(
         null,
         context,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
index 6b2090056c9..91bedaada08 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
@@ -117,7 +117,7 @@ public class StageDefinitionBuilder
     return this;
   }
 
-  int getStageNumber()
+  public int getStageNumber()
   {
     return stageNumber;
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
index d782e538afb..66282b95c24 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
@@ -19,20 +19,25 @@
 
 package org.apache.druid.msq.logical;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelNode;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.key.KeyColumn;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.msq.input.inline.InlineInputSpec;
 import org.apache.druid.msq.input.table.TableInputSpec;
-import org.apache.druid.msq.kernel.QueryDefinition;
-import org.apache.druid.msq.logical.LogicalStageBuilder.ReadStage;
+import org.apache.druid.msq.logical.stages.LogicalStage;
+import org.apache.druid.msq.logical.stages.ReadStage;
+import org.apache.druid.msq.logical.stages.SortStage;
 import org.apache.druid.query.InlineDataSource;
 import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
 import 
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+import org.apache.druid.sql.calcite.rel.DruidQuery;
 import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;
+import org.apache.druid.sql.calcite.rel.logical.DruidSort;
 import org.apache.druid.sql.calcite.rel.logical.DruidTableScan;
 import org.apache.druid.sql.calcite.rel.logical.DruidValues;
 
@@ -43,7 +48,7 @@ import java.util.Optional;
 
 /**
  * Translates the logical plan defined by the {@link DruidLogicalNode} into a
- * {@link QueryDefinition}.
+ * {@link LogicalStage} nodes.
  *
  * The translation should be executed as a single pass over the logical plan.
  *
@@ -53,23 +58,21 @@ import java.util.Optional;
 public class DruidLogicalToQueryDefinitionTranslator
 {
   private PlannerContext plannerContext;
-  private LogicalStageBuilder stageBuilder;
 
   public DruidLogicalToQueryDefinitionTranslator(PlannerContext plannerContext)
   {
     this.plannerContext = plannerContext;
-    this.stageBuilder = new LogicalStageBuilder(plannerContext);
   }
 
   /**
    * Executes the translation of the logical plan into a query definition.
    */
-  public QueryDefinition translate(DruidLogicalNode relRoot)
+  public LogicalStage translate(DruidLogicalNode relRoot)
   {
-    DruidNodeStack stack = new DruidNodeStack();
+    DruidNodeStack stack = new DruidNodeStack(plannerContext);
     stack.push(relRoot);
     LogicalStage logicalStage = buildStageFor(stack);
-    return logicalStage.build();
+    return logicalStage;
   }
 
   /**
@@ -83,11 +86,12 @@ public class DruidLogicalToQueryDefinitionTranslator
   private LogicalStage buildStageFor(DruidNodeStack stack)
   {
     List<LogicalStage> inputStages = buildInputStages(stack);
-
     DruidLogicalNode node = stack.getNode();
-    Optional<ReadStage> stage = buildReadStage(node);
-    if (stage.isPresent()) {
-      return stage.get();
+    if (inputStages.size() == 0) {
+      Optional<ReadStage> stage = buildReadStage(node);
+      if (stage.isPresent()) {
+        return stage.get();
+      }
     }
     if (inputStages.size() == 1) {
       LogicalStage inputStage = inputStages.get(0);
@@ -95,10 +99,39 @@ public class DruidLogicalToQueryDefinitionTranslator
       if (newStage != null) {
         return newStage;
       }
+      newStage = makeSequenceStage(inputStage, stack);
+      if (newStage != null) {
+        return newStage;
+      }
     }
     throw DruidException.defensive().build("Unable to process relNode[%s]", 
node);
   }
 
+  private Optional<ReadStage> buildReadStage(DruidLogicalNode node)
+  {
+    if (node instanceof DruidValues) {
+      return translateValues((DruidValues) node);
+    }
+    if (node instanceof DruidTableScan) {
+      return translateTableScan((DruidTableScan) node);
+    }
+    return Optional.empty();
+  }
+
+  private LogicalStage makeSequenceStage(LogicalStage inputStage, 
DruidNodeStack stack)
+  {
+    if (stack.getNode() instanceof DruidSort) {
+      DruidSort sort = (DruidSort) stack.getNode();
+      if (sort.hasLimitOrOffset()) {
+        throw DruidException.defensive("Sort with limit or offset is not 
supported in MSQ logical stage builder");
+      }
+      List<OrderByColumnSpec> orderBySpecs = 
DruidQuery.buildOrderByColumnSpecs(inputStage.getLogicalRowSignature(), sort);
+      List<KeyColumn> keyColumns = Lists.transform(orderBySpecs, 
KeyColumn::fromOrderByColumnSpec);
+      return new SortStage(inputStage, keyColumns);
+    }
+    return new ReadStage(inputStage.getLogicalRowSignature(), 
LogicalInputSpec.of(inputStage)).extendWith(stack);
+  }
+
   private List<LogicalStage> buildInputStages(DruidNodeStack stack)
   {
     List<LogicalStage> inputStages = new ArrayList<>();
@@ -111,23 +144,12 @@ public class DruidLogicalToQueryDefinitionTranslator
     return inputStages;
   }
 
-  private Optional<ReadStage> buildReadStage(DruidLogicalNode node)
-  {
-    if (node instanceof DruidValues) {
-      return translateValues((DruidValues) node);
-    }
-    if (node instanceof DruidTableScan) {
-      return translateTableScan((DruidTableScan) node);
-    }
-    return Optional.empty();
-  }
-
   private Optional<ReadStage> translateTableScan(DruidTableScan node)
   {
     SourceDesc sd = node.getSourceDesc(plannerContext, 
Collections.emptyList());
     TableDataSource ids = (TableDataSource) sd.dataSource;
     TableInputSpec inputSpec = new TableInputSpec(ids.getName(), 
Intervals.ONLY_ETERNITY, null, null);
-    ReadStage stage = stageBuilder.makeReadStage(sd.rowSignature, 
ImmutableList.of(inputSpec));
+    ReadStage stage = new ReadStage(sd.rowSignature, 
LogicalInputSpec.of(inputSpec));
     return Optional.of(stage);
   }
 
@@ -136,7 +158,7 @@ public class DruidLogicalToQueryDefinitionTranslator
     SourceDesc sd = node.getSourceDesc(plannerContext, 
Collections.emptyList());
     InlineDataSource ids = (InlineDataSource) sd.dataSource;
     InlineInputSpec inputSpec = new InlineInputSpec(ids);
-    ReadStage stage = stageBuilder.makeReadStage(sd.rowSignature, 
ImmutableList.of(inputSpec));
+    ReadStage stage = new ReadStage(sd.rowSignature, 
LogicalInputSpec.of(inputSpec));
     return Optional.of(stage);
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
new file mode 100644
index 00000000000..106537b98a7
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical;
+
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.StageDefinitionBuilder;
+import org.apache.druid.msq.logical.stages.LogicalStage;
+import org.apache.druid.segment.column.RowSignature;
+
+/**
+ * Represents an {@link InputSpec} for {@link LogicalStage}-s.
+ */
+public abstract class LogicalInputSpec
+{
+  public abstract InputSpec toInputSpec(StageMaker maker);
+
+  public abstract RowSignature getRowSignature();
+
+  public static LogicalInputSpec of(LogicalStage inputStage)
+  {
+    return new DagStageInputSpec(inputStage);
+  }
+
+  public static LogicalInputSpec of(InputSpec inputSpec)
+  {
+    return new PhysicalInputSpec(inputSpec);
+  }
+
+  static class PhysicalInputSpec extends LogicalInputSpec
+  {
+    private InputSpec inputSpec;
+
+    public PhysicalInputSpec(InputSpec inputSpec)
+    {
+      this.inputSpec = inputSpec;
+    }
+
+    @Override
+    public InputSpec toInputSpec(StageMaker maker)
+    {
+      return inputSpec;
+    }
+
+    @Override
+    public RowSignature getRowSignature()
+    {
+      throw NotYetImplemented.ex(null, "Not supported for this type");
+    }
+  }
+
+  static class DagStageInputSpec extends LogicalInputSpec
+  {
+
+    private LogicalStage inputStage;
+
+    public DagStageInputSpec(LogicalStage inputStage)
+    {
+      this.inputStage = inputStage;
+    }
+
+    @Override
+    public InputSpec toInputSpec(StageMaker maker)
+    {
+      StageDefinitionBuilder stage = maker.buildStage(inputStage);
+      return new StageInputSpec(stage.getStageNumber());
+    }
+
+    public LogicalStage getStage()
+    {
+      return inputStage;
+    }
+
+    @Override
+    public RowSignature getRowSignature()
+    {
+      return inputStage.getRowSignature();
+    }
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStageBuilder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStageBuilder.java
deleted file mode 100644
index 756b558f86e..00000000000
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStageBuilder.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.logical;
-
-import org.apache.druid.error.DruidException;
-import org.apache.druid.msq.input.InputSpec;
-import org.apache.druid.msq.kernel.MixShuffleSpec;
-import org.apache.druid.msq.kernel.QueryDefinition;
-import org.apache.druid.msq.kernel.StageDefinition;
-import org.apache.druid.msq.kernel.StageDefinitionBuilder;
-import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessorFactory;
-import org.apache.druid.query.Druids;
-import org.apache.druid.query.QueryContexts;
-import org.apache.druid.query.filter.DimFilter;
-import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.spec.QuerySegmentSpec;
-import org.apache.druid.segment.VirtualColumns;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
-import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
-import org.apache.druid.sql.calcite.rel.DruidQuery;
-import org.apache.druid.sql.calcite.rel.Projection;
-import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
-import org.apache.druid.sql.calcite.rel.logical.DruidFilter;
-import org.apache.druid.sql.calcite.rel.logical.DruidProject;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Helper class to build a {@link LogicalStage} tree.
- *
- * Tightly coupled to {@link DruidLogicalToQueryDefinitionTranslator}.
- * Currently its just a context to hold all the {@link LogicalStage} classes.
- */
-public class LogicalStageBuilder
-{
-  private PlannerContext plannerContext;
-
-  public LogicalStageBuilder(PlannerContext plannerContext)
-  {
-    this.plannerContext = plannerContext;
-  }
-
-  class StageMaker
-  {
-    /** Provides ids for the stages. */
-    private int stageIdSeq = 0;
-
-    StageDefinition makeScanStage(
-        VirtualColumns virtualColumns,
-        RowSignature signature,
-        List<InputSpec> inputs,
-        DimFilter dimFilter)
-    {
-      ScanQuery s = Druids.newScanQueryBuilder()
-          .dataSource(IRRELEVANT)
-          .intervals(QuerySegmentSpec.ETERNITY)
-          .filters(dimFilter)
-          .virtualColumns(virtualColumns)
-          .columns(signature.getColumnNames())
-          .columnTypes(signature.getColumnTypes())
-          .build();
-      ScanQueryFrameProcessorFactory scanProcessorFactory = new 
ScanQueryFrameProcessorFactory(s);
-      StageDefinitionBuilder sdb = StageDefinition.builder(getNextStageId())
-          .inputs(inputs)
-          .signature(signature)
-          .shuffleSpec(MixShuffleSpec.instance())
-          .processorFactory(scanProcessorFactory);
-      return sdb.build(getIdForBuilder());
-    }
-
-    private int getNextStageId()
-    {
-      return stageIdSeq++;
-    }
-
-    private String getIdForBuilder()
-    {
-      String dartQueryId = 
plannerContext.queryContext().getString(QueryContexts.CTX_DART_QUERY_ID);
-      if (dartQueryId != null) {
-        return dartQueryId;
-      }
-      return plannerContext.getSqlQueryId();
-    }
-  }
-
-  public abstract class AbstractLogicalStage implements LogicalStage
-  {
-    protected final List<InputSpec> inputSpecs;
-    protected final RowSignature signature;
-    protected final List<LogicalStage> inputStages;
-
-    public AbstractLogicalStage(RowSignature signature, List<InputSpec> 
inputs, List<LogicalStage> inputVertices)
-    {
-      this.inputSpecs = inputs;
-      this.signature = signature;
-      this.inputStages = inputVertices;
-    }
-
-    @Override
-    public StageDefinition buildCurrentStage(StageMaker stageMaker)
-    {
-      throw DruidException.defensive("This should have been implemented - or 
not reach this point!");
-    }
-
-    @Override
-    public final QueryDefinition build()
-    {
-      return QueryDefinition.create(buildStageDefinitions(new StageMaker()), 
plannerContext.queryContext());
-    }
-
-    @Override
-    public final List<StageDefinition> buildStageDefinitions(StageMaker 
stageMaker)
-    {
-      List<StageDefinition> ret = new ArrayList<>();
-      if (!inputStages.isEmpty()) {
-        throw DruidException.defensive("Not yet supported");
-      }
-      ret.add(buildCurrentStage(stageMaker));
-      return ret;
-    }
-  }
-
-  /**
-   * Represents a stage that reads data from input sources.
-   */
-  public class ReadStage extends AbstractLogicalStage
-  {
-    public ReadStage(RowSignature signature, List<InputSpec> inputSpecs)
-    {
-      super(signature, inputSpecs, Collections.emptyList());
-    }
-
-    /**
-     * Copy constructor.
-     */
-    public ReadStage(ReadStage readStage, RowSignature newSignature)
-    {
-      super(newSignature, readStage.inputSpecs, readStage.inputStages);
-    }
-
-    @Override
-    public StageDefinition buildCurrentStage(StageMaker stageMaker)
-    {
-      return stageMaker.makeScanStage(VirtualColumns.EMPTY, signature, 
inputSpecs, null);
-    }
-
-    @Override
-    public LogicalStage extendWith(DruidNodeStack stack)
-    {
-      if (stack.getNode() instanceof DruidFilter) {
-        DruidFilter filter = (DruidFilter) stack.getNode();
-        return makeFilterStage(filter);
-      }
-
-      if (stack.getNode() instanceof DruidProject) {
-
-        DruidProject project = (DruidProject) stack.getNode();
-        DruidFilter dummyFilter = new DruidFilter(
-            project.getCluster(), project.getTraitSet(), project,
-            project.getCluster().getRexBuilder().makeLiteral(true)
-        );
-        return makeFilterStage(dummyFilter).extendWith(stack);
-      }
-      return null;
-    }
-
-    private LogicalStage makeFilterStage(DruidFilter filter)
-    {
-      VirtualColumnRegistry virtualColumnRegistry = 
VirtualColumnRegistry.create(
-          signature,
-          plannerContext.getExpressionParser(),
-          plannerContext.getPlannerConfig().isForceExpressionVirtualColumns()
-      );
-
-      DimFilter dimFilter = DruidQuery.getDimFilter(
-          plannerContext,
-          signature, virtualColumnRegistry, filter
-      );
-
-      return new FilterStage(
-          this,
-          virtualColumnRegistry,
-          dimFilter
-      );
-    }
-  }
-
-  public FilterStage create(ReadStage inputStage, DruidFilter filter)
-  {
-    VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(
-        inputStage.signature,
-        plannerContext.getExpressionParser(),
-        plannerContext.getPlannerConfig().isForceExpressionVirtualColumns()
-    );
-    DimFilter dimFilter = DruidQuery.getDimFilter(plannerContext, 
inputStage.signature, virtualColumnRegistry, filter);
-    return new FilterStage(inputStage, virtualColumnRegistry, dimFilter);
-  }
-
-  class FilterStage extends ReadStage
-  {
-    protected final VirtualColumnRegistry virtualColumnRegistry;
-    private DimFilter dimFilter;
-
-    public FilterStage(ReadStage inputStage, VirtualColumnRegistry 
virtualColumnRegistry, DimFilter dimFilter)
-    {
-      super(inputStage, inputStage.signature);
-      this.virtualColumnRegistry = virtualColumnRegistry;
-      this.dimFilter = dimFilter;
-    }
-
-    /**
-     * Copy constructor.
-     */
-    public FilterStage(FilterStage stage, VirtualColumnRegistry 
newVirtualColumnRegistry, RowSignature rowSignature)
-    {
-      super(stage, rowSignature);
-      this.dimFilter = stage.dimFilter;
-      this.virtualColumnRegistry = newVirtualColumnRegistry;
-    }
-
-    @Override
-    public StageDefinition buildCurrentStage(StageMaker stageMaker)
-    {
-      VirtualColumns output = 
virtualColumnRegistry.build(Collections.emptySet());
-      return stageMaker.makeScanStage(output, signature, inputSpecs, 
dimFilter);
-    }
-
-    @Override
-    public LogicalStage extendWith(DruidNodeStack stack)
-    {
-      if (stack.getNode() instanceof DruidProject) {
-        DruidProject project = (DruidProject) stack.getNode();
-        Projection preAggregation = Projection
-            .preAggregation(project, plannerContext, signature, 
virtualColumnRegistry);
-
-        return new ProjectStage(
-            this,
-            virtualColumnRegistry,
-            preAggregation.getOutputRowSignature()
-        );
-      }
-      return null;
-    }
-  }
-
-  class ProjectStage extends FilterStage
-  {
-    public ProjectStage(FilterStage root, VirtualColumnRegistry 
newVirtualColumnRegistry, RowSignature rowSignature)
-    {
-      super(root, newVirtualColumnRegistry, rowSignature);
-    }
-
-    @Override
-    public LogicalStage extendWith(DruidNodeStack stack)
-    {
-      return null;
-    }
-  }
-
-  private static final String IRRELEVANT = "irrelevant";
-
-  public ReadStage makeReadStage(RowSignature rowSignature, List<InputSpec> 
isp)
-  {
-    return new ReadStage(rowSignature, isp);
-  }
-
-}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
new file mode 100644
index 00000000000..d7d99ad8441
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.kernel.MixShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageDefinitionBuilder;
+import org.apache.druid.msq.logical.stages.AbstractFrameProcessorStage;
+import org.apache.druid.msq.logical.stages.AbstractShuffleStage;
+import org.apache.druid.msq.logical.stages.LogicalStage;
+import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessorFactory;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Builds {@link QueryDefinition} from {@link LogicalStage}-s.
+ */
+public class StageMaker
+{
+  /** Provides ids for the stages. */
+  private int stageIdSeq = 0;
+
+  private final PlannerContext plannerContext;
+
+  private Map<LogicalStage, StageDefinitionBuilder> builtStages = new 
IdentityHashMap<>();
+
+  public StageMaker(PlannerContext plannerContext)
+  {
+    this.plannerContext = plannerContext;
+  }
+
+  public static ScanQueryFrameProcessorFactory makeScanFrameProcessor(
+      VirtualColumns virtualColumns,
+      RowSignature signature,
+      DimFilter dimFilter)
+  {
+    return 
ScanQueryFrameProcessorFactory.makeScanFrameProcessorFactory(virtualColumns, 
signature, dimFilter);
+  }
+
+  public StageDefinitionBuilder buildStage(LogicalStage stage)
+  {
+    if (builtStages.get(stage) != null) {
+      return builtStages.get(stage);
+    }
+    StageDefinitionBuilder stageDef = buildStageInternal(stage);
+    builtStages.put(stage, stageDef);
+    return stageDef;
+  }
+
+  private StageDefinitionBuilder buildStageInternal(LogicalStage stage)
+  {
+    if (stage instanceof AbstractFrameProcessorStage) {
+      return buildFrameProcessorStage((AbstractFrameProcessorStage) stage);
+    }
+    if (stage instanceof AbstractShuffleStage) {
+      return buildShuffleStage((AbstractShuffleStage) stage);
+    }
+    throw DruidException.defensive("Cannot build type [%s]", 
stage.getClass().getSimpleName());
+  }
+
+  private StageDefinitionBuilder 
buildFrameProcessorStage(AbstractFrameProcessorStage frameProcessorStage)
+  {
+    List<LogicalInputSpec> inputs = frameProcessorStage.getInputSpecs();
+    List<InputSpec> inputSpecs = new ArrayList<>();
+    for (LogicalInputSpec dagInputSpec : inputs) {
+      inputSpecs.add(dagInputSpec.toInputSpec(this));
+    }
+    BaseFrameProcessorFactory frameProcessor = 
frameProcessorStage.buildFrameProcessor(this);
+    StageDefinitionBuilder sdb = newStageDefinitionBuilder();
+    sdb.inputs(inputSpecs);
+    sdb.signature(frameProcessorStage.getLogicalRowSignature());
+    sdb.processorFactory(frameProcessor);
+    sdb.shuffleSpec(MixShuffleSpec.instance());
+    return sdb;
+  }
+
+  private StageDefinitionBuilder buildShuffleStage(AbstractShuffleStage stage)
+  {
+    List<LogicalInputSpec> inputs = stage.getInputSpecs();
+    List<InputSpec> inputSpecs = new ArrayList<>();
+    for (LogicalInputSpec dagInputSpec : inputs) {
+      inputSpecs.add(dagInputSpec.toInputSpec(this));
+    }
+    StageDefinitionBuilder sdb = newStageDefinitionBuilder();
+    sdb.inputs(inputSpecs);
+    sdb.signature(stage.getRowSignature());
+    sdb.processorFactory(makeScanFrameProcessor(VirtualColumns.EMPTY, 
stage.getRowSignature(), null));
+    sdb.shuffleSpec(stage.buildShuffleSpec());
+    return sdb;
+  }
+
+  private StageDefinitionBuilder newStageDefinitionBuilder()
+  {
+    return StageDefinition.builder(getNextStageId());
+  }
+
+  private int getNextStageId()
+  {
+    return stageIdSeq++;
+  }
+
+  public QueryDefinition buildQueryDefinition()
+  {
+    return QueryDefinition.create(makeStages(), plannerContext.queryContext());
+  }
+
+  private List<StageDefinition> makeStages()
+  {
+    List<StageDefinition> ret = new ArrayList<>();
+    for (StageDefinitionBuilder stageDefinitionBuilder : builtStages.values()) 
{
+      ret.add(stageDefinitionBuilder.build(getIdForBuilder()));
+    }
+    ret.sort(Comparator.comparing(StageDefinition::getStageNumber));
+    return ret;
+  }
+
+  private String getIdForBuilder()
+  {
+    String dartQueryId = 
plannerContext.queryContext().getString(QueryContexts.CTX_DART_QUERY_ID);
+    if (dartQueryId != null) {
+      return dartQueryId;
+    }
+    return plannerContext.getSqlQueryId();
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractFrameProcessorStage.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractFrameProcessorStage.java
new file mode 100644
index 00000000000..3eadd43f871
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractFrameProcessorStage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.kernel.FrameProcessorFactory;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/**
+ * Represents a {@link LogicalStage} which could be processed as a {@link 
FrameProcessorFactory}.
+ */
+public abstract class AbstractFrameProcessorStage extends AbstractLogicalStage
+{
+  public AbstractFrameProcessorStage(RowSignature signature, LogicalInputSpec 
input)
+  {
+    super(signature, input);
+  }
+
+  public AbstractFrameProcessorStage(RowSignature signature, 
List<LogicalInputSpec> inputs)
+  {
+    super(signature, inputs);
+  }
+
+  /**
+   * Builds the {@link FrameProcessorFactory} for this stage.
+   */
+  @Nonnull
+  public abstract BaseFrameProcessorFactory buildFrameProcessor(StageMaker 
stageMaker);
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractLogicalStage.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractLogicalStage.java
new file mode 100644
index 00000000000..8cdcfbef2e4
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractLogicalStage.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.segment.column.RowSignature;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Common stage implementation.
+ */
+public abstract class AbstractLogicalStage implements LogicalStage
+{
+  protected final List<LogicalInputSpec> inputSpecs;
+  protected final RowSignature signature;
+
+  public AbstractLogicalStage(RowSignature signature, LogicalInputSpec input)
+  {
+    this(signature, Collections.singletonList(input));
+  }
+
+  public AbstractLogicalStage(RowSignature signature, List<LogicalInputSpec> 
inputs)
+  {
+    this.inputSpecs = ImmutableList.copyOf(inputs);
+    this.signature = signature;
+  }
+
+  @Override
+  public RowSignature getLogicalRowSignature()
+  {
+    return signature;
+  }
+
+  @Override
+  public final RowSignature getRowSignature()
+  {
+    return signature;
+  }
+
+  @Override
+  public final List<LogicalInputSpec> getInputSpecs()
+  {
+    return inputSpecs;
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractShuffleStage.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractShuffleStage.java
new file mode 100644
index 00000000000..4e1a500171a
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractShuffleStage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Represents a Shuffle stage.
+ */
+public abstract class AbstractShuffleStage extends AbstractLogicalStage
+{
+  public AbstractShuffleStage(RowSignature signature, LogicalInputSpec input)
+  {
+    super(signature, input);
+  }
+
+  /**
+   * Builds the shuffle specification for this stage.
+   */
+  @Nonnull
+  public abstract ShuffleSpec buildShuffleSpec();
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/FilterStage.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/FilterStage.java
new file mode 100644
index 00000000000..b2327dc0006
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/FilterStage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import org.apache.druid.sql.calcite.rel.Projection;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.rel.logical.DruidProject;
+
+import java.util.Collections;
+
+class FilterStage extends ReadStage
+{
+  protected final VirtualColumnRegistry virtualColumnRegistry;
+  protected final DimFilter dimFilter;
+
+  public FilterStage(ReadStage inputStage, VirtualColumnRegistry 
virtualColumnRegistry, DimFilter dimFilter)
+  {
+    super(inputStage, inputStage.signature);
+    this.virtualColumnRegistry = virtualColumnRegistry;
+    this.dimFilter = dimFilter;
+  }
+
+  /**
+   * Copy constructor.
+   */
+  protected FilterStage(FilterStage stage, VirtualColumnRegistry 
newVirtualColumnRegistry, RowSignature rowSignature)
+  {
+    super(stage, rowSignature);
+    this.dimFilter = stage.dimFilter;
+    this.virtualColumnRegistry = newVirtualColumnRegistry;
+  }
+
+  @Override
+  public LogicalStage extendWith(DruidNodeStack stack)
+  {
+    if (stack.getNode() instanceof DruidProject) {
+      DruidProject project = (DruidProject) stack.getNode();
+      Projection projection = Projection.preAggregation(project, 
stack.getPlannerContext(), signature, virtualColumnRegistry);
+
+      return new ProjectStage(
+          this,
+          virtualColumnRegistry,
+          projection.getOutputRowSignature()
+      );
+    }
+    return null;
+  }
+
+  @Override
+  public BaseFrameProcessorFactory buildFrameProcessor(StageMaker stageMaker)
+  {
+    VirtualColumns virtualColumns = 
virtualColumnRegistry.build(Collections.emptySet());
+    return StageMaker.makeScanFrameProcessor(virtualColumns, signature, 
dimFilter);
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStage.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
similarity index 71%
rename from 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStage.java
rename to 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
index ce771d25c6c..bc681f5e066 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStage.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
@@ -17,11 +17,10 @@
  * under the License.
  */
 
-package org.apache.druid.msq.logical;
+package org.apache.druid.msq.logical.stages;
 
-import org.apache.druid.msq.kernel.QueryDefinition;
-import org.apache.druid.msq.kernel.StageDefinition;
-import org.apache.druid.msq.logical.LogicalStageBuilder.StageMaker;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.segment.column.RowSignature;
 import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
 
 import javax.annotation.Nullable;
@@ -36,18 +35,6 @@ import java.util.List;
  */
 public interface LogicalStage
 {
-  /**
-   * Builds the full {@link QueryDefinition}.
-   *
-   * This supposed to be called on the top level stage.
-   */
-  QueryDefinition build();
-
-  /**
-   * Builds the current stage.
-   */
-  StageDefinition buildCurrentStage(StageMaker stageMaker);
-
   /**
    * Attempts to extend the current stage with an additional node.
    *
@@ -57,9 +44,20 @@ public interface LogicalStage
   LogicalStage extendWith(DruidNodeStack stack);
 
   /**
-   * Internal method to build the stage definitions.
+   * Real row signature this stage will return.
+   *
+   * This might have been reordered and may have additional technical columns.
+   */
+  RowSignature getRowSignature();
+
+  /**
+   * Logical row signature this node supposed to be producing.
    */
-  List<StageDefinition> buildStageDefinitions(StageMaker stageMaker);
+  RowSignature getLogicalRowSignature();
 
 
+  /**
+   * Returns the inputs of this stage.
+   */
+  List<LogicalInputSpec> getInputSpecs();
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ProjectStage.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ProjectStage.java
new file mode 100644
index 00000000000..9c9face4750
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ProjectStage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+
+class ProjectStage extends FilterStage
+{
+  public ProjectStage(FilterStage stage, VirtualColumnRegistry 
virtualColumnRegistry, RowSignature signature)
+  {
+    super(stage, virtualColumnRegistry, signature);
+  }
+
+  @Override
+  public LogicalStage extendWith(DruidNodeStack stack)
+  {
+    return null;
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
new file mode 100644
index 00000000000..c1ff5a3e277
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import org.apache.druid.sql.calcite.rel.DruidQuery;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.rel.logical.DruidFilter;
+import org.apache.druid.sql.calcite.rel.logical.DruidProject;
+
+/**
+ * Represents a stage that reads data from input sources.
+ */
+public class ReadStage extends AbstractFrameProcessorStage
+{
+  public ReadStage(RowSignature signature, LogicalInputSpec inputSpec)
+  {
+    super(signature, inputSpec);
+  }
+
+  /**
+   * Copy constructor.
+   */
+  protected ReadStage(ReadStage readStage, RowSignature newSignature)
+  {
+    super(newSignature, readStage.inputSpecs);
+  }
+
+  @Override
+  public LogicalStage extendWith(DruidNodeStack stack)
+  {
+    if (stack.getNode() instanceof DruidFilter) {
+      DruidFilter filter = (DruidFilter) stack.getNode();
+      return makeFilterStage(stack.getPlannerContext(), filter);
+    }
+
+    if (stack.getNode() instanceof DruidProject) {
+
+      DruidProject project = (DruidProject) stack.getNode();
+      DruidFilter dummyFilter = new DruidFilter(
+          project.getCluster(), project.getTraitSet(), project,
+          project.getCluster().getRexBuilder().makeLiteral(true)
+      );
+      return makeFilterStage(stack.getPlannerContext(), 
dummyFilter).extendWith(stack);
+    }
+    return null;
+  }
+
+  private LogicalStage makeFilterStage(PlannerContext plannerContext, 
DruidFilter filter)
+  {
+    VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(
+        signature,
+        plannerContext.getExpressionParser(),
+        plannerContext.getPlannerConfig().isForceExpressionVirtualColumns()
+    );
+
+    DimFilter dimFilter = DruidQuery.getDimFilter(
+        plannerContext,
+        signature, virtualColumnRegistry, filter
+    );
+
+    return new FilterStage(
+        this,
+        virtualColumnRegistry,
+        dimFilter
+    );
+  }
+
+  @Override
+  public BaseFrameProcessorFactory buildFrameProcessor(StageMaker stageMaker)
+  {
+    return StageMaker.makeScanFrameProcessor(VirtualColumns.EMPTY, signature, 
null);
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
new file mode 100644
index 00000000000..df8aad6c57b
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+
+import java.util.List;
+
+public class SortStage extends AbstractShuffleStage
+{
+  protected final List<KeyColumn> keyColumns;
+
+  public SortStage(LogicalStage inputStage, List<KeyColumn> keyColumns)
+  {
+    super(
+        QueryKitUtils.sortableSignature(inputStage.getLogicalRowSignature(), 
keyColumns),
+        LogicalInputSpec.of(inputStage)
+    );
+    this.keyColumns = keyColumns;
+  }
+
+  @Override
+  public RowSignature getLogicalRowSignature()
+  {
+    return inputSpecs.get(0).getRowSignature();
+  }
+
+  @Override
+  public ShuffleSpec buildShuffleSpec()
+  {
+    final ClusterBy clusterBy = new ClusterBy(keyColumns, 0);
+    return 
ShuffleSpecFactories.globalSortWithMaxPartitionCount(1).build(clusterBy, false);
+  }
+
+  @Override
+  public LogicalStage extendWith(DruidNodeStack stack)
+  {
+    return null;
+  }
+
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
index 39c311226c6..4deff368876 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
@@ -30,8 +30,13 @@ import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.kernel.FrameContext;
 import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicLong;
@@ -39,6 +44,8 @@ import java.util.concurrent.atomic.AtomicLong;
 @JsonTypeName("scan")
 public class ScanQueryFrameProcessorFactory extends 
BaseLeafFrameProcessorFactory
 {
+  private static final String IRRELEVANT = "irrelevant";
+
   private final ScanQuery query;
 
   /**
@@ -58,6 +65,22 @@ public class ScanQueryFrameProcessorFactory extends 
BaseLeafFrameProcessorFactor
     this.runningCountForLimit = query.isLimited() && 
query.getOrderBys().isEmpty() ? new AtomicLong() : null;
   }
 
+  public static ScanQueryFrameProcessorFactory makeScanFrameProcessorFactory(
+      VirtualColumns virtualColumns,
+      RowSignature signature,
+      DimFilter dimFilter)
+  {
+    ScanQuery scanQuery = Druids.newScanQueryBuilder()
+        .dataSource(IRRELEVANT)
+        .intervals(QuerySegmentSpec.ETERNITY)
+        .filters(dimFilter)
+        .virtualColumns(virtualColumns)
+        .columns(signature.getColumnNames())
+        .columnTypes(signature.getColumnTypes())
+        .build();
+    return new ScanQueryFrameProcessorFactory(scanQuery);
+  }
+
   @JsonProperty
   public ScanQuery getQuery()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
index afda9e44628..9e71767e6d6 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
@@ -28,9 +28,12 @@ import org.apache.druid.sql.calcite.NotYetSupported.Modes;
 import org.apache.druid.sql.calcite.QueryTestBuilder;
 import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
 import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.jupiter.api.MethodOrderer;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
 
 @SqlTestFrameworkConfig.ComponentSupplier(DartComponentSupplier.class)
+@TestMethodOrder(MethodOrderer.MethodName.class)
 public class CalciteDartTest extends BaseCalciteQueryTest
 {
   @Override
@@ -55,7 +58,6 @@ public class CalciteDartTest extends BaseCalciteQueryTest
         .run();
   }
 
-  @NotYetSupported(Modes.SUPPORT_SORT)
   @Test
   public void testOrderBy()
   {
@@ -74,6 +76,42 @@ public class CalciteDartTest extends BaseCalciteQueryTest
         .run();
   }
 
+  @Test
+  public void testOrderByVirtual()
+  {
+    testBuilder()
+        .sql("SELECT l1 from numfoo order by -l1")
+        .expectedResults(
+            ImmutableList.of(
+                new Object[]{null},
+                new Object[]{null},
+                new Object[]{null},
+                new Object[]{325323L},
+                new Object[]{7L},
+                new Object[]{0L}
+            )
+        )
+        .run();
+  }
+
+  @Test
+  public void testOrderByVirtualDesc()
+  {
+    testBuilder()
+        .sql("SELECT l1 from numfoo order by -l1 desc")
+        .expectedResults(
+            ImmutableList.of(
+                new Object[]{0L},
+                new Object[]{7L},
+                new Object[]{325323L},
+                new Object[]{null},
+                new Object[]{null},
+                new Object[]{null}
+            )
+        )
+        .run();
+  }
+
   @NotYetSupported(Modes.RESTRICTED_DATASOURCE_SUPPORT)
   @Test
   public void testSelectFromRestricted()
diff --git a/processing/src/main/java/org/apache/druid/frame/key/KeyColumn.java 
b/processing/src/main/java/org/apache/druid/frame/key/KeyColumn.java
index cf73c0ecddb..c84b8f85f64 100644
--- a/processing/src/main/java/org/apache/druid/frame/key/KeyColumn.java
+++ b/processing/src/main/java/org/apache/druid/frame/key/KeyColumn.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 
 import java.util.Objects;
 
@@ -84,4 +85,12 @@ public class KeyColumn
   {
     return StringUtils.format("%s%s", columnName, order == KeyOrder.NONE ? "" 
: " " + order);
   }
+
+  public static KeyColumn fromOrderByColumnSpec(OrderByColumnSpec 
orderByColumnSpec)
+  {
+    return new KeyColumn(
+        orderByColumnSpec.getDimension(),
+        KeyOrder.fromDirection(orderByColumnSpec.getDirection())
+    );
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java 
b/processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java
index fe568483bf4..25f86bd7c32 100644
--- a/processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java
+++ b/processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.frame.key;
 
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec.Direction;
+
 /**
  * Ordering associated with a {@link KeyColumn}.
  */
@@ -58,4 +60,16 @@ public enum KeyOrder
   {
     return sortable;
   }
+
+  static KeyOrder fromDirection(Direction direction)
+  {
+    switch (direction) {
+      case ASCENDING:
+        return ASCENDING;
+      case DESCENDING:
+        return DESCENDING;
+      default:
+        return NONE;
+    }
+  }
 }
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
index 28ac96046e8..f59f1e3c6ff 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
@@ -29,6 +29,8 @@ import org.apache.calcite.rel.core.Window;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.query.DataSource;
 import org.apache.druid.query.FilteredDataSource;
@@ -85,9 +87,20 @@ public class DruidQueryGenerator
         this.node = node;
         this.operandIndex = operandIndex;
       }
-    }
 
+      @Override
+      public String toString()
+      {
+        return ToStringBuilder.reflectionToString(this, 
ToStringStyle.NO_CLASS_NAME_STYLE);
+      }
+    }
     Stack<Entry> stack = new Stack<>();
+    PlannerContext plannerContext;
+
+    public DruidNodeStack(PlannerContext plannerContext)
+    {
+      this.plannerContext = plannerContext;
+    }
 
     public void push(DruidLogicalNode item)
     {
@@ -128,11 +141,16 @@ public class DruidQueryGenerator
     {
       return stack.peek().operandIndex;
     }
+
+    public PlannerContext getPlannerContext()
+    {
+      return plannerContext;
+    }
   }
 
   public DruidQuery buildQuery()
   {
-    DruidNodeStack stack = new DruidNodeStack();
+    DruidNodeStack stack = new DruidNodeStack(vertexFactory.plannerContext);
     stack.push(relRoot);
     Vertex vertex = buildVertexFor(stack);
     return vertex.buildQuery(true);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index aa88409c0b8..809c945edd7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -631,7 +631,30 @@ public class DruidQuery
     final OffsetLimit offsetLimit = OffsetLimit.fromSort(sort);
 
     // Extract orderBy column specs.
-    final List<OrderByColumnSpec> orderBys = new 
ArrayList<>(sort.getSortExps().size());
+    final List<OrderByColumnSpec> orderBys = 
buildOrderByColumnSpecs(rowSignature, sort);
+
+    // Extract any post-sort Projection.
+    final Projection projection;
+
+    if (sortProject == null) {
+      projection = null;
+    } else if (partialQuery.getAggregate() == null) {
+      if (virtualColumnRegistry == null) {
+        throw new ISE("Must provide 'virtualColumnRegistry' for 
pre-aggregation Projection!");
+      }
+
+      projection = Projection.preAggregation(sortProject, plannerContext, 
rowSignature, virtualColumnRegistry);
+    } else {
+      projection = Projection.postAggregation(sortProject, plannerContext, 
rowSignature, "s");
+    }
+
+    return Sorting.create(orderBys, offsetLimit, projection);
+  }
+
+  public static List<OrderByColumnSpec> buildOrderByColumnSpecs(final 
RowSignature rowSignature, final Sort sort)
+  {
+    final List<OrderByColumnSpec> orderBys;
+    orderBys = new ArrayList<>(sort.getSortExps().size());
     for (int sortKey = 0; sortKey < sort.getSortExps().size(); sortKey++) {
       final RexNode sortExpression = sort.getSortExps().get(sortKey);
       final RelFieldCollation collation = 
sort.getCollation().getFieldCollations().get(sortKey);
@@ -657,23 +680,7 @@ public class DruidQuery
         throw new CannotBuildQueryException(sort, sortExpression);
       }
     }
-
-    // Extract any post-sort Projection.
-    final Projection projection;
-
-    if (sortProject == null) {
-      projection = null;
-    } else if (partialQuery.getAggregate() == null) {
-      if (virtualColumnRegistry == null) {
-        throw new ISE("Must provide 'virtualColumnRegistry' for 
pre-aggregation Projection!");
-      }
-
-      projection = Projection.preAggregation(sortProject, plannerContext, 
rowSignature, virtualColumnRegistry);
-    } else {
-      projection = Projection.postAggregation(sortProject, plannerContext, 
rowSignature, "s");
-    }
-
-    return Sorting.create(orderBys, offsetLimit, projection);
+    return orderBys;
   }
 
   /**
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidSort.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidSort.java
index 4ad6091ad12..e3d569a5166 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidSort.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidSort.java
@@ -94,4 +94,9 @@ public class DruidSort extends Sort implements 
DruidLogicalNode
   {
     return super.explainTerms(pw).item("druid", "logical");
   }
+
+  public boolean hasLimitOrOffset()
+  {
+    return fetch != null || offset != null;
+  }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
index 8592a933c1e..2ba0a74283f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
@@ -102,7 +102,7 @@ public @interface NotYetSupported
     SORT_REMOVE_TROUBLE(Scope.DECOUPLED, DruidException.class, "Calcite 
assertion violated.*Sort\\.<init>"),
     UNNEST_INLINED(Scope.DECOUPLED, Exception.class, "Missing conversion is 
Uncollect"),
     UNNEST_RESULT_MISMATCH(Scope.DECOUPLED, AssertionError.class, "(Result 
count mismatch|column content mismatch)"),
-    SUPPORT_SORT(Scope.DECOUPLED, DruidException.class, "Unable to process 
relNode.*DruidSort"),
+    SUPPORT_SORT(Scope.DECOUPLED, DruidException.class, "Sort with limit"),
     SUPPORT_AGGREGATE(Scope.DECOUPLED, DruidException.class, "Unable to 
process relNode.*DruidAggregate"),
     RESTRICTED_DATASOURCE_SUPPORT(Scope.DECOUPLED, DruidException.class, 
"ForbiddenException: Unauthorized");
     // @formatter:on


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to