This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c184b5250f Unnest now works on MSQ (#14886)
c184b5250f is described below
commit c184b5250f1f8fca64ffe1d34dc1157fbe96c271
Author: Soumyava <[email protected]>
AuthorDate: Sun Sep 24 20:49:21 2023 -0700
Unnest now works on MSQ (#14886)
This entails:
Removing the enableUnnest flag and additional machinery
Updating the datasource plan and frame processors to support unnest
Adding support in MSQ for UnnestDataSource and FilteredDataSource
CalciteArrayTest now has a MSQ test component
Additional tests for Unnest on MSQ
---
.../druid/msq/querykit/BaseLeafFrameProcessor.java | 20 +-
.../apache/druid/msq/querykit/DataSourcePlan.java | 108 ++++++++++-
.../org/apache/druid/msq/sql/MSQTaskSqlEngine.java | 2 +-
.../org/apache/druid/msq/exec/MSQInsertTest.java | 89 +++++++++
.../org/apache/druid/msq/exec/MSQReplaceTest.java | 180 +++++++++++++++++-
.../org/apache/druid/msq/exec/MSQSelectTest.java | 203 +++++++++++++++++++++
.../msq/test/CalciteArraysSelectQueryMSQTest.java | 98 ++++++++++
.../druid/msq/test/CalciteMSQTestsHelper.java | 12 ++
.../druid/msq/test/MSQTestWorkerContext.java | 3 +-
.../druid/sql/calcite/planner/PlannerContext.java | 12 --
.../apache/druid/sql/calcite/rule/DruidRules.java | 1 +
.../druid/sql/calcite/view/ViewSqlEngine.java | 1 -
.../druid/sql/calcite/BaseCalciteQueryTest.java | 1 -
.../druid/sql/calcite/CalciteArraysQueryTest.java | 18 +-
.../druid/sql/calcite/IngestionTestSqlEngine.java | 1 +
.../calcite/planner/CalcitePlannerModuleTest.java | 8 +-
.../druid/sql/calcite/util/TestDataBuilder.java | 2 +-
17 files changed, 732 insertions(+), 27 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
index d5b31328b0..d0a12b3ae7 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java
@@ -36,8 +36,10 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.msq.input.ReadableInput;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
+import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
@@ -96,7 +98,17 @@ public abstract class BaseLeafFrameProcessor implements
FrameProcessor<Long>
final long memoryReservedForBroadcastJoin
)
{
- if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
+ // An UnnestDataSource or FilteredDataSource can have a join as a base
+ // In such a case a side channel is expected to be there
+ final DataSource baseDataSource;
+ if (dataSource instanceof UnnestDataSource) {
+ baseDataSource = ((UnnestDataSource) dataSource).getBase();
+ } else if (dataSource instanceof FilteredDataSource) {
+ baseDataSource = ((FilteredDataSource) dataSource).getBase();
+ } else {
+ baseDataSource = dataSource;
+ }
+ if (!(baseDataSource instanceof JoinDataSource) &&
!sideChannels.isEmpty()) {
throw new ISE("Did not expect side channels for dataSource [%s]",
dataSource);
}
@@ -106,8 +118,8 @@ public abstract class BaseLeafFrameProcessor implements
FrameProcessor<Long>
if (baseInput.hasChannel()) {
inputChannels.add(baseInput.getChannel());
}
-
- if (dataSource instanceof JoinDataSource) {
+
+ if (baseDataSource instanceof JoinDataSource) {
final Int2IntMap inputNumberToProcessorChannelMap = new
Int2IntOpenHashMap();
final List<FrameReader> channelReaders = new ArrayList<>();
@@ -196,7 +208,7 @@ public abstract class BaseLeafFrameProcessor implements
FrameProcessor<Long>
if (segmentMapFn != null) {
return true;
} else if (broadcastJoinHelper == null) {
- segmentMapFn = Function.identity();
+ segmentMapFn = query.getDataSource().createSegmentMapFunction(query,
cpuAccumulator);
return true;
} else {
final boolean retVal =
broadcastJoinHelper.buildBroadcastTablesIncrementally(readableInputs);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
index a826f1928e..d8481bf7a0 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java
@@ -44,12 +44,14 @@ import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.query.DataSource;
+import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.planning.PreJoinableClause;
@@ -135,8 +137,29 @@ public class DataSourcePlan
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forInline((InlineDataSource) dataSource, broadcast);
} else if (dataSource instanceof LookupDataSource) {
- checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forLookup((LookupDataSource) dataSource, broadcast);
+ } else if (dataSource instanceof FilteredDataSource) {
+ return forFilteredDataSource(
+ queryKit,
+ queryId,
+ queryContext,
+ (FilteredDataSource) dataSource,
+ querySegmentSpec,
+ maxWorkerCount,
+ minStageNumber,
+ broadcast
+ );
+ } else if (dataSource instanceof UnnestDataSource) {
+ return forUnnest(
+ queryKit,
+ queryId,
+ queryContext,
+ (UnnestDataSource) dataSource,
+ querySegmentSpec,
+ maxWorkerCount,
+ minStageNumber,
+ broadcast
+ );
} else if (dataSource instanceof QueryDataSource) {
checkQuerySegmentSpecIsEternity(dataSource, querySegmentSpec);
return forQuery(
@@ -353,6 +376,88 @@ public class DataSourcePlan
);
}
+ private static DataSourcePlan forFilteredDataSource(
+ final QueryKit queryKit,
+ final String queryId,
+ final QueryContext queryContext,
+ final FilteredDataSource dataSource,
+ final QuerySegmentSpec querySegmentSpec,
+ final int maxWorkerCount,
+ final int minStageNumber,
+ final boolean broadcast
+ )
+ {
+ final DataSourcePlan basePlan = forDataSource(
+ queryKit,
+ queryId,
+ queryContext,
+ dataSource.getBase(),
+ querySegmentSpec,
+ null,
+ maxWorkerCount,
+ minStageNumber,
+ broadcast
+ );
+
+ DataSource newDataSource = basePlan.getNewDataSource();
+
+ final List<InputSpec> inputSpecs = new
ArrayList<>(basePlan.getInputSpecs());
+ newDataSource = FilteredDataSource.create(newDataSource,
dataSource.getFilter());
+ return new DataSourcePlan(
+ newDataSource,
+ inputSpecs,
+ basePlan.getBroadcastInputs(),
+ basePlan.getSubQueryDefBuilder().orElse(null)
+ );
+
+ }
+
+ /**
+ * Build a plan for Unnest data source
+ */
+ private static DataSourcePlan forUnnest(
+ final QueryKit queryKit,
+ final String queryId,
+ final QueryContext queryContext,
+ final UnnestDataSource dataSource,
+ final QuerySegmentSpec querySegmentSpec,
+ final int maxWorkerCount,
+ final int minStageNumber,
+ final boolean broadcast
+ )
+ {
+ // Find the plan for base data source by recursing
+ final DataSourcePlan basePlan = forDataSource(
+ queryKit,
+ queryId,
+ queryContext,
+ dataSource.getBase(),
+ querySegmentSpec,
+ null,
+ maxWorkerCount,
+ minStageNumber,
+ broadcast
+ );
+ DataSource newDataSource = basePlan.getNewDataSource();
+
+ final List<InputSpec> inputSpecs = new
ArrayList<>(basePlan.getInputSpecs());
+
+ // Create the new data source using the data source from the base plan
+ newDataSource = UnnestDataSource.create(
+ newDataSource,
+ dataSource.getVirtualColumn(),
+ dataSource.getUnnestFilter()
+ );
+ // The base data source can be a join and might already have broadcast
inputs
+ // Need to set the broadcast inputs from the basePlan
+ return new DataSourcePlan(
+ newDataSource,
+ inputSpecs,
+ basePlan.getBroadcastInputs(),
+ basePlan.getSubQueryDefBuilder().orElse(null)
+ );
+ }
+
/**
* Build a plan for broadcast hash-join.
*/
@@ -379,7 +484,6 @@ public class DataSourcePlan
null, // Don't push query filters down through a join: this needs some
work to ensure pruning works properly.
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
-
broadcast
);
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
index ee7a590e1f..e6578388a4 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskSqlEngine.java
@@ -114,8 +114,8 @@ public class MSQTaskSqlEngine implements SqlEngine
case TIME_BOUNDARY_QUERY:
case GROUPING_SETS:
case WINDOW_FUNCTIONS:
- case UNNEST:
return false;
+ case UNNEST:
case CAN_SELECT:
case CAN_INSERT:
case CAN_REPLACE:
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 009f595bf3..81ba53f755 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -185,6 +185,95 @@ public class MSQInsertTest extends MSQTestBase
}
+ @Test
+ public void testInsertWithUnnestInline()
+ {
+ List<Object[]> expectedRows = ImmutableList.of(
+ new Object[]{1692226800000L, 1L},
+ new Object[]{1692226800000L, 2L},
+ new Object[]{1692226800000L, 3L}
+ );
+
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("d", ColumnType.LONG)
+ .build();
+
+
+ testIngestQuery().setSql(
+ "insert into foo1 select
TIME_PARSE('2023-08-16T23:00') as __time, d from UNNEST(ARRAY[1,2,3]) as
unnested(d) PARTITIONED BY ALL")
+ .setQueryContext(context)
+ .setExpectedResultRows(expectedRows)
+ .setExpectedDataSource("foo1")
+ .setExpectedRowSignature(rowSignature)
+ .verifyResults();
+
+ }
+
+ @Test
+ public void testInsertWithUnnest()
+ {
+ List<Object[]> expectedRows = ImmutableList.of(
+ new Object[]{946684800000L, "a"},
+ new Object[]{946684800000L, "b"},
+ new Object[]{946771200000L, "b"},
+ new Object[]{946771200000L, "c"},
+ new Object[]{946857600000L, "d"},
+ new Object[]{978307200000L, NullHandling.sqlCompatible() ? "" : null},
+ new Object[]{978393600000L, null},
+ new Object[]{978480000000L, null}
+ );
+
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .build();
+
+
+ testIngestQuery().setSql(
+ "insert into foo1 select __time, d from
foo,UNNEST(MV_TO_ARRAY(dim3)) as unnested(d) PARTITIONED BY ALL")
+ .setQueryContext(context)
+ .setExpectedResultRows(expectedRows)
+ .setExpectedDataSource("foo1")
+ .setExpectedRowSignature(rowSignature)
+ .verifyResults();
+
+ }
+
+ @Test
+ public void testInsertWithUnnestWithVirtualColumns()
+ {
+ List<Object[]> expectedRows = ImmutableList.of(
+ new Object[]{946684800000L, 1.0f},
+ new Object[]{946684800000L, 1.0f},
+ new Object[]{946771200000L, 2.0f},
+ new Object[]{946771200000L, 2.0f},
+ new Object[]{946857600000L, 3.0f},
+ new Object[]{946857600000L, 3.0f},
+ new Object[]{978307200000L, 4.0f},
+ new Object[]{978307200000L, 4.0f},
+ new Object[]{978393600000L, 5.0f},
+ new Object[]{978393600000L, 5.0f},
+ new Object[]{978480000000L, 6.0f},
+ new Object[]{978480000000L, 6.0f}
+ );
+
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("d", ColumnType.FLOAT)
+ .build();
+
+
+ testIngestQuery().setSql(
+ "insert into foo1 select __time, d from
foo,UNNEST(ARRAY[m1,m2]) as unnested(d) PARTITIONED BY ALL")
+ .setQueryContext(context)
+ .setExpectedResultRows(expectedRows)
+ .setExpectedDataSource("foo1")
+ .setExpectedRowSignature(rowSignature)
+ .verifyResults();
+
+ }
+
@Test
public void testInsertOnExternalDataSource() throws IOException
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
index 80d6719f12..0a43fdaea7 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java
@@ -739,6 +739,182 @@ public class MSQReplaceTest extends MSQTestBase
.verifyPlanningErrors();
}
+ @Test
+ public void testReplaceUnnestSegmentEntireTable()
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .build();
+
+ testIngestQuery().setSql(" REPLACE INTO foo "
+ + "OVERWRITE ALL "
+ + "SELECT __time, d "
+ + "FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as
unnested(d) "
+ + "PARTITIONED BY ALL TIME ")
+ .setExpectedDataSource("foo")
+ .setExpectedRowSignature(rowSignature)
+ .setQueryContext(context)
+ .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+ .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+ "foo",
+ Intervals.of("2000-01-01T/P1M"),
+ "test",
+ 0
+ )))
+ .setExpectedResultRows(
+ ImmutableList.of(
+ new Object[]{946684800000L, "a"},
+ new Object[]{946684800000L, "b"},
+ new Object[]{946771200000L, "b"},
+ new Object[]{946771200000L, "c"},
+ new Object[]{946857600000L, "d"},
+ new Object[]{978307200000L,
NullHandling.sqlCompatible() ? "" : null},
+ new Object[]{978393600000L, null},
+ new Object[]{978480000000L, null}
+ )
+ )
+ .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo",
Intervals.ETERNITY, "test", 0)))
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().totalFiles(1),
+ 0, 0, "input0"
+ )
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().rows(8).frames(1),
+ 0, 0, "shuffle"
+ )
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().rows(8).frames(1),
+ 1, 0, "input0"
+ )
+
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+ CounterSnapshotMatcher
+ .with().segmentRowsProcessed(8),
+ 1, 0
+ )
+ .verifyResults();
+ }
+
+ @Test
+ public void testReplaceUnnestWithVirtualColumnSegmentEntireTable()
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("d", ColumnType.FLOAT)
+ .build();
+
+ testIngestQuery().setSql(" REPLACE INTO foo "
+ + "OVERWRITE ALL "
+ + "SELECT __time, d "
+ + "FROM foo, UNNEST(ARRAY[m1, m2]) as unnested(d)
"
+ + "PARTITIONED BY ALL TIME ")
+ .setExpectedDataSource("foo")
+ .setExpectedRowSignature(rowSignature)
+ .setQueryContext(context)
+ .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+ .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+ "foo",
+ Intervals.of("2000-01-01T/P1M"),
+ "test",
+ 0
+ )))
+ .setExpectedResultRows(
+ ImmutableList.of(
+ new Object[]{946684800000L, 1.0f},
+ new Object[]{946684800000L, 1.0f},
+ new Object[]{946771200000L, 2.0f},
+ new Object[]{946771200000L, 2.0f},
+ new Object[]{946857600000L, 3.0f},
+ new Object[]{946857600000L, 3.0f},
+ new Object[]{978307200000L, 4.0f},
+ new Object[]{978307200000L, 4.0f},
+ new Object[]{978393600000L, 5.0f},
+ new Object[]{978393600000L, 5.0f},
+ new Object[]{978480000000L, 6.0f},
+ new Object[]{978480000000L, 6.0f}
+ )
+ )
+ .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo",
Intervals.ETERNITY, "test", 0)))
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().totalFiles(1),
+ 0, 0, "input0"
+ )
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().rows(12).frames(1),
+ 0, 0, "shuffle"
+ )
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().rows(12).frames(1),
+ 1, 0, "input0"
+ )
+
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+ CounterSnapshotMatcher
+ .with().segmentRowsProcessed(12),
+ 1, 0
+ )
+ .verifyResults();
+ }
+
+ @Test
+ public void testReplaceUnnestSegmentWithTimeFilter()
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .add("d", ColumnType.STRING)
+ .build();
+
+ testIngestQuery().setSql(" REPLACE INTO foo "
+ + "OVERWRITE WHERE __time >= TIMESTAMP
'1999-01-01 00:00:00' and __time < TIMESTAMP '2002-01-01 00:00:00'"
+ + "SELECT __time, d "
+ + "FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as
unnested(d) "
+ + "PARTITIONED BY DAY CLUSTERED BY d ")
+ .setExpectedDataSource("foo")
+ .setExpectedRowSignature(rowSignature)
+ .setQueryContext(context)
+
.setExpectedDestinationIntervals(ImmutableList.of(Intervals.of(
+ "1999-01-01T00:00:00.000Z/2002-01-01T00:00:00.000Z")))
+ .setExpectedShardSpec(DimensionRangeShardSpec.class)
+ .setExpectedResultRows(
+ ImmutableList.of(
+ new Object[]{946684800000L, "a"},
+ new Object[]{946684800000L, "b"},
+ new Object[]{946771200000L, "b"},
+ new Object[]{946771200000L, "c"},
+ new Object[]{946857600000L, "d"},
+ new Object[]{978307200000L,
NullHandling.sqlCompatible() ? "" : null},
+ new Object[]{978393600000L, null},
+ new Object[]{978480000000L, null}
+ )
+ )
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().totalFiles(1),
+ 0, 0, "input0"
+ )
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().rows(2, 2, 1, 1, 1, 1).frames(1, 1, 1, 1,
1, 1),
+ 0, 0, "shuffle"
+ )
+ .setExpectedCountersForStageWorkerChannel(
+ CounterSnapshotMatcher
+ .with().rows(2, 2, 1, 1, 1, 1).frames(1, 1, 1, 1,
1, 1),
+ 1, 0, "input0"
+ )
+
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+ CounterSnapshotMatcher
+ .with().segmentRowsProcessed(8),
+ 1, 0
+ )
+ .verifyResults();
+ }
+
@Test
public void testReplaceTombstonesOverPartiallyOverlappingSegments()
{
@@ -755,7 +931,9 @@ public class MSQReplaceTest extends MSQTestBase
.dataSource("foo1")
.build();
-
Mockito.doReturn(ImmutableSet.of(existingDataSegment)).when(testTaskActionClient).submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class));
+ Mockito.doReturn(ImmutableSet.of(existingDataSegment))
+ .when(testTaskActionClient)
+ .submit(ArgumentMatchers.isA(RetrieveUsedSegmentsAction.class));
List<Object[]> expectedResults;
if (NullHandling.sqlCompatible()) {
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
index ae0bfa71f1..6b6f8ff356 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java
@@ -51,6 +51,7 @@ import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.UnnestDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
@@ -136,6 +137,7 @@ public class MSQSelectTest extends MSQTestBase
{QUERY_RESULTS_WITH_DURABLE_STORAGE,
QUERY_RESULTS_WITH_DURABLE_STORAGE_CONTEXT},
{QUERY_RESULTS_WITH_DEFAULT, QUERY_RESULTS_WITH_DEFAULT_CONTEXT}
};
+
return Arrays.asList(data);
}
@@ -2119,6 +2121,207 @@ public class MSQSelectTest extends MSQTestBase
.verifyResults();
}
+ @Test
+ public void testSelectUnnestOnInlineFoo()
+ {
+ RowSignature resultSignature = RowSignature.builder()
+ .add("EXPR$0", ColumnType.LONG)
+ .build();
+ RowSignature outputSignature = RowSignature.builder()
+ .add("d", ColumnType.LONG)
+ .build();
+
+ final ColumnMappings expectedColumnMappings = new ColumnMappings(
+ ImmutableList.of(
+ new ColumnMapping("EXPR$0", "d")
+ )
+ );
+
+ testSelectQuery()
+ .setSql("select d from UNNEST(ARRAY[1,2,3]) as unnested(d)")
+ .setExpectedMSQSpec(
+ MSQSpec.builder()
+ .query(newScanQueryBuilder()
+ .dataSource(
+ InlineDataSource.fromIterable(
+ ImmutableList.of(
+ new Object[]{1L},
+ new Object[]{2L},
+ new Object[]{3L}
+ ),
+ resultSignature
+ )
+ )
+
.intervals(querySegmentSpec(Filtration.eternity()))
+ .columns("EXPR$0")
+ .context(defaultScanQueryContext(
+ context,
+ resultSignature
+ ))
+ .build())
+ .columnMappings(expectedColumnMappings)
+ .tuningConfig(MSQTuningConfig.defaultConfig())
+ .destination(isDurableStorageDestination()
+ ? DurableStorageMSQDestination.INSTANCE
+ : TaskReportMSQDestination.INSTANCE)
+ .build()
+ )
+ .setExpectedRowSignature(outputSignature)
+ .setQueryContext(context)
+ .setExpectedResultRows(ImmutableList.of(
+ new Object[]{1},
+ new Object[]{2},
+ new Object[]{3}
+ ))
+ .verifyResults();
+ }
+
+
+ @Test
+ public void testSelectUnnestOnFoo()
+ {
+ RowSignature resultSignature = RowSignature.builder()
+ .add("j0.unnest",
ColumnType.STRING)
+ .build();
+
+ RowSignature outputSignature = RowSignature.builder()
+ .add("d3", ColumnType.STRING)
+ .build();
+
+ final ColumnMappings expectedColumnMappings = new ColumnMappings(
+ ImmutableList.of(
+ new ColumnMapping("j0.unnest", "d3")
+ )
+ );
+
+ testSelectQuery()
+ .setSql("SELECT d3 FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as unnested
(d3)")
+ .setExpectedMSQSpec(
+ MSQSpec.builder()
+ .query(newScanQueryBuilder()
+ .dataSource(UnnestDataSource.create(
+ new
TableDataSource(CalciteTests.DATASOURCE1),
+ expressionVirtualColumn("j0.unnest",
"\"dim3\"", ColumnType.STRING),
+ null
+ ))
+
.intervals(querySegmentSpec(Filtration.eternity()))
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .legacy(false)
+ .context(defaultScanQueryContext(
+ context,
+ resultSignature
+ ))
+ .columns(ImmutableList.of("j0.unnest"))
+ .build())
+ .columnMappings(expectedColumnMappings)
+ .tuningConfig(MSQTuningConfig.defaultConfig())
+ .destination(isDurableStorageDestination()
+ ? DurableStorageMSQDestination.INSTANCE
+ : TaskReportMSQDestination.INSTANCE)
+ .build()
+ )
+ .setExpectedRowSignature(outputSignature)
+ .setQueryContext(context)
+ .setExpectedResultRows(
+ useDefault ? ImmutableList.of(
+ new Object[]{"a"},
+ new Object[]{"b"},
+ new Object[]{"b"},
+ new Object[]{"c"},
+ new Object[]{"d"},
+ new Object[]{""},
+ new Object[]{""},
+ new Object[]{""}
+ ) : ImmutableList.of(
+ new Object[]{"a"},
+ new Object[]{"b"},
+ new Object[]{"b"},
+ new Object[]{"c"},
+ new Object[]{"d"},
+ new Object[]{""},
+ new Object[]{null},
+ new Object[]{null}
+ ))
+ .verifyResults();
+ }
+
+ @Test
+ public void testSelectUnnestOnQueryFoo()
+ {
+ RowSignature resultSignature = RowSignature.builder()
+ .add("j0.unnest",
ColumnType.STRING)
+ .build();
+
+ RowSignature resultSignature1 = RowSignature.builder()
+ .add("dim3", ColumnType.STRING)
+ .build();
+
+ RowSignature outputSignature = RowSignature.builder()
+ .add("d3", ColumnType.STRING)
+ .build();
+
+ final ColumnMappings expectedColumnMappings = new ColumnMappings(
+ ImmutableList.of(
+ new ColumnMapping("j0.unnest", "d3")
+ )
+ );
+
+ testSelectQuery()
+ .setSql("SELECT d3 FROM (select * from druid.foo where dim2='a' LIMIT
10), UNNEST(MV_TO_ARRAY(dim3)) as unnested (d3)")
+ .setExpectedMSQSpec(
+ MSQSpec.builder()
+ .query(newScanQueryBuilder()
+ .dataSource(UnnestDataSource.create(
+ new QueryDataSource(
+ newScanQueryBuilder()
+ .dataSource(
+ new
TableDataSource(CalciteTests.DATASOURCE1)
+ )
+
.intervals(querySegmentSpec(Filtration.eternity()))
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .legacy(false)
+ .filters(equality("dim2", "a",
ColumnType.STRING))
+ .columns("dim3")
+ .context(defaultScanQueryContext(
+ context,
+ resultSignature1
+ ))
+ .limit(10)
+ .build()
+ ),
+ expressionVirtualColumn("j0.unnest",
"\"dim3\"", ColumnType.STRING),
+ null
+ ))
+
.intervals(querySegmentSpec(Filtration.eternity()))
+
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+ .legacy(false)
+ .context(defaultScanQueryContext(
+ context,
+ resultSignature
+ ))
+ .columns(ImmutableList.of("j0.unnest"))
+ .build())
+ .columnMappings(expectedColumnMappings)
+ .tuningConfig(MSQTuningConfig.defaultConfig())
+ .destination(isDurableStorageDestination()
+ ? DurableStorageMSQDestination.INSTANCE
+ : TaskReportMSQDestination.INSTANCE)
+ .build()
+ )
+ .setExpectedRowSignature(outputSignature)
+ .setQueryContext(context)
+ .setExpectedResultRows(
+ useDefault ? ImmutableList.of(
+ new Object[]{"a"},
+ new Object[]{"b"}
+ ) : ImmutableList.of(
+ new Object[]{"a"},
+ new Object[]{"b"},
+ new Object[]{""}
+ ))
+ .verifyResults();
+ }
+
@Nonnull
private List<Object[]> expectedMultiValueFooRowsGroup()
{
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java
new file mode 100644
index 0000000000..16aef6bdca
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteArraysSelectQueryMSQTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import org.apache.druid.guice.DruidInjectorBuilder;
+import org.apache.druid.msq.exec.WorkerMemoryParameters;
+import org.apache.druid.msq.sql.MSQTaskSqlEngine;
+import org.apache.druid.query.groupby.TestGroupByBuffers;
+import org.apache.druid.server.QueryLifecycleFactory;
+import org.apache.druid.sql.calcite.CalciteArraysQueryTest;
+import org.apache.druid.sql.calcite.QueryTestBuilder;
+import org.apache.druid.sql.calcite.run.SqlEngine;
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Runs {@link CalciteArraysQueryTest} but with MSQ engine
+ */
+public class CalciteArraysSelectQueryMSQTest extends CalciteArraysQueryTest
+{
+ private TestGroupByBuffers groupByBuffers;
+
+ @Before
+ public void setup2()
+ {
+ groupByBuffers = TestGroupByBuffers.createDefault();
+ }
+
+ @After
+ public void teardown2()
+ {
+ groupByBuffers.close();
+ }
+
+ @Override
+ public void configureGuice(DruidInjectorBuilder builder)
+ {
+ super.configureGuice(builder);
+ builder.addModules(CalciteMSQTestsHelper.fetchModules(temporaryFolder,
groupByBuffers).toArray(new Module[0]));
+ }
+
+
+ @Override
+ public SqlEngine createEngine(
+ QueryLifecycleFactory qlf,
+ ObjectMapper queryJsonMapper,
+ Injector injector
+ )
+ {
+ final WorkerMemoryParameters workerMemoryParameters =
+ WorkerMemoryParameters.createInstance(
+ WorkerMemoryParameters.PROCESSING_MINIMUM_BYTES * 50,
+ 2,
+ 10,
+ 2,
+ 0,
+ 0
+ );
+ final MSQTestOverlordServiceClient indexingServiceClient = new
MSQTestOverlordServiceClient(
+ queryJsonMapper,
+ injector,
+ new MSQTestTaskActionClient(queryJsonMapper),
+ workerMemoryParameters
+ );
+ return new MSQTaskSqlEngine(indexingServiceClient, queryJsonMapper);
+ }
+
+ @Override
+ protected QueryTestBuilder testBuilder()
+ {
+ return new QueryTestBuilder(new CalciteTestConfig(true))
+ .addCustomRunner(new ExtractResultsFactory(() ->
(MSQTestOverlordServiceClient) ((MSQTaskSqlEngine)
queryFramework().engine()).overlordClient()))
+ .skipVectorize(true)
+ .verifyNativeQueries(new VerifyMSQSupportedNativeQueriesPredicate())
+ .msqCompatible(msqCompatible);
+ }
+}
+
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index 0c840b14a8..d7c0ea1f2d 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -89,10 +89,13 @@ import java.util.function.Supplier;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2;
import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3;
+import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5;
+import static
org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS;
import static
org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1;
import static
org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS;
import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2;
+import static
org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS_LOTS_OF_COLUMNS;
/**
* Helper class aiding in wiring up the Guice bindings required for MSQ engine
to work with the Calcite's tests
@@ -246,6 +249,15 @@ public class CalciteMSQTestsHelper
.rows(ROWS1_WITH_NUMERIC_DIMS)
.buildMMappedIndex();
break;
+ case DATASOURCE5:
+ index = IndexBuilder
+ .create()
+ .tmpDir(new File(temporaryFolder.newFolder(), "5"))
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(INDEX_SCHEMA_LOTS_O_COLUMNS)
+ .rows(ROWS_LOTS_OF_COLUMNS)
+ .buildMMappedIndex();
+ break;
default:
throw new ISE("Cannot query segment %s in test runner", segmentId);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
index a6f98b3ba8..a478d1c3c1 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
@@ -119,7 +119,8 @@ public class MSQTestWorkerContext implements WorkerContext
IndexMergerV9 indexMerger = new IndexMergerV9(
mapper,
indexIO,
- OffHeapMemorySegmentWriteOutMediumFactory.instance()
+ OffHeapMemorySegmentWriteOutMediumFactory.instance(),
+ true
);
final TaskReportFileWriter reportFileWriter = new TaskReportFileWriter()
{
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
index d72b577ef3..9141c4db09 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerContext.java
@@ -82,11 +82,6 @@ public class PlannerContext
*/
public static final String CTX_ENABLE_WINDOW_FNS = "windowsAreForClosers";
- /**
- * Undocumented context key, used to enable {@link
org.apache.calcite.sql.fun.SqlStdOperatorTable#UNNEST}.
- */
- public static final String CTX_ENABLE_UNNEST = "enableUnnest";
-
public static final String CTX_SQL_USE_BOUNDS_AND_SELECTORS =
"sqlUseBoundAndSelectors";
public static final boolean DEFAULT_SQL_USE_BOUNDS_AND_SELECTORS =
NullHandling.replaceWithDefault();
@@ -527,13 +522,6 @@ public class PlannerContext
// Short-circuit: feature requires context flag.
return false;
}
-
- if (feature == EngineFeature.UNNEST &&
- !QueryContexts.getAsBoolean(CTX_ENABLE_UNNEST,
queryContext.get(CTX_ENABLE_UNNEST), false)) {
- // Short-circuit: feature requires context flag.
- return false;
- }
-
return engine.featureAvailable(feature, this);
}
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
index 526eb6a976..8ca4ab076d 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/rule/DruidRules.java
@@ -115,6 +115,7 @@ public class DruidRules
retVal.add(DruidOuterQueryRule.WINDOW);
}
+ // Adding unnest specific rules
if (plannerContext.featureAvailable(EngineFeature.UNNEST)) {
retVal.add(new DruidUnnestRule(plannerContext));
retVal.add(new DruidCorrelateUnnestRule(plannerContext));
diff --git
a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
index 47dae5c4d9..cd719d7f29 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/view/ViewSqlEngine.java
@@ -64,7 +64,6 @@ public class ViewSqlEngine implements SqlEngine
case WINDOW_FUNCTIONS:
case UNNEST:
return true;
-
// Views can't sit on top of INSERT or REPLACE.
case CAN_INSERT:
case CAN_REPLACE:
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index e72537c7da..84fd4217c7 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -203,7 +203,6 @@ public class BaseCalciteQueryTest extends CalciteTestBase
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
.put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
- .put(PlannerContext.CTX_ENABLE_UNNEST, true)
.build();
public static final Map<String, Object>
QUERY_CONTEXT_NO_STRINGIFY_ARRAY_USE_EQUALITY =
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
index df4e9b62cc..ae4437faf6 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteArraysQueryTest.java
@@ -60,7 +60,6 @@ import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.sql.calcite.filtration.Filtration;
-import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.junit.Assert;
import org.junit.Test;
@@ -78,7 +77,6 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
private static final Map<String, Object> QUERY_CONTEXT_UNNEST =
ImmutableMap.<String, Object>builder()
.putAll(QUERY_CONTEXT_DEFAULT)
- .put(PlannerContext.CTX_ENABLE_UNNEST, true)
.put(QueryContexts.CTX_SQL_STRINGIFY_ARRAYS, false)
.build();
@@ -87,6 +85,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testSelectConstantArrayExpressionFromTable()
{
+ notMsqCompatible();
testQuery(
"SELECT ARRAY[1,2] as arr, dim1 FROM foo LIMIT 1",
ImmutableList.of(
@@ -168,6 +167,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testSelectNonConstantArrayExpressionFromTableForMultival()
{
+ notMsqCompatible();
final String sql = "SELECT ARRAY[CONCAT(dim3, 'word'),'up'] as arr, dim1
FROM foo LIMIT 5";
final Query<?> scanQuery = newScanQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
@@ -207,6 +207,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
// Yes these outputs are strange sometimes, arrays are in a partial state
of existence so end up a bit
// stringy for now this is because virtual column selectors are coercing
values back to stringish so that
// multi-valued string dimensions can be grouped on.
+ notMsqCompatible();
List<Object[]> expectedResults;
if (useDefault) {
expectedResults = ImmutableList.of(
@@ -386,6 +387,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
// which will still always be stringified to ultimately adhere to the
varchar type
// as array support increases in the engine this will likely change since
using explict array functions should
// probably kick it into an array
+ notMsqCompatible();
List<Object[]> expectedResults;
if (useDefault) {
expectedResults = ImmutableList.of(
@@ -1017,6 +1019,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testArrayGroupAsLongArray()
{
+ notMsqCompatible();
// Cannot vectorize as we donot have support in native query subsytem for
grouping on arrays
cannotVectorize();
testQuery(
@@ -1068,6 +1071,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
{
// Cannot vectorize as we donot have support in native query subsytem for
grouping on arrays as keys
cannotVectorize();
+ notMsqCompatible();
testQuery(
"SELECT ARRAY[d1], SUM(cnt) FROM druid.numfoo GROUP BY 1 ORDER BY 2
DESC",
QUERY_CONTEXT_NO_STRINGIFY_ARRAY,
@@ -1115,6 +1119,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testArrayGroupAsFloatArray()
{
+ notMsqCompatible();
// Cannot vectorize as we donot have support in native query subsytem for
grouping on arrays as keys
cannotVectorize();
testQuery(
@@ -1605,6 +1610,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testArrayAggNumeric()
{
+ notMsqCompatible();
cannotVectorize();
testQuery(
"SELECT ARRAY_AGG(l1), ARRAY_AGG(DISTINCT l1), ARRAY_AGG(d1),
ARRAY_AGG(DISTINCT d1), ARRAY_AGG(f1), ARRAY_AGG(DISTINCT f1) FROM numfoo",
@@ -1741,6 +1747,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testArrayAggQuantile()
{
+ notMsqCompatible();
cannotVectorize();
testQuery(
"SELECT ARRAY_QUANTILE(ARRAY_AGG(l1), 0.9) FROM numfoo",
@@ -1784,6 +1791,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testArrayAggArrays()
{
+ notMsqCompatible();
cannotVectorize();
testQuery(
"SELECT ARRAY_AGG(ARRAY[l1, l2]), ARRAY_AGG(DISTINCT ARRAY[l1, l2])
FROM numfoo",
@@ -1880,6 +1888,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testArrayConcatAggArrays()
{
+ notMsqCompatible();
cannotVectorize();
testQuery(
"SELECT ARRAY_CONCAT_AGG(ARRAY[l1, l2]), ARRAY_CONCAT_AGG(DISTINCT
ARRAY[l1, l2]) FROM numfoo",
@@ -2028,6 +2037,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
public void testArrayAggMaxBytes()
{
cannotVectorize();
+ notMsqCompatible();
testQuery(
"SELECT ARRAY_AGG(l1, 128), ARRAY_AGG(DISTINCT l1, 128) FROM numfoo",
ImmutableList.of(
@@ -2227,6 +2237,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testArrayAggGroupByArrayAggOfLongsFromSubquery()
{
+ notMsqCompatible();
requireMergeBuffers(3);
cannotVectorize();
testQuery(
@@ -2366,6 +2377,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testArrayAggGroupByArrayAggOfDoubleFromSubquery()
{
+ notMsqCompatible();
requireMergeBuffers(3);
cannotVectorize();
testQuery(
@@ -2883,6 +2895,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testUnnestThriceWithFiltersOnDimAndUnnestCol()
{
+ notMsqCompatible();
cannotVectorize();
String sql = " SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3
FROM \n"
+ " ( SELECT * FROM \n"
@@ -2981,6 +2994,7 @@ public class CalciteArraysQueryTest extends
BaseCalciteQueryTest
@Test
public void testUnnestThriceWithFiltersOnDimAndAllUnnestColumns()
{
+ notMsqCompatible();
cannotVectorize();
String sql = " SELECT dimZipf, dim3_unnest1, dim3_unnest2, dim3_unnest3
FROM \n"
+ " ( SELECT * FROM \n"
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
index 272fddbd8a..46fb40fdda 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/IngestionTestSqlEngine.java
@@ -83,6 +83,7 @@ public class IngestionTestSqlEngine implements SqlEngine
case TOPN_QUERY:
case TIME_BOUNDARY_QUERY:
case SCAN_NEEDS_SIGNATURE:
+ case UNNEST:
return false;
case CAN_INSERT:
case CAN_REPLACE:
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
index 52e52ec7f8..48e7ee2423 100644
---
a/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
+++
b/sql/src/test/java/org/apache/druid/sql/calcite/planner/CalcitePlannerModuleTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
import org.apache.druid.sql.calcite.rule.ExtensionCalciteRuleProvider;
+import org.apache.druid.sql.calcite.run.SqlEngine;
import org.apache.druid.sql.calcite.schema.DruidSchemaCatalog;
import org.apache.druid.sql.calcite.schema.DruidSchemaName;
import org.apache.druid.sql.calcite.schema.NamedSchema;
@@ -89,6 +90,8 @@ public class CalcitePlannerModuleTest extends CalciteTestBase
@Mock
private DruidSchemaCatalog rootSchema;
+ @Mock
+ private SqlEngine engine;
private Set<SqlAggregator> aggregators;
private Set<SqlOperatorConversion> operatorConversions;
@@ -185,13 +188,16 @@ public class CalcitePlannerModuleTest extends
CalciteTestBase
CalciteTests.TEST_AUTHORIZER_MAPPER,
AuthConfig.newBuilder().build()
);
+
+
PlannerContext context = PlannerContext.create(
toolbox,
"SELECT 1",
- null,
+ engine,
Collections.emptyMap(),
null
);
+
boolean containsCustomRule =
injector.getInstance(CalciteRulesManager.class)
.druidConventionRuleSet(context)
.contains(customRule);
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
index b2f93340db..c6f0569702 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestDataBuilder.java
@@ -222,7 +222,7 @@ public class TestDataBuilder
.withRollup(false)
.build();
- private static final IncrementalIndexSchema INDEX_SCHEMA_LOTS_O_COLUMNS =
new IncrementalIndexSchema.Builder()
+ public static final IncrementalIndexSchema INDEX_SCHEMA_LOTS_O_COLUMNS = new
IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("count")
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]