jayzhan211 commented on issue #13449:
URL: https://github.com/apache/datafusion/issues/13449#issuecomment-2480920752

   It seems `GroupedTopKAggregateStream` is not used for handling topK but 
SortExec with fetch 🤔
   
   ```
   query TT
   explain SELECT "UserID", extract(minute FROM 
to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits 
GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
   ----
   logical_plan
   01)Sort: count(*) DESC NULLS FIRST, fetch=10
   02)--Projection: hits.UserID, 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)) AS m, 
hits.SearchPhrase, count(*)
   03)----Aggregate: groupBy=[[hits.UserID, date_part(Utf8("MINUTE"), 
to_timestamp_seconds(hits.EventTime)), hits.SearchPhrase]], 
aggr=[[count(Int64(1)) AS count(*)]]
   04)------TableScan: hits projection=[EventTime, UserID, SearchPhrase]
   physical_plan
   01)SortPreservingMergeExec: [count(*)@3 DESC], fetch=10
   02)--SortExec: TopK(fetch=10), expr=[count(*)@3 DESC], 
preserve_partitioning=[true]
   03)----ProjectionExec: expr=[UserID@0 as UserID, 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as m, 
SearchPhrase@2 as SearchPhrase, count(*)@3 as count(*)]
   04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID, 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 
as SearchPhrase], aggr=[count(*)]
   05)--------CoalesceBatchesExec: target_batch_size=8192
   06)----------RepartitionExec: partitioning=Hash([UserID@0, 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1, 
SearchPhrase@2], 4), input_partitions=4
   07)------------AggregateExec: mode=Partial, gby=[UserID@1 as UserID, 
date_part(MINUTE, to_timestamp_seconds(EventTime@0)) as 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 
as SearchPhrase], aggr=[count(*)]
   08)--------------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, 
projection=[EventTime, UserID, SearchPhrase]
   
   ```
   
   ```
   query TT
   explain SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, 
COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, 
"ClientIP" - 3 ORDER BY c DESC LIMIT 10;
   ----
   logical_plan
   01)Sort: c DESC NULLS FIRST, fetch=10
   02)--Projection: hits.ClientIP, hits.ClientIP - Int64(1), hits.ClientIP - 
Int64(2), hits.ClientIP - Int64(3), count(*) AS c
   03)----Aggregate: groupBy=[[hits.ClientIP, __common_expr_1 AS hits.ClientIP 
- Int64(1), __common_expr_1 AS hits.ClientIP - Int64(2), __common_expr_1 AS 
hits.ClientIP - Int64(3)]], aggr=[[count(Int64(1)) AS count(*)]]
   04)------Projection: CAST(hits.ClientIP AS Int64) AS __common_expr_1, 
hits.ClientIP
   05)--------TableScan: hits projection=[ClientIP]
   physical_plan
   01)SortPreservingMergeExec: [c@4 DESC], fetch=10
   02)--SortExec: TopK(fetch=10), expr=[c@4 DESC], preserve_partitioning=[true]
   03)----ProjectionExec: expr=[ClientIP@0 as ClientIP, hits.ClientIP - 
Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as 
hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP - 
Int64(3), count(*)@4 as c]
   04)------AggregateExec: mode=FinalPartitioned, gby=[ClientIP@0 as ClientIP, 
hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - 
Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as 
hits.ClientIP - Int64(3)], aggr=[count(*)]
   05)--------CoalesceBatchesExec: target_batch_size=8192
   06)----------RepartitionExec: partitioning=Hash([ClientIP@0, hits.ClientIP - 
Int64(1)@1, hits.ClientIP - Int64(2)@2, hits.ClientIP - Int64(3)@3], 4), 
input_partitions=4
   07)------------AggregateExec: mode=Partial, gby=[ClientIP@1 as ClientIP, 
__common_expr_1@0 - 1 as hits.ClientIP - Int64(1), __common_expr_1@0 - 2 as 
hits.ClientIP - Int64(2), __common_expr_1@0 - 3 as hits.ClientIP - Int64(3)], 
aggr=[count(*)]
   08)--------------ProjectionExec: expr=[CAST(ClientIP@0 AS Int64) as 
__common_expr_1, ClientIP@0 as ClientIP]
   09)----------------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, 
projection=[ClientIP]
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to