wombatu-kun opened a new pull request, #16384:
URL: https://github.com/apache/iceberg/pull/16384

   ## What
   
   Enables Spark aggregate push-down for `GROUP BY` queries whose grouping 
columns are identity-partition columns, e.g. `SELECT p, COUNT(*), MIN(x), 
MAX(x) FROM t GROUP BY p` where `p` is an identity-partitioned column. These 
are now answered from manifest metadata instead of scanning data files, 
producing one row per partition group via a local scan.
   
   Previously, `SparkScanBuilder.pushAggregation` rejected any aggregation with 
a `GROUP BY` clause outright (the existing `// TODO: enable aggregate push down 
for partition col group by expression`), so only ungrouped aggregates were 
optimized.
   
   ## Why
   
   For a table partitioned by an identity transform, every row in a data file 
shares the same value for that partition column, so the file's manifest stats 
(`record_count`, `lower_bounds`, `upper_bounds`, `value_counts`) belong 
entirely to a single group. `COUNT`/`MIN`/`MAX` per group can therefore be 
computed from manifests alone, turning a full table scan into a metadata-only 
operation for a common reporting pattern.
   
   ## How
   
   `pushAggregation` now resolves the grouping expressions to schema fields 
and, when they map to identity partition columns, accumulates a per-group 
`AggregateEvaluator` keyed by the file's partition tuple, then emits the 
grouped result as a local scan. The output schema follows the Spark 
`SupportsPushDownAggregates` contract: group-by columns first (in order), then 
the aggregate expressions (in order).
   
   Push-down is conservatively skipped (the query falls back to a normal scan, 
with identical results) when any of the following hold:
   
   - A grouping column is not an identity partition column in the spec of every 
scanned data file (covers non-identity transforms such as 
bucket/truncate/temporal, non-partition columns, and partition-spec evolution).
   - Any scanned file has delete files (position or equality deletes), which 
would invalidate metadata-derived counts.
   - Any group's aggregate cannot be satisfied from metrics (e.g. NaN bounds, 
or a metrics mode that does not retain the needed bounds).
   - A non-partition data filter remains (Spark keeps the filter above the 
scan, so the aggregate is not pushed).
   
   ## Scope
   
   Applied identically to all four supported Spark trees: `spark/v3.4`, 
`spark/v3.5`, `spark/v4.0`, `spark/v4.1`.
   
   ## Testing
   
   New cases in `TestAggregatePushDown` (all four Spark versions): single and 
multiple identity-partition GROUP BY, non-partition and non-identity-transform 
fallback, partition-spec-evolution fallback and the stays-identity-across-specs 
positive case, null partition value as its own group, row-level deletes 
fallback, partition filter, empty table, non-partition data-filter fallback, 
NaN fallback, and time-travel / incremental-scan grouped push-down. A 
differential oracle re-runs each positive query with 
`spark.sql.iceberg.aggregate-push-down.enabled=false` and asserts the metadata 
fast-path result equals the data-scan result. All `TestAggregatePushDown` tests 
pass on Spark 3.4, 3.5, 4.0, and 4.1; `spotlessCheck` is clean.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to