This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8232c03667 [MSQ] Handle dimensionless group by queries with
partitioning, and multiple workers (#14678)
8232c03667 is described below
commit 8232c0366750238b8c35321abd6f09bfbff10df0
Author: Laksh Singla <[email protected]>
AuthorDate: Sat Jul 29 01:45:17 2023 +0000
[MSQ] Handle dimensionless group by queries with partitioning, and multiple
workers (#14678)
* fixup
* add ut
* review
---
.../apache/druid/msq/kernel/StageDefinition.java | 2 +-
.../msq/querykit/groupby/GroupByQueryKit.java | 6 ++--
.../org/apache/druid/msq/exec/MSQInsertTest.java | 34 ++++++++++++++++++++++
3 files changed, 39 insertions(+), 3 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 39843db2ba..c81f58691f 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -130,7 +130,7 @@ public class StageDefinition
this.shuffleCheckHasMultipleValues = shuffleCheckHasMultipleValues;
this.frameReader = Suppliers.memoize(() ->
FrameReader.create(signature))::get;
- if (mustGatherResultKeyStatistics() &&
shuffleSpec.clusterBy().getColumns().isEmpty()) {
+ if (mustGatherResultKeyStatistics() && shuffleSpec.clusterBy().isEmpty()) {
throw new IAE("Cannot shuffle with spec [%s] and nil clusterBy",
shuffleSpec);
}
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
index 3b4ea36eb0..4ea56f7388 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
@@ -120,12 +120,14 @@ public class GroupByQueryKit implements
QueryKit<GroupByQuery>
// (i.e. no GROUP BY clause)
// __time in such queries is generated using either an aggregator (e.g.
sum(metric) as __time) or using a
// post-aggregator (e.g. TIMESTAMP '2000-01-01' as __time)
- if (intermediateClusterBy.getColumns().isEmpty() &&
resultClusterBy.isEmpty()) {
+ if (intermediateClusterBy.isEmpty() && resultClusterBy.isEmpty()) {
// Ignore shuffleSpecFactory, since we know only a single partition will
come out, and we can save some effort.
shuffleSpecFactoryPreAggregation =
ShuffleSpecFactories.singlePartition();
shuffleSpecFactoryPostAggregation =
ShuffleSpecFactories.singlePartition();
} else if (doOrderBy) {
- shuffleSpecFactoryPreAggregation =
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
+ shuffleSpecFactoryPreAggregation = intermediateClusterBy.isEmpty()
+ ?
ShuffleSpecFactories.singlePartition()
+ :
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount);
shuffleSpecFactoryPostAggregation = doLimitOrOffset
?
ShuffleSpecFactories.singlePartition()
: resultShuffleSpecFactory;
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
index 0f4d28a555..009f595bf3 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java
@@ -437,6 +437,40 @@ public class MSQInsertTest extends MSQTestBase
}
+ @Test
+ public void testInsertOnFoo1WithTimeAggregatorAndMultipleWorkers()
+ {
+ Map<String, Object> localContext = new HashMap<>(context);
+ localContext.put(MultiStageQueryContext.CTX_TASK_ASSIGNMENT_STRATEGY,
WorkerAssignmentStrategy.MAX.name());
+ localContext.put(MultiStageQueryContext.CTX_MAX_NUM_TASKS, 4);
+
+ RowSignature rowSignature = RowSignature.builder()
+ .add("__time", ColumnType.LONG)
+ .build();
+
+ testIngestQuery().setSql(
+ "INSERT INTO foo1 "
+ + "SELECT MILLIS_TO_TIMESTAMP((SUM(CAST(\"m1\" AS
BIGINT)))) AS __time "
+ + "FROM foo "
+ + "PARTITIONED BY DAY"
+ )
+ .setExpectedDataSource("foo1")
+ .setExpectedRowSignature(rowSignature)
+ .setQueryContext(localContext)
+ .setExpectedSegment(
+ ImmutableSet.of(
+ SegmentId.of("foo1",
Intervals.of("1970-01-01/P1D"), "test", 0)
+ )
+ )
+ .setExpectedResultRows(
+ ImmutableList.of(
+ new Object[]{21L}
+ )
+ )
+ .verifyResults();
+ }
+
+
@Test
public void testInsertOnFoo1WithTimePostAggregator()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]