This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c184b5250f Unnest now works on MSQ (#14886)
c184b5250f is described below

commit c184b5250f1f8fca64ffe1d34dc1157fbe96c271
Author: Soumyava <[email protected]>
AuthorDate: Sun Sep 24 20:49:21 2023 -0700

    Unnest now works on MSQ (#14886)
    
    This entails:
        Removing the enableUnnest flag and additional machinery
        Updating the datasource plan and frame processors to support unnest
        Adding support in MSQ for UnnestDataSource and FilteredDataSource
        CalciteArrayTest now has a MSQ test component
        Additional tests for Unnest on MSQ
---
 .../druid/msq/querykit/BaseLeafFrameProcessor.java |  20 +-
 .../apache/druid/msq/querykit/DataSourcePlan.java  | 108 ++++++++++-
 .../org/apache/druid/msq/sql/MSQTaskSqlEngine.java |   2 +-
 .../org/apache/druid/msq/exec/MSQInsertTest.java   |  89 +++++++++
 .../org/apache/druid/msq/exec/MSQReplaceTest.java  | 180 +++++++++++++++++-
 .../org/apache/druid/msq/exec/MSQSelectTest.java   | 203 +++++++++++++++++++++
 .../msq/test/CalciteArraysSelectQueryMSQTest.java  |  98 ++++++++++
 .../druid/msq/test/CalciteMSQTestsHelper.java      |  12 ++
 .../druid/msq/test/MSQTestWorkerContext.java       |   3 +-
 .../druid/sql/calcite/planner/PlannerContext.java  |  12 --
 .../apache/druid/sql/calcite/rule/DruidRules.java  |   1 +
 .../druid/sql/calcite/view/ViewSqlEngine.java      |   1 -
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |   1 -
 .../druid/sql/calcite/CalciteArraysQueryTest.java  |  18 +-
 .../druid/sql/calcite/IngestionTestSqlEngine.java  |   1 +
 .../calcite/planner/CalcitePlannerModuleTest.java  |   8 +-
 .../druid/sql/calcite/util/TestDataBuilder.java    |   2 +-
 17 files changed, 732 insertions(+), 27 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
index d5b31328b0..d0a12b3ae7 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
@@ -36,8 +36,10 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.msq.input.ReadableInput;
 import org.apache.druid.msq.input.table.SegmentWithDescriptor;
 import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FilteredDataSource;
 import org.apache.druid.query.JoinDataSource;
 import org.apache.druid.query.Query;
+import org.apache.druid.query.UnnestDataSource;
 import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentReference;
@@ -96,7 +98,17 @@ public abstract class BaseLeafFrameProcessor implements 
FrameProcessor<Long>
       final long memoryReservedForBroadcastJoin
   )
   {
-    if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
+    // An UnnestDataSource or FilteredDataSource can have a join as a base
+    // In such a case a side channel is expected to be there
+    final DataSource baseDataSource;
+    if (dataSource instanceof UnnestDataSource) {
+      baseDataSource = ((UnnestDataSource) dataSource).getBase();
+    } else if (dataSource instanceof FilteredDataSource) {
+      baseDataSource = ((FilteredDataSource) dataSource).getBase();
+    } else {
+      baseDataSource = dataSource;
+    }
+    if (!(baseDataSource instanceof JoinDataSource) && 
!sideChannels.isEmpty()) {
       throw new ISE("Did not expect side channels for dataSource [%s]", 
dataSource);
     }
 
@@ -106,8 +118,8 @@ public abstract class BaseLeafFrameProcessor implements 
FrameProcessor<Long>
     if (baseInput.hasChannel()) {
       inputChannels.add(baseInput.getChannel());
     }
-
-    if (dataSource instanceof JoinDataSource) {
+    
+    if (baseDataSource instanceof JoinDataSource) {
       final Int2IntMap inputNumberToProcessorChannelMap = new 
Int2IntOpenHashMap();
       final List<FrameReader> channelReaders = new ArrayList<>();
 
@@ -196,7 +208,7 @@ public abstract class BaseLeafFrameProcessor implements 
FrameProcessor<Long>
     if (segmentMapFn != null) {
       return true;
     } else if (broadcastJoinHelper == null) {
-      segmentMapFn = Function.identity();
+      segmentMapFn = query.getDataSource().createSegmentMapFunction(query, 
cpuAccumulator);
       return true;
     } else {
       final boolean retVal = 
broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index a826f1928e..d8481bf7a0 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -44,12 +44,14 @@ import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.StageDefinitionBuilder;
 import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
 import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FilteredDataSource;
 import org.apache.druid.query.InlineDataSource;
 import org.apache.druid.query.JoinDataSource;
 import org.apache.druid.query.LookupDataSource;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnnestDataSource;
 import org.apache.druid.query.filter.DimFilter;
 import org.apache.druid.query.planning.DataSourceAnalysis;
 import org.apache.druid.query.planning.PreJoinableClause;
@@ -135,8 +137,29 @@ public class DataSourcePlan
       checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
       return forInline((InlineDataSource) dataSource, broadcast);
     } else if (dataSource instanceof LookupDataSource) {
-      checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
       return forLookup((LookupDataSource) dataSource, broadcast);
+    } else if (dataSource instanceof FilteredDataSource) {
+      return forFilteredDataSource(
+          queryKit,
+          queryId,
+          queryContext,
+          (FilteredDataSource) dataSource,
+          querySegmentSpec,
+          maxWorkerCount,
+          minStageNumber,
+          broadcast
+      );
+    } else if (dataSource instanceof UnnestDataSource) {
+      return forUnnest(
+          queryKit,
+          queryId,
+          queryContext,
+          (UnnestDataSource) dataSource,
+          querySegmentSpec,
+          maxWorkerCount,
+          minStageNumber,
+          broadcast
+      );
     } else if (dataSource instanceof QueryDataSource) {
       checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
       return forQuery(
@@ -353,6 +376,88 @@ public class DataSourcePlan
     );
   }
 
+  private static DataSourcePlan forFilteredDataSource(
+      final QueryKit queryKit,
+      final String queryId,
+      final QueryContext queryContext,
+      final FilteredDataSource dataSource,
+      final QuerySegmentSpec querySegmentSpec,
+      final int maxWorkerCount,
+      final int minStageNumber,
+      final boolean broadcast
+  )
+  {
+    final DataSourcePlan basePlan = forDataSource(
+        queryKit,
+        queryId,
+        queryContext,
+        dataSource.getBase(),
+        querySegmentSpec,
+        null,
+        maxWorkerCount,
+        minStageNumber,
+        broadcast
+    );
+
+    DataSource newDataSource = basePlan.getNewDataSource();
+
+    final List<InputSpec> inputSpecs = new 
ArrayList<>(basePlan.getInputSpecs());
+    newDataSource = FilteredDataSource.create(newDataSource, 
dataSource.getFilter());
+    return new DataSourcePlan(
+        newDataSource,
+        inputSpecs,
+        basePlan.getBroadcastInputs(),
+        basePlan.getSubQueryDefBuilder().orElse(null)
+    );
+
+  }
+
+  /**
+   * Build a plan for Unnest data source
+   */
+  private static DataSourcePlan forUnnest(
+      final QueryKit queryKit,
+      final String queryId,
+      final QueryContext queryContext,
+      final UnnestDataSource dataSource,
+      final QuerySegmentSpec querySegmentSpec,
+      final int maxWorkerCount,
+      final int minStageNumber,
+      final boolean broadcast
+  )
+  {
+    // Find the plan for base data source by recursing
+    final DataSourcePlan basePlan = forDataSource(
+        queryKit,
+        queryId,
+        queryContext,
+        dataSource.getBase(),
+        querySegmentSpec,
+        null,
+        maxWorkerCount,
+        minStageNumber,
+        broadcast
+    );
+    DataSource newDataSource = basePlan.getNewDataSource();
+
+    final List<InputSpec> inputSpecs = new 
ArrayList<>(basePlan.getInputSpecs());
+
+    // Create the new data source using the data source from the base plan
+    newDataSource = UnnestDataSource.create(
+        newDataSource,
+        dataSource.getVirtualColumn(),
+        dataSource.getUnnestFilter()
+    );
+    // The base data source can be a join and might already have broadcast 
inputs
+    // Need to set the broadcast inputs from the basePlan
+    return new DataSourcePlan(
+        newDataSource,
+        inputSpecs,
+        basePlan.getBroadcastInputs(),
+        basePlan.getSubQueryDefBuilder().orElse(null)
+    );
+  }
+
   /**
    * Build a plan for broadcast hash-join.
    */
@@ -379,7 +484,6 @@ public class DataSourcePlan
         null, // Don't push query filters down through a join: this needs some 
work to ensure pruning works properly.
         maxWorkerCount,
         Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
-
         broadcast
     );
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index ee7a590e1f..e6578388a4 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -114,8 +114,8 @@ public class MSQTaskSqlEngine implements SqlEngine
       case TIME_BOUNDARY_QUERY:
       case GROUPING_SETS:
       case WINDOW_FUNCTIONS:
-      case UNNEST:
         return false;
+      case UNNEST:
       case CAN_SELECT:
       case CAN_INSERT:
       case CAN_REPLACE:
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 009f595bf3..81ba53f755 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -185,6 +185,95 @@ public class MSQInsertTest extends MSQTestBase
 
   }
 
+  @Test
+  public void testInsertWithUnnestInline()
+  {
+    List<Object[]> expectedRows = ImmutableList.of(
+        new Object[]{1692226800000L, 1L},
+        new Object[]{1692226800000L, 2L},
+        new Object[]{1692226800000L, 3L}
+    );
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("d", ColumnType.LONG)
+                                            .build();
+
+
+    testIngestQuery().setSql(
+                         "insert into foo1 select 
TIME_PARSE('2023-08-16T23:00') as __time, d from UNNEST(ARRAY[1,2,3]) as 
unnested(d) PARTITIONED BY ALL")
+                     .setQueryContext(context)
+                     .setExpectedResultRows(expectedRows)
+                     .setExpectedDataSource("foo1")
+                     .setExpectedRowSignature(rowSignature)
+                     .verifyResults();
+
+  }
+
+  @Test
+  public void testInsertWithUnnest()
+  {
+    List<Object[]> expectedRows = ImmutableList.of(
+        new Object[]{946684800000L, "a"},
+        new Object[]{946684800000L, "b"},
+        new Object[]{946771200000L, "b"},
+        new Object[]{946771200000L, "c"},
+        new Object[]{946857600000L, "d"},
+        new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null},
+        new Object[]{978393600000L, null},
+        new Object[]{978480000000L, null}
+    );
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("d", ColumnType.STRING)
+                                            .build();
+
+
+    testIngestQuery().setSql(
+                         "insert into foo1 select __time, d from 
foo,UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) PARTITIONED BY ALL")
+                     .setQueryContext(context)
+                     .setExpectedResultRows(expectedRows)
+                     .setExpectedDataSource("foo1")
+                     .setExpectedRowSignature(rowSignature)
+                     .verifyResults();
+
+  }
+
+  @Test
+  public void testInsertWithUnnestWithVirtualColumns()
+  {
+    List<Object[]> expectedRows = ImmutableList.of(
+        new Object[]{946684800000L, 1.0f},
+        new Object[]{946684800000L, 1.0f},
+        new Object[]{946771200000L, 2.0f},
+        new Object[]{946771200000L, 2.0f},
+        new Object[]{946857600000L, 3.0f},
+        new Object[]{946857600000L, 3.0f},
+        new Object[]{978307200000L, 4.0f},
+        new Object[]{978307200000L, 4.0f},
+        new Object[]{978393600000L, 5.0f},
+        new Object[]{978393600000L, 5.0f},
+        new Object[]{978480000000L, 6.0f},
+        new Object[]{978480000000L, 6.0f}
+    );
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("d", ColumnType.FLOAT)
+                                            .build();
+
+
+    testIngestQuery().setSql(
+                         "insert into foo1 select __time, d from 
foo,UNNEST(ARRAY[m1,m2]) as unnested(d) PARTITIONED BY ALL")
+                     .setQueryContext(context)
+                     .setExpectedResultRows(expectedRows)
+                     .setExpectedDataSource("foo1")
+                     .setExpectedRowSignature(rowSignature)
+                     .verifyResults();
+
+  }
+
   @Test
   public void testInsertOnExternalDataSource() throws IOException
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 80d6719f12..0a43fdaea7 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -739,6 +739,182 @@ public class MSQReplaceTest extends MSQTestBase
                      .verifyPlanningErrors();
   }
 
+  @Test
+  public void testReplaceUnnestSegmentEntireTable()
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("d", ColumnType.STRING)
+                                            .build();
+
+    testIngestQuery().setSql(" REPLACE INTO foo "
+                             + "OVERWRITE ALL "
+                             + "SELECT __time, d "
+                             + "FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as 
unnested(d) "
+                             + "PARTITIONED BY ALL TIME ")
+                     .setExpectedDataSource("foo")
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(context)
+                     .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+                         "foo",
+                         Intervals.of("2000-01-01T/P1M"),
+                         "test",
+                         0
+                     )))
+                     .setExpectedResultRows(
+                         ImmutableList.of(
+                             new Object[]{946684800000L, "a"},
+                             new Object[]{946684800000L, "b"},
+                             new Object[]{946771200000L, "b"},
+                             new Object[]{946771200000L, "c"},
+                             new Object[]{946857600000L, "d"},
+                             new Object[]{978307200000L, 
NullHandling.sqlCompatible() ? "" : null},
+                             new Object[]{978393600000L, null},
+                             new Object[]{978480000000L, null}
+                         )
+                     )
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", 
Intervals.ETERNITY, "test", 0)))
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().totalFiles(1),
+                         0, 0, "input0"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(8).frames(1),
+                         0, 0, "shuffle"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(8).frames(1),
+                         1, 0, "input0"
+                     )
+                     
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+                         CounterSnapshotMatcher
+                             .with().segmentRowsProcessed(8),
+                         1, 0
+                     )
+                     .verifyResults();
+  }
+
+  @Test
+  public void testReplaceUnnestWithVirtualColumnSegmentEntireTable()
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("d", ColumnType.FLOAT)
+                                            .build();
+
+    testIngestQuery().setSql(" REPLACE INTO foo "
+                             + "OVERWRITE ALL "
+                             + "SELECT __time, d "
+                             + "FROM foo, UNNEST(ARRAY[m1, m2]) as unnested(d) 
"
+                             + "PARTITIONED BY ALL TIME ")
+                     .setExpectedDataSource("foo")
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(context)
+                     .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+                         "foo",
+                         Intervals.of("2000-01-01T/P1M"),
+                         "test",
+                         0
+                     )))
+                     .setExpectedResultRows(
+                         ImmutableList.of(
+                             new Object[]{946684800000L, 1.0f},
+                             new Object[]{946684800000L, 1.0f},
+                             new Object[]{946771200000L, 2.0f},
+                             new Object[]{946771200000L, 2.0f},
+                             new Object[]{946857600000L, 3.0f},
+                             new Object[]{946857600000L, 3.0f},
+                             new Object[]{978307200000L, 4.0f},
+                             new Object[]{978307200000L, 4.0f},
+                             new Object[]{978393600000L, 5.0f},
+                             new Object[]{978393600000L, 5.0f},
+                             new Object[]{978480000000L, 6.0f},
+                             new Object[]{978480000000L, 6.0f}
+                         )
+                     )
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", 
Intervals.ETERNITY, "test", 0)))
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().totalFiles(1),
+                         0, 0, "input0"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(12).frames(1),
+                         0, 0, "shuffle"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(12).frames(1),
+                         1, 0, "input0"
+                     )
+                     
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+                         CounterSnapshotMatcher
+                             .with().segmentRowsProcessed(12),
+                         1, 0
+                     )
+                     .verifyResults();
+  }
+
+  @Test
+  public void testReplaceUnnestSegmentWithTimeFilter()
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("d", ColumnType.STRING)
+                                            .build();
+
+    testIngestQuery().setSql(" REPLACE INTO foo "
+                             + "OVERWRITE WHERE __time >= TIMESTAMP 
'1999-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'"
+                             + "SELECT __time, d "
+                             + "FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as 
unnested(d) "
+                             + "PARTITIONED BY DAY CLUSTERED BY d ")
+                     .setExpectedDataSource("foo")
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(context)
+                     
.setExpectedDestinationIntervals(ImmutableList.of(Intervals.of(
+                         "1999-01-01T00:00:00.000Z/2002-01-01T00:00:00.000Z")))
+                     .setExpectedShardSpec(DimensionRangeShardSpec.class)
+                     .setExpectedResultRows(
+                         ImmutableList.of(
+                             new Object[]{946684800000L, "a"},
+                             new Object[]{946684800000L, "b"},
+                             new Object[]{946771200000L, "b"},
+                             new Object[]{946771200000L, "c"},
+                             new Object[]{946857600000L, "d"},
+                             new Object[]{978307200000L, 
NullHandling.sqlCompatible() ? "" : null},
+                             new Object[]{978393600000L, null},
+                             new Object[]{978480000000L, null}
+                         )
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().totalFiles(1),
+                         0, 0, "input0"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(2, 2, 1, 1, 1, 1).frames(1, 1, 1, 1, 
1, 1),
+                         0, 0, "shuffle"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(2, 2, 1, 1, 1, 1).frames(1, 1, 1, 1, 
1, 1),
+                         1, 0, "input0"
+                     )
+                     
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+                         CounterSnapshotMatcher
+                             .with().segmentRowsProcessed(8),
+                         1, 0
+                     )
+                     .verifyResults();
+  }
+
   @Test
   public void testReplaceTombstonesOverPartiallyOverlappingSegments()
   {
@@ -755,7 +931,9 @@ public class MSQReplaceTest extends MSQTestBase
                                                  .dataSource("foo1")
                                                  .build();
 
-    
Mockito.doReturn(ImmutableSet.of(existingDataSegment)).when(testTaskActionClient).submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class));
+    Mockito.doReturn(ImmutableSet.of(existingDataSegment))
+           .when(testTaskActionClient)
+           .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class));
 
     List<Object[]> expectedResults;
     if (NullHandling.sqlCompatible()) {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index ae0bfa71f1..6b6f8ff356 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.query.LookupDataSource;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.QueryDataSource;
 import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnnestDataSource;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
@@ -136,6 +137,7 @@ public class MSQSelectTest extends MSQTestBase
         {QUERY_RESULTS_WITH_DURABLE_STORAGE, 
QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT},
         {QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT}
     };
