cryptoe commented on code in PR #14886:
URL: https://github.com/apache/druid/pull/14886#discussion_r1332329765


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -96,7 +98,18 @@ private static Pair<List<ReadableFrameChannel>, 
BroadcastJoinHelper> makeInputCh
       final long memoryReservedForBroadcastJoin
   )
   {
-    if (!(dataSource instanceof JoinDataSource) && !sideChannels.isEmpty()) {
+    // An UnnestDataSource or FilteredDataSource can have a join as a base
+    // In such a case a side channel is expected to be there
+    final DataSource baseDataSource;
+    if (dataSource instanceof UnnestDataSource) {
+      baseDataSource = ((UnnestDataSource) dataSource).getBase();
+    } else if (dataSource instanceof FilteredDataSource) {
+      baseDataSource = ((FilteredDataSource) dataSource).getBase();
+    } else {
+      baseDataSource = dataSource;
+    }
+    if (!(dataSource instanceof JoinDataSource
+          || baseDataSource instanceof JoinDataSource) && 
!sideChannels.isEmpty()) {

Review Comment:
   Nit: Why do we need `dataSource instanceof JoinDataSource` now ?



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java:
##########
@@ -739,6 +739,128 @@ public void 
testReplaceWithClusteredByDescendingThrowsException()
                      .verifyPlanningErrors();
   }
 
+  @Test
+  public void testReplaceUnnestSegmentEntireTable()
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("d", ColumnType.STRING)
+                                            .build();
+
+    testIngestQuery().setSql(" REPLACE INTO foo "
+                             + "OVERWRITE ALL "
+                             + "SELECT __time, d "
+                             + "FROM foo, UNNEST(MV_TO_ARRAY(dim3)) as 
unnested(d) "
+                             + "PARTITIONED BY ALL TIME ")
+                     .setExpectedDataSource("foo")
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(context)
+                     .setExpectedDestinationIntervals(Intervals.ONLY_ETERNITY)
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of(
+                         "foo",
+                         Intervals.of("2000-01-01T/P1M"),
+                         "test",
+                         0
+                     )))
+                     .setExpectedResultRows(
+                         ImmutableList.of(
+                             new Object[]{946684800000L, "a"},
+                             new Object[]{946684800000L, "b"},
+                             new Object[]{946771200000L, "b"},
+                             new Object[]{946771200000L, "c"},
+                             new Object[]{946857600000L, "d"},
+                             new Object[]{978307200000L, 
NullHandling.sqlCompatible() ? "" : null},
+                             new Object[]{978393600000L, null},
+                             new Object[]{978480000000L, null}
+                         )
+                     )
+                     .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo", 
Intervals.ETERNITY, "test", 0)))
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().totalFiles(1),
+                         0, 0, "input0"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(8).frames(1),
+                         0, 0, "shuffle"
+                     )
+                     .setExpectedCountersForStageWorkerChannel(
+                         CounterSnapshotMatcher
+                             .with().rows(8).frames(1),
+                         1, 0, "input0"
+                     )
+                     
.setExpectedSegmentGenerationProgressCountersForStageWorker(
+                         CounterSnapshotMatcher
+                             .with().segmentRowsProcessed(8),
+                         1, 0
+                     )
+                     .verifyResults();
+  }
+
+  @Test
+  public void testReplaceUnnestWithVirtualColumnSegmentEntireTable()
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .add("d", ColumnType.FLOAT)
+                                            .build();
+
+    testIngestQuery().setSql(" REPLACE INTO foo "
+                             + "OVERWRITE ALL "
+                             + "SELECT __time, d "
+                             + "FROM foo, UNNEST(ARRAY[m1, m2]) as unnested(d) 
"
+                             + "PARTITIONED BY ALL TIME ")

Review Comment:
   I would add a test cases with the query shape like 
   `
   Replace into foo overwrite WHERE __time >= TIMESTAMP '2000-01-01' AND __time 
< TIMESTAMP '2002-01-01 select __time, d, bar from foo, UNNEST(ARRAY[m1, m2]) 
as unnested(d) partitioned by day clustered by d`



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessor.java:
##########
@@ -107,7 +120,8 @@ private static Pair<List<ReadableFrameChannel>, 
BroadcastJoinHelper> makeInputCh
       inputChannels.add(baseInput.getChannel());
     }
 
-    if (dataSource instanceof JoinDataSource) {
+
+    if (dataSource instanceof JoinDataSource || baseDataSource instanceof 
JoinDataSource) {

Review Comment:
   Same comment here. 



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java:
##########
@@ -347,6 +370,92 @@ private static DataSourcePlan forQuery(
     );
   }
 
+  private static DataSourcePlan forFilteredDataSource(
+      final QueryKit queryKit,
+      final String queryId,
+      final QueryContext queryContext,
+      final FilteredDataSource dataSource,
+      final QuerySegmentSpec querySegmentSpec,
+      final int maxWorkerCount,
+      final int minStageNumber,
+      final boolean broadcast
+  )
+  {
+    final QueryDefinitionBuilder subQueryDefBuilder = 
QueryDefinition.builder();
+    final DataSourcePlan basePlan = forDataSource(
+        queryKit,
+        queryId,
+        queryContext,
+        dataSource.getBase(),
+        querySegmentSpec,
+        null,
+        maxWorkerCount,
+        Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
+        broadcast
+    );
+
+    DataSource newDataSource = basePlan.getNewDataSource();
+    basePlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);
+
+    final List<InputSpec> inputSpecs = new 
ArrayList<>(basePlan.getInputSpecs());
+    newDataSource = FilteredDataSource.create(newDataSource, 
dataSource.getFilter());
+    return new DataSourcePlan(
+        newDataSource,
+        inputSpecs,
+        basePlan.getBroadcastInputs(),
+        subQueryDefBuilder
+    );
+
+  }
+
+  /**
+   * Build a plan for Unnest data source
+   */
+  private static DataSourcePlan forUnnest(
+      final QueryKit queryKit,
+      final String queryId,
+      final QueryContext queryContext,
+      final UnnestDataSource dataSource,
+      final QuerySegmentSpec querySegmentSpec,
+      final int maxWorkerCount,
+      final int minStageNumber,
+      final boolean broadcast
+  )
+  {
+    final QueryDefinitionBuilder subQueryDefBuilder = 
QueryDefinition.builder();
+    // Find the plan for base data source by recursing
+    final DataSourcePlan basePlan = forDataSource(
+        queryKit,
+        queryId,
+        queryContext,
+        dataSource.getBase(),
+        querySegmentSpec,
+        null,
+        maxWorkerCount,
+        Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),

Review Comment:
   Wont this line add a new stage ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to