This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d323afa47e Decoupled dart: support unnest/join (#18232)
9d323afa47e is described below

commit 9d323afa47e43b4772ad1cadf0a6e8841da9d0cf
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Wed Sep 17 09:43:19 2025 +0200

    Decoupled dart: support unnest/join (#18232)
---
 .../druid/msq/kernel/StageDefinitionBuilder.java   |   3 +-
 .../DruidLogicalToQueryDefinitionTranslator.java   |  52 ++---
 .../apache/druid/msq/logical/LogicalInputSpec.java |  70 ++++++-
 .../org/apache/druid/msq/logical/StageMaker.java   |  39 ++--
 .../druid/msq/logical/stages/GroupByStages.java    |  12 +-
 .../apache/druid/msq/logical/stages/JoinStage.java | 211 +++++++++++++++++++++
 .../apache/druid/msq/logical/stages/ReadStage.java |  56 ++++++
 .../druid/msq/logical/stages/SegmentMapStage.java  |  51 +++++
 .../druid/msq/logical/stages/UnnestStage.java      |  41 ++++
 .../msq/querykit/scan/ScanQueryStageProcessor.java |  12 ++
 multi-stage-query/src/main/resources/log4j2.xml    |  28 ++-
 ...ier.java => AbstractDartComponentSupplier.java} |  10 +-
 .../org/apache/druid/msq/test/CalciteDartTest.java |  18 ++
 .../druid/msq/test/DartComponentSupplier.java      | 135 +------------
 .../test/DecoupledDartCalciteArraysQueryTest.java  |  69 +++++++
 .../test/DecoupledDartCalciteJoinQueryTest.java    | 124 ++++++++++++
 .../DecoupledDartCalciteNestedDataQueryTest.java   |  60 ++++++
 .../druid/msq/test/DecoupledDartExtension.java     |   2 +-
 .../sql/calcite/NotYetSupportedUsageTest.java      |  36 ++--
 .../apache/druid/segment/column/RowSignature.java  |  10 +
 .../druid/sql/calcite/util/datasets/Larry.java     | 104 ++++++++++
 .../sql/calcite/util/datasets/TestDataSet.java     |   1 +
 .../sql/calcite/planner/CalciteRulesManager.java   |  15 +-
 .../druid/sql/calcite/rel/logical/DruidJoin.java   |   7 +
 .../druid/sql/avatica/DruidAvaticaHandlerTest.java |  13 ++
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |  20 +-
 .../druid/sql/calcite/CalciteArraysQueryTest.java  |  86 +++++++--
 .../druid/sql/calcite/CalciteJoinQueryTest.java    |  71 +++++--
 .../sql/calcite/CalciteNestedDataQueryTest.java    |  16 +-
 .../apache/druid/sql/calcite/CalciteQueryTest.java |  10 +-
 .../druid/sql/calcite/DrillWindowQueryTest.java    |   2 +-
 .../apache/druid/sql/calcite/NotYetSupported.java  |  37 +++-
 .../apache/druid/sql/calcite/QueryTestBuilder.java |   2 +-
 .../calcite/planner/CalcitePlannerModuleTest.java  |   2 +-
 .../druid/sql/calcite/util/TestDataBuilder.java    |   4 +
 .../sql/calcite/util/TestTimelineServerView.java   |  18 +-
 ...estThriceWithFiltersOnDimAndAllUnnestColumns.iq |   4 +-
 37 files changed, 1145 insertions(+), 306 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
index a0edc224f8c..8b2815939f1 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinitionBuilder.java
@@ -28,6 +28,7 @@ import org.apache.druid.segment.column.RowSignature;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Builder for {@link StageDefinition}. See class-level javadoc for that class 
for a description of the parameters.
@@ -76,7 +77,7 @@ public class StageDefinitionBuilder
     return inputs(Arrays.asList(inputSpecs));
   }
 
-  public StageDefinitionBuilder broadcastInputs(final IntSet 
broadcastInputNumbers)
+  public StageDefinitionBuilder broadcastInputs(final Set<Integer> 
broadcastInputNumbers)
   {
     this.broadcastInputNumbers.clear();
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
index 5df11c6f4f8..138bc1f2995 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/DruidLogicalToQueryDefinitionTranslator.java
@@ -23,27 +23,22 @@ import com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelNode;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.key.KeyColumn;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.msq.input.inline.InlineInputSpec;
-import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.logical.stages.JoinStage;
 import org.apache.druid.msq.logical.stages.LogicalStage;
 import org.apache.druid.msq.logical.stages.OffsetLimitStage;
 import org.apache.druid.msq.logical.stages.ReadStage;
 import org.apache.druid.msq.logical.stages.SortStage;
-import org.apache.druid.query.InlineDataSource;
-import org.apache.druid.query.TableDataSource;
+import org.apache.druid.msq.logical.stages.UnnestStage;
 import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
-import 
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
 import org.apache.druid.sql.calcite.rel.DruidQuery;
+import org.apache.druid.sql.calcite.rel.logical.DruidJoin;
 import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;
 import org.apache.druid.sql.calcite.rel.logical.DruidSort;
-import org.apache.druid.sql.calcite.rel.logical.DruidTableScan;
-import org.apache.druid.sql.calcite.rel.logical.DruidValues;
+import org.apache.druid.sql.calcite.rule.logical.DruidUnnest;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
@@ -89,7 +84,7 @@ public class DruidLogicalToQueryDefinitionTranslator
     List<LogicalStage> inputStages = buildInputStages(stack);
     DruidLogicalNode node = stack.getNode();
     if (inputStages.size() == 0) {
-      Optional<ReadStage> stage = buildReadStage(node);
+      Optional<ReadStage> stage = ReadStage.buildReadStage(stack);
       if (stage.isPresent()) {
         return stage.get();
       }
@@ -104,19 +99,21 @@ public class DruidLogicalToQueryDefinitionTranslator
       if (newStage != null) {
         return newStage;
       }
+    } else {
+      LogicalStage newStage = buildMultiInputStage(inputStages, stack);
+      if (newStage != null) {
+        return newStage;
+      }
     }
     throw DruidException.defensive().build("Unable to process relNode[%s]", 
node);
   }
 
-  private Optional<ReadStage> buildReadStage(DruidLogicalNode node)
+  private LogicalStage buildMultiInputStage(List<LogicalStage> inputStages, 
DruidNodeStack stack)
   {
-    if (node instanceof DruidValues) {
-      return translateValues((DruidValues) node);
+    if (stack.getNode() instanceof DruidJoin) {
+      return JoinStage.buildJoinStage(inputStages, stack);
     }
-    if (node instanceof DruidTableScan) {
-      return translateTableScan((DruidTableScan) node);
-    }
-    return Optional.empty();
+    return null;
   }
 
   private LogicalStage makeSequenceStage(LogicalStage inputStage, 
DruidNodeStack stack)
@@ -133,6 +130,9 @@ public class DruidLogicalToQueryDefinitionTranslator
         return sortStage;
       }
     }
+    if (stack.getNode() instanceof DruidUnnest) {
+      return UnnestStage.buildUnnestStage(inputStage, stack);
+    }
     return new ReadStage(inputStage.getLogicalRowSignature(), 
LogicalInputSpec.of(inputStage)).extendWith(stack);
   }
 
@@ -148,22 +148,4 @@ public class DruidLogicalToQueryDefinitionTranslator
     }
     return inputStages;
   }
