waruto210 commented on issue #13449:
URL: https://github.com/apache/datafusion/issues/13449#issuecomment-2534052648
> It seems `GroupedTopKAggregateStream` is not used for handling topK but
SortExec with fetch 🤔
>
> ```
> query TT
> explain SELECT "UserID", extract(minute FROM
to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits
GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
> ----
> logical_plan
> 01)Sort: count(*) DESC NULLS FIRST, fetch=10
> 02)--Projection: hits.UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)) AS m,
hits.SearchPhrase, count(*)
> 03)----Aggregate: groupBy=[[hits.UserID, date_part(Utf8("MINUTE"),
to_timestamp_seconds(hits.EventTime)), hits.SearchPhrase]],
aggr=[[count(Int64(1)) AS count(*)]]
> 04)------TableScan: hits projection=[EventTime, UserID, SearchPhrase]
> physical_plan
> 01)SortPreservingMergeExec: [count(*)@3 DESC], fetch=10
> 02)--SortExec: TopK(fetch=10), expr=[count(*)@3 DESC],
preserve_partitioning=[true]
> 03)----ProjectionExec: expr=[UserID@0 as UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as m,
SearchPhrase@2 as SearchPhrase, count(*)@3 as count(*)]
> 04)------AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1 as
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2
as SearchPhrase], aggr=[count(*)]
> 05)--------CoalesceBatchesExec: target_batch_size=8192
> 06)----------RepartitionExec: partitioning=Hash([UserID@0,
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime))@1,
SearchPhrase@2], 4), input_partitions=4
> 07)------------AggregateExec: mode=Partial, gby=[UserID@1 as UserID,
date_part(MINUTE, to_timestamp_seconds(EventTime@0)) as
date_part(Utf8("MINUTE"),to_timestamp_seconds(hits.EventTime)), SearchPhrase@2
as SearchPhrase], aggr=[count(*)]
> 08)--------------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]},
projection=[EventTime, UserID, SearchPhrase]
> ```
>
> ```
> query TT
> explain SELECT "ClientIP", "ClientIP" - 1, "ClientIP" - 2, "ClientIP" - 3,
COUNT(*) AS c FROM hits GROUP BY "ClientIP", "ClientIP" - 1, "ClientIP" - 2,
"ClientIP" - 3 ORDER BY c DESC LIMIT 10;
> ----
> logical_plan
> 01)Sort: c DESC NULLS FIRST, fetch=10
> 02)--Projection: hits.ClientIP, hits.ClientIP - Int64(1), hits.ClientIP -
Int64(2), hits.ClientIP - Int64(3), count(*) AS c
> 03)----Aggregate: groupBy=[[hits.ClientIP, __common_expr_1 AS
hits.ClientIP - Int64(1), __common_expr_1 AS hits.ClientIP - Int64(2),
__common_expr_1 AS hits.ClientIP - Int64(3)]], aggr=[[count(Int64(1)) AS
count(*)]]
> 04)------Projection: CAST(hits.ClientIP AS Int64) AS __common_expr_1,
hits.ClientIP
> 05)--------TableScan: hits projection=[ClientIP]
> physical_plan
> 01)SortPreservingMergeExec: [c@4 DESC], fetch=10
> 02)--SortExec: TopK(fetch=10), expr=[c@4 DESC],
preserve_partitioning=[true]
> 03)----ProjectionExec: expr=[ClientIP@0 as ClientIP, hits.ClientIP -
Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP - Int64(2)@2 as
hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as hits.ClientIP -
Int64(3), count(*)@4 as c]
> 04)------AggregateExec: mode=FinalPartitioned, gby=[ClientIP@0 as
ClientIP, hits.ClientIP - Int64(1)@1 as hits.ClientIP - Int64(1), hits.ClientIP
- Int64(2)@2 as hits.ClientIP - Int64(2), hits.ClientIP - Int64(3)@3 as
hits.ClientIP - Int64(3)], aggr=[count(*)]
> 05)--------CoalesceBatchesExec: target_batch_size=8192
> 06)----------RepartitionExec: partitioning=Hash([ClientIP@0, hits.ClientIP
- Int64(1)@1, hits.ClientIP - Int64(2)@2, hits.ClientIP - Int64(3)@3], 4),
input_partitions=4
> 07)------------AggregateExec: mode=Partial, gby=[ClientIP@1 as ClientIP,
__common_expr_1@0 - 1 as hits.ClientIP - Int64(1), __common_expr_1@0 - 2 as
hits.ClientIP - Int64(2), __common_expr_1@0 - 3 as hits.ClientIP - Int64(3)],
aggr=[count(*)]
> 08)--------------ProjectionExec: expr=[CAST(ClientIP@0 AS Int64) as
__common_expr_1, ClientIP@0 as ClientIP]
> 09)----------------ParquetExec: file_groups={4 groups:
[[WORKSPACE_ROOT/benchmarks/data/hits.parquet:0..3694994112],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:3694994112..7389988224],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:7389988224..11084982336],
[WORKSPACE_ROOT/benchmarks/data/hits.parquet:11084982336..14779976446]]},
projection=[ClientIP]
> ```
`count(*)` needs to traverse all data, top k aggregation is not applicable
here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]