jayzhan211 commented on issue #13449:
URL: https://github.com/apache/datafusion/issues/13449#issuecomment-2480920752
It seems `GroupedTopKAggregateStream` is not used for handling topK but
SortExec with fetch 🤔
```
query TT
explain SELECT "UserID", extract(minute FROM
to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits
GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
----
logical_plan
01)Sort: count(*) DESC NULLS FIRST, fetch=10
02)--Projection: hits.UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)) AS m,
hits.SearchPhrase, count(*)
03)----Aggregate: groupBy=[[hits.UserID, date_part(Utf8("MINUTE"),
to_timestamp_seconds(hits.EventTime)), hits.SearchPhrase]],
aggr=[[count(Int64(1)) AS count(*)]]
04)------TableScan: hits projection=[EventTime, UserID, SearchPhrase]
physical_plan
01)SortPreservingMergeExec: [count(*)@3 DESC], fetch=10
02)--SortExec: TopK(fetch=10), expr=[count(*)@3 DESC],
preserve_partitioning=[true]
03)----ProjectionExec: expr=[UserID@0 as UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as m,
SearchPhrase@2 as SearchPhrase, count(*)@3 as count(*)]
04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2
as SearchPhrase], aggr=[count(*)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([UserID@0,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1,
SearchPhrase@2], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[UserID@1 as UserID,
date_part(MINUTE, to_timestamp_seconds(EventTime@0)) as
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2
as SearchPhrase], aggr=[count(*)]
08)--------------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]},
projection=[EventTime, UserID, SearchPhrase]
```
```
query TT
explain SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3,
COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2,
"ClientIP" - 3 ORDER BY c DESC LIMIT 10;
----
logical_plan
01)Sort: c DESC NULLS FIRST, fetch=10
02)--Projection: hits.ClientIP, hits.ClientIP - Int64(1), hits.ClientIP -
Int64(2), hits.ClientIP - Int64(3), count(*) AS c
03)----Aggregate: groupBy=[[hits.ClientIP, __common_expr_1 AS hits.ClientIP
- Int64(1), __common_expr_1 AS hits.ClientIP - Int64(2), __common_expr_1 AS
hits.ClientIP - Int64(3)]], aggr=[[count(Int64(1)) AS count(*)]]
04)------Projection: CAST(hits.ClientIP AS Int64) AS __common_expr_1,
hits.ClientIP
05)--------TableScan: hits projection=[ClientIP]
physical_plan
01)SortPreservingMergeExec: [c@4 DESC], fetch=10
02)--SortExec: TopK(fetch=10), expr=[c@4 DESC], preserve_partitioning=[true]
03)----ProjectionExec: expr=[ClientIP@0 as ClientIP, hits.ClientIP -
Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as
hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP -
Int64(3), count(*)@4 as c]
04)------AggregateExec: mode=FinalPartitioned, gby=[ClientIP@0 as ClientIP,
hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP -
Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as
hits.ClientIP - Int64(3)], aggr=[count(*)]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([ClientIP@0, hits.ClientIP -
Int64(1)@1, hits.ClientIP - Int64(2)@2, hits.ClientIP - Int64(3)@3], 4),
input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[ClientIP@1 as ClientIP,
__common_expr_1@0 - 1 as hits.ClientIP - Int64(1), __common_expr_1@0 - 2 as
hits.ClientIP - Int64(2), __common_expr_1@0 - 3 as hits.ClientIP - Int64(3)],
aggr=[count(*)]
08)--------------ProjectionExec: expr=[CAST(ClientIP@0 AS Int64) as
__common_expr_1, ClientIP@0 as ClientIP]
09)----------------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]},
projection=[ClientIP]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]