2010YOUY01 opened a new pull request, #22712:
URL: https://github.com/apache/datafusion/pull/22712

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax. For example `Closes #123` 
indicates that this PR will close issue #123.
   -->
   
   Another attempt for #7065
   
   ## Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in 
the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your 
changes and offer better suggestions for fixes.  
   -->
   There have been several attempts to address this issue, such as 
https://github.com/apache/datafusion/pull/15591, but they have stalled.
   
   My interpretation is that the current aggregation logic is too complex, 
making it hard to reason about the changes and review the patch.
   
   This PR uses a different implementation strategy: incrementally rewriting 
the hash aggregation executor.
   
   I created an issue to analyze the root cause of the existing code complexity 
and how to solve it by incrementally splitting the logic:
   - https://github.com/apache/datafusion/issues/22710
   
   These issues explain the motivation and background well:
   - #7065
   - https://github.com/apache/datafusion/issues/19649
   
   I think the main motivation is memory efficiency. Performance (~10% faster 
for high-cardinality cases in this PoC) is only a nice by-product.
   
   Suppose we have buffered 1GB of state in the partial aggregation stage. If 
the internal states are stored in a contiguous `Vec`, they cannot be freed 
until repartitioning is done — approximately when the final-stage aggregation 
finishes. That means peak memory usage can become `all partial states + all 
final states`; in the worst case, this can reach 2GB.
   
   Ideally, we should be able to stay closer to 1GB by managing memory with 
fixed-size blocks. Once final aggregation starts consuming partial state, the 
corresponding partial blocks can be freed incrementally.
   
   ### Benchmark result
   ```text
   Query(cardinality)      PR       main      Δ
   Q1(~100)                0.165s   0.144s   +14.6%
   Q2(~100)                0.116s   0.139s   -16.5%
   Q3(~9K)                 0.119s   0.139s   -14.4%
   Q4(~18M)                0.389s   0.433s   -10.2%
   Q5(~100M)               1.247s   0.772s   +61.5%
   
   * MacBook Pro (M4 Pro), 1 warmup round, measured 2nd run
   ```
   
   **Summary**: med/high cardinality is faster; low cardinality can be slower 
but acceptable?; high cardinality is slower due to a missing fast path, see 
below.
   
   * Clickbench has 100M rows
   * For low cardinality, blocked approach might bring some slight execution 
overhead. Since they're already very efficiently, so I think we can live with 
that.
   * For hopeless cardinality (Q5), the blocked aggregation POC is missing 
partial aggregation skip optimiziation 
`datafusion.execution.skip_partial_aggregation_probe_ratio_threshold`, once 
implemented it's also likely to get faster, according to Q4 high cardinality's 
current number.
   
   
   #### Memory usage for Q4
   <img width="1650" height="975" alt="memcurve" 
src="https://github.com/user-attachments/assets/975d17b6-04c5-442f-9a69-4e17ccbb53d4";
 />
   It's becoming more efficient as expected. Note in the blocked approach it 
should look like a bell shape, however the memory allocator (like `mimalloc`) 
has cached memory for reuse, so it looks like rise-then-platue, I suppose 
memory allocator can give them back to OS very efficiently.
   
   <details>
   <summary>microbench.sql</summary>
   
   ```sql
   -- Generated from datafusion/benchmarks
   CREATE EXTERNAL TABLE hits
   STORED AS PARQUET
   LOCATION '/Users/yongting/Code/datafusion/benchmarks/data/hits_partitioned/';
   
   set datafusion.execution.target_partitions=8;
   
   -- ClickBench hits_partitioned row count: 99,997,497 rows.
   --
   -- Verify with EXPLAIN VERBOSE: each query should show both
   -- stream=RawPartialHashAggregateStream, blocked=true and
   -- stream=PartialFinalHashAggregateStream, blocked=true.
   
   -- One group over the full table: cardinality 1 over 99,997,497 rows.
   -- Plain no-GROUP-BY avg() does not use the grouped blocked path, so keep a
   -- derived Int64 key that is one value for all rows.
   SELECT
     g,
     avg(v) AS avg_width
   FROM (
     SELECT
       CAST("OS" * 0 AS BIGINT) AS g,
       CAST("ResolutionWidth" AS DOUBLE) AS v
     FROM "hits"
   )
   GROUP BY g;
   
   -- Low cardinality group key: OS has 91 groups.
   -- Cast to BIGINT because the current blocked group-values path is single 
Int64 key only.
   SELECT
     g,
     avg(v) AS avg_width
   FROM (
     SELECT
       CAST("OS" AS BIGINT) AS g,
       CAST("ResolutionWidth" AS DOUBLE) AS v
     FROM "hits"
   )
   GROUP BY g
   LIMIT 20;
   
   -- Low/medium cardinality group key: SearchEngineID has 96 groups.
   SELECT
     g,
     avg(v) AS avg_width
   FROM (
     SELECT
       CAST("SearchEngineID" AS BIGINT) AS g,
       CAST("ResolutionWidth" AS DOUBLE) AS v
     FROM "hits"
   )
   GROUP BY g
   LIMIT 20;
   
   -- Medium cardinality group key: RegionID has 9,040 groups.
   SELECT
     g,
     avg(v) AS avg_width
   FROM (
     SELECT
       CAST("RegionID" AS BIGINT) AS g,
       CAST("ResolutionWidth" AS DOUBLE) AS v
     FROM "hits"
   )
   GROUP BY g
   LIMIT 20;
   
   -- High cardinality group key: UserID has 17,630,976 groups.
   SELECT
     "UserID",
     avg("ResolutionWidth") AS avg_width
   FROM "hits"
   GROUP BY "UserID"
   LIMIT 20;
   
   -- Near-unique group key: WatchID has 99,997,493 groups.
   SELECT
     g,
     avg(v) AS avg_width
   FROM (
     SELECT
       CAST("WatchID" AS BIGINT) AS g,
       CAST("ResolutionWidth" AS DOUBLE) AS v
     FROM "hits"
   )
   GROUP BY g
   LIMIT 20;
   ```
   
   </details>
   
   ### Implementation plan
   This PR is just a PoC, it can be split into smaller patches for review.
   
   ## What changes are included in this PR?
   
   ### Refresher for related internal data structures
   The simplified metal model for hash aggregation is `HashTable: group_key -> 
group_state`, in reality group values and group states are all stored as 
contiguous vector for efficiency.
   <img width="1009" height="700" alt="image" 
src="https://github.com/user-attachments/assets/e15ad434-aa53-40b9-9764-a6991382727b";
 />
   
   ### Key Changes
   #### Split out the partial and final aggregation logic
   See https://github.com/apache/datafusion/issues/22710 for the idea, there 
are 2 execution paths split to finish the micro bench queries above.
   
   #### Support blocked memory management for states
   This PoC only target to make the following workload work for blocked memory 
management
   ```
   -- primitive key + avg accumulator
   select v1%10 as g, avg(v1)
   from generate_series(1000000) as t1(v1)
   group by g;
   ```
   
   So in order to support blocked stage management:
   - `impl<T> GroupValues for GroupValuesPrimitiveBlock<T>`
   - `impl<T, F> GroupsAccumulator for AvgGroupsAccumulator<T, F>`
   
   The idea is to replace the internal contiguous vector with fixed size blocks 
(see above figure).
   They're implemented with new structs just to make PoC simpler, it's possible 
to replace the existing implementation with this blocked approach.
   
   ## Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are 
they covered by existing tests)?
   -->
   
   ## Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be 
updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api 
change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to