adarshsanjeev commented on code in PR #16804:
URL: https://github.com/apache/druid/pull/16804#discussion_r1696274182


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java:
##########
@@ -132,17 +132,22 @@ public QueryDefinition makeQueryDefinition(
     // Update partition by of next window
     final RowSignature signatureSoFar = signatureBuilder.build();
     boolean addShuffle = true;
-    if 
(originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL))
 {
-      final ClusterBy windowClusterBy = (ClusterBy) originalQuery.getContext()
-                                                                 
.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL);
-      for (KeyColumn c : windowClusterBy.getColumns()) {
-        if (!signatureSoFar.contains(c.columnName())) {
-          addShuffle = false;
-          break;
+    boolean windowHasEmptyOver = false;
+    if 
(originalQuery.getContext().containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_SPEC))
 {
+      if 
(originalQuery.getContext().get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_SPEC)
 instanceof MixShuffleSpec) {

Review Comment:
   I am trying to understand the code here. In windowOperatorQK, we check if 
the cluster by columns are empty, and if so, we set the shuffling to Mix.
   
   Do we need this check? If this was not present, wouldn't we add nothing to 
the cluster by, and if there are no cluster by columns, we would use a mix 
shuffle spec?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -309,12 +308,16 @@ private ShuffleSpec 
findShuffleSpecForNextWindow(List<OperatorFactory> operatorF
       }
     }
 
-    if (partition == null || partition.getPartitionColumns().isEmpty()) {
+    if (partition == null) {

Review Comment:
   Do we have any tests which cover `partition == null`? Would the same bug not 
be present here?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java:
##########
@@ -178,6 +183,11 @@ public QueryDefinition makeQueryDefinition(
       }
     }
 
+    // If window has an empty over, we want a single worker to process entire 
data for window function evaluation.
+    if (windowHasEmptyOver) {
+      scanShuffleSpec = MixShuffleSpec.instance();

Review Comment:
   We also do not respect the resultShuffleSpecFactory argument. This means 
that if we are adding a segment generator stage etc, we might not have 
correctly sized segments. Could we also add a test with an ingestion and see if 
the segments being generated are respecting any rowsPerSegment parameters?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryKit.java:
##########
@@ -178,6 +183,11 @@ public QueryDefinition makeQueryDefinition(
       }
     }
 
+    // If window has an empty over, we want a single worker to process entire 
data for window function evaluation.
+    if (windowHasEmptyOver) {
+      scanShuffleSpec = MixShuffleSpec.instance();

Review Comment:
   Setting the shuffle spec like this ignores the cluster by columns that were 
added previous to the window functions piece of code. This means that we do not 
have the sorting according to those columns, is this correct?



##########
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQWindowTest.java:
##########
@@ -2048,4 +2053,143 @@ public void testReplaceGroupByOnWikipedia(String 
contextName, Map<String, Object
                      .setExpectedSegment(ImmutableSet.of(SegmentId.of("foo1", 
Intervals.ETERNITY, "test", 0)))
                      .verifyResults();
   }
+
+  @MethodSource("data")
+  @ParameterizedTest(name = "{index}:with context {0}")
+  public void testWindowOnMixOfEmptyAndNonEmptyOverWithMultipleWorkers(String 
contextName, Map<String, Object> context)
+  {
+    final Map<String, Object> multipleWorkerContext = new HashMap<>(context);
+    multipleWorkerContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 5);
+
+    final RowSignature rowSignature = RowSignature.builder()
+                                            .add("countryName", 
ColumnType.STRING)
+                                            .add("cityName", ColumnType.STRING)
+                                            .add("channel", ColumnType.STRING)
+                                            .add("c1", ColumnType.LONG)
+                                            .add("c2", ColumnType.LONG)
+                                            .build();
+
+    final Map<String, Object> contextWithRowSignature =
+        ImmutableMap.<String, Object>builder()
+                    .putAll(multipleWorkerContext)
+                    .put(
+                        DruidQuery.CTX_SCAN_SIGNATURE,
+                        
"[{\"name\":\"d0\",\"type\":\"STRING\"},{\"name\":\"d1\",\"type\":\"STRING\"},{\"name\":\"d2\",\"type\":\"STRING\"},{\"name\":\"w0\",\"type\":\"LONG\"},{\"name\":\"w1\",\"type\":\"LONG\"}]"
+                    )
+                    .build();
+
+    final GroupByQuery groupByQuery = GroupByQuery.builder()
+                                           
.setDataSource(CalciteTests.WIKIPEDIA)
+                                           
.setInterval(querySegmentSpec(Filtration
+                                                                             
.eternity()))
+                                           .setGranularity(Granularities.ALL)
+                                           .setDimensions(dimensions(
+                                               new DefaultDimensionSpec(
+                                                   "countryName",
+                                                   "d0",
+                                                   ColumnType.STRING
+                                               ),
+                                               new DefaultDimensionSpec(
+                                                   "cityName",
+                                                   "d1",
+                                                   ColumnType.STRING
+                                               ),
+                                               new DefaultDimensionSpec(
+                                                   "channel",
+                                                   "d2",
+                                                   ColumnType.STRING
+                                               )
+                                           ))
+                                           .setDimFilter(in("countryName", 
ImmutableList.of("Austria", "Republic of Korea")))
+                                           .setContext(multipleWorkerContext)
+                                           .build();
+
+    final AggregatorFactory[] aggs = {
+        new FilteredAggregatorFactory(new CountAggregatorFactory("w1"), 
notNull("d2"), "w1")
+    };
+
+    final WindowOperatorQuery windowQuery = new WindowOperatorQuery(
+        new QueryDataSource(groupByQuery),
+        new LegacySegmentSpec(Intervals.ETERNITY),
+        multipleWorkerContext,
+        RowSignature.builder()
+                    .add("d0", ColumnType.STRING)
+                    .add("d1", ColumnType.STRING)
+                    .add("d2", ColumnType.STRING)
+                    .add("w0", ColumnType.LONG)
+                    .add("w1", ColumnType.LONG).build(),
+        ImmutableList.of(
+            new 
NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d0"), 
ColumnWithDirection.ascending("d1"), ColumnWithDirection.ascending("d2"))),
+            new NaivePartitioningOperatorFactory(Collections.emptyList()),
+            new WindowOperatorFactory(new WindowRowNumberProcessor("w0")),
+            new 
NaiveSortOperatorFactory(ImmutableList.of(ColumnWithDirection.ascending("d1"), 
ColumnWithDirection.ascending("d0"), ColumnWithDirection.ascending("d2"))),
+            new 
NaivePartitioningOperatorFactory(Collections.singletonList("d1")),
+            new WindowOperatorFactory(new 
WindowFramedAggregateProcessor(WindowFrame.forOrderBy("d0", "d1", "d2"), aggs))
+        ),
+        ImmutableList.of()
+    );
+
+    final ScanQuery scanQuery = Druids.newScanQueryBuilder()
+                                  .dataSource(new QueryDataSource(windowQuery))
+                                  
.intervals(querySegmentSpec(Filtration.eternity()))
+                                  .columns("d0", "d1", "d2", "w0", "w1")
+                                  .orderBy(
+                                      ImmutableList.of(
+                                          new ScanQuery.OrderBy("d0", 
ScanQuery.Order.ASCENDING),
+                                          new ScanQuery.OrderBy("d1", 
ScanQuery.Order.ASCENDING),
+                                          new ScanQuery.OrderBy("d2", 
ScanQuery.Order.ASCENDING)
+                                      )
+                                  )
+                                  .columnTypes(ColumnType.STRING, 
ColumnType.STRING, ColumnType.STRING, ColumnType.LONG, ColumnType.LONG)
+                                  .limit(Long.MAX_VALUE)
+                                  
.resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
+                                  .context(contextWithRowSignature)
+                                  .build();
+
+    final String sql = "select countryName, cityName, channel, \n"
+                          + "row_number() over (order by countryName, 
cityName, channel) as c1, \n"
+                          + "count(channel) over (partition by cityName order 
by countryName, cityName, channel) as c2\n"
+                          + "from wikipedia\n"
+                          + "where countryName in ('Austria', 'Republic of 
Korea')\n"
+                          + "group by countryName, cityName, channel "
+                          + "order by countryName, cityName, channel";
+
+    final String nullValue = NullHandling.sqlCompatible() ? null : "";
+
+    testSelectQuery()
+        .setSql(sql)
+        .setExpectedMSQSpec(MSQSpec.builder()
+                                   .query(scanQuery)
+                                   .columnMappings(
+                                       new ColumnMappings(ImmutableList.of(
+                                           new ColumnMapping("d0", 
"countryName"),
+                                           new ColumnMapping("d1", "cityName"),
+                                           new ColumnMapping("d2", "channel"),
+                                           new ColumnMapping("w0", "c1"),
+                                           new ColumnMapping("w1", "c2")
+                                       )
+                                       ))
+                                   
.tuningConfig(MSQTuningConfig.defaultConfig())
+                                   .build())
+        .setExpectedRowSignature(rowSignature)
+        .setExpectedResultRows(
+            ImmutableList.<Object[]>of(
+                new Object[]{"Austria", nullValue, "#de.wikipedia", 1L, 1L},
+                new Object[]{"Austria", "Horsching", "#de.wikipedia", 2L, 1L},
+                new Object[]{"Austria", "Vienna", "#de.wikipedia", 3L, 1L},
+                new Object[]{"Austria", "Vienna", "#es.wikipedia", 4L, 2L},
+                new Object[]{"Austria", "Vienna", "#tr.wikipedia", 5L, 3L},
+                new Object[]{"Republic of Korea", nullValue, "#en.wikipedia", 
6L, 2L},
+                new Object[]{"Republic of Korea", nullValue, "#ja.wikipedia", 
7L, 3L},
+                new Object[]{"Republic of Korea", nullValue, "#ko.wikipedia", 
8L, 4L},
+                new Object[]{"Republic of Korea", "Jeonju", "#ko.wikipedia", 
9L, 1L},
+                new Object[]{"Republic of Korea", "Seongnam-si", 
"#ko.wikipedia", 10L, 1L},
+                new Object[]{"Republic of Korea", "Seoul", "#ko.wikipedia", 
11L, 1L},
+                new Object[]{"Republic of Korea", "Suwon-si", "#ko.wikipedia", 
12L, 1L},
+                new Object[]{"Republic of Korea", "Yongsan-dong", 
"#ko.wikipedia", 13L, 1L}
+            )
+        )
+        .setQueryContext(multipleWorkerContext)

Review Comment:
   There should also be an assert that we are using mix shuffle spec here. We 
can do this be using the shuffle counters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to