waruto210 commented on issue #13449:
URL: https://github.com/apache/datafusion/issues/13449#issuecomment-2534052648

   > It seems `GroupedTopKAggregateStream` is not used for handling topK but 
SortExec with fetch 🤔
   > 
   > ```
   > query TT
   > explain SELECT "UserID", extract(minute FROM 
to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits 
GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
   > ----
   > logical_plan
   > 01)Sort: count(*) DESC NULLS FIRST, fetch=10
   > 02)--Projection: hits.UserID, 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)) AS m, 
hits.SearchPhrase, count(*)
   > 03)----Aggregate: groupBy=[[hits.UserID, date_part(Utf8("MINUTE"), 
to_timestamp_seconds(hits.EventTime)), hits.SearchPhrase]], 
aggr=[[count(Int64(1)) AS count(*)]]
   > 04)------TableScan: hits projection=[EventTime, UserID, SearchPhrase]
   > physical_plan
   > 01)SortPreservingMergeExec: [count(*)@3 DESC], fetch=10
   > 02)--SortExec: TopK(fetch=10), expr=[count(*)@3 DESC], 
preserve_partitioning=[true]
   > 03)----ProjectionExec: expr=[UserID@0 as UserID, 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as m, 
SearchPhrase@2 as SearchPhrase, count(*)@3 as count(*)]
   > 04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID, 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 
as SearchPhrase], aggr=[count(*)]
   > 05)--------CoalesceBatchesExec: target_batch_size=8192
   > 06)----------RepartitionExec: partitioning=Hash([UserID@0, 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1, 
SearchPhrase@2], 4), input_partitions=4
   > 07)------------AggregateExec: mode=Partial, gby=[UserID@1 as UserID, 
date_part(MINUTE, to_timestamp_seconds(EventTime@0)) as 
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2 
as SearchPhrase], aggr=[count(*)]
   > 08)--------------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, 
projection=[EventTime, UserID, SearchPhrase]
   > ```
   > 
   > ```
   > query TT
   > explain SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3, 
COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2, 
"ClientIP" - 3 ORDER BY c DESC LIMIT 10;
   > ----
   > logical_plan
   > 01)Sort: c DESC NULLS FIRST, fetch=10
   > 02)--Projection: hits.ClientIP, hits.ClientIP - Int64(1), hits.ClientIP - 
Int64(2), hits.ClientIP - Int64(3), count(*) AS c
   > 03)----Aggregate: groupBy=[[hits.ClientIP, __common_expr_1 AS 
hits.ClientIP - Int64(1), __common_expr_1 AS hits.ClientIP - Int64(2), 
__common_expr_1 AS hits.ClientIP - Int64(3)]], aggr=[[count(Int64(1)) AS 
count(*)]]
   > 04)------Projection: CAST(hits.ClientIP AS Int64) AS __common_expr_1, 
hits.ClientIP
   > 05)--------TableScan: hits projection=[ClientIP]
   > physical_plan
   > 01)SortPreservingMergeExec: [c@4 DESC], fetch=10
   > 02)--SortExec: TopK(fetch=10), expr=[c@4 DESC], 
preserve_partitioning=[true]
   > 03)----ProjectionExec: expr=[ClientIP@0 as ClientIP, hits.ClientIP - 
Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as 
hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP - 
Int64(3), count(*)@4 as c]
   > 04)------AggregateExec: mode=FinalPartitioned, gby=[ClientIP@0 as 
ClientIP, hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP 
- Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as 
hits.ClientIP - Int64(3)], aggr=[count(*)]
   > 05)--------CoalesceBatchesExec: target_batch_size=8192
   > 06)----------RepartitionExec: partitioning=Hash([ClientIP@0, hits.ClientIP 
- Int64(1)@1, hits.ClientIP - Int64(2)@2, hits.ClientIP - Int64(3)@3], 4), 
input_partitions=4
   > 07)------------AggregateExec: mode=Partial, gby=[ClientIP@1 as ClientIP, 
__common_expr_1@0 - 1 as hits.ClientIP - Int64(1), __common_expr_1@0 - 2 as 
hits.ClientIP - Int64(2), __common_expr_1@0 - 3 as hits.ClientIP - Int64(3)], 
aggr=[count(*)]
   > 08)--------------ProjectionExec: expr=[CAST(ClientIP@0 AS Int64) as 
__common_expr_1, ClientIP@0 as ClientIP]
   > 09)----------------ParquetExec: file_groups={4 groups: 
[[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336], 
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]}, 
projection=[ClientIP]
   > ```
   
   `count(*)` needs to traverse all data, top k aggregation is not applicable 
here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to