+
     return Arrays.asList(data);
   }
 
@@ -2119,6 +2121,207 @@ public class MSQSelectTest extends MSQTestBase
         .verifyResults();
   }
 
+  @Test
+  public void testSelectUnnestOnInlineFoo()
+  {
+    RowSignature resultSignature = RowSignature.builder()
+                                               .add("EXPR$0", ColumnType.LONG)
+                                               .build();
+    RowSignature outputSignature = RowSignature.builder()
+                                               .add("d", ColumnType.LONG)
+                                               .build();
+
+    final ColumnMappings expectedColumnMappings = new ColumnMappings(
+        ImmutableList.of(
+            new ColumnMapping("EXPR$0", "d")
+        )
+    );
+
+    testSelectQuery()
+        .setSql("select d from UNNEST(ARRAY[1,2,3]) as unnested(d)")
+        .setExpectedMSQSpec(
+            MSQSpec.builder()
+                   .query(newScanQueryBuilder()
+                              .dataSource(
+                                  InlineDataSource.fromIterable(
+                                      ImmutableList.of(
+                                          new Object[]{1L},
+                                          new Object[]{2L},
+                                          new Object[]{3L}
+                                      ),
+                                      resultSignature
+                                  )
+                              )
+                              
.intervals(querySegmentSpec(Filtration.eternity()))
+                              .columns("EXPR$0")
+                              .context(defaultScanQueryContext(
+                                  context,
+                                  resultSignature
+                              ))
+                              .build())
+                   .columnMappings(expectedColumnMappings)
+                   .tuningConfig(MSQTuningConfig.defaultConfig())
+                   .destination(isDurableStorageDestination()
+                                ? DurableStorageMSQDestination.INSTANCE
+                                : TaskReportMSQDestination.INSTANCE)
+                   .build()
+        )
+        .setExpectedRowSignature(outputSignature)
+        .setQueryContext(context)
+        .setExpectedResultRows(ImmutableList.of(
+            new Object[]{1},
+            new Object[]{2},
+            new Object[]{3}
+        ))
+        .verifyResults();
+  }
+
+
+  @Test
+  public void testSelectUnnestOnFoo()
+  {
+    RowSignature resultSignature = RowSignature.builder()
+                                               .add("j0.unnest", 
ColumnType.STRING)
+                                               .build();
+
+    RowSignature outputSignature = RowSignature.builder()
+                                               .add("d3", ColumnType.STRING)
+                                               .build();
+
+    final ColumnMappings expectedColumnMappings = new ColumnMappings(
+        ImmutableList.of(
+            new ColumnMapping("j0.unnest", "d3")
+        )
+    );
+
+    testSelectQuery()
+        .setSql("SELECT d3 FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as unnested 
(d3)")
+        .setExpectedMSQSpec(
+            MSQSpec.builder()
+                   .query(newScanQueryBuilder()
+                              .dataSource(UnnestDataSource.create(
+                                  new 
TableDataSource(CalciteTests.DATASOURCE1),
+                                  expressionVirtualColumn("j0.unnest", 
"\"dim3\"", ColumnType.STRING),
+                                  null
+                              ))
+                              
.intervals(querySegmentSpec(Filtration.eternity()))
+                              
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                              .legacy(false)
+                              .context(defaultScanQueryContext(
+                                  context,
+                                  resultSignature
+                              ))
+                              .columns(ImmutableList.of("j0.unnest"))
+                              .build())
+                   .columnMappings(expectedColumnMappings)
+                   .tuningConfig(MSQTuningConfig.defaultConfig())
+                   .destination(isDurableStorageDestination()
+                                ? DurableStorageMSQDestination.INSTANCE
+                                : TaskReportMSQDestination.INSTANCE)
+                   .build()
+        )
+        .setExpectedRowSignature(outputSignature)
+        .setQueryContext(context)
+        .setExpectedResultRows(
+            useDefault ? ImmutableList.of(
+                new Object[]{"a"},
+                new Object[]{"b"},
+                new Object[]{"b"},
+                new Object[]{"c"},
+                new Object[]{"d"},
+                new Object[]{""},
+                new Object[]{""},
+                new Object[]{""}
+            ) : ImmutableList.of(
+                new Object[]{"a"},
+                new Object[]{"b"},
+                new Object[]{"b"},
+                new Object[]{"c"},
+                new Object[]{"d"},
+                new Object[]{""},
+                new Object[]{null},
+                new Object[]{null}
+            ))
+        .verifyResults();
+  }
+
+  @Test
+  public void testSelectUnnestOnQueryFoo()
+  {
+    RowSignature resultSignature = RowSignature.builder()
+                                               .add("j0.unnest", 
ColumnType.STRING)
+                                               .build();
+
+    RowSignature resultSignature1 = RowSignature.builder()
+                                               .add("dim3", ColumnType.STRING)
+                                               .build();
+
+    RowSignature outputSignature = RowSignature.builder()
+                                               .add("d3", ColumnType.STRING)
+                                               .build();
+
+    final ColumnMappings expectedColumnMappings = new ColumnMappings(
+        ImmutableList.of(
+            new ColumnMapping("j0.unnest", "d3")
+        )
+    );
+
+    testSelectQuery()
+        .setSql("SELECT d3 FROM (select * from druid.foo where dim2='a' LIMIT 
10), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)")
+        .setExpectedMSQSpec(
+            MSQSpec.builder()
+                   .query(newScanQueryBuilder()
+                              .dataSource(UnnestDataSource.create(
+                                  new QueryDataSource(
+                                      newScanQueryBuilder()
+                                          .dataSource(
+                                              new 
TableDataSource(CalciteTests.DATASOURCE1)
+                                          )
+                                          
.intervals(querySegmentSpec(Filtration.eternity()))
+                                          
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                                          .legacy(false)
+                                          .filters(equality("dim2", "a", 
ColumnType.STRING))
+                                          .columns("dim3")
+                                          .context(defaultScanQueryContext(
+                                              context,
+                                              resultSignature1
+                                          ))
+                                          .limit(10)
+                                          .build()
+                                  ),
+                                  expressionVirtualColumn("j0.unnest", 
"\"dim3\"", ColumnType.STRING),
+                                  null
+                              ))
+                              
.intervals(querySegmentSpec(Filtration.eternity()))
+                              
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                              .legacy(false)
+                              .context(defaultScanQueryContext(
+                                  context,
+                                  resultSignature
+                              ))
+                              .columns(ImmutableList.of("j0.unnest"))
+                              .build())
+                   .columnMappings(expectedColumnMappings)
+                   .tuningConfig(MSQTuningConfig.defaultConfig())
+                   .destination(isDurableStorageDestination()
+                                ? DurableStorageMSQDestination.INSTANCE
+                                : TaskReportMSQDestination.INSTANCE)
+                   .build()
+        )
+        .setExpectedRowSignature(outputSignature)
+        .setQueryContext(context)
+        .setExpectedResultRows(
+            useDefault ? ImmutableList.of(
+                new Object[]{"a"},
+                new Object[]{"b"}
+            ) : ImmutableList.of(
+                new Object[]{"a"},
+                new Object[]{"b"},
+                new Object[]{""}
+            ))
+        .verifyResults();
+  }
+
   @Nonnull
   private List<Object[]> expectedMultiValueFooRowsGroup()
   {
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java
new file mode 100644
index 0000000000..16aef6bdca
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import org.apache.druid.guice.DruidInjectorBuilder;
+import org.apache.druid.msq.exec.WorkerMemoryParameters;
+import org.apache.druid.msq.sql.MSQTaskSqlEngine;
+import org.apache.druid.query.groupby.TestGroupByBuffers;
+import org.apache.druid.server.QueryLifecycleFactory;
+import org.apache.druid.sql.calcite.CalciteArraysQueryTest;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Runs {@link CalciteArraysQueryTest} but with MSQ engine
+ */
+public class CalciteArraysSelectQueryMSQTest extends CalciteArraysQueryTest
+{
+  private TestGroupByBuffers groupByBuffers;
+
+  @Before
+  public void setup2()
+  {
+    groupByBuffers = TestGroupByBuffers.createDefault();
+  }
+
+  @After
+  public void teardown2()
+  {
+    groupByBuffers.close();
+  }
+
+  @Override
+  public void configureGuice(DruidInjectorBuilder builder)
+  {
+    super.configureGuice(builder);
+    builder.addModules(CalciteMSQTestsHelper.fetchModules(temporaryFolder, 
groupByBuffers).toArray(new Module[0]));
+  }
+
+
+  @Override
+  public SqlEngine createEngine(
+      QueryLifecycleFactory qlf,
+      ObjectMapper queryJsonMapper,
+      Injector injector
+  )
+  {
+    final WorkerMemoryParameters workerMemoryParameters =
+        WorkerMemoryParameters.createInstance(
+            WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
+            2,
+            10,
+            2,
+            0,
+            0
+        );
+    final MSQTestOverlordServiceClient indexingServiceClient = new 
MSQTestOverlordServiceClient(
+        queryJsonMapper,
+        injector,
+        new MSQTestTaskActionClient(queryJsonMapper),
+        workerMemoryParameters
+    );
+    return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
+  }
+
+  @Override
+  protected QueryTestBuilder testBuilder()
+  {
+    return new QueryTestBuilder(new CalciteTestConfig(true))
+        .addCustomRunner(new ExtractResultsFactory(() -> 
(MSQTestOverlordServiceClient) ((MSQTaskSqlEngine) 
queryFramework().engine()).overlordClient()))
+        .skipVectorize(true)
+        .verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate())
+        .msqCompatible(msqCompatible);
+  }
+}
+
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index 0c840b14a8..d7c0ea1f2d 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -89,10 +89,13 @@ import java.util.function.Supplier;
 import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1;
 import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2;
 import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3;
