This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8232c03667 [MSQ] Handle dimensionless group by queries with 
partitioning, and multiple workers (#14678)
8232c03667 is described below

commit 8232c0366750238b8c35321abd6f09bfbff10df0
Author: Laksh Singla <[email protected]>
AuthorDate: Sat Jul 29 01:45:17 2023 +0000

    [MSQ] Handle dimensionless group by queries with partitioning, and multiple 
workers (#14678)
    
    * fixup
    
    * add ut
    
    * review
---
 .../apache/druid/msq/kernel/StageDefinition.java   |  2 +-
 .../msq/querykit/groupby/GroupByQueryKit.java      |  6 ++--
 .../org/apache/druid/msq/exec/MSQInsertTest.java   | 34 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 3 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 39843db2ba..c81f58691f 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -130,7 +130,7 @@ public class StageDefinition
     this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues;
     this.frameReader = Suppliers.memoize(() -> 
FrameReader.create(signature))::get;
 
-    if (mustGatherResultKeyStatistics() && 
shuffleSpec.clusterBy().getColumns().isEmpty()) {
+    if (mustGatherResultKeyStatistics() && shuffleSpec.clusterBy().isEmpty()) {
       throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy", 
shuffleSpec);
     }
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index 3b4ea36eb0..4ea56f7388 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -120,12 +120,14 @@ public class GroupByQueryKit implements 
QueryKit<GroupByQuery>
     // (i.e. no GROUP BY clause)
     // __time in such queries is generated using either an aggregator (e.g. 
sum(metric) as __time) or using a
     // post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time)
-    if (intermediateClusterBy.getColumns().isEmpty() && 
resultClusterBy.isEmpty()) {
+    if (intermediateClusterBy.isEmpty() && resultClusterBy.isEmpty()) {
       // Ignore shuffleSpecFactory, since we know only a single partition will 
come out, and we can save some effort.
       shuffleSpecFactoryPreAggregation = 
ShuffleSpecFactories.singlePartition();
       shuffleSpecFactoryPostAggregation = 
ShuffleSpecFactories.singlePartition();
     } else if (doOrderBy) {
-      shuffleSpecFactoryPreAggregation = 
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
+      shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty()
+                                         ? 
ShuffleSpecFactories.singlePartition()
+                                         : 
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
       shuffleSpecFactoryPostAggregation = doLimitOrOffset
                                           ? 
ShuffleSpecFactories.singlePartition()
                                           : resultShuffleSpecFactory;
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 0f4d28a555..009f595bf3 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -437,6 +437,40 @@ public class MSQInsertTest extends MSQTestBase
 
   }
 
+  @Test
+  public void testInsertOnFoo1WithTimeAggregatorAndMultipleWorkers()
+  {
+    Map<String, Object> localContext = new HashMap<>(context);
+    localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY, 
WorkerAssignmentStrategy.MAX.name());
+    localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4);
+
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("__time", ColumnType.LONG)
+                                            .build();
+
+    testIngestQuery().setSql(
+                         "INSERT INTO foo1 "
+                         + "SELECT MILLIS_TO_TIMESTAMP((SUM(CAST(\"m1\" AS 
BIGINT)))) AS __time "
+                         + "FROM foo "
+                         + "PARTITIONED BY DAY"
+                     )
+                     .setExpectedDataSource("foo1")
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(localContext)
+                     .setExpectedSegment(
+                         ImmutableSet.of(
+                             SegmentId.of("foo1", 
Intervals.of("1970-01-01/P1D"), "test", 0)
+                         )
+                     )
+                     .setExpectedResultRows(
+                         ImmutableList.of(
+                             new Object[]{21L}
+                         )
+                     )
+                     .verifyResults();
+  }
+
+
   @Test
   public void testInsertOnFoo1WithTimePostAggregator()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to