-
-  private Optional<ReadStage> translateTableScan(DruidTableScan node)
-  {
-    SourceDesc sd = node.getSourceDesc(plannerContext, 
Collections.emptyList());
-    TableDataSource ids = (TableDataSource) sd.dataSource;
-    TableInputSpec inputSpec = new TableInputSpec(ids.getName(), 
Intervals.ONLY_ETERNITY, null, null);
-    ReadStage stage = new ReadStage(sd.rowSignature, 
LogicalInputSpec.of(inputSpec));
-    return Optional.of(stage);
-  }
-
-  private Optional<ReadStage> translateValues(DruidValues node)
-  {
-    SourceDesc sd = node.getSourceDesc(plannerContext, 
Collections.emptyList());
-    InlineDataSource ids = (InlineDataSource) sd.dataSource;
-    InlineInputSpec inputSpec = new InlineInputSpec(ids);
-    ReadStage stage = new ReadStage(sd.rowSignature, 
LogicalInputSpec.of(inputSpec));
-    return Optional.of(stage);
-  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
index f77e41d0aa6..2a3243c43e9 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/LogicalInputSpec.java
@@ -19,39 +19,91 @@
 
 package org.apache.druid.msq.logical;
 
-import org.apache.druid.error.NotYetImplemented;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.stage.StageInputSpec;
 import org.apache.druid.msq.kernel.StageDefinitionBuilder;
 import org.apache.druid.msq.logical.stages.LogicalStage;
+import org.apache.druid.msq.querykit.InputNumberDataSource;
+import org.apache.druid.query.DataSource;
 import org.apache.druid.segment.column.RowSignature;
+import 
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+
+import java.util.Collections;
+import java.util.Set;
 
 /**
  * Represents an {@link InputSpec} for {@link LogicalStage}-s.
  */
 public abstract class LogicalInputSpec
 {
+  public enum InputProperty
+  {
+    BROADCAST
+  }
+
+  final int inputIndex;
+  final Set<InputProperty> props;
+
+  public LogicalInputSpec(int inputIndex, Set<InputProperty> props)
+  {
+    this.inputIndex = inputIndex;
+    this.props = props;
+  }
+
   public abstract InputSpec toInputSpec(StageMaker maker);
 
   public abstract RowSignature getRowSignature();
 
+  /**
+   * Provides the {@link SourceDesc} for this input spec.
+   *
+   * Supplied to make it more easily interoperable with {@link DataSource}
+   * backed features like {@link DataSource#createSegmentMapFunction}.
+   */
+  public final SourceDesc getSourceDesc()
+  {
+    InputNumberDataSource ds = new InputNumberDataSource(inputIndex);
+    return new SourceDesc(ds, getRowSignature());
+  }
+
+  public final boolean hasProperty(InputProperty prop)
+  {
+    return props.contains(prop);
+  }
+
   public static LogicalInputSpec of(LogicalStage inputStage)
   {
-    return new DagStageInputSpec(inputStage);
+    return of(inputStage, 0, Collections.emptySet());
+  }
+
+  public static LogicalInputSpec of(InputSpec inputSpec, RowSignature 
rowSignature)
+  {
+    return new PhysicalInputSpec(inputSpec, 0, rowSignature, 
Collections.emptySet());
   }
 
-  public static LogicalInputSpec of(InputSpec inputSpec)
+  public static LogicalInputSpec of(LogicalStage logicalStage, int inputIndex, 
InputProperty prop)
   {
-    return new PhysicalInputSpec(inputSpec);
+    return of(logicalStage, inputIndex, Collections.singleton(prop));
+  }
+
+  public static LogicalInputSpec of(LogicalStage logicalStage, int inputIndex, 
Set<InputProperty> props)
+  {
+    // could potentially unwrap LogicalStage if some conditions are met
+    // logicalStage.unwrap(InputSpec.class);
+    // partial: 
https://github.com/kgyrtkirk/druid/commit/9a541f69361f341c537ee196514d2d6a00ae3feb
+    return new DagStageInputSpec(logicalStage, inputIndex, props);
   }
 
   static class PhysicalInputSpec extends LogicalInputSpec
   {
     private InputSpec inputSpec;
+    private RowSignature rowSignature;
 
-    public PhysicalInputSpec(InputSpec inputSpec)
+    public PhysicalInputSpec(InputSpec inputSpec, int inputIndex, RowSignature 
rowSignature, Set<InputProperty> props)
     {
+      super(inputIndex, props);
       this.inputSpec = inputSpec;
+      this.rowSignature = rowSignature;
     }
 
     @Override
@@ -63,17 +115,17 @@ public abstract class LogicalInputSpec
     @Override
     public RowSignature getRowSignature()
     {
-      throw NotYetImplemented.ex(null, "Not supported for this type");
+      return rowSignature;
     }
   }
 
   static class DagStageInputSpec extends LogicalInputSpec
   {
+    protected LogicalStage inputStage;
 
-    private LogicalStage inputStage;
-
-    public DagStageInputSpec(LogicalStage inputStage)
+    public DagStageInputSpec(LogicalStage inputStage, int inputIndex, 
Set<InputProperty> props)
     {
+      super(inputIndex, props);
       this.inputStage = inputStage;
     }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
index c957ca21773..4885a7d3e7d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/StageMaker.java
@@ -26,10 +26,12 @@ import org.apache.druid.msq.kernel.MixShuffleSpec;
 import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.StageDefinitionBuilder;
+import org.apache.druid.msq.logical.LogicalInputSpec.InputProperty;
 import org.apache.druid.msq.logical.stages.AbstractFrameProcessorStage;
 import org.apache.druid.msq.logical.stages.AbstractShuffleStage;
 import org.apache.druid.msq.logical.stages.LogicalStage;
 import org.apache.druid.msq.querykit.scan.ScanQueryStageProcessor;
+import org.apache.druid.query.DataSource;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.VirtualColumns;
@@ -38,9 +40,11 @@ import org.apache.druid.sql.calcite.planner.PlannerContext;
 
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Builds {@link QueryDefinition} from {@link LogicalStage}-s.
@@ -91,13 +95,8 @@ public class StageMaker
   private StageDefinitionBuilder 
buildFrameProcessorStage(AbstractFrameProcessorStage frameProcessorStage)
   {
     List<LogicalInputSpec> inputs = frameProcessorStage.getInputSpecs();
-    List<InputSpec> inputSpecs = new ArrayList<>();
-    for (LogicalInputSpec dagInputSpec : inputs) {
-      inputSpecs.add(dagInputSpec.toInputSpec(this));
-    }
+    StageDefinitionBuilder sdb = newStageDefinitionBuilder(inputs);
     StageProcessor<?, ?> stageProcessor = 
frameProcessorStage.buildStageProcessor(this);
-    StageDefinitionBuilder sdb = newStageDefinitionBuilder();
-    sdb.inputs(inputSpecs);
     sdb.signature(frameProcessorStage.getLogicalRowSignature());
     sdb.processor(stageProcessor);
     sdb.shuffleSpec(MixShuffleSpec.instance());
@@ -107,21 +106,28 @@ public class StageMaker
   private StageDefinitionBuilder buildShuffleStage(AbstractShuffleStage stage)
   {
     List<LogicalInputSpec> inputs = stage.getInputSpecs();
-    List<InputSpec> inputSpecs = new ArrayList<>();
-    for (LogicalInputSpec dagInputSpec : inputs) {
-      inputSpecs.add(dagInputSpec.toInputSpec(this));
-    }
-    StageDefinitionBuilder sdb = newStageDefinitionBuilder();
-    sdb.inputs(inputSpecs);
+    StageDefinitionBuilder sdb = newStageDefinitionBuilder(inputs);
     sdb.signature(stage.getRowSignature());
     sdb.processor(makeScanStageProcessor(VirtualColumns.EMPTY, 
stage.getRowSignature(), null));
     sdb.shuffleSpec(stage.buildShuffleSpec());
     return sdb;
   }
 
-  private StageDefinitionBuilder newStageDefinitionBuilder()
+  private StageDefinitionBuilder 
newStageDefinitionBuilder(List<LogicalInputSpec> inputs)
   {
-    return StageDefinition.builder(getNextStageId());
+    List<InputSpec> inputSpecs = new ArrayList<>();
+    Set<Integer> broadcastInputs = new HashSet<>();
+    for (int i = 0; i < inputs.size(); i++) {
+      LogicalInputSpec dagInputSpec = inputs.get(i);
+      inputSpecs.add(dagInputSpec.toInputSpec(this));
+      if (dagInputSpec.hasProperty(InputProperty.BROADCAST)) {
+        broadcastInputs.add(i);
+      }
+    }
+    StageDefinitionBuilder sdb = StageDefinition.builder(getNextStageId());
+    sdb.broadcastInputs(broadcastInputs);
+    sdb.inputs(inputSpecs);
+    return sdb;
   }
 
   private int getNextStageId()
@@ -152,4 +158,9 @@ public class StageMaker
     }
     return plannerContext.getSqlQueryId();
   }
+
+  public StageProcessor<?, ?> makeSegmentMapProcessor(RowSignature signature, 
DataSource dataSource)
+  {
+    return ScanQueryStageProcessor.makeSegmentMapFnProcessor(signature, 
dataSource);
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
index da992c3e1fa..e955c6a4052 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/GroupByStages.java
@@ -27,8 +27,6 @@ import org.apache.druid.msq.logical.LogicalInputSpec;
 import org.apache.druid.msq.logical.StageMaker;
 import org.apache.druid.msq.querykit.groupby.GroupByPostShuffleStageProcessor;
 import org.apache.druid.msq.querykit.groupby.GroupByPreShuffleStageProcessor;
-import org.apache.druid.query.DataSource;
-import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.spec.QuerySegmentSpec;
 import org.apache.druid.segment.column.RowSignature;
@@ -43,14 +41,6 @@ import java.util.List;
 
 public class GroupByStages
 {
-  /**
-   * An input datasource is needed to construct a valid {@link GroupByQuery}.
-   *
-   * During staged execution the data is coming in thru channels - so this is
-   * just a placeholder.
-   */
-  private static final DataSource DUMMY_INPUT_DATASOURCE = new 
TableDataSource("__input__");
-
   public static class PreShuffleStage extends ProjectStage
   {
     private GroupByQuery groupByQuery;
@@ -116,7 +106,7 @@ public class GroupByStages
     builder.setDimFilter(projectStage.getDimFilter());
     builder.setVirtualColumns(projectStage.getVirtualColumns());
     builder.setPostAggregatorSpecs(grouping.getPostAggregators());
-    builder.setDataSource(DUMMY_INPUT_DATASOURCE);
+    
builder.setDataSource(LogicalInputSpec.of(projectStage).getSourceDesc().dataSource);
     return builder.build();
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
new file mode 100644
index 00000000000..96cf3d3510b
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/JoinStage.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.frame.key.ClusterBy;
+import org.apache.druid.frame.key.KeyColumn;
+import org.apache.druid.msq.exec.StageProcessor;
+import org.apache.druid.msq.kernel.HashShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.msq.querykit.QueryKitUtils;
+import org.apache.druid.msq.querykit.common.SortMergeJoinStageProcessor;
+import org.apache.druid.query.JoinAlgorithm;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.join.JoinConditionAnalysis;
+import org.apache.druid.segment.join.JoinType;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.expression.Expressions;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import 
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+import org.apache.druid.sql.calcite.rel.logical.DruidJoin;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Represents a stage that reads data from input sources.
+ */
+public class JoinStage
+{
+
+  static class ShuffleStage extends AbstractShuffleStage
+  {
+    protected final List<KeyColumn> keyColumns;
+
+    public ShuffleStage(LogicalStage inputStage, List<KeyColumn> keyColumns)
+    {
+      super(
+          QueryKitUtils.sortableSignature(inputStage.getLogicalRowSignature(), 
keyColumns),
+          LogicalInputSpec.of(inputStage)
+      );
+      this.keyColumns = keyColumns;
+    }
+
+    @Override
+    public RowSignature getLogicalRowSignature()
+    {
+      return inputSpecs.get(0).getRowSignature();
+    }
+
+    @Override
+    public ShuffleSpec buildShuffleSpec()
+    {
+      final ClusterBy clusterBy = new ClusterBy(keyColumns, 0);
+      return new HashShuffleSpec(clusterBy, 1);
+    }
+
+    @Override
+    public LogicalStage extendWith(DruidNodeStack stack)
+    {
+      return null;
+    }
+  }
+
+  public static class SortMergeStage extends AbstractFrameProcessorStage
+  {
+
+    private String rightPrefix;
+    private JoinConditionAnalysis conditionAnalysis;
+    private JoinType joinType;
+
+    public SortMergeStage(RowSignature signature, List<LogicalInputSpec> 
inputs, String rightPrefix,
+        JoinConditionAnalysis conditionAnalysis, JoinType joinType)
+    {
+      super(signature, inputs);
+      this.rightPrefix = rightPrefix;
+      this.conditionAnalysis = conditionAnalysis;
+      this.joinType = joinType;
+    }
+
+    @Override
+    public LogicalStage extendWith(DruidNodeStack stack)
+    {
+      return null;
+    }
+
+    @Override
+    public StageProcessor<?, ?> buildStageProcessor(StageMaker stageMaker)
+    {
+      return new SortMergeJoinStageProcessor(
+          rightPrefix,
+          conditionAnalysis,
+          joinType
+      );
+    }
+  }
+
+  /** similar to {@link DruidJoinQueryRel#buildJoinSourceDesc} */
+  public static LogicalStage buildJoinStage(List<LogicalStage> inputStages, 
DruidNodeStack stack)
+  {
+    DruidJoin join = (DruidJoin) stack.getNode();
+    if (join.getJoinAlgorithm(stack.getPlannerContext()) == 
JoinAlgorithm.SORT_MERGE) {
+      return buildMergeJoin(inputStages, stack, join);
+    } else {
+      return buildBroadcastJoin(inputStages, stack, join);
+    }
+  }
+
+  private static LogicalStage buildBroadcastJoin(List<LogicalStage> 
inputStages, DruidNodeStack stack, DruidJoin join)
+  {
+    PlannerContext plannerContext = stack.getPlannerContext();
+    List<LogicalInputSpec> inputDescs = new ArrayList<>();
+    inputDescs.add(LogicalInputSpec.of(inputStages.get(0)));
+    for (int i = 1; i < inputStages.size(); i++) {
+      inputDescs.add(LogicalInputSpec.of(inputStages.get(i), i, 
LogicalInputSpec.InputProperty.BROADCAST));
+    }
+    SourceDesc unnestSD = join.getSourceDesc(plannerContext, 
Lists.transform(inputDescs, LogicalInputSpec::getSourceDesc));
+    return new SegmentMapStage(unnestSD, inputDescs);
+  }
+
+  private static LogicalStage buildMergeJoin(List<LogicalStage> inputStages, 
DruidNodeStack stack, DruidJoin join)
+  {
+    String prefix = findUnusedJoinPrefix(inputStages.get(0).getRowSignature());
+
+    RowSignature signature = RowSignature.builder()
+        .addAll(inputStages.get(0).getLogicalRowSignature())
+        .addAll(inputStages.get(1).getLogicalRowSignature().withPrefix(prefix))
+        .build();
+
+    PlannerContext plannerContext = stack.getPlannerContext();
+    VirtualColumnRegistry virtualColumnRegistry = VirtualColumnRegistry.create(
+        signature,
+        plannerContext.getExpressionParser(),
+        plannerContext.getPlannerConfig().isForceExpressionVirtualColumns()
+    );
+    
plannerContext.setJoinExpressionVirtualColumnRegistry(virtualColumnRegistry);
+
+    // Generate the condition for this join as a Druid expression.
+    final DruidExpression condition = Expressions.toDruidExpression(
+        plannerContext,
+        signature,
+        join.getCondition()
+    );
+
+    // Unsetting it to avoid any VC Registry leaks incase there are multiple
+    // druid quries for the SQL
+    // It should be fixed soon with changes in interface for
+    // SqlOperatorConversion and Expressions bridge class
+    plannerContext.setJoinExpressionVirtualColumnRegistry(null);
+
+    if (!virtualColumnRegistry.isEmpty()) {
+      throw DruidException.defensive("Not sure how to handle this right now - 
it should be fixed");
+    }
+
+    JoinConditionAnalysis analysis = JoinConditionAnalysis.forExpression(
+        condition.getExpression(),
+        plannerContext.parseExpression(condition.getExpression()),
+        prefix
+    );
+
+    // Partition by keys given by the join condition.
+    final List<List<KeyColumn>> partitionKeys = 
SortMergeJoinStageProcessor.toKeyColumns(
+        SortMergeJoinStageProcessor.validateCondition(analysis)
+    );
+
+    List<LogicalStage> shuffleStages = new ArrayList<>();
+    for (int i = 0; i < inputStages.size(); i++) {
+      LogicalStage inputStage = inputStages.get(i);
+      shuffleStages.add(new ShuffleStage(inputStage, partitionKeys.get(i)));
+
+    }
+
+    return new SortMergeStage(
+        signature,
+        Lists.transform(shuffleStages, LogicalInputSpec::of),
+        prefix,
+        analysis,
+        DruidJoinQueryRel.toDruidJoinType(join.getJoinType())
+    );
+  }
+
+  private static String findUnusedJoinPrefix(RowSignature rowSignature)
+  {
+    List<String> leftColumnNames = rowSignature.getColumnNames();
+    return Calcites.findUnusedPrefixForDigits("j", leftColumnNames) + "0";
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
index b28d2cb2914..09ee864f582 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/ReadStage.java
@@ -19,20 +19,39 @@
 
 package org.apache.druid.msq.logical.stages;
 
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.msq.exec.StageProcessor;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.inline.InlineInputSpec;
+import org.apache.druid.msq.input.lookup.LookupInputSpec;
+import org.apache.druid.msq.input.table.TableInputSpec;
 import org.apache.druid.msq.logical.LogicalInputSpec;
 import org.apache.druid.msq.logical.StageMaker;
+import org.apache.druid.query.DataSource;
+import org.apache.druid.query.InlineDataSource;
+import org.apache.druid.query.LookupDataSource;
+import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
 import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
+import 
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
 import org.apache.druid.sql.calcite.rel.DruidQuery;
 import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 import org.apache.druid.sql.calcite.rel.logical.DruidAggregate;
 import org.apache.druid.sql.calcite.rel.logical.DruidFilter;
 import org.apache.druid.sql.calcite.rel.logical.DruidLogicalNode;
 import org.apache.druid.sql.calcite.rel.logical.DruidProject;
+import org.apache.druid.sql.calcite.rel.logical.DruidTableScan;
+import org.apache.druid.sql.calcite.rel.logical.DruidValues;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.Optional;
 
 /**
  * Represents a stage that reads data from input sources.
@@ -98,4 +117,41 @@ public class ReadStage extends AbstractFrameProcessorStage
   {
     return StageMaker.makeScanStageProcessor(VirtualColumns.EMPTY, signature, 
null);
   }
+
+  public static Optional<ReadStage> buildReadStage(DruidNodeStack stack)
+  {
+    DruidLogicalNode node = stack.getNode();
+    if (node instanceof DruidValues || node instanceof DruidTableScan) {
+      return translateViaSourceDesc(stack.getPlannerContext(), 
(SourceDescProducer) node);
+    }
+    return Optional.empty();
+  }
+
+  private static Optional<ReadStage> translateViaSourceDesc(PlannerContext 
plannerContext, SourceDescProducer node)
+  {
+    SourceDesc sd = node.getSourceDesc(plannerContext, 
Collections.emptyList());
+    InputSpec inputSpec = translateDataSource(sd.dataSource);
+    ReadStage stage = new ReadStage(sd.rowSignature, 
LogicalInputSpec.of(inputSpec, sd.rowSignature));
+    return Optional.of(stage);
+  }
+
+  @Nonnull
+  private static InputSpec translateDataSource(DataSource dataSource)
+  {
+    if (dataSource instanceof TableDataSource) {
+      TableDataSource ids = (TableDataSource) dataSource;
+      TableInputSpec inputSpec = new TableInputSpec(ids.getName(), 
Intervals.ONLY_ETERNITY, null, null);
+      return inputSpec;
+    }
+    if (dataSource instanceof InlineDataSource) {
+      InlineDataSource ids = (InlineDataSource) dataSource;
+      InlineInputSpec inputSpec = new InlineInputSpec(ids);
+      return inputSpec;
+    }
+    if (dataSource instanceof LookupDataSource) {
+      LookupDataSource lookupDataSource = (LookupDataSource) dataSource;
+      return new LookupInputSpec(lookupDataSource.getLookupName());
+    }
+    throw DruidException.defensive("This type of data source [%s] is not 
currently supported.", dataSource.getClass());
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SegmentMapStage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SegmentMapStage.java
new file mode 100644
index 00000000000..a1b2a80b4ac
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/SegmentMapStage.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.exec.StageProcessor;
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.msq.logical.StageMaker;
+import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import 
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+
+import java.util.List;
+
+public class SegmentMapStage extends AbstractFrameProcessorStage
+{
+  private SourceDesc sourceDesc;
+
+  public SegmentMapStage(SourceDesc sourceDesc, List<LogicalInputSpec> inputs)
+  {
+    super(sourceDesc.rowSignature, inputs);
+    this.sourceDesc = sourceDesc;
+  }
+
+  @Override
+  public LogicalStage extendWith(DruidNodeStack stack)
+  {
+    return null;
+  }
+
+  @Override
+  public StageProcessor<?, ?> buildStageProcessor(StageMaker stageMaker)
+  {
+    return stageMaker.makeSegmentMapProcessor(signature, 
sourceDesc.dataSource);
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/UnnestStage.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/UnnestStage.java
new file mode 100644
index 00000000000..a69ff0dc0d9
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/logical/stages/UnnestStage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.logical.stages;
+
+import org.apache.druid.msq.logical.LogicalInputSpec;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import 
org.apache.druid.sql.calcite.planner.querygen.DruidQueryGenerator.DruidNodeStack;
+import 
org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer.SourceDesc;
+import org.apache.druid.sql.calcite.rule.logical.DruidUnnest;
+
+import java.util.Collections;
+
+public class UnnestStage
+{
+  public static LogicalStage buildUnnestStage(LogicalStage inputStage, 
DruidNodeStack stack)
+  {
+    DruidUnnest unnest = (DruidUnnest) stack.getNode();
+    PlannerContext plannerContext = stack.getPlannerContext();
+    LogicalInputSpec logicalInputSpec = LogicalInputSpec.of(inputStage);
+    SourceDesc sourceDesc = logicalInputSpec.getSourceDesc();
+    SourceDesc unnestSD = unnest.getSourceDesc(plannerContext, 
Collections.singletonList(sourceDesc));
+    return new SegmentMapStage(unnestSD, 
Collections.singletonList(logicalInputSpec));
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryStageProcessor.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryStageProcessor.java
index 5be30fe72c6..61dcc8631a3 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryStageProcessor.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryStageProcessor.java
@@ -30,6 +30,7 @@ import org.apache.druid.frame.write.FrameWriterFactory;
 import org.apache.druid.msq.exec.FrameContext;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.querykit.BaseLeafStageProcessor;
+import org.apache.druid.query.DataSource;
 import org.apache.druid.query.Druids;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.scan.ScanQuery;
@@ -85,6 +86,17 @@ public class ScanQueryStageProcessor extends 
BaseLeafStageProcessor
     return new ScanQueryStageProcessor(scanQuery);
   }
 
+  public static ScanQueryStageProcessor makeSegmentMapFnProcessor(RowSignature 
signature, DataSource dataSource)
+  {
+    ScanQuery scanQuery = Druids.newScanQueryBuilder()
+        .dataSource(dataSource)
+        .intervals(QuerySegmentSpec.ETERNITY)
+        .columns(signature.getColumnNames())
+        .columnTypes(signature.getColumnTypes())
+        .build();
+    return new ScanQueryStageProcessor(scanQuery);
+  }
+
   @JsonProperty
   public ScanQuery getQuery()
   {
diff --git a/multi-stage-query/src/main/resources/log4j2.xml 
b/multi-stage-query/src/main/resources/log4j2.xml
index d98bb05ef6c..67d1e66244b 100644
--- a/multi-stage-query/src/main/resources/log4j2.xml
+++ b/multi-stage-query/src/main/resources/log4j2.xml
@@ -23,19 +23,31 @@
     <Console name="Console" target="SYSTEM_OUT">
       <PatternLayout pattern="%d{ISO8601} %p [%t] %c - %m%n"/>
     </Console>
+
+    <Routing name="RoutingAppender">
+      <Routes pattern="$${ctx:task.log.id}">
+        <!-- Task logs on CliIndexer should go to dedicated file -->
+        <Route>
+          <File name="task-${ctx:task.log.id}" fileName="${ctx:task.log.file}">
+            <PatternLayout pattern="%d{ISO8601} %p [%t] %c -%notEmpty{ 
[%markerSimpleName]} %m%n"/>
+          </File>
+        </Route>
+
+        <!-- Default route to send non-task logs to the Console -->
+        <Route key="$${ctx:task.log.id}" ref="Console"/>
+      </Routes>
+    </Routing>
+
   </Appenders>
   <Loggers>
     <Root level="info">
-      <AppenderRef ref="Console"/>
+      <AppenderRef ref="RoutingAppender"/>
     </Root>
-    <Logger level="info" name="org.apache.druid.frame" additivity="false">
-      <AppenderRef ref="Console"/>
-    </Logger>
-    <Logger level="info" name="org.apache.druid.msq" additivity="false">
-      <AppenderRef ref="Console"/>
+    <Logger level="info" name="org.apache.druid" additivity="false">
+      <AppenderRef ref="RoutingAppender"/>
     </Logger>
-    <Logger level="info" name="org.apache.calcite" additivity="false">
-      <AppenderRef ref="Console"/>
+    <Logger level="debug" 
name="org.apache.calcite.plan.AbstractRelOptPlanner.rule_execution_summary" 
additivity="false">
+      <AppenderRef ref="RoutingAppender"/>
     </Logger>
   </Loggers>
 </Configuration>
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
similarity index 93%
copy from 
multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
copy to 
multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
index 249a17c07fd..c7313c4c098 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
@@ -45,11 +45,10 @@ import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
 import org.apache.druid.sql.avatica.DartDruidMeta;
 import org.apache.druid.sql.avatica.DruidMeta;
-import org.apache.druid.sql.calcite.TempDirProducer;
 import org.apache.druid.sql.calcite.run.SqlEngine;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.DruidModuleCollection;
-import 
org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
+import 
org.apache.druid.sql.calcite.util.SqlTestFramework.QueryComponentSupplier;
 import org.apache.druid.sql.calcite.util.datasets.TestDataSet;
 
 import java.nio.ByteBuffer;
@@ -58,11 +57,11 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class DartComponentSupplier extends AbstractMSQComponentSupplierDelegate
+public abstract class AbstractDartComponentSupplier extends 
AbstractMSQComponentSupplierDelegate
 {
-  public DartComponentSupplier(TempDirProducer tempFolderProducer)
+  public AbstractDartComponentSupplier(QueryComponentSupplier 
componentSupplier)
   {
-    super(new StandardComponentSupplier(tempFolderProducer));
+    super(componentSupplier);
   }
 
   @Override
@@ -78,7 +77,6 @@ public class DartComponentSupplier extends 
AbstractMSQComponentSupplierDelegate
     walker.add(TestDataSet.NUMBERS, TestHelper.JSON_MAPPER, 
getTempDirProducer().newTempFolder("tmp_numbers"));
     return super.addSegmentsToWalker(walker);
   }
-
   @Override
   public DruidModule getCoreModule()
   {
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
index 4ffd0f8db98..3623983fa66 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteDartTest.java
@@ -259,6 +259,24 @@ public class CalciteDartTest extends BaseCalciteQueryTest
         .run();
   }
 
+  @Test
+  public void testJoin()
+  {
+    testBuilder()
+        .sql("SELECT f1.dim2 from foo f1 join foo f2 on f1.dim1 = f2.dim1 
order by f1.dim2")
+        .expectedResults(
+            ImmutableList.of(
+                new Object[]{null},
+                new Object[]{null},
+                new Object[]{""},
+                new Object[]{"a"},
+                new Object[]{"a"},
+                new Object[]{"abc"}
+            )
+        )
+        .run();
+  }
+
   @Test
   public void testSubQuery()
   {
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
index 249a17c07fd..409bd73ae10 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DartComponentSupplier.java
@@ -19,146 +19,13 @@
 
 package org.apache.druid.msq.test;
 
-import com.google.inject.Binder;
-import com.google.inject.Provides;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.client.coordinator.NoopCoordinatorClient;
-import org.apache.druid.collections.NonBlockingPool;
-import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
-import org.apache.druid.guice.LazySingleton;
-import org.apache.druid.guice.annotations.EscalatedGlobal;
-import org.apache.druid.guice.annotations.Merging;
-import org.apache.druid.initialization.DruidModule;
-import org.apache.druid.java.util.http.client.HttpClient;
-import org.apache.druid.msq.dart.Dart;
-import org.apache.druid.msq.dart.controller.DartControllerContextFactory;
-import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
-import org.apache.druid.msq.dart.guice.DartControllerModule;
-import org.apache.druid.msq.dart.guice.DartModules;
-import org.apache.druid.msq.dart.guice.DartWorkerMemoryManagementModule;
-import org.apache.druid.msq.dart.guice.DartWorkerModule;
-import org.apache.druid.msq.exec.WorkerRunRef;
-import org.apache.druid.query.TestBufferPool;
-import org.apache.druid.rpc.ServiceClientFactory;
-import org.apache.druid.rpc.guice.ServiceClientModule;
-import org.apache.druid.segment.TestHelper;
-import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
-import org.apache.druid.sql.avatica.DartDruidMeta;
-import org.apache.druid.sql.avatica.DruidMeta;
 import org.apache.druid.sql.calcite.TempDirProducer;
-import org.apache.druid.sql.calcite.run.SqlEngine;
-import org.apache.druid.sql.calcite.util.CalciteTests;
-import org.apache.druid.sql.calcite.util.DruidModuleCollection;
 import 
org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
-import org.apache.druid.sql.calcite.util.datasets.TestDataSet;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class DartComponentSupplier extends AbstractMSQComponentSupplierDelegate
+public class DartComponentSupplier extends AbstractDartComponentSupplier
 {
   public DartComponentSupplier(TempDirProducer tempFolderProducer)
   {
     super(new StandardComponentSupplier(tempFolderProducer));
   }
-
-  @Override
-  public void gatherProperties(Properties properties)
-  {
-    super.gatherProperties(properties);
-    properties.put(DartModules.DART_ENABLED_PROPERTY, "true");
-  }
-
-  @Override
-  public SpecificSegmentsQuerySegmentWalker 
addSegmentsToWalker(SpecificSegmentsQuerySegmentWalker walker)
-  {
-    walker.add(TestDataSet.NUMBERS, TestHelper.JSON_MAPPER, 
getTempDirProducer().newTempFolder("tmp_numbers"));
-    return super.addSegmentsToWalker(walker);
-  }
-
-  @Override
-  public DruidModule getCoreModule()
-  {
-    return DruidModuleCollection.of(
-        super.getCoreModule(),
-        new DartControllerModule(),
-        new DartWorkerModule(),
-        new DartWorkerMemoryManagementModule(),
-        new DartTestCoreModule()
-    );
-  }
-
-  @Override
-  public DruidModule getOverrideModule()
-  {
-    return DruidModuleCollection.of(
-        super.getOverrideModule(),
-        new DartTestOverrideModule()
-    );
-  }
-
-  @Override
-  public Class<? extends SqlEngine> getSqlEngineClass()
-  {
-    return DartSqlEngine.class;
-  }
-
-  static class DartTestCoreModule implements DruidModule
-  {
-    @Provides
-    @EscalatedGlobal
-    final ServiceClientFactory getServiceClientFactory(HttpClient ht)
-    {
-      return ServiceClientModule.makeServiceClientFactory(ht);
-
-    }
-
-    @Provides
-    final DruidNodeDiscoveryProvider getDiscoveryProvider()
-    {
-      return new 
CalciteTests.FakeDruidNodeDiscoveryProvider(Collections.emptyMap());
-    }
-
-    @Override
-    public void configure(Binder binder)
-    {
-      binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class);
-    }
-  }
-
-  static class DartTestOverrideModule implements DruidModule
-  {
-    @Provides
-    @LazySingleton
-    public DruidMeta createMeta(DartDruidMeta druidMeta)
-    {
-      return druidMeta;
-    }
-
-    @Override
-    public void configure(Binder binder)
-    {
-      binder.bind(DartControllerContextFactory.class)
-          .to(TestDartControllerContextFactoryImpl.class)
-          .in(LazySingleton.class);
-    }
-
-    @Provides
-    @Merging
-    NonBlockingPool<ByteBuffer> makeMergingBuffer(TestBufferPool bufferPool)
-    {
-      return bufferPool;
-    }
-
-    @Provides
-    @LazySingleton
-    @Dart
-    Map<String, WorkerRunRef> workerMap()
-    {
-      return new ConcurrentHashMap<>();
-    }
-  }
 }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteArraysQueryTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteArraysQueryTest.java
new file mode 100644
index 00000000000..3a55e10745c
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteArraysQueryTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.test;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.sql.calcite.CalciteArraysQueryTest;
+import org.apache.druid.sql.calcite.NotYetSupported;
+import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
+import org.junit.AssumptionViolatedException;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
[email protected](DartComponentSupplier.class)
+public class DecoupledDartCalciteArraysQueryTest extends CalciteArraysQueryTest
+{
+  @RegisterExtension
+  NotYetSupportedProcessor notYetSupportedProcessor = new 
NotYetSupportedProcessor(NotYetSupported.Scope.DECOUPLED_DART);
+
+  @RegisterExtension
+  DecoupledDartExtension decoupledExtension = new DecoupledDartExtension(this);
+
+  @Override
+  protected QueryTestBuilder testBuilder()
+  {
+    return decoupledExtension.testBuilder()
+        .queryContext(
+            ImmutableMap.<String, Object>builder()
+                .put(QueryContexts.CTX_PREPLANNED, true)
+                .put(QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE, 
QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED)
+                .put(QueryContexts.ENABLE_DEBUG, true)
+                .build()
+        );
+  }
+
+  @Override
+  protected void cannotVectorize()
+  {
+  }
+
+  @Override
+  protected void cannotVectorizeUnlessFallback()
+  {
+  }
+
+  @Override
+  protected void msqIncompatible()
+  {
+    throw new AssumptionViolatedException("Case marked as msqIncompatible; not 
trying dart right now");
+  }
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteJoinQueryTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteJoinQueryTest.java
new file mode 100644
index 00000000000..76124a7b3f4
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteJoinQueryTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.test;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.query.JoinAlgorithm;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.sql.calcite.CalciteJoinQueryTest;
+import org.apache.druid.sql.calcite.NotYetSupported;
+import org.apache.druid.sql.calcite.NotYetSupported.Modes;
+import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.junit.AssumptionViolatedException;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
[email protected](DartComponentSupplier.class)
+public abstract class DecoupledDartCalciteJoinQueryTest extends 
CalciteJoinQueryTest
+{
+
+  @Nested
+  public static class BroadcastTest extends DecoupledDartCalciteJoinQueryTest
+  {
+    @Override
+    protected JoinAlgorithm joinAlgorithm()
+    {
+      return JoinAlgorithm.BROADCAST;
+    }
+  }
+
+  @Nested
+  public static class SortMergeTest extends DecoupledDartCalciteJoinQueryTest
+  {
+    @Override
+    protected JoinAlgorithm joinAlgorithm()
+    {
+      return JoinAlgorithm.SORT_MERGE;
+    }
+
+    @NotYetSupported(Modes.DD_JOIN_CONDITION_NORMALIZATION)
+    @Test
+    @Override
+    public void testJoinWithInputRefCondition()
+    {
+      super.testJoinWithInputRefCondition();
+    }
+  }
+
+  @RegisterExtension
+  NotYetSupportedProcessor notYetSupportedProcessor = new 
NotYetSupportedProcessor(
+      NotYetSupported.Scope.DECOUPLED_DART
+  );
+
+  @RegisterExtension
+  DecoupledDartExtension decoupledExtension = new DecoupledDartExtension(this);
+
+  @Override
+  protected QueryTestBuilder testBuilder()
+  {
+    return decoupledExtension.testBuilder()
+        .queryContext(
+            ImmutableMap.<String, Object>builder()
+                .put(QueryContexts.CTX_PREPLANNED, true)
+                .put(
+                    QueryContexts.CTX_NATIVE_QUERY_SQL_PLANNING_MODE,
+                    QueryContexts.NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED
+                )
+                .put(QueryContexts.REWRITE_JOIN_TO_FILTER_ENABLE_KEY, 
decoupledExtension)
+                .put(PlannerContext.CTX_SQL_JOIN_ALGORITHM, 
joinAlgorithm().toString())
+                .put(QueryContexts.ENABLE_DEBUG, true)
+                .build()
+        );
+  }
+
+  protected abstract JoinAlgorithm joinAlgorithm();
+
+  @Override
+  protected void cannotVectorize()
+  {
+  }
+
+  @Override
+  protected void cannotVectorizeUnlessFallback()
+  {
+  }
+
+  @Override
+  protected void msqIncompatible()
+  {
+    throw new AssumptionViolatedException("Case marked as msqIncompatible; not 
trying dart right now");
+  }
+
+  @Override
+  public boolean isSortBasedJoin()
+  {
+    return joinAlgorithm() == JoinAlgorithm.SORT_MERGE;
+  }
+
+  @Override
+  protected boolean isRunningMSQ()
+  {
+    return true;
+  }
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteNestedDataQueryTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteNestedDataQueryTest.java
new file mode 100644
index 00000000000..0cf97cb5d13
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartCalciteNestedDataQueryTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.test;
+
+import org.apache.druid.sql.calcite.CalciteNestedDataQueryTest;
+import org.apache.druid.sql.calcite.NotYetSupported;
+import org.apache.druid.sql.calcite.NotYetSupported.NotYetSupportedProcessor;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
+import org.apache.druid.sql.calcite.TempDirProducer;
+import org.junit.AssumptionViolatedException;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
[email protected](DecoupledDartCalciteNestedDataQueryTest.NestedDataQueryMSQComponentSupplier.class)
+public class DecoupledDartCalciteNestedDataQueryTest extends 
CalciteNestedDataQueryTest
+{
+
+  public static class NestedDataQueryMSQComponentSupplier extends 
AbstractDartComponentSupplier
+  {
+    public NestedDataQueryMSQComponentSupplier(TempDirProducer 
tempFolderProducer)
+    {
+      super(new NestedComponentSupplier(tempFolderProducer));
+    }
+  }
+
+  @RegisterExtension
+  NotYetSupportedProcessor notYetSupportedProcessor = new 
NotYetSupportedProcessor(NotYetSupported.Scope.DECOUPLED_DART);
+
+  @RegisterExtension
+  DecoupledDartExtension decoupledExtension = new DecoupledDartExtension(this);
+
+  @Override
+  protected QueryTestBuilder testBuilder()
+  {
+    return decoupledExtension.testBuilder();
+  }
+
+  @Override
+  protected void msqIncompatible()
+  {
+    throw new AssumptionViolatedException("Case marked as msqIncompatible; not 
trying dart right now");
+  }
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartExtension.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartExtension.java
index 32be6a06078..5cbc27a6c97 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartExtension.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/DecoupledDartExtension.java
@@ -57,7 +57,7 @@ public class DecoupledDartExtension implements 
BeforeEachCallback
 
   public QueryTestBuilder testBuilder()
   {
-    CalciteTestConfig testConfig = baseTest.new 
CalciteTestConfig(CONTEXT_OVERRIDES)
+    CalciteTestConfig testConfig = baseTest.new 
CalciteTestConfig(CONTEXT_OVERRIDES, true)
     {
       @Override
       public SqlTestFramework.PlannerFixture plannerFixture(PlannerConfig 
plannerConfig, AuthConfig authConfig)
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/sql/calcite/NotYetSupportedUsageTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/sql/calcite/NotYetSupportedUsageTest.java
index 2076ae1a5f7..ae253cb3087 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/sql/calcite/NotYetSupportedUsageTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/sql/calcite/NotYetSupportedUsageTest.java
@@ -48,7 +48,9 @@ public class NotYetSupportedUsageTest
     Set<NotYetSupported.Modes> modes = new 
HashSet<>(Arrays.asList(NotYetSupported.Modes.values()));
     for (Method method : methodsAnnotatedWith) {
       NotYetSupported annot = method.getAnnotation(NotYetSupported.class);
-      modes.remove(annot.value());
+      for (Modes m : annot.value()) {
+        modes.remove(m);
+      }
     }
 
     assertEquals("There are unused modes which should be removed", 
Collections.emptySet(), modes);
@@ -67,7 +69,11 @@ public class NotYetSupportedUsageTest
       @Override
       public int compare(ReportEntry l, ReportEntry r)
       {
-        int res = l.className.compareTo(r.className);
+        int res = l.mode.scope.compareTo(r.mode.scope);
+        if (res != 0) {
+          return res;
+        }
+        res = l.className.compareTo(r.className);
         if (res != 0) {
           return res;
         }
@@ -104,7 +110,7 @@ public class NotYetSupportedUsageTest
     @Override
     public String toString()
     {
-      return " | " + className + " | " + methodNames.size() + " | " + mode + " 
| " + mode.regex + " | ";
+      return " | " + mode.scope + " | " + className + " | " + 
methodNames.size() + " | " + mode.name() + " | " + mode.regex + " | ";
     }
   }
 
@@ -115,16 +121,18 @@ public class NotYetSupportedUsageTest
 
     Map<List<Object>, ReportEntry> mentryMap = new HashMap<>();
     for (Method method : methodsAnnotatedWith) {
-      ReportEntry entry = new ReportEntry(
-          method.getDeclaringClass().getSimpleName(),
-          method.getName(),
-          getAnnotation(method)
-      );
-      ReportEntry existing = mentryMap.get(entry.getKey());
-      if (existing != null) {
-        existing.merge(entry);
-      } else {
-        mentryMap.put(entry.getKey(), entry);
+      for (Modes mode : getAnnotation(method)) {
+        ReportEntry entry = new ReportEntry(
+            method.getDeclaringClass().getSimpleName(),
+            method.getName(),
+            mode
+        );
+        ReportEntry existing = mentryMap.get(entry.getKey());
+        if (existing != null) {
+          existing.merge(entry);
+        } else {
+          mentryMap.put(entry.getKey(), entry);
+        }
       }
     }
 
@@ -136,7 +144,7 @@ public class NotYetSupportedUsageTest
 
   }
 
-  private Modes getAnnotation(Method method)
+  private Modes[] getAnnotation(Method method)
   {
     NotYetSupported annotation = method.getAnnotation(NotYetSupported.class);
     if (annotation == null) {
diff --git 
a/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java 
b/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java
index 26f6f74a51a..c7f43fa653a 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java
@@ -422,4 +422,14 @@ public class RowSignature implements ColumnInspector
     }
     return ret;
   }
+
+  public RowSignature withPrefix(String prefix)
+  {
+    Builder builder = new Builder();
+    for (String columnName : columnNames) {
+      ColumnType columnType = columnTypes.get(columnName);
+      builder.add(prefix + columnName, columnType);
+    }
+    return builder.build();
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/Larry.java 
b/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/Larry.java
new file mode 100644
index 00000000000..76b2b6f48a3
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/Larry.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.sql.calcite.util.datasets;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.segment.AutoTypeColumnSchema;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class Larry extends MapBasedTestDataset
+{
+  protected Larry()
+  {
+    this("larry");
+  }
+
+  public Larry(String name)
+  {
+    super(name);
+  }
+
+  @Override
+  public final InputRowSchema getInputRowSchema()
+  {
+    return new InputRowSchema(
+        new TimestampSpec(TIMESTAMP_COLUMN, "iso", null),
+        new DimensionsSpec(
+            ImmutableList.<DimensionSchema>builder()
+                .addAll(
+                    DimensionsSpec.getDefaultSchemas(
+                        ImmutableList.of(
+                            "label",
+                            "mv"
+                        )
+                    )
+                )
+                .add(new LongDimensionSchema("l1"))
+                .add(new AutoTypeColumnSchema("l_arr", null))
+                .build()
+        ),
+        null
+    );
+  }
+
+  @Override
+  public List<AggregatorFactory> getMetrics()
+  {
+    return ImmutableList.of(
+        new CountAggregatorFactory("cnt")
+    );
+  }
+
+  @Override
+  public List<Map<String, Object>> getRawRows()
+  {
+    return ImmutableList.of(
+        makeRow("[]", ImmutableList.of()),
+        makeRow("[null]", Collections.singletonList(null)),
+        makeRow("[1]", ImmutableList.of(1)),
+        makeRow("[2,3]", ImmutableList.of(2, 3)),
+        makeRow("null", null)
+    );
+  }
+
+  private Map<String, Object> makeRow(String label, Object object)
+  {
+    Map<String, Object> ret = new HashMap<String, Object>();
+
+    ret.put("t", "2000-01-01");
+    ret.put("label", label);
+    ret.put("l1", 11);
+    ret.put("mv", object);
+    ret.put("l_arr", object);
+    return ret;
+
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/TestDataSet.java
 
b/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/TestDataSet.java
index c7b688499a9..b861d9b248e 100644
--- 
a/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/TestDataSet.java
+++ 
b/server/src/test/java/org/apache/druid/sql/calcite/util/datasets/TestDataSet.java
@@ -30,6 +30,7 @@ public interface TestDataSet
   String TIMESTAMP_COLUMN = "t";
 
   MapBasedTestDataset NUMFOO = new NumFoo();
+  MapBasedTestDataset LARRY = new Larry();
   MapBasedTestDataset BROADCAST = new NumFoo("broadcast");
   MapBasedTestDataset RESTRICTED_BROADCAST = new 
NumFoo("restrictedBroadcastDatasource_m1_is_6");
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java
 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java
index 837d951dc63..fa48e04e41e 100644
--- 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java
+++ 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/CalciteRulesManager.java
@@ -28,6 +28,7 @@ import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.plan.hep.HepMatchOrder;
 import org.apache.calcite.plan.hep.HepProgram;
 import org.apache.calcite.plan.hep.HepProgramBuilder;
 import org.apache.calcite.plan.volcano.AbstractConverter;
@@ -44,6 +45,7 @@ import 
org.apache.calcite.rel.rules.FilterProjectTransposeRule;
 import org.apache.calcite.rel.rules.JoinExtractFilterRule;
 import org.apache.calcite.rel.rules.JoinPushThroughJoinRule;
 import org.apache.calcite.rel.rules.ProjectMergeRule;
+import org.apache.calcite.rel.rules.ProjectRemoveRule;
 import org.apache.calcite.rel.rules.PruneEmptyRules;
 import org.apache.calcite.sql.SqlExplainFormat;
 import org.apache.calcite.sql.SqlExplainLevel;
@@ -280,8 +282,9 @@ public class CalciteRulesManager
   private Program buildDecoupledLogicalOptimizationProgram(PlannerContext 
plannerContext)
   {
     final HepProgramBuilder builder = HepProgram.builder();
+    builder.addMatchOrder(HepMatchOrder.BOTTOM_UP);
     builder.addMatchLimit(CalciteRulesManager.HEP_DEFAULT_MATCH_LIMIT);
-    builder.addRuleCollection(baseRuleSet(plannerContext));
+    builder.addRuleCollection(baseRuleSet(plannerContext, false));
     builder.addRuleInstance(CoreRules.UNION_MERGE);
     builder.addRuleInstance(FilterCorrelateRule.Config.DEFAULT.toRule());
     
builder.addRuleInstance(FilterProjectTransposeRule.Config.DEFAULT.toRule());
@@ -290,6 +293,8 @@ public class CalciteRulesManager
     
builder.addRuleInstance(FilterProjectTransposeRule.Config.DEFAULT.toRule());
     builder.addRuleInstance(new LogicalUnnestRule());
     builder.addRuleInstance(new UnnestInputCleanupRule());
+    builder.addRuleInstance(ProjectMergeRule.Config.DEFAULT.toRule());
+    builder.addRuleInstance(ProjectRemoveRule.Config.DEFAULT.toRule());
 
     final HepProgramBuilder cleanupRules = HepProgram.builder();
     
cleanupRules.addRuleInstance(FilterProjectTransposeRule.Config.DEFAULT.toRule());
@@ -458,7 +463,7 @@ public class CalciteRulesManager
   {
     final ImmutableList.Builder<RelOptRule> retVal = ImmutableList
         .<RelOptRule>builder()
-        .addAll(baseRuleSet(plannerContext))
+        .addAll(baseRuleSet(plannerContext, 
plannerContext.getJoinAlgorithm().requiresSubquery()))
         .add(DruidRelToDruidRule.instance())
         .add(new DruidTableScanRule(plannerContext))
         .add(new DruidLogicalValuesRule(plannerContext))
@@ -483,7 +488,7 @@ public class CalciteRulesManager
   public List<RelOptRule> bindableConventionRuleSet(final PlannerContext 
plannerContext)
   {
     return ImmutableList.<RelOptRule>builder()
-                        .addAll(baseRuleSet(plannerContext))
+                        .addAll(baseRuleSet(plannerContext, false))
                         .addAll(Bindables.RULES)
                         .addAll(DEFAULT_BINDABLE_RULES)
                         .add(CoreRules.AGGREGATE_REDUCE_FUNCTIONS)
@@ -501,7 +506,7 @@ public class CalciteRulesManager
     return (bloat != null) ? bloat : DEFAULT_BLOAT;
   }
 
-  public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext)
+  public List<RelOptRule> baseRuleSet(final PlannerContext plannerContext, 
boolean withJoinRules)
   {
     final PlannerConfig plannerConfig = plannerContext.getPlannerConfig();
     final ImmutableList.Builder<RelOptRule> rules = ImmutableList.builder();
@@ -513,7 +518,7 @@ public class CalciteRulesManager
     rules.add(new 
DruidAggregateCaseToFilterRule(plannerContext.queryContext().isExtendedFilteredSumRewrite()));
     rules.addAll(configurableRuleSet(plannerContext));
 
-    if (plannerContext.getJoinAlgorithm().requiresSubquery()) {
+    if (withJoinRules) {
       rules.addAll(FANCY_JOIN_RULES);
     }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidJoin.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidJoin.java
index 5d531bfde80..3ea022fcc1f 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidJoin.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/logical/DruidJoin.java
@@ -30,7 +30,9 @@ import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
+import org.apache.druid.query.JoinAlgorithm;
 import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.planner.QueryUtils;
 import org.apache.druid.sql.calcite.planner.querygen.SourceDescProducer;
 import org.apache.druid.sql.calcite.rel.DruidJoinQueryRel;
 
@@ -76,4 +78,9 @@ public class DruidJoin extends Join implements 
DruidLogicalNode, SourceDescProdu
     SourceDesc rightDesc = sources.get(1);
     return DruidJoinQueryRel.buildJoinSourceDesc(leftDesc, rightDesc, 
plannerContext, this, null);
   }
+
+  public JoinAlgorithm getJoinAlgorithm(PlannerContext plannerContext)
+  {
+    return QueryUtils.getJoinAlgorithm(this, plannerContext);
+  }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
index 287f216d3aa..4ef05973389 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java
@@ -91,6 +91,7 @@ import org.apache.druid.sql.calcite.schema.NamedSchema;
 import org.apache.druid.sql.calcite.util.CalciteTestBase;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.QueryFrameworkUtils;
+import org.apache.druid.sql.calcite.util.datasets.TestDataSet;
 import org.apache.druid.sql.guice.SqlModule;
 import org.apache.druid.sql.hook.DruidHookDispatcher;
 import org.eclipse.jetty.server.Server;
@@ -572,6 +573,12 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
                 Pair.of("TABLE_TYPE", "TABLE")
 
             ),
+            row(
+                Pair.of("TABLE_CAT", "druid"),
+                Pair.of("TABLE_NAME", TestDataSet.LARRY.getName()),
+                Pair.of("TABLE_SCHEM", "druid"),
+                Pair.of("TABLE_TYPE", "TABLE")
+            ),
             row(
                 Pair.of("TABLE_CAT", "druid"),
                 Pair.of("TABLE_NAME", CalciteTests.DATASOURCE5),
@@ -676,6 +683,12 @@ public class DruidAvaticaHandlerTest extends 
CalciteTestBase
                 Pair.of("TABLE_SCHEM", "druid"),
                 Pair.of("TABLE_TYPE", "TABLE")
             ),
+            row(
+                Pair.of("TABLE_CAT", "druid"),
+                Pair.of("TABLE_NAME", TestDataSet.LARRY.getName()),
+                Pair.of("TABLE_SCHEM", "druid"),
+                Pair.of("TABLE_TYPE", "TABLE")
+            ),
             row(
                 Pair.of("TABLE_CAT", "druid"),
                 Pair.of("TABLE_NAME", CalciteTests.DATASOURCE5),
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index afae66b062d..8f65b432843 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -84,6 +84,7 @@ import org.apache.druid.server.security.AuthenticationResult;
 import org.apache.druid.server.security.ForbiddenException;
 import org.apache.druid.server.security.ResourceAction;
 import org.apache.druid.sql.SqlStatementFactory;
+import org.apache.druid.sql.calcite.DrillWindowQueryTest.ArrayRowCmp;
 import org.apache.druid.sql.calcite.QueryTestRunner.QueryResults;
 import org.apache.druid.sql.calcite.expression.DruidExpression;
 import org.apache.druid.sql.calcite.planner.Calcites;
@@ -1019,7 +1020,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
     }
   }
 
-  public void assertResultsValid(final ResultMatchMode matchMode, final 
List<Object[]> expected, final QueryResults queryResults)
+  public static void assertResultsValid(final ResultMatchMode matchMode, final 
List<Object[]> expected, final QueryResults queryResults)
   {
     final List<Object[]> results = queryResults.results;
     Assert.assertEquals("Result count mismatch", expected.size(), 
results.size());
@@ -1388,7 +1389,7 @@ public class BaseCalciteQueryTest extends CalciteTestBase
     return new DefaultResultsVerifier(expectedResults, 
expectedResultMatchMode, expectedSignature);
   }
 
-  public class DefaultResultsVerifier implements ResultsVerifier
+  public static class DefaultResultsVerifier implements ResultsVerifier
   {
     protected final List<Object[]> expectedResults;
     @Nullable
@@ -1428,7 +1429,22 @@ public class BaseCalciteQueryTest extends CalciteTestBase
         throw e;
       }
     }
+  }
+
+  public static class UnorderedResultsVerifier extends DefaultResultsVerifier
+  {
+    public UnorderedResultsVerifier(List<Object[]> expectedResults, 
ResultMatchMode expectedResultMatchMode,
+        RowSignature expectedSignature)
+    {
+      super(ImmutableList.sortedCopyOf(new ArrayRowCmp(), expectedResults), 
expectedResultMatchMode, expectedSignature);
+    }
 
+    @Override
+    public void verify(String sql, QueryResults queryResults)
+    {
+      queryResults.results.sort(new ArrayRowCmp());
+      super.verify(sql, queryResults);
+    }
   }
 
   /**
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
index 1d754142555..a4b99952fd0 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
@@ -1612,7 +1612,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     // the Scan query in native reads this makeColumnValueSelector. Behavior 
of those selectors is inconsistent.
     // The DimensionSelector returns an empty list; the ColumnValueSelector 
returns a list containing a single null.
     final String expectedValueForEmptyMvd =
-        queryFramework().engine().name().equals("msq-task")
+        isRunningMSQ()
         ? null
         : "not abd";
 
@@ -2664,6 +2664,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
+  @NotYetSupported(Modes.DD_RESULT_MISMATCH_FLOAT_DOUBLE)
   @Test
   public void testArrayAggNumeric()
   {
@@ -3888,7 +3889,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
 
   }
 
-  @NotYetSupported(Modes.UNNEST_INLINED)
+  @NotYetSupported({Modes.UNNEST_INLINED, Modes.DD_UNNEST_INLINED})
   @Test
   public void testUnnestInline()
   {
@@ -3922,7 +3923,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_INLINED)
+  @NotYetSupported({Modes.UNNEST_INLINED, Modes.DD_UNNEST_INLINED})
   @Test
   public void testUnnestInlineWithCount()
   {
@@ -3953,7 +3954,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @Test
   public void testUnnest()
   {
@@ -4335,7 +4336,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @Test
   public void testUnnestTwice()
   {
@@ -4892,7 +4893,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @Test
   public void testUnnestWithGroupBy()
   {
@@ -4955,7 +4956,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @Test
   public void testUnnestWithGroupByOrderBy()
   {
@@ -4999,7 +5000,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @Test
   public void testUnnestWithGroupByOrderByWithLimit()
   {
@@ -5093,7 +5094,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @Test
   public void testUnnestFirstQueryOnSelect()
   {
@@ -5449,7 +5450,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @Test
   public void testUnnestWithInFilters()
   {
@@ -5567,7 +5568,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @DecoupledTestConfig(ignoreExpectedQueriesReason = 
IgnoreQueriesReason.UNNEST_EXTRA_SCANQUERY)
   @Test
   public void testUnnestWithJoinOnTheLeft()
@@ -5619,7 +5620,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_INLINED)
+  @NotYetSupported({Modes.UNNEST_INLINED, Modes.DD_UNNEST_INLINED})
   @Test
   public void testUnnestWithConstant()
   {
@@ -5677,7 +5678,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @Test
   public void testUnnestWithSQLFunctionOnUnnestedColumn()
   {
@@ -6209,7 +6210,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.UNNEST_RESULT_MISMATCH)
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
   @Test
   public void testUnnestWithCountOnColumn()
   {
@@ -7413,4 +7414,61 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
         )
     );
   }
+
+
+  @Test
+  public void testSimpleArraysUnnest()
+  {
+    skipVectorize();
+    testBuilder()
+        .sql("SELECT label,l_arr,val from larry,unnest(l_arr) as u(val)")
+        .expectedResults(
+            ImmutableList.of(
+                new Object[] {"[1]", "[1]", 1L},
+                new Object[] {"[2,3]", "[2,3]", 2L},
+                new Object[] {"[2,3]", "[2,3]", 3L},
+                new Object[] {"[null]", "[null]", null}
+            )
+        )
+        .run();
+  }
+
+  @Test
+  public void testMvToArrayResults()
+  {
+    skipVectorize();
+    testBuilder()
+        .sql("SELECT label,l_arr,mv_to_array(mv) from larry")
+        .expectedResults(
+            ImmutableList.of(
+                new Object[]{"[1]", "[1]", "[\"1\"]"},
+                new Object[]{"[2,3]", "[2,3]", "[\"2\",\"3\"]"},
+                new Object[]{"[]", "[]", null},
+                new Object[]{"[null]", "[null]", null},
+                new Object[]{"null", null, null}
+            )
+        )
+        .run();
+  }
+
+  @NotYetSupported({Modes.UNNEST_RESULT_MISMATCH, 
Modes.DD_UNNEST_RESULT_MISMATCH})
+  @Test
+  public void testMvToArrayUnnest()
+  {
+    skipVectorize();
+    testBuilder()
+        .sql("SELECT label,l_arr,mv_to_array(mv),val from 
larry,unnest(mv_to_array(mv)) as u(val)")
+        .expectedResults(
+            ImmutableList.of(
+                new Object[]{"[1]", "[1]", "[\"1\"]", "1"},
+                new Object[]{"[2,3]", "[2,3]", "[\"2\",\"3\"]", "2"},
+                new Object[]{"[2,3]", "[2,3]", "[\"2\",\"3\"]", "3"},
+                // below results will be missing in decoupled mode
+                new Object[]{"[]", "[]", null, null},
+                new Object[]{"[null]", "[null]", null, null},
+                new Object[]{"null", null, null, null}
+            )
+        )
+        .run();
+  }
 }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
index b683b8bce91..06c2241643c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteJoinQueryTest.java
@@ -91,6 +91,7 @@ import org.apache.druid.sql.calcite.filtration.Filtration;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.run.EngineFeature;
 import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.hamcrest.CoreMatchers;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
 import org.junit.Assert;
@@ -2466,6 +2467,26 @@ public class CalciteJoinQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
+  @Test
+  public void testJoinOfTwoJoinsWithSubQueries()
+  {
+    skipVectorize();
+
+    String sql = "with\n"
+        + "l1 as (SELECT f.dim1, sum(n.dbl1) s1 from foo f join numfoo n on 
n.dim2=f.dim2 group by f.dim1),\n"
+        + "r1 as (SELECT f.dim2, sum(n.dbl2) s2 from foo f join numfoo n on 
n.dim1=f.dim1 group by f.dim2)\n"
+        + "select dim1, s1+s2 FROM l1 join r1 on dim1=dim2";
+
+    testBuilder()
+        .sql(sql)
+        .expectedResults(
+            ImmutableList.of(
+                new Object[]{"", 1.0D}
+            )
+        )
+        .run();
+  }
+
   @MethodSource("provideQueryContexts")
   @ParameterizedTest(name = "{0}")
   public void testSelectOnLookupUsingLeftJoinOperator(Map<String, Object> 
queryContext)
@@ -5124,6 +5145,7 @@ public class CalciteJoinQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
+  @NotYetSupported(Modes.DD_RESTRICTED_DATASOURCE_SUPPORT2)
   @MethodSource("provideQueryContexts")
   @ParameterizedTest(name = "{0}")
   public void testJoinOnRestrictedBroadcast(Map<String, Object> queryContext)
@@ -5174,9 +5196,13 @@ public class CalciteJoinQueryTest extends 
BaseCalciteQueryTest
             ImmutableList.of()
         )
     );
-    Assert.assertTrue(e.getMessage()
-                       .contains(
-                           "Restricted data source 
[GlobalTableDataSource{name='restrictedBroadcastDatasource_m1_is_6'}] with 
policy [RowFilterPolicy{rowFilter=m1 = 6 (LONG)}] is not supported"));
+
+    assertThat(
+        e.getMessage(),
+        CoreMatchers.containsString(
+            "Restricted data source 
[GlobalTableDataSource{name='restrictedBroadcastDatasource_m1_is_6'}] with 
policy [RowFilterPolicy{rowFilter=m1 = 6 (LONG)}] is not supported"
+        )
+    );
   }
 
   @MethodSource("provideQueryContexts")
@@ -5686,7 +5712,7 @@ public class CalciteJoinQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.SORT_REMOVE_TROUBLE)
+  @NotYetSupported({Modes.SORT_REMOVE_TROUBLE, Modes.DD_SORT_REMOVE_TROUBLE})
   @MethodSource("provideQueryContexts")
   @ParameterizedTest(name = "{0}")
   public void testRegressionFilteredAggregatorsSubqueryJoins(Map<String, 
Object> queryContext)
@@ -6230,13 +6256,12 @@ public class CalciteJoinQueryTest extends 
BaseCalciteQueryTest
   public void testSelfJoinsWithUnnestOnLeftAndRight()
   {
     Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
-    testQuery(
-        "with t1 as (\n"
+    String sql = "with t1 as (\n"
         + "select * from foo, unnest(MV_TO_ARRAY(\"dim3\")) as u(d3)\n"
         + ")\n"
         + "select t1.dim3, t1.d3, t2.dim2 from t1 JOIN t1 as t2\n"
-        + "ON t1.d3 = t2.d3",
-        context,
+        + "ON t1.d3 = t2.d3";
+    ImmutableList<Query<?>> expectedQueries =
         ImmutableList.of(
             newScanQueryBuilder()
                 .dataSource(
@@ -6269,20 +6294,24 @@ public class CalciteJoinQueryTest extends 
BaseCalciteQueryTest
                 .columnTypes(ColumnType.STRING, ColumnType.STRING, 
ColumnType.STRING)
                 .context(context)
                 .build()
-        ),
-        sortIfSortBased(
-            ImmutableList.of(
-                new Object[]{"[\"a\",\"b\"]", "a", "a"},
-                new Object[]{"[\"a\",\"b\"]", "b", "a"},
-                new Object[]{"[\"a\",\"b\"]", "b", null},
-                new Object[]{"[\"b\",\"c\"]", "b", "a"},
-                new Object[]{"[\"b\",\"c\"]", "b", null},
-                new Object[]{"[\"b\",\"c\"]", "c", null},
-                new Object[]{"d", "d", ""},
-                new Object[]{"", "", "a"}
-            ), 0
-        )
+        );
+    ImmutableList<Object[]> expectedResults = ImmutableList.of(
+        new Object[] {"[\"a\",\"b\"]", "a", "a"},
+        new Object[] {"[\"a\",\"b\"]", "b", "a"},
+        new Object[] {"[\"a\",\"b\"]", "b", null},
+        new Object[] {"[\"b\",\"c\"]", "b", "a"},
+        new Object[] {"[\"b\",\"c\"]", "b", null},
+        new Object[] {"[\"b\",\"c\"]", "c", null},
+        new Object[] {"d", "d", ""},
+        new Object[] {"", "", "a"}
     );
+
+    testBuilder()
+        .queryContext(context)
+        .sql(sql)
+        .expectedQueries(expectedQueries)
+        .expectedResults(new UnorderedResultsVerifier(expectedResults, 
ResultMatchMode.RELAX_NULLS, null))
+        .run();
   }
 
   @DecoupledTestConfig(ignoreExpectedQueriesReason = 
IgnoreQueriesReason.UNNEST_EXTRA_SCANQUERY)
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
index e2bc176068b..e43bc08c111 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteNestedDataQueryTest.java
@@ -86,6 +86,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+
 @SqlTestFrameworkConfig.ComponentSupplier(NestedComponentSupplier.class)
 public class CalciteNestedDataQueryTest extends BaseCalciteQueryTest
 {
@@ -5452,13 +5454,15 @@ public class CalciteNestedDataQueryTest extends 
BaseCalciteQueryTest
   public void testJoinOnNestedColumnThrows()
   {
     DruidException e = Assertions.assertThrows(DruidException.class, () -> {
-      testQuery(
-          "SELECT * FROM druid.nested a INNER JOIN druid.nested b ON a.nester 
= b.nester",
-          ImmutableList.of(),
-          ImmutableList.of()
-      );
+      testBuilder()
+          .sql("SELECT * FROM druid.nested a INNER JOIN druid.nested b ON 
a.nester = b.nester")
+          .run();
     });
-    Assertions.assertEquals("Cannot join when the join condition has column of 
type [COMPLEX<json>]", e.getMessage());
+
+    assertThat(
+        e.getMessage(),
+        CoreMatchers.containsString("Cannot join when the join condition has 
column of type [COMPLEX<json>]")
+    );
   }
 
   @Test
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index 0ff94fb020a..bf1ba524072 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -132,6 +132,7 @@ import 
org.apache.druid.sql.calcite.rel.CannotBuildQueryException;
 import org.apache.druid.sql.calcite.run.EngineFeature;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.apache.druid.sql.calcite.util.TestDataBuilder;
+import org.apache.druid.sql.calcite.util.datasets.TestDataSet;
 import org.hamcrest.CoreMatchers;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -189,6 +190,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                      .add(new Object[]{"druid", CalciteTests.DATASOURCE1, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", CalciteTests.DATASOURCE2, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", CalciteTests.DATASOURCE4, 
"TABLE", "NO", "NO"})
+                     .add(new Object[]{"druid", TestDataSet.LARRY.getName(), 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", CalciteTests.DATASOURCE5, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", CalciteTests.DATASOURCE3, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", 
CalciteTests.RESTRICTED_BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
@@ -233,6 +235,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
                      .add(new Object[]{"druid", CalciteTests.DATASOURCE2, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", CalciteTests.DATASOURCE4, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", 
CalciteTests.FORBIDDEN_DATASOURCE, "TABLE", "NO", "NO"})
+                     .add(new Object[]{"druid", TestDataSet.LARRY.getName(), 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", CalciteTests.DATASOURCE5, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", CalciteTests.DATASOURCE3, 
"TABLE", "NO", "NO"})
                      .add(new Object[]{"druid", 
CalciteTests.RESTRICTED_BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
@@ -2584,7 +2587,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
         .run();
   }
 
-  @NotYetSupported(Modes.DD_JOIN)
   @SqlTestFrameworkConfig.NumMergeBuffers(3)
   @Test
   public void testExactCountDistinctWithFilter2()
@@ -7258,7 +7260,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.DD_JOIN)
   @DecoupledTestConfig(quidemReason = QuidemTestCaseReason.AGG_COL_EXCHANGE)
   @Test
   public void 
testMultipleExactCountDistinctWithGroupingAndOtherAggregatorsUsingJoin()
@@ -12665,7 +12666,6 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
   }
 
   // __time >= x remains in the join condition
-  @NotYetSupported(Modes.DD_JOIN)
   @DecoupledTestConfig(quidemReason = 
QuidemTestCaseReason.JOIN_FILTER_LOCATIONS)
   @Test
   public void testRequireTimeConditionPositive3()
@@ -14888,7 +14888,6 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.DD_JOIN)
   @Test
   public void testOrderByAlongWithInternalScanQuery()
   {
@@ -14933,7 +14932,6 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
     );
   }
 
-  @NotYetSupported(Modes.DD_JOIN)
   @Test
   public void testOrderByAlongWithInternalScanQueryNoDistinct()
   {
@@ -15606,7 +15604,7 @@ public class CalciteQueryTest extends 
BaseCalciteQueryTest
         .run();
   }
 
-  @NotYetSupported(Modes.DD_JOIN)
+  @NotYetSupported(Modes.DD_WINDOW)
   @Test
   public void testWindowingOverJoin()
   {
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
index d413a0147bf..5ef5eac695f 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
@@ -232,7 +232,7 @@ public class DrillWindowQueryTest extends 
BaseCalciteQueryTest
     }
   }
 
-  public class TextualResultsVerifier implements ResultsVerifier
+  public static class TextualResultsVerifier implements ResultsVerifier
   {
     protected final List<String[]> expectedResultsText;
     @Nullable
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
index 7767025399e..25972ae556c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java
@@ -27,6 +27,8 @@ import org.junit.jupiter.api.extension.InvocationInterceptor;
 import org.junit.jupiter.api.extension.ReflectiveInvocationContext;
 import org.opentest4j.IncompleteExecutionException;
 
+import javax.annotation.Nullable;
+
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
@@ -73,7 +75,7 @@ import static org.junit.Assert.assertThrows;
 @Target({ElementType.METHOD})
 public @interface NotYetSupported
 {
-  Modes value();
+  Modes[] value();
 
   enum Scope
   {
@@ -105,11 +107,17 @@ public @interface NotYetSupported
     UNNEST_RESULT_MISMATCH(Scope.DECOUPLED, AssertionError.class, "(Result 
count mismatch|column content mismatch)"),
 
     DD_RESTRICTED_DATASOURCE_SUPPORT(Scope.DECOUPLED_DART, 
DruidException.class, "ForbiddenException: Unauthorized"),
+    DD_RESTRICTED_DATASOURCE_SUPPORT2(Scope.DECOUPLED_DART, 
AssertionError.class, "Unauthorized"),
     DD_INCORRECT_RESULTS_EMPTY_STRING(Scope.DECOUPLED_DART, 
AssertionError.class, "column content mismatch at"),
     NO_INFORMATION_SCHEMA_SUPPORT(Scope.DECOUPLED_DART, DruidException.class, 
"INFORMATION_SCHEMA"),
-    DD_JOIN(Scope.DECOUPLED_DART, DruidException.class, 
"DruidJoin.DRUID_LOGICAL"),
     DD_NULL_COLUMN_ORDER(Scope.DECOUPLED_DART, DruidException.class, "sort: 
\\[\\] -> \\[1\\]"),
-    DD_UNION(Scope.DECOUPLED_DART, DruidException.class, 
"DruidUnion.DRUID_LOGICAL");
+    DD_UNION(Scope.DECOUPLED_DART, DruidException.class, 
"DruidUnion.DRUID_LOGICAL"),
+    DD_WINDOW(Scope.DECOUPLED_DART, DruidException.class, 
"DruidWindow.DRUID_LOGICAL"),
+    DD_UNNEST_RESULT_MISMATCH(Scope.DECOUPLED_DART, AssertionError.class, 
"(Result count mismatch|column content mismatch)"),
+    DD_UNNEST_INLINED(Scope.DECOUPLED_DART, Exception.class, "Missing 
conversion is Uncollect"),
+    DD_SORT_REMOVE_TROUBLE(Scope.DECOUPLED_DART, DruidException.class, 
"Calcite assertion violated.*Sort\\.<init>"),
+    DD_JOIN_CONDITION_NORMALIZATION(Scope.DECOUPLED_DART, 
DruidException.class, "Cannot handle equality"),
+    DD_RESULT_MISMATCH_FLOAT_DOUBLE(Scope.DECOUPLED_DART, 
AssertionError.class, "column content mismatch");
     // @formatter:on
 
     public Scope scope;
@@ -162,14 +170,15 @@ public @interface NotYetSupported
     {
       Method method = extensionContext.getTestMethod().get();
       NotYetSupported annotation = method.getAnnotation(NotYetSupported.class);
+      Modes ignoreMode = getModeForScope(annotation);
+
 
-      if (annotation == null || annotation.value().scope != scope) {
+      if (ignoreMode == null) {
         invocation.proceed();
         return;
       }
       {
         {
-          Modes ignoreMode = annotation.value();
           Throwable e = null;
           try {
             invocation.proceed();
@@ -197,16 +206,30 @@ public @interface NotYetSupported
           );
 
           String trace = Throwables.getStackTraceAsString(e);
-          Matcher m = annotation.value().getPattern().matcher(trace);
+          Matcher m = ignoreMode.getPattern().matcher(trace);
 
           if (!m.find()) {
-            throw new AssertionError("Exception stacktrace doesn't match 
regex: " + annotation.value().regex, e);
+            throw new AssertionError("Exception stacktrace doesn't match 
regex: " + ignoreMode.regex, e);
           }
           throw new AssumptionViolatedException("Test is not-yet supported; 
ignored with:" + annotation);
         }
       }
     }
 
+    private Modes getModeForScope(@Nullable NotYetSupported annotation)
+    {
+      if (annotation == null) {
+        return null;
+      }
+      for (Modes mode : annotation.value()) {
+        if (mode.scope == scope) {
+          return mode;
+        }
+
+      }
+      return null;
+    }
+
     @Override
     public void interceptTestTemplateMethod(Invocation<Void> invocation,
         ReflectiveInvocationContext<Method> invocationContext,
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java
index c35003e0e76..d110f3200d7 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/QueryTestBuilder.java
@@ -127,7 +127,7 @@ public class QueryTestBuilder
 
   public QueryTestBuilder queryContext(Map<String, Object> queryContext)
   {
-    this.queryContext = QueryContexts.override(config.baseQueryContext(), 
queryContext);
+    this.queryContext = QueryContexts.override(this.queryContext, 
queryContext);
     return this;
   }
 
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
index d6d544af522..c283a364d1a 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
@@ -235,7 +235,7 @@ public class CalcitePlannerModuleTest extends 
CalciteTestBase
 
   private void assertBloat(PlannerContext context, int expectedBloat)
   {
-    Optional<ProjectMergeRule> firstProjectMergeRule = 
injector.getInstance(CalciteRulesManager.class).baseRuleSet(context).stream()
+    Optional<ProjectMergeRule> firstProjectMergeRule = 
injector.getInstance(CalciteRulesManager.class).baseRuleSet(context, 
false).stream()
             .filter(rule -> rule instanceof ProjectMergeRule)
             .map(rule -> (ProjectMergeRule) rule)
             .findAny();
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
index f455b0a0d4b..2625da9044b 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
@@ -860,6 +860,10 @@ public class TestDataBuilder
         TestDataSet.NUMFOO,
         TestHelper.JSON_MAPPER,
         new File(tmpDir, "3")
+    ).add(
+        TestDataSet.LARRY,
+        TestHelper.JSON_MAPPER,
+        new File(tmpDir, "larry")
     ).add(
         DataSegment.builder()
                    .dataSource(CalciteTests.DATASOURCE4)
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
index 8f0c7eec4b3..55443bcaa30 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java
@@ -39,7 +39,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineLookup;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
-import org.apache.druid.timeline.partition.SingleElementPartitionChunk;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -100,22 +100,26 @@ public class TestTimelineServerView implements 
TimelineServerView
   @Override
   public Optional<? extends TimelineLookup<String, ServerSelector>> 
getTimeline(TableDataSource table)
   {
+    final VersionedIntervalTimeline<String, ServerSelector> timelineLookup =
+        new VersionedIntervalTimeline<>(Comparator.naturalOrder());
+
     for (DataSegment segment : segments) {
       if (!segment.getDataSource().equals(table.getName())) {
         continue;
       }
 
-      VersionedIntervalTimeline<String, ServerSelector> timelineLookup = new 
VersionedIntervalTimeline<String, ServerSelector>(
-          Comparator.naturalOrder()
-      );
       TierSelectorStrategy st = new HighestPriorityTierSelectorStrategy(new 
RandomServerSelectorStrategy());
-      ServerSelector sss = new ServerSelector(segment, st, 
HistoricalFilter.IDENTITY_FILTER);
+      ServerSelector serverSelector = new ServerSelector(segment, st, 
HistoricalFilter.IDENTITY_FILTER);
 
-      PartitionChunk<ServerSelector> partitionChunk = new 
SingleElementPartitionChunk(sss);
+      PartitionChunk<ServerSelector> partitionChunk = 
segment.getShardSpec().createChunk(serverSelector);
       timelineLookup.add(segment.getInterval(), segment.getVersion(), 
partitionChunk);
+    }
+
+    if (timelineLookup.isEmpty()) {
+      return Optional.empty();
+    } else {
       return Optional.of(timelineLookup);
     }
-    return Optional.empty();
   }
 
   @Override
diff --git 
a/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteArraysQueryTest/testUnnestThriceWithFiltersOnDimAndAllUnnestColumns.iq
 
b/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteArraysQueryTest/testUnnestThriceWithFiltersOnDimAndAllUnnestColumns.iq
index 23b89e6e323..5eb85f3a09d 100644
--- 
a/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteArraysQueryTest/testUnnestThriceWithFiltersOnDimAndAllUnnestColumns.iq
+++ 
b/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteArraysQueryTest/testUnnestThriceWithFiltersOnDimAndAllUnnestColumns.iq
@@ -22,7 +22,7 @@
 !ok
 LogicalProject(dimZipf=[$0], dim3_unnest1=[CAST('Baz':VARCHAR):VARCHAR], 
dim3_unnest2=[$1], dim3_unnest3=[$3])
   LogicalUnnest(unnestExpr=[MV_TO_ARRAY($2)], filter=[=($0, 'World')])
-    LogicalProject(dimZipf=[$1], EXPR$0=[$2], dimMultivalEnumerated0=[$0])
+    LogicalProject(dimZipf=[$1], EXPR$00=[$2], dimMultivalEnumerated0=[$0])
       LogicalUnnest(unnestExpr=[MV_TO_ARRAY($0)], filter=[=($0, 'Hello')])
         LogicalProject(dimMultivalEnumerated=[$0], dimZipf=[$1])
           LogicalUnnest(unnestExpr=[MV_TO_ARRAY($0)], filter=[=($0, 'Baz')])
@@ -33,7 +33,7 @@ LogicalProject(dimZipf=[$0], 
dim3_unnest1=[CAST('Baz':VARCHAR):VARCHAR], dim3_un
 !logicalPlan
 DruidProject(dimZipf=[$0], dim3_unnest1=[CAST('Baz':VARCHAR):VARCHAR], 
dim3_unnest2=[$1], dim3_unnest3=[$3], druid=[logical])
   DruidUnnest(unnestExpr=[MV_TO_ARRAY($2)], filter=[=($0, 'World')])
-    DruidProject(dimZipf=[$1], EXPR$0=[$2], dimMultivalEnumerated0=[$0], 
druid=[logical])
+    DruidProject(dimZipf=[$1], EXPR$00=[$2], dimMultivalEnumerated0=[$0], 
druid=[logical])
       DruidUnnest(unnestExpr=[MV_TO_ARRAY($0)], filter=[=($0, 'Hello')])
         DruidProject(dimMultivalEnumerated=[$0], dimZipf=[$1], druid=[logical])
           DruidUnnest(unnestExpr=[MV_TO_ARRAY($0)], filter=[=($0, 'Baz')])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to