+import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5;
+import static 
org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS;
 import static 
org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS;
 import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1;
 import static 
org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS;
 import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2;
+import static 
org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS_LOTS_OF_COLUMNS;
 
 /**
  * Helper class aiding in wiring up the Guice bindings required for MSQ engine 
to work with the Calcite's tests
@@ -246,6 +249,15 @@ public class CalciteMSQTestsHelper
               .rows(ROWS1_WITH_NUMERIC_DIMS)
               .buildMMappedIndex();
           break;
+        case DATASOURCE5:
+          index = IndexBuilder
+              .create()
+              .tmpDir(new File(temporaryFolder.newFolder(), "5"))
+              
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+              .schema(INDEX_SCHEMA_LOTS_O_COLUMNS)
+              .rows(ROWS_LOTS_OF_COLUMNS)
+              .buildMMappedIndex();
+          break;
         default:
           throw new ISE("Cannot query segment %s in test runner", segmentId);
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index a6f98b3ba8..a478d1c3c1 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -119,7 +119,8 @@ public class MSQTestWorkerContext implements WorkerContext
     IndexMergerV9 indexMerger = new IndexMergerV9(
         mapper,
         indexIO,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance()
+        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+        true
     );
     final TaskReportFileWriter reportFileWriter = new TaskReportFileWriter()
     {
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
index d72b577ef3..9141c4db09 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
@@ -82,11 +82,6 @@ public class PlannerContext
    */
   public static final String CTX_ENABLE_WINDOW_FNS = "windowsAreForClosers";
 
