This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 7ec8d527cec Support Sort in decoupled Dart plans (#18123)
7ec8d527cec is described below
commit 7ec8d527cecbb8b0ba753dd1c9173278c794c177
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Wed Jun 18 15:29:25 2025 +0200
Support Sort in decoupled Dart plans (#18123)
* 🔁 runs `Sort`-s with the use of a separate stage - which makes this plan
not entirely the same as the one executed in the end - but its easier to
comprehend and when I've started working on adding real grouping it seems like
a good fit
* 📄 the translation to `QueryDefinition` has its own place now ; it right
now adds surpulus stages which could be collapsed - but that could be fixed
later
* 📦 moves the `*Stage` classes outside of `LogicalStageBuilder` (which was
never really a builder)
---
.../controller/sql/PrePlannedDartQueryMaker.java | 11 +-
.../druid/msq/kernel/StageDefinitionBuilder.java | 2 +-
.../DruidLogicalToQueryDefinitionTranslator.java | 74 ++++--
.../apache/druid/msq/logical/LogicalInputSpec.java | 98 +++++++
.../druid/msq/logical/LogicalStageBuilder.java | 287 ---------------------
.../org/apache/druid/msq/logical/StageMaker.java | 155 +++++++++++
.../stages/AbstractFrameProcessorStage.java | 52 ++++
.../msq/logical/stages/AbstractLogicalStage.java | 64 +++++
.../msq/logical/stages/AbstractShuffleStage.java | 43 +++
.../druid/msq/logical/stages/FilterStage.java | 78 ++++++
.../msq/logical/{ => stages}/LogicalStage.java | 34 ++-
.../druid/msq/logical/stages/ProjectStage.java | 38 +++
.../apache/druid/msq/logical/stages/ReadStage.java | 98 +++++++
.../apache/druid/msq/logical/stages/SortStage.java | 65 +++++
.../scan/ScanQueryFrameProcessorFactory.java | 23 ++
.../org/apache/druid/msq/test/CalciteDartTest.java | 40 ++-
.../java/org/apache/druid/frame/key/KeyColumn.java | 9 +
.../java/org/apache/druid/frame/key/KeyOrder.java | 14 +
.../planner/querygen/DruidQueryGenerator.java | 22 +-
.../apache/druid/sql/calcite/rel/DruidQuery.java | 43 +--
.../druid/sql/calcite/rel/logical/DruidSort.java | 5 +
.../apache/druid/sql/calcite/NotYetSupported.java | 2 +-
22 files changed, 901 insertions(+), 356 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
index 68df90b6867..d22b7ebe028 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/sql/PrePlannedDartQueryMaker.java
@@ -25,6 +25,8 @@ import org.apache.druid.msq.indexing.LegacyMSQSpec;
import org.apache.druid.msq.indexing.QueryDefMSQSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.logical.DruidLogicalToQueryDefinitionTranslator;
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.logical.stages.LogicalStage;
import org.apache.druid.msq.sql.MSQTaskQueryMaker;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
@@ -65,9 +67,14 @@ class PrePlannedDartQueryMaker implements QueryMaker,
QueryMaker.FromDruidLogica
throw new
ForbiddenException(plannerContext.getAuthorizationResult().getErrorMessage());
}
DruidLogicalToQueryDefinitionTranslator qdt = new
DruidLogicalToQueryDefinitionTranslator(plannerContext);
- QueryDefinition queryDef = qdt.translate(rootRel);
+ LogicalStage logicalStage = qdt.translate(rootRel);
+
+ StageMaker maker = new StageMaker(plannerContext);
+ maker.buildStage(logicalStage);
+ QueryDefinition queryDef = maker.buildQueryDefinition();
+
QueryContext context = plannerContext.queryContext();
- ColumnMappings columnMappings =
QueryUtils.buildColumnMappings(dartQueryMaker.fieldMapping,
queryDef.getOutputRowSignature());
+ ColumnMappings columnMappings =
QueryUtils.buildColumnMappings(dartQueryMaker.fieldMapping,
logicalStage.getLogicalRowSignature());
QueryDefMSQSpec querySpec = MSQTaskQueryMaker.makeQueryDefMSQSpec(
null,
context,
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
index 6b2090056c9..91bedaada08 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
@@ -117,7 +117,7 @@ public class StageDefinitionBuilder
return this;
}
- int getStageNumber()
+ public int getStageNumber()
{
return stageNumber;
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
index d782e538afb..66282b95c24 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
@@ -19,20 +19,25 @@
package org.apache.druid.msq.logical;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import org.apache.calcite.rel.RelNode;
import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.input.inline.InlineInputSpec;
import org.apache.druid.msq.input.table.TableInputSpec;
-import org.apache.druid.msq.kernel.QueryDefinition;
-import org.apache.druid.msq.logical.LogicalStageBuilder.ReadStage;
+import org.apache.druid.msq.logical.stages.LogicalStage;
+import org.apache.druid.msq.logical.stages.ReadStage;
+import org.apache.druid.msq.logical.stages.SortStage;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
import
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;
+import org.apache.druid.sql.calcite.rel.logical.DruidSort;
import org.apache.druid.sql.calcite.rel.logical.DruidTableScan;
import org.apache.druid.sql.calcite.rel.logical.DruidValues;
@@ -43,7 +48,7 @@ import java.util.Optional;
/**
* Translates the logical plan defined by the {@link DruidLogicalNode} into a
- * {@link QueryDefinition}.
+ * {@link LogicalStage} nodes.
*
* The translation should be executed as a single pass over the logical plan.
*
@@ -53,23 +58,21 @@ import java.util.Optional;
public class DruidLogicalToQueryDefinitionTranslator
{
private PlannerContext plannerContext;
- private LogicalStageBuilder stageBuilder;
public DruidLogicalToQueryDefinitionTranslator(PlannerContext plannerContext)
{
this.plannerContext = plannerContext;
- this.stageBuilder = new LogicalStageBuilder(plannerContext);
}
/**
* Executes the translation of the logical plan into a query definition.
*/
- public QueryDefinition translate(DruidLogicalNode relRoot)
+ public LogicalStage translate(DruidLogicalNode relRoot)
{
- DruidNodeStack stack = new DruidNodeStack();
+ DruidNodeStack stack = new DruidNodeStack(plannerContext);
stack.push(relRoot);
LogicalStage logicalStage = buildStageFor(stack);
- return logicalStage.build();
+ return logicalStage;
}
/**
@@ -83,11 +86,12 @@ public class DruidLogicalToQueryDefinitionTranslator
private LogicalStage buildStageFor(DruidNodeStack stack)
{
List<LogicalStage> inputStages = buildInputStages(stack);
-
DruidLogicalNode node = stack.getNode();
- Optional<ReadStage> stage = buildReadStage(node);
- if (stage.isPresent()) {
- return stage.get();
+ if (inputStages.size() == 0) {
+ Optional<ReadStage> stage = buildReadStage(node);
+ if (stage.isPresent()) {
+ return stage.get();
+ }
}
if (inputStages.size() == 1) {
LogicalStage inputStage = inputStages.get(0);
@@ -95,10 +99,39 @@ public class DruidLogicalToQueryDefinitionTranslator
if (newStage != null) {
return newStage;
}
+ newStage = makeSequenceStage(inputStage, stack);
+ if (newStage != null) {
+ return newStage;
+ }
}
throw DruidException.defensive().build("Unable to process relNode[%s]",
node);
}
+ private Optional<ReadStage> buildReadStage(DruidLogicalNode node)
+ {
+ if (node instanceof DruidValues) {
+ return translateValues((DruidValues) node);
+ }
+ if (node instanceof DruidTableScan) {
+ return translateTableScan((DruidTableScan) node);
+ }
+ return Optional.empty();
+ }
+
+ private LogicalStage makeSequenceStage(LogicalStage inputStage,
DruidNodeStack stack)
+ {
+ if (stack.getNode() instanceof DruidSort) {
+ DruidSort sort = (DruidSort) stack.getNode();
+ if (sort.hasLimitOrOffset()) {
+ throw DruidException.defensive("Sort with limit or offset is not
supported in MSQ logical stage builder");
+ }
+ List<OrderByColumnSpec> orderBySpecs =
DruidQuery.buildOrderByColumnSpecs(inputStage.getLogicalRowSignature(), sort);
+ List<KeyColumn> keyColumns = Lists.transform(orderBySpecs,
KeyColumn::fromOrderByColumnSpec);
+ return new SortStage(inputStage, keyColumns);
+ }
+ return new ReadStage(inputStage.getLogicalRowSignature(),
LogicalInputSpec.of(inputStage)).extendWith(stack);
+ }
+
private List<LogicalStage> buildInputStages(DruidNodeStack stack)
{
List<LogicalStage> inputStages = new ArrayList<>();
@@ -111,23 +144,12 @@ public class DruidLogicalToQueryDefinitionTranslator
return inputStages;
}
- private Optional<ReadStage> buildReadStage(DruidLogicalNode node)
- {
- if (node instanceof DruidValues) {
- return translateValues((DruidValues) node);
- }
- if (node instanceof DruidTableScan) {
- return translateTableScan((DruidTableScan) node);
- }
- return Optional.empty();
- }
-
private Optional<ReadStage> translateTableScan(DruidTableScan node)
{
SourceDesc sd = node.getSourceDesc(plannerContext,
Collections.emptyList());
TableDataSource ids = (TableDataSource) sd.dataSource;
TableInputSpec inputSpec = new TableInputSpec(ids.getName(),
Intervals.ONLY_ETERNITY, null, null);
- ReadStage stage = stageBuilder.makeReadStage(sd.rowSignature,
ImmutableList.of(inputSpec));
+ ReadStage stage = new ReadStage(sd.rowSignature,
LogicalInputSpec.of(inputSpec));
return Optional.of(stage);
}
@@ -136,7 +158,7 @@ public class DruidLogicalToQueryDefinitionTranslator
SourceDesc sd = node.getSourceDesc(plannerContext,
Collections.emptyList());
InlineDataSource ids = (InlineDataSource) sd.dataSource;
InlineInputSpec inputSpec = new InlineInputSpec(ids);
- ReadStage stage = stageBuilder.makeReadStage(sd.rowSignature,
ImmutableList.of(inputSpec));
+ ReadStage stage = new ReadStage(sd.rowSignature,
LogicalInputSpec.of(inputSpec));
return Optional.of(stage);
}
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
new file mode 100644
index 00000000000..106537b98a7
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical;
+
+import org.apache.druid.error.NotYetImplemented;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.stage.StageInputSpec;
+import org.apache.druid.msq.kernel.StageDefinitionBuilder;
+import org.apache.druid.msq.logical.stages.LogicalStage;
+import org.apache.druid.segment.column.RowSignature;
+
+/**
+ * Represents an {@link InputSpec} for {@link LogicalStage}-s.
+ */
+public abstract class LogicalInputSpec
+{
+ public abstract InputSpec toInputSpec(StageMaker maker);
+
+ public abstract RowSignature getRowSignature();
+
+ public static LogicalInputSpec of(LogicalStage inputStage)
+ {
+ return new DagStageInputSpec(inputStage);
+ }
+
+ public static LogicalInputSpec of(InputSpec inputSpec)
+ {
+ return new PhysicalInputSpec(inputSpec);
+ }
+
+ static class PhysicalInputSpec extends LogicalInputSpec
+ {
+ private InputSpec inputSpec;
+
+ public PhysicalInputSpec(InputSpec inputSpec)
+ {
+ this.inputSpec = inputSpec;
+ }
+
+ @Override
+ public InputSpec toInputSpec(StageMaker maker)
+ {
+ return inputSpec;
+ }
+
+ @Override
+ public RowSignature getRowSignature()
+ {
+ throw NotYetImplemented.ex(null, "Not supported for this type");
+ }
+ }
+
+ static class DagStageInputSpec extends LogicalInputSpec
+ {
+
+ private LogicalStage inputStage;
+
+ public DagStageInputSpec(LogicalStage inputStage)
+ {
+ this.inputStage = inputStage;
+ }
+
+ @Override
+ public InputSpec toInputSpec(StageMaker maker)
+ {
+ StageDefinitionBuilder stage = maker.buildStage(inputStage);
+ return new StageInputSpec(stage.getStageNumber());
+ }
+
+ public LogicalStage getStage()
+ {
+ return inputStage;
+ }
+
+ @Override
+ public RowSignature getRowSignature()
+ {
+ return inputStage.getRowSignature();
+ }
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStageBuilder.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStageBuilder.java
deleted file mode 100644
index 756b558f86e..00000000000
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStageBuilder.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.logical;
-
-import org.apache.druid.error.DruidException;
-import org.apache.druid.msq.input.InputSpec;
-import org.apache.druid.msq.kernel.MixShuffleSpec;
-import org.apache.druid.msq.kernel.QueryDefinition;
-import org.apache.druid.msq.kernel.StageDefinition;
-import org.apache.druid.msq.kernel.StageDefinitionBuilder;
-import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessorFactory;
-import org.apache.druid.query.Druids;
-import org.apache.druid.query.QueryContexts;
-import org.apache.druid.query.filter.DimFilter;
-import org.apache.druid.query.scan.ScanQuery;
-import org.apache.druid.query.spec.QuerySegmentSpec;
-import org.apache.druid.segment.VirtualColumns;
-import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
-import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
-import org.apache.druid.sql.calcite.rel.DruidQuery;
-import org.apache.druid.sql.calcite.rel.Projection;
-import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
-import org.apache.druid.sql.calcite.rel.logical.DruidFilter;
-import org.apache.druid.sql.calcite.rel.logical.DruidProject;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Helper class to build a {@link LogicalStage} tree.
- *
- * Tightly coupled to {@link DruidLogicalToQueryDefinitionTranslator}.
- * Currently its just a context to hold all the {@link LogicalStage} classes.
- */
-public class LogicalStageBuilder
-{
- private PlannerContext plannerContext;
-
- public LogicalStageBuilder(PlannerContext plannerContext)
- {
- this.plannerContext = plannerContext;
- }
-
- class StageMaker
- {
- /** Provides ids for the stages. */
- private int stageIdSeq = 0;
-
- StageDefinition makeScanStage(
- VirtualColumns virtualColumns,
- RowSignature signature,
- List<InputSpec> inputs,
- DimFilter dimFilter)
- {
- ScanQuery s = Druids.newScanQueryBuilder()
- .dataSource(IRRELEVANT)
- .intervals(QuerySegmentSpec.ETERNITY)
- .filters(dimFilter)
- .virtualColumns(virtualColumns)
- .columns(signature.getColumnNames())
- .columnTypes(signature.getColumnTypes())
- .build();
- ScanQueryFrameProcessorFactory scanProcessorFactory = new
ScanQueryFrameProcessorFactory(s);
- StageDefinitionBuilder sdb = StageDefinition.builder(getNextStageId())
- .inputs(inputs)
- .signature(signature)
- .shuffleSpec(MixShuffleSpec.instance())
- .processorFactory(scanProcessorFactory);
- return sdb.build(getIdForBuilder());
- }
-
- private int getNextStageId()
- {
- return stageIdSeq++;
- }
-
- private String getIdForBuilder()
- {
- String dartQueryId =
plannerContext.queryContext().getString(QueryContexts.CTX_DART_QUERY_ID);
- if (dartQueryId != null) {
- return dartQueryId;
- }
- return plannerContext.getSqlQueryId();
- }
- }
-
- public abstract class AbstractLogicalStage implements LogicalStage
- {
- protected final List<InputSpec> inputSpecs;
- protected final RowSignature signature;
- protected final List<LogicalStage> inputStages;
-
- public AbstractLogicalStage(RowSignature signature, List<InputSpec>
inputs, List<LogicalStage> inputVertices)
- {
- this.inputSpecs = inputs;
- this.signature = signature;
- this.inputStages = inputVertices;
- }
-
- @Override
- public StageDefinition buildCurrentStage(StageMaker stageMaker)
- {
- throw DruidException.defensive("This should have been implemented - or
not reach this point!");
- }
-
- @Override
- public final QueryDefinition build()
- {
- return QueryDefinition.create(buildStageDefinitions(new StageMaker()),
plannerContext.queryContext());
- }
-
- @Override
- public final List<StageDefinition> buildStageDefinitions(StageMaker
stageMaker)
- {
- List<StageDefinition> ret = new ArrayList<>();
- if (!inputStages.isEmpty()) {
- throw DruidException.defensive("Not yet supported");
- }
- ret.add(buildCurrentStage(stageMaker));
- return ret;
- }
- }
-
- /**
- * Represents a stage that reads data from input sources.
- */
- public class ReadStage extends AbstractLogicalStage
- {
- public ReadStage(RowSignature signature, List<InputSpec> inputSpecs)
- {
- super(signature, inputSpecs, Collections.emptyList());
- }
-
- /**
- * Copy constructor.
- */
- public ReadStage(ReadStage readStage, RowSignature newSignature)
- {
- super(newSignature, readStage.inputSpecs, readStage.inputStages);
- }
-
- @Override
- public StageDefinition buildCurrentStage(StageMaker stageMaker)
- {
- return stageMaker.makeScanStage(VirtualColumns.EMPTY, signature,
inputSpecs, null);
- }
-
- @Override
- public LogicalStage extendWith(DruidNodeStack stack)
- {
- if (stack.getNode() instanceof DruidFilter) {
- DruidFilter filter = (DruidFilter) stack.getNode();
- return makeFilterStage(filter);
- }
-
- if (stack.getNode() instanceof DruidProject) {
-
- DruidProject project = (DruidProject) stack.getNode();
- DruidFilter dummyFilter = new DruidFilter(
- project.getCluster(), project.getTraitSet(), project,
- project.getCluster().getRexBuilder().makeLiteral(true)
- );
- return makeFilterStage(dummyFilter).extendWith(stack);
- }
- return null;
- }
-
- private LogicalStage makeFilterStage(DruidFilter filter)
- {
- VirtualColumnRegistry virtualColumnRegistry =
VirtualColumnRegistry.create(
- signature,
- plannerContext.getExpressionParser(),
- plannerContext.getPlannerConfig().isForceExpressionVirtualColumns()
- );
-
- DimFilter dimFilter = DruidQuery.getDimFilter(
- plannerContext,
- signature, virtualColumnRegistry, filter
- );
-
- return new FilterStage(
- this,
- virtualColumnRegistry,
- dimFilter
- );
- }
- }
-
- public FilterStage create(ReadStage inputStage, DruidFilter filter)
- {
- VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(
- inputStage.signature,
- plannerContext.getExpressionParser(),
- plannerContext.getPlannerConfig().isForceExpressionVirtualColumns()
- );
- DimFilter dimFilter = DruidQuery.getDimFilter(plannerContext,
inputStage.signature, virtualColumnRegistry, filter);
- return new FilterStage(inputStage, virtualColumnRegistry, dimFilter);
- }
-
- class FilterStage extends ReadStage
- {
- protected final VirtualColumnRegistry virtualColumnRegistry;
- private DimFilter dimFilter;
-
- public FilterStage(ReadStage inputStage, VirtualColumnRegistry
virtualColumnRegistry, DimFilter dimFilter)
- {
- super(inputStage, inputStage.signature);
- this.virtualColumnRegistry = virtualColumnRegistry;
- this.dimFilter = dimFilter;
- }
-
- /**
- * Copy constructor.
- */
- public FilterStage(FilterStage stage, VirtualColumnRegistry
newVirtualColumnRegistry, RowSignature rowSignature)
- {
- super(stage, rowSignature);
- this.dimFilter = stage.dimFilter;
- this.virtualColumnRegistry = newVirtualColumnRegistry;
- }
-
- @Override
- public StageDefinition buildCurrentStage(StageMaker stageMaker)
- {
- VirtualColumns output =
virtualColumnRegistry.build(Collections.emptySet());
- return stageMaker.makeScanStage(output, signature, inputSpecs,
dimFilter);
- }
-
- @Override
- public LogicalStage extendWith(DruidNodeStack stack)
- {
- if (stack.getNode() instanceof DruidProject) {
- DruidProject project = (DruidProject) stack.getNode();
- Projection preAggregation = Projection
- .preAggregation(project, plannerContext, signature,
virtualColumnRegistry);
-
- return new ProjectStage(
- this,
- virtualColumnRegistry,
- preAggregation.getOutputRowSignature()
- );
- }
- return null;
- }
- }
-
- class ProjectStage extends FilterStage
- {
- public ProjectStage(FilterStage root, VirtualColumnRegistry
newVirtualColumnRegistry, RowSignature rowSignature)
- {
- super(root, newVirtualColumnRegistry, rowSignature);
- }
-
- @Override
- public LogicalStage extendWith(DruidNodeStack stack)
- {
- return null;
- }
- }
-
- private static final String IRRELEVANT = "irrelevant";
-
- public ReadStage makeReadStage(RowSignature rowSignature, List<InputSpec>
isp)
- {
- return new ReadStage(rowSignature, isp);
- }
-
-}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
new file mode 100644
index 00000000000..d7d99ad8441
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical;
+
+import org.apache.druid.error.DruidException;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.kernel.MixShuffleSpec;
+import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.StageDefinitionBuilder;
+import org.apache.druid.msq.logical.stages.AbstractFrameProcessorStage;
+import org.apache.druid.msq.logical.stages.AbstractShuffleStage;
+import org.apache.druid.msq.logical.stages.LogicalStage;
+import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessorFactory;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Builds {@link QueryDefinition} from {@link LogicalStage}-s.
+ */
+public class StageMaker
+{
+ /** Provides ids for the stages. */
+ private int stageIdSeq = 0;
+
+ private final PlannerContext plannerContext;
+
+ private Map<LogicalStage, StageDefinitionBuilder> builtStages = new
IdentityHashMap<>();
+
+ public StageMaker(PlannerContext plannerContext)
+ {
+ this.plannerContext = plannerContext;
+ }
+
+ public static ScanQueryFrameProcessorFactory makeScanFrameProcessor(
+ VirtualColumns virtualColumns,
+ RowSignature signature,
+ DimFilter dimFilter)
+ {
+ return
ScanQueryFrameProcessorFactory.makeScanFrameProcessorFactory(virtualColumns,
signature, dimFilter);
+ }
+
+ public StageDefinitionBuilder buildStage(LogicalStage stage)
+ {
+ if (builtStages.get(stage) != null) {
+ return builtStages.get(stage);
+ }
+ StageDefinitionBuilder stageDef = buildStageInternal(stage);
+ builtStages.put(stage, stageDef);
+ return stageDef;
+ }
+
+ private StageDefinitionBuilder buildStageInternal(LogicalStage stage)
+ {
+ if (stage instanceof AbstractFrameProcessorStage) {
+ return buildFrameProcessorStage((AbstractFrameProcessorStage) stage);
+ }
+ if (stage instanceof AbstractShuffleStage) {
+ return buildShuffleStage((AbstractShuffleStage) stage);
+ }
+ throw DruidException.defensive("Cannot build type [%s]",
stage.getClass().getSimpleName());
+ }
+
+ private StageDefinitionBuilder
buildFrameProcessorStage(AbstractFrameProcessorStage frameProcessorStage)
+ {
+ List<LogicalInputSpec> inputs = frameProcessorStage.getInputSpecs();
+ List<InputSpec> inputSpecs = new ArrayList<>();
+ for (LogicalInputSpec dagInputSpec : inputs) {
+ inputSpecs.add(dagInputSpec.toInputSpec(this));
+ }
+ BaseFrameProcessorFactory frameProcessor =
frameProcessorStage.buildFrameProcessor(this);
+ StageDefinitionBuilder sdb = newStageDefinitionBuilder();
+ sdb.inputs(inputSpecs);
+ sdb.signature(frameProcessorStage.getLogicalRowSignature());
+ sdb.processorFactory(frameProcessor);
+ sdb.shuffleSpec(MixShuffleSpec.instance());
+ return sdb;
+ }
+
+ private StageDefinitionBuilder buildShuffleStage(AbstractShuffleStage stage)
+ {
+ List<LogicalInputSpec> inputs = stage.getInputSpecs();
+ List<InputSpec> inputSpecs = new ArrayList<>();
+ for (LogicalInputSpec dagInputSpec : inputs) {
+ inputSpecs.add(dagInputSpec.toInputSpec(this));
+ }
+ StageDefinitionBuilder sdb = newStageDefinitionBuilder();
+ sdb.inputs(inputSpecs);
+ sdb.signature(stage.getRowSignature());
+ sdb.processorFactory(makeScanFrameProcessor(VirtualColumns.EMPTY,
stage.getRowSignature(), null));
+ sdb.shuffleSpec(stage.buildShuffleSpec());
+ return sdb;
+ }
+
+ private StageDefinitionBuilder newStageDefinitionBuilder()
+ {
+ return StageDefinition.builder(getNextStageId());
+ }
+
+ private int getNextStageId()
+ {
+ return stageIdSeq++;
+ }
+
+ public QueryDefinition buildQueryDefinition()
+ {
+ return QueryDefinition.create(makeStages(), plannerContext.queryContext());
+ }
+
+ private List<StageDefinition> makeStages()
+ {
+ List<StageDefinition> ret = new ArrayList<>();
+ for (StageDefinitionBuilder stageDefinitionBuilder : builtStages.values())
{
+ ret.add(stageDefinitionBuilder.build(getIdForBuilder()));
+ }
+ ret.sort(Comparator.comparing(StageDefinition::getStageNumber));
+ return ret;
+ }
+
+ private String getIdForBuilder()
+ {
+ String dartQueryId =
plannerContext.queryContext().getString(QueryContexts.CTX_DART_QUERY_ID);
+ if (dartQueryId != null) {
+ return dartQueryId;
+ }
+ return plannerContext.getSqlQueryId();
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractFrameProcessorStage.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractFrameProcessorStage.java
new file mode 100644
index 00000000000..3eadd43f871
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractFrameProcessorStage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.kernel.FrameProcessorFactory;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/**
+ * Represents a {@link LogicalStage} which could be processed as a {@link
FrameProcessorFactory}.
+ */
+public abstract class AbstractFrameProcessorStage extends AbstractLogicalStage
+{
+ public AbstractFrameProcessorStage(RowSignature signature, LogicalInputSpec
input)
+ {
+ super(signature, input);
+ }
+
+ public AbstractFrameProcessorStage(RowSignature signature,
List<LogicalInputSpec> inputs)
+ {
+ super(signature, inputs);
+ }
+
+ /**
+ * Builds the {@link FrameProcessorFactory} for this stage.
+ */
+ @Nonnull
+ public abstract BaseFrameProcessorFactory buildFrameProcessor(StageMaker
stageMaker);
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractLogicalStage.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractLogicalStage.java
new file mode 100644
index 00000000000..8cdcfbef2e4
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractLogicalStage.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.segment.column.RowSignature;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Common stage implementation.
+ */
+public abstract class AbstractLogicalStage implements LogicalStage
+{
+ protected final List<LogicalInputSpec> inputSpecs;
+ protected final RowSignature signature;
+
+ public AbstractLogicalStage(RowSignature signature, LogicalInputSpec input)
+ {
+ this(signature, Collections.singletonList(input));
+ }
+
+ public AbstractLogicalStage(RowSignature signature, List<LogicalInputSpec>
inputs)
+ {
+ this.inputSpecs = ImmutableList.copyOf(inputs);
+ this.signature = signature;
+ }
+
+ @Override
+ public RowSignature getLogicalRowSignature()
+ {
+ return signature;
+ }
+
+ @Override
+ public final RowSignature getRowSignature()
+ {
+ return signature;
+ }
+
+ @Override
+ public final List<LogicalInputSpec> getInputSpecs()
+ {
+ return inputSpecs;
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractShuffleStage.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractShuffleStage.java
new file mode 100644
index 00000000000..4e1a500171a
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/AbstractShuffleStage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Represents a Shuffle stage.
+ */
+public abstract class AbstractShuffleStage extends AbstractLogicalStage
+{
+ public AbstractShuffleStage(RowSignature signature, LogicalInputSpec input)
+ {
+ super(signature, input);
+ }
+
+ /**
+ * Builds the shuffle specification for this stage.
+ */
+ @Nonnull
+ public abstract ShuffleSpec buildShuffleSpec();
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/FilterStage.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/FilterStage.java
new file mode 100644
index 00000000000..b2327dc0006
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/FilterStage.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import org.apache.druid.sql.calcite.rel.Projection;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.rel.logical.DruidProject;
+
+import java.util.Collections;
+
+class FilterStage extends ReadStage
+{
+ protected final VirtualColumnRegistry virtualColumnRegistry;
+ protected final DimFilter dimFilter;
+
+ public FilterStage(ReadStage inputStage, VirtualColumnRegistry
virtualColumnRegistry, DimFilter dimFilter)
+ {
+ super(inputStage, inputStage.signature);
+ this.virtualColumnRegistry = virtualColumnRegistry;
+ this.dimFilter = dimFilter;
+ }
+
+ /**
+ * Copy constructor.
+ */
+ protected FilterStage(FilterStage stage, VirtualColumnRegistry
newVirtualColumnRegistry, RowSignature rowSignature)
+ {
+ super(stage, rowSignature);
+ this.dimFilter = stage.dimFilter;
+ this.virtualColumnRegistry = newVirtualColumnRegistry;
+ }
+
+ @Override
+ public LogicalStage extendWith(DruidNodeStack stack)
+ {
+ if (stack.getNode() instanceof DruidProject) {
+ DruidProject project = (DruidProject) stack.getNode();
+ Projection projection = Projection.preAggregation(project,
stack.getPlannerContext(), signature, virtualColumnRegistry);
+
+ return new ProjectStage(
+ this,
+ virtualColumnRegistry,
+ projection.getOutputRowSignature()
+ );
+ }
+ return null;
+ }
+
+ @Override
+ public BaseFrameProcessorFactory buildFrameProcessor(StageMaker stageMaker)
+ {
+ VirtualColumns virtualColumns =
virtualColumnRegistry.build(Collections.emptySet());
+ return StageMaker.makeScanFrameProcessor(virtualColumns, signature,
dimFilter);
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStage.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
similarity index 71%
rename from
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStage.java
rename to
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
index ce771d25c6c..bc681f5e066 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalStage.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/LogicalStage.java
@@ -17,11 +17,10 @@
* under the License.
*/
-package org.apache.druid.msq.logical;
+package org.apache.druid.msq.logical.stages;
-import org.apache.druid.msq.kernel.QueryDefinition;
-import org.apache.druid.msq.kernel.StageDefinition;
-import org.apache.druid.msq.logical.LogicalStageBuilder.StageMaker;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.segment.column.RowSignature;
import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
import javax.annotation.Nullable;
@@ -36,18 +35,6 @@ import java.util.List;
*/
public interface LogicalStage
{
- /**
- * Builds the full {@link QueryDefinition}.
- *
- * This supposed to be called on the top level stage.
- */
- QueryDefinition build();
-
- /**
- * Builds the current stage.
- */
- StageDefinition buildCurrentStage(StageMaker stageMaker);
-
/**
* Attempts to extend the current stage with an additional node.
*
@@ -57,9 +44,20 @@ public interface LogicalStage
LogicalStage extendWith(DruidNodeStack stack);
/**
- * Internal method to build the stage definitions.
+ * Real row signature this stage will return.
+ *
+ * This might have been reordered and may have additional technical columns.
+ */
+ RowSignature getRowSignature();
+
+ /**
+ * Logical row signature this node supposed to be producing.
*/
- List<StageDefinition> buildStageDefinitions(StageMaker stageMaker);
+ RowSignature getLogicalRowSignature();
+ /**
+ * Returns the inputs of this stage.
+ */
+ List<LogicalInputSpec> getInputSpecs();
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ProjectStage.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ProjectStage.java
new file mode 100644
index 00000000000..9c9face4750
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ProjectStage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.segment.column.RowSignature;
+import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+
+class ProjectStage extends FilterStage
+{
+ public ProjectStage(FilterStage stage, VirtualColumnRegistry
virtualColumnRegistry, RowSignature signature)
+ {
+ super(stage, virtualColumnRegistry, signature);
+ }
+
+ @Override
+ public LogicalStage extendWith(DruidNodeStack stack)
+ {
+ return null;
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
new file mode 100644
index 00000000000..c1ff5a3e277
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.query.filter.DimFilter;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import org.apache.druid.sql.calcite.rel.DruidQuery;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.rel.logical.DruidFilter;
+import org.apache.druid.sql.calcite.rel.logical.DruidProject;
+
+/**
+ * Represents a stage that reads data from input sources.
+ */
+public class ReadStage extends AbstractFrameProcessorStage
+{
+ public ReadStage(RowSignature signature, LogicalInputSpec inputSpec)
+ {
+ super(signature, inputSpec);
+ }
+
+ /**
+ * Copy constructor.
+ */
+ protected ReadStage(ReadStage readStage, RowSignature newSignature)
+ {
+ super(newSignature, readStage.inputSpecs);
+ }
+
+ @Override
+ public LogicalStage extendWith(DruidNodeStack stack)
+ {
+ if (stack.getNode() instanceof DruidFilter) {
+ DruidFilter filter = (DruidFilter) stack.getNode();
+ return makeFilterStage(stack.getPlannerContext(), filter);
+ }
+
+ if (stack.getNode() instanceof DruidProject) {
+
+ DruidProject project = (DruidProject) stack.getNode();
+ DruidFilter dummyFilter = new DruidFilter(
+ project.getCluster(), project.getTraitSet(), project,
+ project.getCluster().getRexBuilder().makeLiteral(true)
+ );
+ return makeFilterStage(stack.getPlannerContext(),
dummyFilter).extendWith(stack);
+ }
+ return null;
+ }
+
+ private LogicalStage makeFilterStage(PlannerContext plannerContext,
DruidFilter filter)
+ {
+ VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(
+ signature,
+ plannerContext.getExpressionParser(),
+ plannerContext.getPlannerConfig().isForceExpressionVirtualColumns()
+ );
+
+ DimFilter dimFilter = DruidQuery.getDimFilter(
+ plannerContext,
+ signature, virtualColumnRegistry, filter
+ );
+
+ return new FilterStage(
+ this,
+ virtualColumnRegistry,
+ dimFilter
+ );
+ }
+
+ @Override
+ public BaseFrameProcessorFactory buildFrameProcessor(StageMaker stageMaker)
+ {
+ return StageMaker.makeScanFrameProcessor(VirtualColumns.EMPTY, signature,
null);
+ }
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
new file mode 100644
index 00000000000..df8aad6c57b
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SortStage.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.msq.querykit.ShuffleSpecFactories;
+import org.apache.druid.segment.column.RowSignature;
+import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+
+import java.util.List;
+
+public class SortStage extends AbstractShuffleStage
+{
+ protected final List<KeyColumn> keyColumns;
+
+ public SortStage(LogicalStage inputStage, List<KeyColumn> keyColumns)
+ {
+ super(
+ QueryKitUtils.sortableSignature(inputStage.getLogicalRowSignature(),
keyColumns),
+ LogicalInputSpec.of(inputStage)
+ );
+ this.keyColumns = keyColumns;
+ }
+
+ @Override
+ public RowSignature getLogicalRowSignature()
+ {
+ return inputSpecs.get(0).getRowSignature();
+ }
+
+ @Override
+ public ShuffleSpec buildShuffleSpec()
+ {
+ final ClusterBy clusterBy = new ClusterBy(keyColumns, 0);
+ return
ShuffleSpecFactories.globalSortWithMaxPartitionCount(1).build(clusterBy, false);
+ }
+
+ @Override
+ public LogicalStage extendWith(DruidNodeStack stack)
+ {
+ return null;
+ }
+
+}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
index 39c311226c6..4deff368876 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorFactory.java
@@ -30,8 +30,13 @@ import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.querykit.BaseLeafFrameProcessorFactory;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.SegmentMapFunction;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,6 +44,8 @@ import java.util.concurrent.atomic.AtomicLong;
@JsonTypeName("scan")
public class ScanQueryFrameProcessorFactory extends
BaseLeafFrameProcessorFactory
{
+ private static final String IRRELEVANT = "irrelevant";
+
private final ScanQuery query;
/**
@@ -58,6 +65,22 @@ public class ScanQueryFrameProcessorFactory extends
BaseLeafFrameProcessorFactor
this.runningCountForLimit = query.isLimited() &&
query.getOrderBys().isEmpty() ? new AtomicLong() : null;
}
+ public static ScanQueryFrameProcessorFactory makeScanFrameProcessorFactory(
+ VirtualColumns virtualColumns,
+ RowSignature signature,
+ DimFilter dimFilter)
+ {
+ ScanQuery scanQuery = Druids.newScanQueryBuilder()
+ .dataSource(IRRELEVANT)
+ .intervals(QuerySegmentSpec.ETERNITY)
+ .filters(dimFilter)
+ .virtualColumns(virtualColumns)
+ .columns(signature.getColumnNames())
+ .columnTypes(signature.getColumnTypes())
+ .build();
+ return new ScanQueryFrameProcessorFactory(scanQuery);
+ }
+
@JsonProperty
public ScanQuery getQuery()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
index afda9e44628..9e71767e6d6 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
@@ -28,9 +28,12 @@ import org.apache.druid.sql.calcite.NotYetSupported.Modes;
import org.apache.druid.sql.calcite.QueryTestBuilder;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
@SqlTestFrameworkConfig.ComponentSupplier(DartComponentSupplier.class)
+@TestMethodOrder(MethodOrderer.MethodName.class)
public class CalciteDartTest extends BaseCalciteQueryTest
{
@Override
@@ -55,7 +58,6 @@ public class CalciteDartTest extends BaseCalciteQueryTest
.run();
}
- @NotYetSupported(Modes.SUPPORT_SORT)
@Test
public void testOrderBy()
{
@@ -74,6 +76,42 @@ public class CalciteDartTest extends BaseCalciteQueryTest
.run();
}
+ @Test
+ public void testOrderByVirtual()
+ {
+ testBuilder()
+ .sql("SELECT l1 from numfoo order by -l1")
+ .expectedResults(
+ ImmutableList.of(
+ new Object[]{null},
+ new Object[]{null},
+ new Object[]{null},
+ new Object[]{325323L},
+ new Object[]{7L},
+ new Object[]{0L}
+ )
+ )
+ .run();
+ }
+
+ @Test
+ public void testOrderByVirtualDesc()
+ {
+ testBuilder()
+ .sql("SELECT l1 from numfoo order by -l1 desc")
+ .expectedResults(
+ ImmutableList.of(
+ new Object[]{0L},
+ new Object[]{7L},
+ new Object[]{325323L},
+ new Object[]{null},
+ new Object[]{null},
+ new Object[]{null}
+ )
+ )
+ .run();
+ }
+
@NotYetSupported(Modes.RESTRICTED_DATASOURCE_SUPPORT)
@Test
public void testSelectFromRestricted()
diff --git a/processing/src/main/java/org/apache/druid/frame/key/KeyColumn.java
b/processing/src/main/java/org/apache/druid/frame/key/KeyColumn.java
index cf73c0ecddb..c84b8f85f64 100644
--- a/processing/src/main/java/org/apache/druid/frame/key/KeyColumn.java
+++ b/processing/src/main/java/org/apache/druid/frame/key/KeyColumn.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import java.util.Objects;
@@ -84,4 +85,12 @@ public class KeyColumn
{
return StringUtils.format("%s%s", columnName, order == KeyOrder.NONE ? ""
: " " + order);
}
+
+ public static KeyColumn fromOrderByColumnSpec(OrderByColumnSpec
orderByColumnSpec)
+ {
+ return new KeyColumn(
+ orderByColumnSpec.getDimension(),
+ KeyOrder.fromDirection(orderByColumnSpec.getDirection())
+ );
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java
b/processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java
index fe568483bf4..25f86bd7c32 100644
--- a/processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java
+++ b/processing/src/main/java/org/apache/druid/frame/key/KeyOrder.java
@@ -19,6 +19,8 @@
package org.apache.druid.frame.key;
+import org.apache.druid.query.groupby.orderby.OrderByColumnSpec.Direction;
+
/**
* Ordering associated with a {@link KeyColumn}.
*/
@@ -58,4 +60,16 @@ public enum KeyOrder
{
return sortable;
}
+
+ static KeyOrder fromDirection(Direction direction)
+ {
+ switch (direction) {
+ case ASCENDING:
+ return ASCENDING;
+ case DESCENDING:
+ return DESCENDING;
+ default:
+ return NONE;
+ }
+ }
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
index 28ac96046e8..f59f1e3c6ff 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/querygen/DruidQueryGenerator.java
@@ -29,6 +29,8 @@ import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.druid.error.DruidException;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
@@ -85,9 +87,20 @@ public class DruidQueryGenerator
this.node = node;
this.operandIndex = operandIndex;
}
- }
+ @Override
+ public String toString()
+ {
+ return ToStringBuilder.reflectionToString(this,
ToStringStyle.NO_CLASS_NAME_STYLE);
+ }
+ }
Stack<Entry> stack = new Stack<>();
+ PlannerContext plannerContext;
+
+ public DruidNodeStack(PlannerContext plannerContext)
+ {
+ this.plannerContext = plannerContext;
+ }
public void push(DruidLogicalNode item)
{
@@ -128,11 +141,16 @@ public class DruidQueryGenerator
{
return stack.peek().operandIndex;
}
+
+ public PlannerContext getPlannerContext()
+ {
+ return plannerContext;
+ }
}
public DruidQuery buildQuery()
{
- DruidNodeStack stack = new DruidNodeStack();
+ DruidNodeStack stack = new DruidNodeStack(vertexFactory.plannerContext);
stack.push(relRoot);
Vertex vertex = buildVertexFor(stack);
return vertex.buildQuery(true);
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
index aa88409c0b8..809c945edd7 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java
@@ -631,7 +631,30 @@ public class DruidQuery
final OffsetLimit offsetLimit = OffsetLimit.fromSort(sort);
// Extract orderBy column specs.
- final List<OrderByColumnSpec> orderBys = new
ArrayList<>(sort.getSortExps().size());
+ final List<OrderByColumnSpec> orderBys =
buildOrderByColumnSpecs(rowSignature, sort);
+
+ // Extract any post-sort Projection.
+ final Projection projection;
+
+ if (sortProject == null) {
+ projection = null;
+ } else if (partialQuery.getAggregate() == null) {
+ if (virtualColumnRegistry == null) {
+ throw new ISE("Must provide 'virtualColumnRegistry' for
pre-aggregation Projection!");
+ }
+
+ projection = Projection.preAggregation(sortProject, plannerContext,
rowSignature, virtualColumnRegistry);
+ } else {
+ projection = Projection.postAggregation(sortProject, plannerContext,
rowSignature, "s");
+ }
+
+ return Sorting.create(orderBys, offsetLimit, projection);
+ }
+
+ public static List<OrderByColumnSpec> buildOrderByColumnSpecs(final
RowSignature rowSignature, final Sort sort)
+ {
+ final List<OrderByColumnSpec> orderBys;
+ orderBys = new ArrayList<>(sort.getSortExps().size());
for (int sortKey = 0; sortKey < sort.getSortExps().size(); sortKey++) {
final RexNode sortExpression = sort.getSortExps().get(sortKey);
final RelFieldCollation collation =
sort.getCollation().getFieldCollations().get(sortKey);
@@ -657,23 +680,7 @@ public class DruidQuery
throw new CannotBuildQueryException(sort, sortExpression);
}
}
-
- // Extract any post-sort Projection.
- final Projection projection;
-
- if (sortProject == null) {
- projection = null;
- } else if (partialQuery.getAggregate() == null) {
- if (virtualColumnRegistry == null) {
- throw new ISE("Must provide 'virtualColumnRegistry' for
pre-aggregation Projection!");
- }
-
- projection = Projection.preAggregation(sortProject, plannerContext,
rowSignature, virtualColumnRegistry);
- } else {
- projection = Projection.postAggregation(sortProject, plannerContext,
rowSignature, "s");
- }
-
- return Sorting.create(orderBys, offsetLimit, projection);
+ return orderBys;
}
/**
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidSort.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidSort.java
index 4ad6091ad12..e3d569a5166 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidSort.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidSort.java
@@ -94,4 +94,9 @@ public class DruidSort extends Sort implements
DruidLogicalNode
{
return super.explainTerms(pw).item("druid", "logical");
}
+
+ public boolean hasLimitOrOffset()
+ {
+ return fetch != null || offset != null;
+ }
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
index 8592a933c1e..2ba0a74283f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
@@ -102,7 +102,7 @@ public @interface NotYetSupported
SORT_REMOVE_TROUBLE(Scope.DECOUPLED, DruidException.class, "Calcite
assertion violated.*Sort\\.<init>"),
UNNEST_INLINED(Scope.DECOUPLED, Exception.class, "Missing conversion is
Uncollect"),
UNNEST_RESULT_MISMATCH(Scope.DECOUPLED, AssertionError.class, "(Result
count mismatch|column content mismatch)"),
- SUPPORT_SORT(Scope.DECOUPLED, DruidException.class, "Unable to process
relNode.*DruidSort"),
+ SUPPORT_SORT(Scope.DECOUPLED, DruidException.class, "Sort with limit"),
SUPPORT_AGGREGATE(Scope.DECOUPLED, DruidException.class, "Unable to
process relNode.*DruidAggregate"),
RESTRICTED_DATASOURCE_SUPPORT(Scope.DECOUPLED, DruidException.class,
"ForbiddenException: Unauthorized");
// @formatter:on
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]