This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9d323afa47e Decoupled dart: support unnest/join (#18232)
9d323afa47e is described below
commit 9d323afa47e43b4772ad1cadf0a6e8841da9d0cf
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Wed Sep 17 09:43:19 2025 +0200
Decoupled dart: support unnest/join (#18232)
---
.../druid/msq/kernel/StageDefinitionBuilder.java | 3 +-
.../DruidLogicalToQueryDefinitionTranslator.java | 52 ++---
.../apache/druid/msq/logical/LogicalInputSpec.java | 70 ++++++-
.../org/apache/druid/msq/logical/StageMaker.java | 39 ++--
.../druid/msq/logical/stages/GroupByStages.java | 12 +-
.../apache/druid/msq/logical/stages/JoinStage.java | 211 +++++++++++++++++++++
.../apache/druid/msq/logical/stages/ReadStage.java | 56 ++++++
.../druid/msq/logical/stages/SegmentMapStage.java | 51 +++++
.../druid/msq/logical/stages/UnnestStage.java | 41 ++++
.../msq/querykit/scan/ScanQueryStageProcessor.java | 12 ++
multi-stage-query/src/main/resources/log4j2.xml | 28 ++-
...ier.java => AbstractDartComponentSupplier.java} | 10 +-
.../org/apache/druid/msq/test/CalciteDartTest.java | 18 ++
.../druid/msq/test/DartComponentSupplier.java | 135 +------------
.../test/DecoupledDartCalciteArraysQueryTest.java | 69 +++++++
.../test/DecoupledDartCalciteJoinQueryTest.java | 124 ++++++++++++
.../DecoupledDartCalciteNestedDataQueryTest.java | 60 ++++++
.../druid/msq/test/DecoupledDartExtension.java | 2 +-
.../sql/calcite/NotYetSupportedUsageTest.java | 36 ++--
.../apache/druid/segment/column/RowSignature.java | 10 +
.../druid/sql/calcite/util/datasets/Larry.java | 104 ++++++++++
.../sql/calcite/util/datasets/TestDataSet.java | 1 +
.../sql/calcite/planner/CalciteRulesManager.java | 15 +-
.../druid/sql/calcite/rel/logical/DruidJoin.java | 7 +
.../druid/sql/avatica/DruidAvaticaHandlerTest.java | 13 ++
.../druid/sql/calcite/BaseCalciteQueryTest.java | 20 +-
.../druid/sql/calcite/CalciteArraysQueryTest.java | 86 +++++++--
.../druid/sql/calcite/CalciteJoinQueryTest.java | 71 +++++--
.../sql/calcite/CalciteNestedDataQueryTest.java | 16 +-
.../apache/druid/sql/calcite/CalciteQueryTest.java | 10 +-
.../druid/sql/calcite/DrillWindowQueryTest.java | 2 +-
.../apache/druid/sql/calcite/NotYetSupported.java | 37 +++-
.../apache/druid/sql/calcite/QueryTestBuilder.java | 2 +-
.../calcite/planner/CalcitePlannerModuleTest.java | 2 +-
.../druid/sql/calcite/util/TestDataBuilder.java | 4 +
.../sql/calcite/util/TestTimelineServerView.java | 18 +-
...estThriceWithFiltersOnDimAndAllUnnestColumns.iq | 4 +-
37 files changed, 1145 insertions(+), 306 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
index a0edc224f8c..8b2815939f1 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
@@ -28,6 +28,7 @@ import org.apache.druid.segment.column.RowSignature;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Set;
/**
* Builder for {@link StageDefinition}. See class-level javadoc for that class
for a description of the parameters.
@@ -76,7 +77,7 @@ public class StageDefinitionBuilder
return inputs(Arrays.asList(inputSpecs));
}
- public StageDefinitionBuilder broadcastInputs(final IntSet
broadcastInputNumbers)
+ public StageDefinitionBuilder broadcastInputs(final Set<Integer>
broadcastInputNumbers)
{
this.broadcastInputNumbers.clear();
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
index 5df11c6f4f8..138bc1f2995 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
@@ -23,27 +23,22 @@ import com.google.common.collect.Lists;
import org.apache.calcite.rel.RelNode;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.key.KeyColumn;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.msq.input.inline.InlineInputSpec;
-import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.logical.stages.JoinStage;
import org.apache.druid.msq.logical.stages.LogicalStage;
import org.apache.druid.msq.logical.stages.OffsetLimitStage;
import org.apache.druid.msq.logical.stages.ReadStage;
import org.apache.druid.msq.logical.stages.SortStage;
-import org.apache.druid.query.InlineDataSource;
-import org.apache.druid.query.TableDataSource;
+import org.apache.druid.msq.logical.stages.UnnestStage;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
-import
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
import org.apache.druid.sql.calcite.rel.DruidQuery;
+import org.apache.druid.sql.calcite.rel.logical.DruidJoin;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;
import org.apache.druid.sql.calcite.rel.logical.DruidSort;
-import org.apache.druid.sql.calcite.rel.logical.DruidTableScan;
-import org.apache.druid.sql.calcite.rel.logical.DruidValues;
+import org.apache.druid.sql.calcite.rule.logical.DruidUnnest;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -89,7 +84,7 @@ public class DruidLogicalToQueryDefinitionTranslator
List<LogicalStage> inputStages = buildInputStages(stack);
DruidLogicalNode node = stack.getNode();
if (inputStages.size() == 0) {
- Optional<ReadStage> stage = buildReadStage(node);
+ Optional<ReadStage> stage = ReadStage.buildReadStage(stack);
if (stage.isPresent()) {
return stage.get();
}
@@ -104,19 +99,21 @@ public class DruidLogicalToQueryDefinitionTranslator
if (newStage != null) {
return newStage;
}
+ } else {
+ LogicalStage newStage = buildMultiInputStage(inputStages, stack);
+ if (newStage != null) {
+ return newStage;
+ }
}
throw DruidException.defensive().build("Unable to process relNode[%s]",
node);
}
- private Optional<ReadStage> buildReadStage(DruidLogicalNode node)
+ private LogicalStage buildMultiInputStage(List<LogicalStage> inputStages,
DruidNodeStack stack)
{
- if (node instanceof DruidValues) {
- return translateValues((DruidValues) node);
+ if (stack.getNode() instanceof DruidJoin) {
+ return JoinStage.buildJoinStage(inputStages, stack);
}
- if (node instanceof DruidTableScan) {
- return translateTableScan((DruidTableScan) node);
- }
- return Optional.empty();
+ return null;
}
private LogicalStage makeSequenceStage(LogicalStage inputStage,
DruidNodeStack stack)
@@ -133,6 +130,9 @@ public class DruidLogicalToQueryDefinitionTranslator
return sortStage;
}
}
+ if (stack.getNode() instanceof DruidUnnest) {
+ return UnnestStage.buildUnnestStage(inputStage, stack);
+ }
return new ReadStage(inputStage.getLogicalRowSignature(),
LogicalInputSpec.of(inputStage)).extendWith(stack);
}
@@ -148,22 +148,4 @@ public class DruidLogicalToQueryDefinitionTranslator
}
return inputStages;
}
-
- private Optional<ReadStage> translateTableScan(DruidTableScan node)
- {
- SourceDesc sd = node.getSourceDesc(plannerContext,
Collections.emptyList());
- TableDataSource ids = (TableDataSource) sd.dataSource;
- TableInputSpec inputSpec = new TableInputSpec(ids.getName(),
Intervals.ONLY_ETERNITY, null, null);
- ReadStage stage = new ReadStage(sd.rowSignature,
LogicalInputSpec.of(inputSpec));
- return Optional.of(stage);
- }
-
- private Optional<ReadStage> translateValues(DruidValues node)
- {
- SourceDesc sd = node.getSourceDesc(plannerContext,
Collections.emptyList());
- InlineDataSource ids = (InlineDataSource) sd.dataSource;
- InlineInputSpec inputSpec = new InlineInputSpec(ids);
- ReadStage stage = new ReadStage(sd.rowSignature,
LogicalInputSpec.of(inputSpec));
- return Optional.of(stage);
- }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
index f77e41d0aa6..2a3243c43e9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
@@ -19,39 +19,91 @@
package org.apache.druid.msq.logical;
-import org.apache.druid.error.NotYetImplemented;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.stage.StageInputSpec;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.logical.stages.LogicalStage;
+import org.apache.druid.msq.querykit.InputNumberDataSource;
+import org.apache.druid.query.DataSource;
import org.apache.druid.segment.column.RowSignature;
+import
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+
+import java.util.Collections;
+import java.util.Set;
/**
* Represents an {@link InputSpec} for {@link LogicalStage}-s.
*/
public abstract class LogicalInputSpec
{
+ public enum InputProperty
+ {
+ BROADCAST
+ }
+
+ final int inputIndex;
+ final Set<InputProperty> props;
+
+ public LogicalInputSpec(int inputIndex, Set<InputProperty> props)
+ {
+ this.inputIndex = inputIndex;
+ this.props = props;
+ }
+
public abstract InputSpec toInputSpec(StageMaker maker);
public abstract RowSignature getRowSignature();
+ /**
+ * Provides the {@link SourceDesc} for this input spec.
+ *
+ * Supplied to make it more easily interoperable with {@link DataSource}
+ * backed features like {@link DataSource#createSegmentMapFunction}.
+ */
+ public final SourceDesc getSourceDesc()
+ {
+ InputNumberDataSource ds = new InputNumberDataSource(inputIndex);
+ return new SourceDesc(ds, getRowSignature());
+ }
+
+ public final boolean hasProperty(InputProperty prop)
+ {
+ return props.contains(prop);
+ }
+
public static LogicalInputSpec of(LogicalStage inputStage)
{
- return new DagStageInputSpec(inputStage);
+ return of(inputStage, 0, Collections.emptySet());
+ }
+
+ public static LogicalInputSpec of(InputSpec inputSpec, RowSignature
rowSignature)
+ {
+ return new PhysicalInputSpec(inputSpec, 0, rowSignature,
Collections.emptySet());
}
- public static LogicalInputSpec of(InputSpec inputSpec)
+ public static LogicalInputSpec of(LogicalStage logicalStage, int inputIndex,
InputProperty prop)
{
- return new PhysicalInputSpec(inputSpec);
+ return of(logicalStage, inputIndex, Collections.singleton(prop));
+ }
+
+ public static LogicalInputSpec of(LogicalStage logicalStage, int inputIndex,
Set<InputProperty> props)
+ {
+ // could potentially unwrap LogicalStage if some conditions are met
+ // logicalStage.unwrap(InputSpec.class);
+ // partial:
https://github.com/kgyrtkirk/druid/commit/9a541f69361f341c537ee196514d2d6a00ae3feb
+ return new DagStageInputSpec(logicalStage, inputIndex, props);
}
static class PhysicalInputSpec extends LogicalInputSpec
{
private InputSpec inputSpec;
+ private RowSignature rowSignature;
- public PhysicalInputSpec(InputSpec inputSpec)
+ public PhysicalInputSpec(InputSpec inputSpec, int inputIndex, RowSignature
rowSignature, Set<InputProperty> props)
{
+ super(inputIndex, props);
this.inputSpec = inputSpec;
+ this.rowSignature = rowSignature;
}
@Override
@@ -63,17 +115,17 @@ public abstract class LogicalInputSpec
@Override
public RowSignature getRowSignature()
{
- throw NotYetImplemented.ex(null, "Not supported for this type");
+ return rowSignature;
}
}
static class DagStageInputSpec extends LogicalInputSpec
{
+ protected LogicalStage inputStage;
- private LogicalStage inputStage;
-
- public DagStageInputSpec(LogicalStage inputStage)
+ public DagStageInputSpec(LogicalStage inputStage, int inputIndex,
Set<InputProperty> props)
{
+ super(inputIndex, props);
this.inputStage = inputStage;
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
index c957ca21773..4885a7d3e7d 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
@@ -26,10 +26,12 @@ import org.apache.druid.msq.kernel.MixShuffleSpec;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
+import org.apache.druid.msq.logical.LogicalInputSpec.InputProperty;
import org.apache.druid.msq.logical.stages.AbstractFrameProcessorStage;
import org.apache.druid.msq.logical.stages.AbstractShuffleStage;
import org.apache.druid.msq.logical.stages.LogicalStage;
import org.apache.druid.msq.querykit.scan.ScanQueryStageProcessor;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.VirtualColumns;
@@ -38,9 +40,11 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Builds {@link QueryDefinition} from {@link LogicalStage}-s.
@@ -91,13 +95,8 @@ public class StageMaker
private StageDefinitionBuilder
buildFrameProcessorStage(AbstractFrameProcessorStage frameProcessorStage)
{
List<LogicalInputSpec> inputs = frameProcessorStage.getInputSpecs();
- List<InputSpec> inputSpecs = new ArrayList<>();
- for (LogicalInputSpec dagInputSpec : inputs) {
- inputSpecs.add(dagInputSpec.toInputSpec(this));
- }
+ StageDefinitionBuilder sdb = newStageDefinitionBuilder(inputs);
StageProcessor<?, ?> stageProcessor =
frameProcessorStage.buildStageProcessor(this);
- StageDefinitionBuilder sdb = newStageDefinitionBuilder();
- sdb.inputs(inputSpecs);
sdb.signature(frameProcessorStage.getLogicalRowSignature());
sdb.processor(stageProcessor);
sdb.shuffleSpec(MixShuffleSpec.instance());
@@ -107,21 +106,28 @@ public class StageMaker
private StageDefinitionBuilder buildShuffleStage(AbstractShuffleStage stage)
{
List<LogicalInputSpec> inputs = stage.getInputSpecs();
- List<InputSpec> inputSpecs = new ArrayList<>();
- for (LogicalInputSpec dagInputSpec : inputs) {
- inputSpecs.add(dagInputSpec.toInputSpec(this));
- }
- StageDefinitionBuilder sdb = newStageDefinitionBuilder();
- sdb.inputs(inputSpecs);
+ StageDefinitionBuilder sdb = newStageDefinitionBuilder(inputs);
sdb.signature(stage.getRowSignature());
sdb.processor(makeScanStageProcessor(VirtualColumns.EMPTY,
stage.getRowSignature(), null));
sdb.shuffleSpec(stage.buildShuffleSpec());
return sdb;
}
- private StageDefinitionBuilder newStageDefinitionBuilder()
+ private StageDefinitionBuilder
newStageDefinitionBuilder(List<LogicalInputSpec> inputs)
{
- return StageDefinition.builder(getNextStageId());
+ List<InputSpec> inputSpecs = new ArrayList<>();
+ Set<Integer> broadcastInputs = new HashSet<>();
+ for (int i = 0; i < inputs.size(); i++) {
+ LogicalInputSpec dagInputSpec = inputs.get(i);
+ inputSpecs.add(dagInputSpec.toInputSpec(this));
+ if (dagInputSpec.hasProperty(InputProperty.BROADCAST)) {
+ broadcastInputs.add(i);
+ }
+ }
+ StageDefinitionBuilder sdb = StageDefinition.builder(getNextStageId());
+ sdb.broadcastInputs(broadcastInputs);
+ sdb.inputs(inputSpecs);
+ return sdb;
}
private int getNextStageId()
@@ -152,4 +158,9 @@ public class StageMaker
}
return plannerContext.getSqlQueryId();
}
+
+ public StageProcessor<?, ?> makeSegmentMapProcessor(RowSignature signature,
DataSource dataSource)
+ {
+ return ScanQueryStageProcessor.makeSegmentMapFnProcessor(signature,
dataSource);
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
index da992c3e1fa..e955c6a4052 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
@@ -27,8 +27,6 @@ import org.apache.druid.msq.logical.LogicalInputSpec;
import org.apache.druid.msq.logical.StageMaker;
import org.apache.druid.msq.querykit.groupby.GroupByPostShuffleStageProcessor;
import org.apache.druid.msq.querykit.groupby.GroupByPreShuffleStageProcessor;
-import org.apache.druid.query.DataSource;
-import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.column.RowSignature;
@@ -43,14 +41,6 @@ import java.util.List;
public class GroupByStages
{
- /**
- * An input datasource is needed to construct a valid {@link GroupByQuery}.
- *
- * During staged execution the data is coming in thru channels - so this is
- * just a placeholder.
- */
- private static final DataSource DUMMY_INPUT_DATASOURCE = new
TableDataSource("__input__");
-
public static class PreShuffleStage extends ProjectStage
{
private GroupByQuery groupByQuery;
@@ -116,7 +106,7 @@ public class GroupByStages
builder.setDimFilter(projectStage.getDimFilter());
builder.setVirtualColumns(projectStage.getVirtualColumns());
builder.setPostAggregatorSpecs(grouping.getPostAggregators());
- builder.setDataSource(DUMMY_INPUT_DATASOURCE);
+
builder.setDataSource(LogicalInputSpec.of(projectStage).getSourceDesc().dataSource);
return builder.build();
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
new file mode 100644
index 00000000000..96cf3d3510b
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.msq.exec.StageProcessor;
+import org.apache.druid.msq.kernel.HashShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.msq.querykit.common.SortMergeJoinStageProcessor;
+import org.apache.druid.query.JoinAlgorithm;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.join.JoinConditionAnalysis;
+import org.apache.druid.segment.join.JoinType;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.Expressions;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.rel.logical.DruidJoin;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a stage that reads data from input sources.
+ */
+public class JoinStage
+{
+
+ static class ShuffleStage extends AbstractShuffleStage
+ {
+ protected final List<KeyColumn> keyColumns;
+
+ public ShuffleStage(LogicalStage inputStage, List<KeyColumn> keyColumns)
+ {
+ super(
+ QueryKitUtils.sortableSignature(inputStage.getLogicalRowSignature(),
keyColumns),
+ LogicalInputSpec.of(inputStage)
+ );
+ this.keyColumns = keyColumns;
+ }
+
+ @Override
+ public RowSignature getLogicalRowSignature()
+ {
+ return inputSpecs.get(0).getRowSignature();
+ }
+
+ @Override
+ public ShuffleSpec buildShuffleSpec()
+ {
+ final ClusterBy clusterBy = new ClusterBy(keyColumns, 0);
+ return new HashShuffleSpec(clusterBy, 1);
+ }
+
+ @Override
+ public LogicalStage extendWith(DruidNodeStack stack)
+ {
+ return null;
+ }
+ }
+
+ public static class SortMergeStage extends AbstractFrameProcessorStage
+ {
+
+ private String rightPrefix;
+ private JoinConditionAnalysis conditionAnalysis;
+ private JoinType joinType;
+
+ public SortMergeStage(RowSignature signature, List<LogicalInputSpec>
inputs, String rightPrefix,
+ JoinConditionAnalysis conditionAnalysis, JoinType joinType)
+ {
+ super(signature, inputs);
+ this.rightPrefix = rightPrefix;
+ this.conditionAnalysis = conditionAnalysis;
+ this.joinType = joinType;
+ }
+
+ @Override
+ public LogicalStage extendWith(DruidNodeStack stack)
+ {
+ return null;
+ }
+
+ @Override
+ public StageProcessor<?, ?> buildStageProcessor(StageMaker stageMaker)
+ {
+ return new SortMergeJoinStageProcessor(
+ rightPrefix,
+ conditionAnalysis,
+ joinType
+ );
+ }
+ }
+
+ /** similar to {@link DruidJoinQueryRel#buildJoinSourceDesc} */
+ public static LogicalStage buildJoinStage(List<LogicalStage> inputStages,
DruidNodeStack stack)
+ {
+ DruidJoin join = (DruidJoin) stack.getNode();
+ if (join.getJoinAlgorithm(stack.getPlannerContext()) ==
JoinAlgorithm.SORT_MERGE) {
+ return buildMergeJoin(inputStages, stack, join);
+ } else {
+ return buildBroadcastJoin(inputStages, stack, join);
+ }
+ }
+
+ private static LogicalStage buildBroadcastJoin(List<LogicalStage>
inputStages, DruidNodeStack stack, DruidJoin join)
+ {
+ PlannerContext plannerContext = stack.getPlannerContext();
+ List<LogicalInputSpec> inputDescs = new ArrayList<>();
+ inputDescs.add(LogicalInputSpec.of(inputStages.get(0)));
+ for (int i = 1; i < inputStages.size(); i++) {
+ inputDescs.add(LogicalInputSpec.of(inputStages.get(i), i,
LogicalInputSpec.InputProperty.BROADCAST));
+ }
+ SourceDesc unnestSD = join.getSourceDesc(plannerContext,
Lists.transform(inputDescs, LogicalInputSpec::getSourceDesc));
+ return new SegmentMapStage(unnestSD, inputDescs);
+ }
+
+ private static LogicalStage buildMergeJoin(List<LogicalStage> inputStages,
DruidNodeStack stack, DruidJoin join)
+ {
+ String prefix = findUnusedJoinPrefix(inputStages.get(0).getRowSignature());
+
+ RowSignature signature = RowSignature.builder()
+ .addAll(inputStages.get(0).getLogicalRowSignature())
+ .addAll(inputStages.get(1).getLogicalRowSignature().withPrefix(prefix))
+ .build();
+
+ PlannerContext plannerContext = stack.getPlannerContext();
+ VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(
+ signature,
+ plannerContext.getExpressionParser(),
+ plannerContext.getPlannerConfig().isForceExpressionVirtualColumns()
+ );
+
plannerContext.setJoinExpressionVirtualColumnRegistry(virtualColumnRegistry);
+
+ // Generate the condition for this join as a Druid expression.
+ final DruidExpression condition = Expressions.toDruidExpression(
+ plannerContext,
+ signature,
+ join.getCondition()
+ );
+
+ // Unsetting it to avoid any VC Registry leaks incase there are multiple
+ // druid quries for the SQL
+ // It should be fixed soon with changes in interface for
+ // SqlOperatorConversion and Expressions bridge class
+ plannerContext.setJoinExpressionVirtualColumnRegistry(null);
+
+ if (!virtualColumnRegistry.isEmpty()) {
+ throw DruidException.defensive("Not sure how to handle this right now -
it should be fixed");
+ }
+
+ JoinConditionAnalysis analysis = JoinConditionAnalysis.forExpression(
+ condition.getExpression(),
+ plannerContext.parseExpression(condition.getExpression()),
+ prefix
+ );
+
+ // Partition by keys given by the join condition.
+ final List<List<KeyColumn>> partitionKeys =
SortMergeJoinStageProcessor.toKeyColumns(
+ SortMergeJoinStageProcessor.validateCondition(analysis)
+ );
+
+ List<LogicalStage> shuffleStages = new ArrayList<>();
+ for (int i = 0; i < inputStages.size(); i++) {
+ LogicalStage inputStage = inputStages.get(i);
+ shuffleStages.add(new ShuffleStage(inputStage, partitionKeys.get(i)));
+
+ }
+
+ return new SortMergeStage(
+ signature,
+ Lists.transform(shuffleStages, LogicalInputSpec::of),
+ prefix,
+ analysis,
+ DruidJoinQueryRel.toDruidJoinType(join.getJoinType())
+ );
+ }
+
+ private static String findUnusedJoinPrefix(RowSignature rowSignature)
+ {
+ List<String> leftColumnNames = rowSignature.getColumnNames();
+ return Calcites.findUnusedPrefixForDigits("j", leftColumnNames) + "0";
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
index b28d2cb2914..09ee864f582 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
@@ -19,20 +19,39 @@
package org.apache.druid.msq.logical.stages;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.exec.StageProcessor;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.inline.InlineInputSpec;
+import org.apache.druid.msq.input.lookup.LookupInputSpec;
+import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.logical.LogicalInputSpec;
import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.LookupDataSource;
+import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
+import
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
import org.apache.druid.sql.calcite.rel.logical.DruidAggregate;
import org.apache.druid.sql.calcite.rel.logical.DruidFilter;
import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;
import org.apache.druid.sql.calcite.rel.logical.DruidProject;
+import org.apache.druid.sql.calcite.rel.logical.DruidTableScan;
+import org.apache.druid.sql.calcite.rel.logical.DruidValues;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Optional;
/**
* Represents a stage that reads data from input sources.
@@ -98,4 +117,41 @@ public class ReadStage extends AbstractFrameProcessorStage
{
return StageMaker.makeScanStageProcessor(VirtualColumns.EMPTY, signature,
null);
}
+
+ public static Optional<ReadStage> buildReadStage(DruidNodeStack stack)
+ {
+ DruidLogicalNode node = stack.getNode();
+ if (node instanceof DruidValues || node instanceof DruidTableScan) {
+ return translateViaSourceDesc(stack.getPlannerContext(),
(SourceDescProducer) node);
+ }
+ return Optional.empty();
+ }
+
+ private static Optional<ReadStage> translateViaSourceDesc(PlannerContext
plannerContext, SourceDescProducer node)
+ {
+ SourceDesc sd = node.getSourceDesc(plannerContext,
Collections.emptyList());
+ InputSpec inputSpec = translateDataSource(sd.dataSource);
+ ReadStage stage = new ReadStage(sd.rowSignature,
LogicalInputSpec.of(inputSpec, sd.rowSignature));
+ return Optional.of(stage);
+ }
+
+ @Nonnull
+ private static InputSpec translateDataSource(DataSource dataSource)
+ {
+ if (dataSource instanceof TableDataSource) {
+ TableDataSource ids = (TableDataSource) dataSource;
+ TableInputSpec inputSpec = new TableInputSpec(ids.getName(),
Intervals.ONLY_ETERNITY, null, null);
+ return inputSpec;
+ }
+ if (dataSource instanceof InlineDataSource) {
+ InlineDataSource ids = (InlineDataSource) dataSource;
+ InlineInputSpec inputSpec = new InlineInputSpec(ids);
+ return inputSpec;
+ }
+ if (dataSource instanceof LookupDataSource) {
+ LookupDataSource lookupDataSource = (LookupDataSource) dataSource;
+ return new LookupInputSpec(lookupDataSource.getLookupName());
+ }
+ throw DruidException.defensive("This type of data source [%s] is not
currently supported.", dataSource.getClass());
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SegmentMapStage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SegmentMapStage.java
new file mode 100644
index 00000000000..a1b2a80b4ac
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SegmentMapStage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.exec.StageProcessor;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.logical.StageMaker;
+import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+
+import java.util.List;
+
+public class SegmentMapStage extends AbstractFrameProcessorStage
+{
+ private SourceDesc sourceDesc;
+
+ public SegmentMapStage(SourceDesc sourceDesc, List<LogicalInputSpec> inputs)
+ {
+ super(sourceDesc.rowSignature, inputs);
+ this.sourceDesc = sourceDesc;
+ }
+
+ @Override
+ public LogicalStage extendWith(DruidNodeStack stack)
+ {
+ return null;
+ }
+
+ @Override
+ public StageProcessor<?, ?> buildStageProcessor(StageMaker stageMaker)
+ {
+ return stageMaker.makeSegmentMapProcessor(signature,
sourceDesc.dataSource);
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/UnnestStage.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/UnnestStage.java
new file mode 100644
index 00000000000..a69ff0dc0d9
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/UnnestStage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+import org.apache.druid.sql.calcite.rule.logical.DruidUnnest;
+
+import java.util.Collections;
+
+public class UnnestStage
+{
+ public static LogicalStage buildUnnestStage(LogicalStage inputStage,
DruidNodeStack stack)
+ {
+ DruidUnnest unnest = (DruidUnnest) stack.getNode();
+ PlannerContext plannerContext = stack.getPlannerContext();
+ LogicalInputSpec logicalInputSpec = LogicalInputSpec.of(inputStage);
+ SourceDesc sourceDesc = logicalInputSpec.getSourceDesc();
+ SourceDesc unnestSD = unnest.getSourceDesc(plannerContext,
Collections.singletonList(sourceDesc));
+ return new SegmentMapStage(unnestSD,
Collections.singletonList(logicalInputSpec));
+ }
+}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryStageProcessor.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryStageProcessor.java
index 5be30fe72c6..61dcc8631a3 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryStageProcessor.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryStageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.druid.frame.write.FrameWriterFactory;
import org.apache.druid.msq.exec.FrameContext;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.querykit.BaseLeafStageProcessor;
+import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.scan.ScanQuery;
@@ -85,6 +86,17 @@ public class ScanQueryStageProcessor extends
BaseLeafStageProcessor
return new ScanQueryStageProcessor(scanQuery);
}
+ public static ScanQueryStageProcessor makeSegmentMapFnProcessor(RowSignature
signature, DataSource dataSource)
+ {
+ ScanQuery scanQuery = Druids.newScanQueryBuilder()
+ .dataSource(dataSource)
+ .intervals(QuerySegmentSpec.ETERNITY)
+ .columns(signature.getColumnNames())
+ .columnTypes(signature.getColumnTypes())
+ .build();
+ return new ScanQueryStageProcessor(scanQuery);
+ }
+
@JsonProperty
public ScanQuery getQuery()
{
diff --git a/multi-stage-query/src/main/resources/log4j2.xml
b/multi-stage-query/src/main/resources/log4j2.xml
index d98bb05ef6c..67d1e66244b 100644
--- a/multi-stage-query/src/main/resources/log4j2.xml
+++ b/multi-stage-query/src/main/resources/log4j2.xml
@@ -23,19 +23,31 @@
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
</Console>
+
+ <Routing name="RoutingAppender">
+ <Routes pattern="$${ctx:task.log.id}">
+ <!-- Task logs on CliIndexer should go to dedicated file -->
+ <Route>
+ <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+ <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{
[%markerSimpleName]} %m%n"/>
+ </File>
+ </Route>
+
+ <!-- Default route to send non-task logs to the Console -->
+ <Route key="$${ctx:task.log.id}" ref="Console"/>
+ </Routes>
+ </Routing>
+
</Appenders>
<Loggers>
<Root level="info">
- <AppenderRef ref="Console"/>
+ <AppenderRef ref="RoutingAppender"/>
</Root>
- <Logger level="info" name="org.apache.druid.frame" additivity="false">
- <AppenderRef ref="Console"/>
- </Logger>
- <Logger level="info" name="org.apache.druid.msq" additivity="false">
- <AppenderRef ref="Console"/>
+ <Logger level="info" name="org.apache.druid" additivity="false">
+ <AppenderRef ref="RoutingAppender"/>
</Logger>
- <Logger level="info" name="org.apache.calcite" additivity="false">
- <AppenderRef ref="Console"/>
+ <Logger level="debug"
name="org.apache.calcite.plan.AbstractRelOptPlanner.rule_execution_summary"
additivity="false">
+ <AppenderRef ref="RoutingAppender"/>
</Logger>
</Loggers>
</Configuration>
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
similarity index 93%
copy from
multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
copy to
multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
index 249a17c07fd..c7313c4c098 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
@@ -45,11 +45,10 @@ import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.sql.avatica.DartDruidMeta;
import org.apache.druid.sql.avatica.DruidMeta;
-import org.apache.druid.sql.calcite.TempDirProducer;
import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.DruidModuleCollection;
-import
org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
+import
org.apache.druid.sql.calcite.util.SqlTestFramework.QueryComponentSupplier;
import org.apache.druid.sql.calcite.util.datasets.TestDataSet;
import java.nio.ByteBuffer;
@@ -58,11 +57,11 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-public class DartComponentSupplier extends AbstractMSQComponentSupplierDelegate
+public abstract class AbstractDartComponentSupplier extends
AbstractMSQComponentSupplierDelegate
{
- public DartComponentSupplier(TempDirProducer tempFolderProducer)
+ public AbstractDartComponentSupplier(QueryComponentSupplier
componentSupplier)
{
- super(new StandardComponentSupplier(tempFolderProducer));
+ super(componentSupplier);
}
@Override
@@ -78,7 +77,6 @@ public class DartComponentSupplier extends
AbstractMSQComponentSupplierDelegate
walker.add(TestDataSet.NUMBERS, TestHelper.JSON_MAPPER,
getTempDirProducer().newTempFolder("tmp_numbers"));
return super.addSegmentsToWalker(walker);
}
-
@Override
public DruidModule getCoreModule()
{
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
index 4ffd0f8db98..3623983fa66 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
@@ -259,6 +259,24 @@ public class CalciteDartTest extends BaseCalciteQueryTest
.run();
}
+ @Test
+ public void testJoin()
+ {
+ testBuilder()
+ .sql("SELECT f1.dim2 from foo f1 join foo f2 on f1.dim1 = f2.dim1
order by f1.dim2")
+ .expectedResults(
+ ImmutableList.of(
+ new Object[]{null},
+ new Object[]{null},
+ new Object[]{""},
+ new Object[]{"a"},
+ new Object[]{"a"},
+ new Object[]{"abc"}
+ )
+ )
+ .run();
+ }
+
@Test
public void testSubQuery()
{
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
index 249a17c07fd..409bd73ae10 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
@@ -19,146 +19,13 @@
package org.apache.druid.msq.test;
-import com.google.inject.Binder;
-import com.google.inject.Provides;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.client.coordinator.NoopCoordinatorClient;
-import org.apache.druid.collections.NonBlockingPool;
-import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.guice.LazySingleton;
-import org.apache.druid.guice.annotations.EscalatedGlobal;
-import org.apache.druid.guice.annotations.Merging;
-import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.msq.dart.Dart;
-import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
-import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
-import org.apache.druid.msq.dart.guice.DartControllerModule;
-import org.apache.druid.msq.dart.guice.DartModules;
-import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule;
-import org.apache.druid.msq.dart.guice.DartWorkerModule;
-import org.apache.druid.msq.exec.WorkerRunRef;
-import org.apache.druid.query.TestBufferPool;
-import org.apache.druid.rpc.ServiceClientFactory;
-import org.apache.druid.rpc.guice.ServiceClientModule;
-import org.apache.druid.segment.TestHelper;
-import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
-import org.apache.druid.sql.avatica.DartDruidMeta;
-import org.apache.druid.sql.avatica.DruidMeta;
import org.apache.druid.sql.calcite.TempDirProducer;
-import org.apache.druid.sql.calcite.run.SqlEngine;
-import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.apache.druid.sql.calcite.util.DruidModuleCollection;
import
org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
-import org.apache.druid.sql.calcite.util.datasets.TestDataSet;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class DartComponentSupplier extends AbstractMSQComponentSupplierDelegate
+public class DartComponentSupplier extends AbstractDartComponentSupplier
{
public DartComponentSupplier(TempDirProducer tempFolderProducer)
{
super(new StandardComponentSupplier(tempFolderProducer));
}
-
- @Override
- public void gatherProperties(Properties properties)
- {
- super.gatherProperties(properties);
- properties.put(DartModules.DART_ENABLED_PROPERTY, "true");
- }
-
- @Override
- public SpecificSegmentsQuerySegmentWalker
addSegmentsToWalker(SpecificSegmentsQuerySegmentWalker walker)
- {
- walker.add(TestDataSet.NUMBERS, TestHelper.JSON_MAPPER,
getTempDirProducer().newTempFolder("tmp_numbers"));
- return super.addSegmentsToWalker(walker);
- }
-
- @Override
- public DruidModule getCoreModule()
- {
- return DruidModuleCollection.of(
- super.getCoreModule(),
- new DartControllerModule(),
- new DartWorkerModule(),
- new DartWorkerMemoryManagementModule(),
- new DartTestCoreModule()
- );
- }
-
- @Override
- public DruidModule getOverrideModule()
- {
- return DruidModuleCollection.of(
- super.getOverrideModule(),
- new DartTestOverrideModule()
- );
- }
-
- @Override
- public Class<? extends SqlEngine> getSqlEngineClass()
- {
- return DartSqlEngine.class;
- }
-
- static class DartTestCoreModule implements DruidModule
- {
- @Provides
- @EscalatedGlobal
- final ServiceClientFactory getServiceClientFactory(HttpClient ht)
- {
- return ServiceClientModule.makeServiceClientFactory(ht);
-
- }
-
- @Provides
- final DruidNodeDiscoveryProvider getDiscoveryProvider()
- {
- return new
CalciteTests.FakeDruidNodeDiscoveryProvider(Collections.emptyMap());
- }
-
- @Override
- public void configure(Binder binder)
- {
- binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class);
- }
- }
-
- static class DartTestOverrideModule implements DruidModule
- {
- @Provides
- @LazySingleton
- public DruidMeta createMeta(DartDruidMeta druidMeta)
- {
- return druidMeta;
- }
-
- @Override
- public void configure(Binder binder)
- {
- binder.bind(DartControllerContextFactory.class)
- .to(TestDartControllerContextFactoryImpl.class)
- .in(LazySingleton.class);
- }
-
- @Provides
- @Merging
- NonBlockingPool<ByteBuffer> makeMergingBuffer(TestBufferPool bufferPool)
- {
- return bufferPool;
- }
-
- @Provides
- @LazySingleton
- @Dart
- Map<String, WorkerRunRef> workerMap()
- {
- return new ConcurrentHashMap<>();
- }
- }
}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteArraysQueryTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteArraysQueryTest.java
new file mode 100644
index 00000000000..3a55e10745c
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteArraysQueryTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.test;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.sql.calcite.CalciteArraysQueryTest;
+import org.apache.druid.sql.calcite.NotYetSupported;
+import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
+import org.junit.AssumptionViolatedException;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
[email protected](DartComponentSupplier.class)
+public class DecoupledDartCalciteArraysQueryTest extends CalciteArraysQueryTest
+{
+ @RegisterExtension
+ NotYetSupportedProcessor notYetSupportedProcessor = new
NotYetSupportedProcessor(NotYetSupported.Scope.DECOUPLED_DART);
+
+ @RegisterExtension
+ DecoupledDartExtension decoupledExtension = new DecoupledDartExtension(this);
+
+ @Override
+ protected QueryTestBuilder testBuilder()
+ {
+ return decoupledExtension.testBuilder()
+ .queryContext(
+ ImmutableMap.<String, Object>builder()
+ .put(QueryContexts.CTX_PREPLANNED, true)
+ .put(QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED)
+ .put(QueryContexts.ENABLE_DEBUG, true)
+ .build()
+ );
+ }
+
+ @Override
+ protected void cannotVectorize()
+ {
+ }
+
+ @Override
+ protected void cannotVectorizeUnlessFallback()
+ {
+ }
+
+ @Override
+ protected void msqIncompatible()
+ {
+ throw new AssumptionViolatedException("Case marked as msqIncompatible; not
trying dart right now");
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteJoinQueryTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteJoinQueryTest.java
new file mode 100644
index 00000000000..76124a7b3f4
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteJoinQueryTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.test;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.query.JoinAlgorithm;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.sql.calcite.CalciteJoinQueryTest;
+import org.apache.druid.sql.calcite.NotYetSupported;
+import org.apache.druid.sql.calcite.NotYetSupported.Modes;
+import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.junit.AssumptionViolatedException;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
[email protected](DartComponentSupplier.class)
+public abstract class DecoupledDartCalciteJoinQueryTest extends
CalciteJoinQueryTest
+{
+
+ @Nested
+ public static class BroadcastTest extends DecoupledDartCalciteJoinQueryTest
+ {
+ @Override
+ protected JoinAlgorithm joinAlgorithm()
+ {
+ return JoinAlgorithm.BROADCAST;
+ }
+ }
+
+ @Nested
+ public static class SortMergeTest extends DecoupledDartCalciteJoinQueryTest
+ {
+ @Override
+ protected JoinAlgorithm joinAlgorithm()
+ {
+ return JoinAlgorithm.SORT_MERGE;
+ }
+
+ @NotYetSupported(Modes.DD_JOIN_CONDITION_NORMALIZATION)
+ @Test
+ @Override
+ public void testJoinWithInputRefCondition()
+ {
+ super.testJoinWithInputRefCondition();
+ }
+ }
+
+ @RegisterExtension
+ NotYetSupportedProcessor notYetSupportedProcessor = new
NotYetSupportedProcessor(
+ NotYetSupported.Scope.DECOUPLED_DART
+ );
+
+ @RegisterExtension
+ DecoupledDartExtension decoupledExtension = new DecoupledDartExtension(this);
+
+ @Override
+ protected QueryTestBuilder testBuilder()
+ {
+ return decoupledExtension.testBuilder()
+ .queryContext(
+ ImmutableMap.<String, Object>builder()
+ .put(QueryContexts.CTX_PREPLANNED, true)
+ .put(
+ QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
+ QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED
+ )
+ .put(QueryContexts.REWRITE_JOIN_TO_FILTER_ENABLE_KEY,
decoupledExtension)
+ .put(PlannerContext.CTX_SQL_JOIN_ALGORITHM,
joinAlgorithm().toString())
+ .put(QueryContexts.ENABLE_DEBUG, true)
+ .build()
+ );
+ }
+
+ protected abstract JoinAlgorithm joinAlgorithm();
+
+ @Override
+ protected void cannotVectorize()
+ {
+ }
+
+ @Override
+ protected void cannotVectorizeUnlessFallback()
+ {
+ }
+
+ @Override
+ protected void msqIncompatible()
+ {
+ throw new AssumptionViolatedException("Case marked as msqIncompatible; not
trying dart right now");
+ }
+
+ @Override
+ public boolean isSortBasedJoin()
+ {
+ return joinAlgorithm() == JoinAlgorithm.SORT_MERGE;
+ }
+
+ @Override
+ protected boolean isRunningMSQ()
+ {
+ return true;
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteNestedDataQueryTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteNestedDataQueryTest.java
new file mode 100644
index 00000000000..0cf97cb5d13
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteNestedDataQueryTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.test;
+
+import org.apache.druid.sql.calcite.CalciteNestedDataQueryTest;
+import org.apache.druid.sql.calcite.NotYetSupported;
+import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
+import org.apache.druid.sql.calcite.TempDirProducer;
+import org.junit.AssumptionViolatedException;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
[email protected](DecoupledDartCalciteNestedDataQueryTest.NestedDataQueryMSQComponentSupplier.class)
+public class DecoupledDartCalciteNestedDataQueryTest extends
CalciteNestedDataQueryTest
+{
+
+ public static class NestedDataQueryMSQComponentSupplier extends
AbstractDartComponentSupplier
+ {
+ public NestedDataQueryMSQComponentSupplier(TempDirProducer
tempFolderProducer)
+ {
+ super(new NestedComponentSupplier(tempFolderProducer));
+ }
+ }
+
+ @RegisterExtension
+ NotYetSupportedProcessor notYetSupportedProcessor = new
NotYetSupportedProcessor(NotYetSupported.Scope.DECOUPLED_DART);
+
+ @RegisterExtension
+ DecoupledDartExtension decoupledExtension = new DecoupledDartExtension(this);
+
+ @Override
+ protected QueryTestBuilder testBuilder()
+ {
+ return decoupledExtension.testBuilder();
+ }
+
+ @Override
+ protected void msqIncompatible()
+ {
+ throw new AssumptionViolatedException("Case marked as msqIncompatible; not
trying dart right now");
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartExtension.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartExtension.java
index 32be6a06078..5cbc27a6c97 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartExtension.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartExtension.java
@@ -57,7 +57,7 @@ public class DecoupledDartExtension implements
BeforeEachCallback
public QueryTestBuilder testBuilder()
{
- CalciteTestConfig testConfig = baseTest.new
CalciteTestConfig(CONTEXT_OVERRIDES)
+ CalciteTestConfig testConfig = baseTest.new
CalciteTestConfig(CONTEXT_OVERRIDES, true)
{
@Override
public SqlTestFramework.PlannerFixture plannerFixture(PlannerConfig
plannerConfig, AuthConfig authConfig)
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/sql/calcite/NotYetSupportedUsageTest.java
b/multi-stage-query/src/test/java/org/apache/druid/sql/calcite/NotYetSupportedUsageTest.java
index 2076ae1a5f7..ae253cb3087 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/sql/calcite/NotYetSupportedUsageTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/sql/calcite/NotYetSupportedUsageTest.java
@@ -48,7 +48,9 @@ public class NotYetSupportedUsageTest
Set<NotYetSupported.Modes> modes = new
HashSet<>(Arrays.asList(NotYetSupported.Modes.values()));
for (Method method : methodsAnnotatedWith) {
NotYetSupported annot = method.getAnnotation(NotYetSupported.class);
- modes.remove(annot.value());
+ for (Modes m : annot.value()) {
+ modes.remove(m);
+ }
}
assertEquals("There are unused modes which should be removed",
Collections.emptySet(), modes);
@@ -67,7 +69,11 @@ public class NotYetSupportedUsageTest
@Override
public int compare(ReportEntry l, ReportEntry r)
{
- int res = l.className.compareTo(r.className);
+ int res = l.mode.scope.compareTo(r.mode.scope);
+ if (res != 0) {
+ return res;
+ }
+ res = l.className.compareTo(r.className);
if (res != 0) {
return res;
}
@@ -104,7 +110,7 @@ public class NotYetSupportedUsageTest
@Override
public String toString()
{
- return " | " + className + " | " + methodNames.size() + " | " + mode + "
| " + mode.regex + " | ";
+ return " | " + mode.scope + " | " + className + " | " +
methodNames.size() + " | " + mode.name() + " | " + mode.regex + " | ";
}
}
@@ -115,16 +121,18 @@ public class NotYetSupportedUsageTest
Map<List<Object>, ReportEntry> mentryMap = new HashMap<>();
for (Method method : methodsAnnotatedWith) {
- ReportEntry entry = new ReportEntry(
- method.getDeclaringClass().getSimpleName(),
- method.getName(),
- getAnnotation(method)
- );
- ReportEntry existing = mentryMap.get(entry.getKey());
- if (existing != null) {
- existing.merge(entry);
- } else {
- mentryMap.put(entry.getKey(), entry);
+ for (Modes mode : getAnnotation(method)) {
+ ReportEntry entry = new ReportEntry(
+ method.getDeclaringClass().getSimpleName(),
+ method.getName(),
+ mode
+ );
+ ReportEntry existing = mentryMap.get(entry.getKey());
+ if (existing != null) {
+ existing.merge(entry);
+ } else {
+ mentryMap.put(entry.getKey(), entry);
+ }
}
}
@@ -136,7 +144,7 @@ public class NotYetSupportedUsageTest
}
- private Modes getAnnotation(Method method)
+ private Modes[] getAnnotation(Method method)
{
NotYetSupported annotation = method.getAnnotation(NotYetSupported.class);
if (annotation == null) {
diff --git
a/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java
b/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java
index 26f6f74a51a..c7f43fa653a 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java
@@ -422,4 +422,14 @@ public class RowSignature implements ColumnInspector
}
return ret;
}
+
+ public RowSignature withPrefix(String prefix)
+ {
+ Builder builder = new Builder();
+ for (String columnName : columnNames) {
+ ColumnType columnType = columnTypes.get(columnName);
+ builder.add(prefix + columnName, columnType);
+ }
+ return builder.build();
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/Larry.java
b/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/Larry.java
new file mode 100644
index 00000000000..76b2b6f48a3
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/Larry.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.util.datasets;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.AutoTypeColumnSchema;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Larry extends MapBasedTestDataset
+{
+ protected Larry()
+ {
+ this("larry");
+ }
+
+ public Larry(String name)
+ {
+ super(name);
+ }
+
+ @Override
+ public final InputRowSchema getInputRowSchema()
+ {
+ return new InputRowSchema(
+ new TimestampSpec(TIMESTAMP_COLUMN, "iso", null),
+ new DimensionsSpec(
+ ImmutableList.<DimensionSchema>builder()
+ .addAll(
+ DimensionsSpec.getDefaultSchemas(
+ ImmutableList.of(
+ "label",
+ "mv"
+ )
+ )
+ )
+ .add(new LongDimensionSchema("l1"))
+ .add(new AutoTypeColumnSchema("l_arr", null))
+ .build()
+ ),
+ null
+ );
+ }
+
+ @Override
+ public List<AggregatorFactory> getMetrics()
+ {
+ return ImmutableList.of(
+ new CountAggregatorFactory("cnt")
+ );
+ }
+
+ @Override
+ public List<Map<String, Object>> getRawRows()
+ {
+ return ImmutableList.of(
+ makeRow("[]", ImmutableList.of()),
+ makeRow("[null]", Collections.singletonList(null)),
+ makeRow("[1]", ImmutableList.of(1)),
+ makeRow("[2,3]", ImmutableList.of(2, 3)),
+ makeRow("null", null)
+ );
+ }
+
+ private Map<String, Object> makeRow(String label, Object object)
+ {
+ Map<String, Object> ret = new HashMap<String, Object>();
+
+ ret.put("t", "2000-01-01");
+ ret.put("label", label);
+ ret.put("l1", 11);
+ ret.put("mv", object);
+ ret.put("l_arr", object);
+ return ret;
+
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/TestDataSet.java
b/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/TestDataSet.java
index c7b688499a9..b861d9b248e 100644
---
a/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/TestDataSet.java
+++
b/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/TestDataSet.java
@@ -30,6 +30,7 @@ public interface TestDataSet
String TIMESTAMP_COLUMN = "t";
MapBasedTestDataset NUMFOO = new NumFoo();
+ MapBasedTestDataset LARRY = new Larry();
MapBasedTestDataset BROADCAST = new NumFoo("broadcast");
MapBasedTestDataset RESTRICTED_BROADCAST = new
NumFoo("restrictedBroadcastDatasource_m1_is_6");
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java
index 837d951dc63..fa48e04e41e 100644
---
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java
+++
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java
@@ -28,6 +28,7 @@ import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.hep.HepMatchOrder;
import org.apache.calcite.plan.hep.HepProgram;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.plan.volcano.AbstractConverter;
@@ -44,6 +45,7 @@ import
org.apache.calcite.rel.rules.FilterProjectTransposeRule;
import org.apache.calcite.rel.rules.JoinExtractFilterRule;
import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
import org.apache.calcite.rel.rules.ProjectMergeRule;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
import org.apache.calcite.rel.rules.PruneEmptyRules;
import org.apache.calcite.sql.SqlExplainFormat;
import org.apache.calcite.sql.SqlExplainLevel;
@@ -280,8 +282,9 @@ public class CalciteRulesManager
private Program buildDecoupledLogicalOptimizationProgram(PlannerContext
plannerContext)
{
final HepProgramBuilder builder = HepProgram.builder();
+ builder.addMatchOrder(HepMatchOrder.BOTTOM_UP);
builder.addMatchLimit(CalciteRulesManager.HEP_DEFAULT_MATCH_LIMIT);
- builder.addRuleCollection(baseRuleSet(plannerContext));
+ builder.addRuleCollection(baseRuleSet(plannerContext, false));
builder.addRuleInstance(CoreRules.UNION_MERGE);
builder.addRuleInstance(FilterCorrelateRule.Config.DEFAULT.toRule());
builder.addRuleInstance(FilterProjectTransposeRule.Config.DEFAULT.toRule());
@@ -290,6 +293,8 @@ public class CalciteRulesManager
builder.addRuleInstance(FilterProjectTransposeRule.Config.DEFAULT.toRule());
builder.addRuleInstance(new LogicalUnnestRule());
builder.addRuleInstance(new UnnestInputCleanupRule());
+ builder.addRuleInstance(ProjectMergeRule.Config.DEFAULT.toRule());
+ builder.addRuleInstance(ProjectRemoveRule.Config.DEFAULT.toRule());
final HepProgramBuilder cleanupRules = HepProgram.builder();
cleanupRules.addRuleInstance(FilterProjectTransposeRule.Config.DEFAULT.toRule());
@@ -458,7 +463,7 @@ public class CalciteRulesManager
{
final ImmutableList.Builder<RelOptRule> retVal = ImmutableList
.<RelOptRule>builder()
- .addAll(baseRuleSet(plannerContext))
+ .addAll(baseRuleSet(plannerContext,
plannerContext.getJoinAlgorithm().requiresSubquery()))
.add(DruidRelToDruidRule.instance())
.add(new DruidTableScanRule(plannerContext))
.add(new DruidLogicalValuesRule(plannerContext))
@@ -483,7 +488,7 @@ public class CalciteRulesManager
public List<RelOptRule> bindableConventionRuleSet(final PlannerContext
plannerContext)
{
return ImmutableList.<RelOptRule>builder()
- .addAll(baseRuleSet(plannerContext))
+ .addAll(baseRuleSet(plannerContext, false))
.addAll(Bindables.RULES)
.addAll(DEFAULT_BINDABLE_RULES)
.add(CoreRules.AGGREGATE_REDUCE_FUNCTIONS)
@@ -501,7 +506,7 @@ public class CalciteRulesManager
return (bloat != null) ? bloat : DEFAULT_BLOAT;
}
- public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
+ public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext,
boolean withJoinRules)
{
final PlannerConfig plannerConfig = plannerContext.getPlannerConfig();
final ImmutableList.Builder<RelOptRule> rules = ImmutableList.builder();
@@ -513,7 +518,7 @@ public class CalciteRulesManager
rules.add(new
DruidAggregateCaseToFilterRule(plannerContext.queryContext().isExtendedFilteredSumRewrite()));
rules.addAll(configurableRuleSet(plannerContext));
- if (plannerContext.getJoinAlgorithm().requiresSubquery()) {
+ if (withJoinRules) {
rules.addAll(FANCY_JOIN_RULES);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidJoin.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidJoin.java
index 5d531bfde80..3ea022fcc1f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidJoin.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidJoin.java
@@ -30,7 +30,9 @@ import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
+import org.apache.druid.query.JoinAlgorithm;
import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.planner.QueryUtils;
import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
@@ -76,4 +78,9 @@ public class DruidJoin extends Join implements
DruidLogicalNode, SourceDescProdu
SourceDesc rightDesc = sources.get(1);
return DruidJoinQueryRel.buildJoinSourceDesc(leftDesc, rightDesc,
plannerContext, this, null);
}
+
+ public JoinAlgorithm getJoinAlgorithm(PlannerContext plannerContext)
+ {
+ return QueryUtils.getJoinAlgorithm(this, plannerContext);
+ }
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 287f216d3aa..4ef05973389 100644
---
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -91,6 +91,7 @@ import org.apache.druid.sql.calcite.schema.NamedSchema;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.QueryFrameworkUtils;
+import org.apache.druid.sql.calcite.util.datasets.TestDataSet;
import org.apache.druid.sql.guice.SqlModule;
import org.apache.druid.sql.hook.DruidHookDispatcher;
import org.eclipse.jetty.server.Server;
@@ -572,6 +573,12 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
Pair.of("TABLE_TYPE", "TABLE")
),
+ row(
+ Pair.of("TABLE_CAT", "druid"),
+ Pair.of("TABLE_NAME", TestDataSet.LARRY.getName()),
+ Pair.of("TABLE_SCHEM", "druid"),
+ Pair.of("TABLE_TYPE", "TABLE")
+ ),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE5),
@@ -676,6 +683,12 @@ public class DruidAvaticaHandlerTest extends
CalciteTestBase
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
+ row(
+ Pair.of("TABLE_CAT", "druid"),
+ Pair.of("TABLE_NAME", TestDataSet.LARRY.getName()),
+ Pair.of("TABLE_SCHEM", "druid"),
+ Pair.of("TABLE_TYPE", "TABLE")
+ ),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE5),
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index afae66b062d..8f65b432843 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -84,6 +84,7 @@ import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.calcite.DrillWindowQueryTest.ArrayRowCmp;
import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.planner.Calcites;
@@ -1019,7 +1020,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
}
}
- public void assertResultsValid(final ResultMatchMode matchMode, final
List<Object[]> expected, final QueryResults queryResults)
+ public static void assertResultsValid(final ResultMatchMode matchMode, final
List<Object[]> expected, final QueryResults queryResults)
{
final List<Object[]> results = queryResults.results;
Assert.assertEquals("Result count mismatch", expected.size(),
results.size());
@@ -1388,7 +1389,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
return new DefaultResultsVerifier(expectedResults,
expectedResultMatchMode, expectedSignature);
}
- public class DefaultResultsVerifier implements ResultsVerifier
+ public static class DefaultResultsVerifier implements ResultsVerifier
{
protected final List<Object[]> expectedResults;
@Nullable
@@ -1428,7 +1429,22 @@ public class BaseCalciteQueryTest extends CalciteTestBase
throw e;
}
}
+ }
+
+ public static class UnorderedResultsVerifier extends DefaultResultsVerifier
+ {
+ public UnorderedResultsVerifier(List<Object[]> expectedResults,
ResultMatchMode expectedResultMatchMode,
+ RowSignature expectedSignature)
+ {
+ super(ImmutableList.sortedCopyOf(new ArrayRowCmp(), expectedResults),
expectedResultMatchMode, expectedSignature);
+ }
+ @Override
+ public void verify(String sql, QueryResults queryResults)
+ {
+ queryResults.results.sort(new ArrayRowCmp());
+ super.verify(sql, queryResults);
+ }
}
/**
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
index 1d754142555..a4b99952fd0 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
@@ -1612,7 +1612,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
// the Scan query in native reads this makeColumnValueSelector. Behavior
of those selectors is inconsistent.
// The DimensionSelector returns an empty list; the ColumnValueSelector
returns a list containing a single null.
final String expectedValueForEmptyMvd =
- queryFramework().engine().name().equals("msq-task")
+ isRunningMSQ()
? null
: "not abd";
@@ -2664,6 +2664,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
+ @NotYetSupported(Modes.DD_RESULT_MISMATCH_FLOAT_DOUBLE)
@Test
public void testArrayAggNumeric()
{
@@ -3888,7 +3889,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
}
- @NotYetSupported(Modes.UNNEST_INLINED)
+ @NotYetSupported({Modes.UNNEST_INLINED, Modes.DD_UNNEST_INLINED})
@Test
public void testUnnestInline()
{
@@ -3922,7 +3923,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_INLINED)
+ @NotYetSupported({Modes.UNNEST_INLINED, Modes.DD_UNNEST_INLINED})
@Test
public void testUnnestInlineWithCount()
{
@@ -3953,7 +3954,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@Test
public void testUnnest()
{
@@ -4335,7 +4336,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@Test
public void testUnnestTwice()
{
@@ -4892,7 +4893,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@Test
public void testUnnestWithGroupBy()
{
@@ -4955,7 +4956,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@Test
public void testUnnestWithGroupByOrderBy()
{
@@ -4999,7 +5000,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@Test
public void testUnnestWithGroupByOrderByWithLimit()
{
@@ -5093,7 +5094,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@Test
public void testUnnestFirstQueryOnSelect()
{
@@ -5449,7 +5450,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@Test
public void testUnnestWithInFilters()
{
@@ -5567,7 +5568,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@DecoupledTestConfig(ignoreExpectedQueriesReason =
IgnoreQueriesReason.UNNEST_EXTRA_SCANQUERY)
@Test
public void testUnnestWithJoinOnTheLeft()
@@ -5619,7 +5620,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_INLINED)
+ @NotYetSupported({Modes.UNNEST_INLINED, Modes.DD_UNNEST_INLINED})
@Test
public void testUnnestWithConstant()
{
@@ -5677,7 +5678,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@Test
public void testUnnestWithSQLFunctionOnUnnestedColumn()
{
@@ -6209,7 +6210,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
@Test
public void testUnnestWithCountOnColumn()
{
@@ -7413,4 +7414,61 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
)
);
}
+
+
+ @Test
+ public void testSimpleArraysUnnest()
+ {
+ skipVectorize();
+ testBuilder()
+ .sql("SELECT label,l_arr,val from larry,unnest(l_arr) as u(val)")
+ .expectedResults(
+ ImmutableList.of(
+ new Object[] {"[1]", "[1]", 1L},
+ new Object[] {"[2,3]", "[2,3]", 2L},
+ new Object[] {"[2,3]", "[2,3]", 3L},
+ new Object[] {"[null]", "[null]", null}
+ )
+ )
+ .run();
+ }
+
+ @Test
+ public void testMvToArrayResults()
+ {
+ skipVectorize();
+ testBuilder()
+ .sql("SELECT label,l_arr,mv_to_array(mv) from larry")
+ .expectedResults(
+ ImmutableList.of(
+ new Object[]{"[1]", "[1]", "[\"1\"]"},
+ new Object[]{"[2,3]", "[2,3]", "[\"2\",\"3\"]"},
+ new Object[]{"[]", "[]", null},
+ new Object[]{"[null]", "[null]", null},
+ new Object[]{"null", null, null}
+ )
+ )
+ .run();
+ }
+
+ @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH,
Modes.DD_UNNEST_RESULT_MISMATCH})
+ @Test
+ public void testMvToArrayUnnest()
+ {
+ skipVectorize();
+ testBuilder()
+ .sql("SELECT label,l_arr,mv_to_array(mv),val from
larry,unnest(mv_to_array(mv)) as u(val)")
+ .expectedResults(
+ ImmutableList.of(
+ new Object[]{"[1]", "[1]", "[\"1\"]", "1"},
+ new Object[]{"[2,3]", "[2,3]", "[\"2\",\"3\"]", "2"},
+ new Object[]{"[2,3]", "[2,3]", "[\"2\",\"3\"]", "3"},
+ // below results will be missing in decoupled mode
+ new Object[]{"[]", "[]", null, null},
+ new Object[]{"[null]", "[null]", null, null},
+ new Object[]{"null", null, null, null}
+ )
+ )
+ .run();
+ }
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
index b683b8bce91..06c2241643c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
@@ -91,6 +91,7 @@ import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.hamcrest.CoreMatchers;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
import org.junit.Assert;
@@ -2466,6 +2467,26 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
);
}
+ @Test
+ public void testJoinOfTwoJoinsWithSubQueries()
+ {
+ skipVectorize();
+
+ String sql = "with\n"
+ + "l1 as (SELECT f.dim1, sum(n.dbl1) s1 from foo f join numfoo n on
n.dim2=f.dim2 group by f.dim1),\n"
+ + "r1 as (SELECT f.dim2, sum(n.dbl2) s2 from foo f join numfoo n on
n.dim1=f.dim1 group by f.dim2)\n"
+ + "select dim1, s1+s2 FROM l1 join r1 on dim1=dim2";
+
+ testBuilder()
+ .sql(sql)
+ .expectedResults(
+ ImmutableList.of(
+ new Object[]{"", 1.0D}
+ )
+ )
+ .run();
+ }
+
@MethodSource("provideQueryContexts")
@ParameterizedTest(name = "{0}")
public void testSelectOnLookupUsingLeftJoinOperator(Map<String, Object>
queryContext)
@@ -5124,6 +5145,7 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
);
}
+ @NotYetSupported(Modes.DD_RESTRICTED_DATASOURCE_SUPPORT2)
@MethodSource("provideQueryContexts")
@ParameterizedTest(name = "{0}")
public void testJoinOnRestrictedBroadcast(Map<String, Object> queryContext)
@@ -5174,9 +5196,13 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
ImmutableList.of()
)
);
- Assert.assertTrue(e.getMessage()
- .contains(
- "Restricted data source
[GlobalTableDataSource{name='restrictedBroadcastDatasource_m1_is_6'}] with
policy [RowFilterPolicy{rowFilter=m1 = 6 (LONG)}] is not supported"));
+
+ assertThat(
+ e.getMessage(),
+ CoreMatchers.containsString(
+ "Restricted data source
[GlobalTableDataSource{name='restrictedBroadcastDatasource_m1_is_6'}] with
policy [RowFilterPolicy{rowFilter=m1 = 6 (LONG)}] is not supported"
+ )
+ );
}
@MethodSource("provideQueryContexts")
@@ -5686,7 +5712,7 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.SORT_REMOVE_TROUBLE)
+ @NotYetSupported({Modes.SORT_REMOVE_TROUBLE, Modes.DD_SORT_REMOVE_TROUBLE})
@MethodSource("provideQueryContexts")
@ParameterizedTest(name = "{0}")
public void testRegressionFilteredAggregatorsSubqueryJoins(Map<String,
Object> queryContext)
@@ -6230,13 +6256,12 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
public void testSelfJoinsWithUnnestOnLeftAndRight()
{
Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
- testQuery(
- "with t1 as (\n"
+ String sql = "with t1 as (\n"
+ "select * from foo, unnest(MV_TO_ARRAY(\"dim3\")) as u(d3)\n"
+ ")\n"
+ "select t1.dim3, t1.d3, t2.dim2 from t1 JOIN t1 as t2\n"
- + "ON t1.d3 = t2.d3",
- context,
+ + "ON t1.d3 = t2.d3";
+ ImmutableList<Query<?>> expectedQueries =
ImmutableList.of(
newScanQueryBuilder()
.dataSource(
@@ -6269,20 +6294,24 @@ public class CalciteJoinQueryTest extends
BaseCalciteQueryTest
.columnTypes(ColumnType.STRING, ColumnType.STRING,
ColumnType.STRING)
.context(context)
.build()
- ),
- sortIfSortBased(
- ImmutableList.of(
- new Object[]{"[\"a\",\"b\"]", "a", "a"},
- new Object[]{"[\"a\",\"b\"]", "b", "a"},
- new Object[]{"[\"a\",\"b\"]", "b", null},
- new Object[]{"[\"b\",\"c\"]", "b", "a"},
- new Object[]{"[\"b\",\"c\"]", "b", null},
- new Object[]{"[\"b\",\"c\"]", "c", null},
- new Object[]{"d", "d", ""},
- new Object[]{"", "", "a"}
- ), 0
- )
+ );
+ ImmutableList<Object[]> expectedResults = ImmutableList.of(
+ new Object[] {"[\"a\",\"b\"]", "a", "a"},
+ new Object[] {"[\"a\",\"b\"]", "b", "a"},
+ new Object[] {"[\"a\",\"b\"]", "b", null},
+ new Object[] {"[\"b\",\"c\"]", "b", "a"},
+ new Object[] {"[\"b\",\"c\"]", "b", null},
+ new Object[] {"[\"b\",\"c\"]", "c", null},
+ new Object[] {"d", "d", ""},
+ new Object[] {"", "", "a"}
);
+
+ testBuilder()
+ .queryContext(context)
+ .sql(sql)
+ .expectedQueries(expectedQueries)
+ .expectedResults(new UnorderedResultsVerifier(expectedResults,
ResultMatchMode.RELAX_NULLS, null))
+ .run();
}
@DecoupledTestConfig(ignoreExpectedQueriesReason =
IgnoreQueriesReason.UNNEST_EXTRA_SCANQUERY)
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
index e2bc176068b..e43bc08c111 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
@@ -86,6 +86,8 @@ import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static org.hamcrest.MatcherAssert.assertThat;
+
@SqlTestFrameworkConfig.ComponentSupplier(NestedComponentSupplier.class)
public class CalciteNestedDataQueryTest extends BaseCalciteQueryTest
{
@@ -5452,13 +5454,15 @@ public class CalciteNestedDataQueryTest extends
BaseCalciteQueryTest
public void testJoinOnNestedColumnThrows()
{
DruidException e = Assertions.assertThrows(DruidException.class, () -> {
- testQuery(
- "SELECT * FROM druid.nested a INNER JOIN druid.nested b ON a.nester
= b.nester",
- ImmutableList.of(),
- ImmutableList.of()
- );
+ testBuilder()
+ .sql("SELECT * FROM druid.nested a INNER JOIN druid.nested b ON
a.nester = b.nester")
+ .run();
});
- Assertions.assertEquals("Cannot join when the join condition has column of
type [COMPLEX<json>]", e.getMessage());
+
+ assertThat(
+ e.getMessage(),
+ CoreMatchers.containsString("Cannot join when the join condition has
column of type [COMPLEX<json>]")
+ );
}
@Test
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 0ff94fb020a..bf1ba524072 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -132,6 +132,7 @@ import
org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestDataBuilder;
+import org.apache.druid.sql.calcite.util.datasets.TestDataSet;
import org.hamcrest.CoreMatchers;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -189,6 +190,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.DATASOURCE1,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE2,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE4,
"TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", TestDataSet.LARRY.getName(),
"TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE5,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE3,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid",
CalciteTests.RESTRICTED_BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
@@ -233,6 +235,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.add(new Object[]{"druid", CalciteTests.DATASOURCE2,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE4,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid",
CalciteTests.FORBIDDEN_DATASOURCE, "TABLE", "NO", "NO"})
+ .add(new Object[]{"druid", TestDataSet.LARRY.getName(),
"TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE5,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE3,
"TABLE", "NO", "NO"})
.add(new Object[]{"druid",
CalciteTests.RESTRICTED_BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
@@ -2584,7 +2587,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.run();
}
- @NotYetSupported(Modes.DD_JOIN)
@SqlTestFrameworkConfig.NumMergeBuffers(3)
@Test
public void testExactCountDistinctWithFilter2()
@@ -7258,7 +7260,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.DD_JOIN)
@DecoupledTestConfig(quidemReason = QuidemTestCaseReason.AGG_COL_EXCHANGE)
@Test
public void
testMultipleExactCountDistinctWithGroupingAndOtherAggregatorsUsingJoin()
@@ -12665,7 +12666,6 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
}
// __time >= x remains in the join condition
- @NotYetSupported(Modes.DD_JOIN)
@DecoupledTestConfig(quidemReason =
QuidemTestCaseReason.JOIN_FILTER_LOCATIONS)
@Test
public void testRequireTimeConditionPositive3()
@@ -14888,7 +14888,6 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.DD_JOIN)
@Test
public void testOrderByAlongWithInternalScanQuery()
{
@@ -14933,7 +14932,6 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
);
}
- @NotYetSupported(Modes.DD_JOIN)
@Test
public void testOrderByAlongWithInternalScanQueryNoDistinct()
{
@@ -15606,7 +15604,7 @@ public class CalciteQueryTest extends
BaseCalciteQueryTest
.run();
}
- @NotYetSupported(Modes.DD_JOIN)
+ @NotYetSupported(Modes.DD_WINDOW)
@Test
public void testWindowingOverJoin()
{
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
index d413a0147bf..5ef5eac695f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
@@ -232,7 +232,7 @@ public class DrillWindowQueryTest extends
BaseCalciteQueryTest
}
}
- public class TextualResultsVerifier implements ResultsVerifier
+ public static class TextualResultsVerifier implements ResultsVerifier
{
protected final List<String[]> expectedResultsText;
@Nullable
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
index 7767025399e..25972ae556c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
@@ -27,6 +27,8 @@ import org.junit.jupiter.api.extension.InvocationInterceptor;
import org.junit.jupiter.api.extension.ReflectiveInvocationContext;
import org.opentest4j.IncompleteExecutionException;
+import javax.annotation.Nullable;
+
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@@ -73,7 +75,7 @@ import static org.junit.Assert.assertThrows;
@Target({ElementType.METHOD})
public @interface NotYetSupported
{
- Modes value();
+ Modes[] value();
enum Scope
{
@@ -105,11 +107,17 @@ public @interface NotYetSupported
UNNEST_RESULT_MISMATCH(Scope.DECOUPLED, AssertionError.class, "(Result
count mismatch|column content mismatch)"),
DD_RESTRICTED_DATASOURCE_SUPPORT(Scope.DECOUPLED_DART,
DruidException.class, "ForbiddenException: Unauthorized"),
+ DD_RESTRICTED_DATASOURCE_SUPPORT2(Scope.DECOUPLED_DART,
AssertionError.class, "Unauthorized"),
DD_INCORRECT_RESULTS_EMPTY_STRING(Scope.DECOUPLED_DART,
AssertionError.class, "column content mismatch at"),
NO_INFORMATION_SCHEMA_SUPPORT(Scope.DECOUPLED_DART, DruidException.class,
"INFORMATION_SCHEMA"),
- DD_JOIN(Scope.DECOUPLED_DART, DruidException.class,
"DruidJoin.DRUID_LOGICAL"),
DD_NULL_COLUMN_ORDER(Scope.DECOUPLED_DART, DruidException.class, "sort:
\\[\\] -> \\[1\\]"),
- DD_UNION(Scope.DECOUPLED_DART, DruidException.class,
"DruidUnion.DRUID_LOGICAL");
+ DD_UNION(Scope.DECOUPLED_DART, DruidException.class,
"DruidUnion.DRUID_LOGICAL"),
+ DD_WINDOW(Scope.DECOUPLED_DART, DruidException.class,
"DruidWindow.DRUID_LOGICAL"),
+ DD_UNNEST_RESULT_MISMATCH(Scope.DECOUPLED_DART, AssertionError.class,
"(Result count mismatch|column content mismatch)"),
+ DD_UNNEST_INLINED(Scope.DECOUPLED_DART, Exception.class, "Missing
conversion is Uncollect"),
+ DD_SORT_REMOVE_TROUBLE(Scope.DECOUPLED_DART, DruidException.class,
"Calcite assertion violated.*Sort\\.<init>"),
+ DD_JOIN_CONDITION_NORMALIZATION(Scope.DECOUPLED_DART,
DruidException.class, "Cannot handle equality"),
+ DD_RESULT_MISMATCH_FLOAT_DOUBLE(Scope.DECOUPLED_DART,
AssertionError.class, "column content mismatch");
// @formatter:on
public Scope scope;
@@ -162,14 +170,15 @@ public @interface NotYetSupported
{
Method method = extensionContext.getTestMethod().get();
NotYetSupported annotation = method.getAnnotation(NotYetSupported.class);
+ Modes ignoreMode = getModeForScope(annotation);
+
- if (annotation == null || annotation.value().scope != scope) {
+ if (ignoreMode == null) {
invocation.proceed();
return;
}
{
{
- Modes ignoreMode = annotation.value();
Throwable e = null;
try {
invocation.proceed();
@@ -197,16 +206,30 @@ public @interface NotYetSupported
);
String trace = Throwables.getStackTraceAsString(e);
- Matcher m = annotation.value().getPattern().matcher(trace);
+ Matcher m = ignoreMode.getPattern().matcher(trace);
if (!m.find()) {
- throw new AssertionError("Exception stacktrace doesn't match
regex: " + annotation.value().regex, e);
+ throw new AssertionError("Exception stacktrace doesn't match
regex: " + ignoreMode.regex, e);
}
throw new AssumptionViolatedException("Test is not-yet supported;
ignored with:" + annotation);
}
}
}
+ private Modes getModeForScope(@Nullable NotYetSupported annotation)
+ {
+ if (annotation == null) {
+ return null;
+ }
+ for (Modes mode : annotation.value()) {
+ if (mode.scope == scope) {
+ return mode;
+ }
+
+ }
+ return null;
+ }
+
@Override
public void interceptTestTemplateMethod(Invocation<Void> invocation,
ReflectiveInvocationContext<Method> invocationContext,
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java
b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java
index c35003e0e76..d110f3200d7 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java
@@ -127,7 +127,7 @@ public class QueryTestBuilder
public QueryTestBuilder queryContext(Map<String, Object> queryContext)
{
- this.queryContext = QueryContexts.override(config.baseQueryContext(),
queryContext);
+ this.queryContext = QueryContexts.override(this.queryContext,
queryContext);
return this;
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
index d6d544af522..c283a364d1a 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
@@ -235,7 +235,7 @@ public class CalcitePlannerModuleTest extends
CalciteTestBase
private void assertBloat(PlannerContext context, int expectedBloat)
{
- Optional<ProjectMergeRule> firstProjectMergeRule =
injector.getInstance(CalciteRulesManager.class).baseRuleSet(context).stream()
+ Optional<ProjectMergeRule> firstProjectMergeRule =
injector.getInstance(CalciteRulesManager.class).baseRuleSet(context,
false).stream()
.filter(rule -> rule instanceof ProjectMergeRule)
.map(rule -> (ProjectMergeRule) rule)
.findAny();
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
index f455b0a0d4b..2625da9044b 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
@@ -860,6 +860,10 @@ public class TestDataBuilder
TestDataSet.NUMFOO,
TestHelper.JSON_MAPPER,
new File(tmpDir, "3")
+ ).add(
+ TestDataSet.LARRY,
+ TestHelper.JSON_MAPPER,
+ new File(tmpDir, "larry")
).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE4)
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
index 8f0c7eec4b3..55443bcaa30 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
@@ -39,7 +39,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
-import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -100,22 +100,26 @@ public class TestTimelineServerView implements
TimelineServerView
@Override
public Optional<? extends TimelineLookup<String, ServerSelector>>
getTimeline(TableDataSource table)
{
+ final VersionedIntervalTimeline<String, ServerSelector> timelineLookup =
+ new VersionedIntervalTimeline<>(Comparator.naturalOrder());
+
for (DataSegment segment : segments) {
if (!segment.getDataSource().equals(table.getName())) {
continue;
}
- VersionedIntervalTimeline<String, ServerSelector> timelineLookup = new
VersionedIntervalTimeline<String, ServerSelector>(
- Comparator.naturalOrder()
- );
TierSelectorStrategy st = new HighestPriorityTierSelectorStrategy(new
RandomServerSelectorStrategy());
- ServerSelector sss = new ServerSelector(segment, st,
HistoricalFilter.IDENTITY_FILTER);
+ ServerSelector serverSelector = new ServerSelector(segment, st,
HistoricalFilter.IDENTITY_FILTER);
- PartitionChunk<ServerSelector> partitionChunk = new
SingleElementPartitionChunk(sss);
+ PartitionChunk<ServerSelector> partitionChunk =
segment.getShardSpec().createChunk(serverSelector);
timelineLookup.add(segment.getInterval(), segment.getVersion(),
partitionChunk);
+ }
+
+ if (timelineLookup.isEmpty()) {
+ return Optional.empty();
+ } else {
return Optional.of(timelineLookup);
}
- return Optional.empty();
}
@Override
diff --git
a/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteArraysQueryTest/testUnnestThriceWithFiltersOnDimAndAllUnnestColumns.iq
b/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteArraysQueryTest/testUnnestThriceWithFiltersOnDimAndAllUnnestColumns.iq
index 23b89e6e323..5eb85f3a09d 100644
---
a/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteArraysQueryTest/testUnnestThriceWithFiltersOnDimAndAllUnnestColumns.iq
+++
b/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteArraysQueryTest/testUnnestThriceWithFiltersOnDimAndAllUnnestColumns.iq
@@ -22,7 +22,7 @@
!ok
LogicalProject(dimZipf=[$0], dim3_unnest1=[CAST('Baz':VARCHAR):VARCHAR],
dim3_unnest2=[$1], dim3_unnest3=[$3])
LogicalUnnest(unnestExpr=[MV_TO_ARRAY($2)], filter=[=($0, 'World')])
- LogicalProject(dimZipf=[$1], EXPR$0=[$2], dimMultivalEnumerated0=[$0])
+ LogicalProject(dimZipf=[$1], EXPR$00=[$2], dimMultivalEnumerated0=[$0])
LogicalUnnest(unnestExpr=[MV_TO_ARRAY($0)], filter=[=($0, 'Hello')])
LogicalProject(dimMultivalEnumerated=[$0], dimZipf=[$1])
LogicalUnnest(unnestExpr=[MV_TO_ARRAY($0)], filter=[=($0, 'Baz')])
@@ -33,7 +33,7 @@ LogicalProject(dimZipf=[$0],
dim3_unnest1=[CAST('Baz':VARCHAR):VARCHAR], dim3_un
!logicalPlan
DruidProject(dimZipf=[$0], dim3_unnest1=[CAST('Baz':VARCHAR):VARCHAR],
dim3_unnest2=[$1], dim3_unnest3=[$3], druid=[logical])
DruidUnnest(unnestExpr=[MV_TO_ARRAY($2)], filter=[=($0, 'World')])
- DruidProject(dimZipf=[$1], EXPR$0=[$2], dimMultivalEnumerated0=[$0],
druid=[logical])
+ DruidProject(dimZipf=[$1], EXPR$00=[$2], dimMultivalEnumerated0=[$0],
druid=[logical])
DruidUnnest(unnestExpr=[MV_TO_ARRAY($0)], filter=[=($0, 'Hello')])
DruidProject(dimMultivalEnumerated=[$0], dimZipf=[$1], druid=[logical])
DruidUnnest(unnestExpr=[MV_TO_ARRAY($0)], filter=[=($0, 'Baz')])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]