-  /**
-   * Undocumented context key, used to enable {@link 
org.apache.calcite.sql.fun.SqlStdOperatorTable#UNNEST}.
-   */
-  public static final String CTX_ENABLE_UNNEST = "enableUnnest";
-
   public static final String CTX_SQL_USE_BOUNDS_AND_SELECTORS = 
"sqlUseBoundAndSelectors";
   public static final boolean DEFAULT_SQL_USE_BOUNDS_AND_SELECTORS = 
NullHandling.replaceWithDefault();
 
@@ -527,13 +522,6 @@ public class PlannerContext
       // Short-circuit: feature requires context flag.
       return false;
     }
-
-    if (feature == EngineFeature.UNNEST &&
-        !QueryContexts.getAsBoolean(CTX_ENABLE_UNNEST, 
queryContext.get(CTX_ENABLE_UNNEST), false)) {
-      // Short-circuit: feature requires context flag.
-      return false;
-    }
-
     return engine.featureAvailable(feature, this);
   }
 
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
index 526eb6a976..8ca4ab076d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
@@ -115,6 +115,7 @@ public class DruidRules
       retVal.add(DruidOuterQueryRule.WINDOW);
     }
 
+    // Adding unnest specific rules
     if (plannerContext.featureAvailable(EngineFeature.UNNEST)) {
       retVal.add(new DruidUnnestRule(plannerContext));
       retVal.add(new DruidCorrelateUnnestRule(plannerContext));
diff --git 
a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java 
b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
index 47dae5c4d9..cd719d7f29 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
@@ -64,7 +64,6 @@ public class ViewSqlEngine implements SqlEngine
       case WINDOW_FUNCTIONS:
       case UNNEST:
         return true;
-
       // Views can't sit on top of INSERT or REPLACE.
       case CAN_INSERT:
       case CAN_REPLACE:
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index e72537c7da..84fd4217c7 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -203,7 +203,6 @@ public class BaseCalciteQueryTest extends CalciteTestBase
       ImmutableMap.<String, Object>builder()
                   .putAll(QUERY_CONTEXT_DEFAULT)
                   .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
-                  .put(PlannerContext.CTX_ENABLE_UNNEST, true)
                   .build();
 
   public static final Map<String, Object> 
QUERY_CONTEXT_NO_STRINGIFY_ARRAY_USE_EQUALITY =
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
index df4e9b62cc..ae4437faf6 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
@@ -60,7 +60,6 @@ import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.join.JoinType;
 import org.apache.druid.sql.calcite.filtration.Filtration;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
 import org.apache.druid.sql.calcite.util.CalciteTests;
 import org.junit.Assert;
 import org.junit.Test;
@@ -78,7 +77,6 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   private static final Map<String, Object> QUERY_CONTEXT_UNNEST =
       ImmutableMap.<String, Object>builder()
                   .putAll(QUERY_CONTEXT_DEFAULT)
-                  .put(PlannerContext.CTX_ENABLE_UNNEST, true)
                   .put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
                   .build();
 
@@ -87,6 +85,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testSelectConstantArrayExpressionFromTable()
   {
+    notMsqCompatible();
     testQuery(
         "SELECT ARRAY[1,2] as arr, dim1 FROM foo LIMIT 1",
         ImmutableList.of(
@@ -168,6 +167,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testSelectNonConstantArrayExpressionFromTableForMultival()
   {
+    notMsqCompatible();
     final String sql = "SELECT ARRAY[CONCAT(dim3, 'word'),'up'] as arr, dim1 
FROM foo LIMIT 5";
     final Query<?> scanQuery = newScanQueryBuilder()
         .dataSource(CalciteTests.DATASOURCE1)
@@ -207,6 +207,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     // Yes these outputs are strange sometimes, arrays are in a partial state 
of existence so end up a bit
     // stringy for now this is because virtual column selectors are coercing 
values back to stringish so that
     // multi-valued string dimensions can be grouped on.
+    notMsqCompatible();
     List<Object[]> expectedResults;
     if (useDefault) {
       expectedResults = ImmutableList.of(
@@ -386,6 +387,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
     // which will still always be stringified to ultimately adhere to the 
varchar type
     // as array support increases in the engine this will likely change since 
using explict array functions should
     // probably kick it into an array
+    notMsqCompatible();
     List<Object[]> expectedResults;
     if (useDefault) {
       expectedResults = ImmutableList.of(
@@ -1017,6 +1019,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testArrayGroupAsLongArray()
   {
+    notMsqCompatible();
     // Cannot vectorize as we donot have support in native query subsytem for 
grouping on arrays
     cannotVectorize();
     testQuery(
@@ -1068,6 +1071,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   {
     // Cannot vectorize as we donot have support in native query subsytem for 
grouping on arrays as keys
     cannotVectorize();
+    notMsqCompatible();
     testQuery(
         "SELECT ARRAY[d1], SUM(cnt) FROM druid.numfoo GROUP BY 1 ORDER BY 2 
DESC",
         QUERY_CONTEXT_NO_STRINGIFY_ARRAY,
@@ -1115,6 +1119,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testArrayGroupAsFloatArray()
   {
+    notMsqCompatible();
     // Cannot vectorize as we donot have support in native query subsytem for 
grouping on arrays as keys
     cannotVectorize();
     testQuery(
@@ -1605,6 +1610,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testArrayAggNumeric()
   {
+    notMsqCompatible();
     cannotVectorize();
     testQuery(
         "SELECT ARRAY_AGG(l1), ARRAY_AGG(DISTINCT l1), ARRAY_AGG(d1), 
ARRAY_AGG(DISTINCT d1), ARRAY_AGG(f1), ARRAY_AGG(DISTINCT f1) FROM numfoo",
@@ -1741,6 +1747,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testArrayAggQuantile()
   {
+    notMsqCompatible();
     cannotVectorize();
     testQuery(
         "SELECT ARRAY_QUANTILE(ARRAY_AGG(l1), 0.9) FROM numfoo",
@@ -1784,6 +1791,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testArrayAggArrays()
   {
+    notMsqCompatible();
     cannotVectorize();
     testQuery(
         "SELECT ARRAY_AGG(ARRAY[l1, l2]), ARRAY_AGG(DISTINCT ARRAY[l1, l2]) 
FROM numfoo",
@@ -1880,6 +1888,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testArrayConcatAggArrays()
   {
+    notMsqCompatible();
     cannotVectorize();
     testQuery(
         "SELECT ARRAY_CONCAT_AGG(ARRAY[l1, l2]), ARRAY_CONCAT_AGG(DISTINCT 
ARRAY[l1, l2]) FROM numfoo",
@@ -2028,6 +2037,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   public void testArrayAggMaxBytes()
   {
     cannotVectorize();
+    notMsqCompatible();
     testQuery(
         "SELECT ARRAY_AGG(l1, 128), ARRAY_AGG(DISTINCT l1, 128) FROM numfoo",
         ImmutableList.of(
@@ -2227,6 +2237,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testArrayAggGroupByArrayAggOfLongsFromSubquery()
   {
+    notMsqCompatible();
     requireMergeBuffers(3);
     cannotVectorize();
     testQuery(
@@ -2366,6 +2377,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testArrayAggGroupByArrayAggOfDoubleFromSubquery()
   {
+    notMsqCompatible();
     requireMergeBuffers(3);
     cannotVectorize();
     testQuery(
@@ -2883,6 +2895,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testUnnestThriceWithFiltersOnDimAndUnnestCol()
   {
+    notMsqCompatible();
     cannotVectorize();
     String sql = "    SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3 
FROM \n"
                  + "      ( SELECT * FROM \n"
@@ -2981,6 +2994,7 @@ public class CalciteArraysQueryTest extends 
BaseCalciteQueryTest
   @Test
   public void testUnnestThriceWithFiltersOnDimAndAllUnnestColumns()
   {
+    notMsqCompatible();
     cannotVectorize();
     String sql = "    SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3 
FROM \n"
                  + "      ( SELECT * FROM \n"
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
index 272fddbd8a..46fb40fdda 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
@@ -83,6 +83,7 @@ public class IngestionTestSqlEngine implements SqlEngine
       case TOPN_QUERY:
       case TIME_BOUNDARY_QUERY:
       case SCAN_NEEDS_SIGNATURE:
+      case UNNEST:
         return false;
       case CAN_INSERT:
       case CAN_REPLACE:
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
 
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
index 52e52ec7f8..48e7ee2423 100644
--- 
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
+++ 
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.server.security.ResourceType;
 import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
 import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
 import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
+import org.apache.druid.sql.calcite.run.SqlEngine;
 import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
 import org.apache.druid.sql.calcite.schema.DruidSchemaName;
 import org.apache.druid.sql.calcite.schema.NamedSchema;
@@ -89,6 +90,8 @@ public class CalcitePlannerModuleTest extends CalciteTestBase
   @Mock
   private DruidSchemaCatalog rootSchema;
 
+  @Mock
+  private SqlEngine engine;
   private Set<SqlAggregator> aggregators;
   private Set<SqlOperatorConversion> operatorConversions;
 
@@ -185,13 +188,16 @@ public class CalcitePlannerModuleTest extends 
CalciteTestBase
         CalciteTests.TEST_AUTHORIZER_MAPPER,
         AuthConfig.newBuilder().build()
     );
+
+
     PlannerContext context = PlannerContext.create(
         toolbox,
         "SELECT 1",
-        null,
+        engine,
         Collections.emptyMap(),
         null
     );
+
     boolean containsCustomRule = 
injector.getInstance(CalciteRulesManager.class)
                                          .druidConventionRuleSet(context)
                                          .contains(customRule);
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
index b2f93340db..c6f0569702 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
@@ -222,7 +222,7 @@ public class TestDataBuilder
       .withRollup(false)
       .build();
 
-  private static final IncrementalIndexSchema INDEX_SCHEMA_LOTS_O_COLUMNS = 
new IncrementalIndexSchema.Builder()
+  public static final IncrementalIndexSchema INDEX_SCHEMA_LOTS_O_COLUMNS = new 
IncrementalIndexSchema.Builder()
       .withMetrics(
           new CountAggregatorFactory("count")
       )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to