huaxingao opened a new pull request, #5872:
URL: https://github.com/apache/iceberg/pull/5872

   This PR pushes down min/max/count to iceberg.
   
   For `SELECT MIN(col), MAX(col), COUNT(col), COUNT(*) FROM table`, without 
this PR, iceberg will do `SELECT col FROM table`, and Spark will calculate 
MIN(col), MAX(col), COUNT(col), COUNT(*). With this PR, iceberg will do `SELECT 
MIN(col), MAX(col), COUNT(col), COUNT(*) FROM table`. MIN, MAX, COUNT will be 
calculated on iceberg side using the statistics info in the manifest file.
   
   I have the following changes:
   
   1. Add a table property `AGGREGATE_PUSHDOWN_ENABLED`. The default is false.
   2. Make `SparkScanBuilder` implement `SupportsPushDownAggregates`, so 
MIN/MAX/COUNT can be pushed down to iceberg, and then iceberg will read the 
statistics information (upper_bound, lower_bound, record_count) from manifest 
file, calculate the MIN/MAX/COUNT, build a Spark InternalRow and pass the 
InternalRow to Spark. 
   3. Add a `SparkLocalScan`.  It is a special Scan which will happen on Spark 
driver locally instead of executors. If MIN/MAX/COUNT are pushed down, iceberg 
will create a `SparkLocalScan`, and then iceberg doesn't need to plan files, 
create FileScanTasks, and send the tasks to executors. Instead, iceberg can 
just do a local scan on the Spark driver.
   4. SparkTableUtil.loadMetadataTable(spark, table, 
MetadataTableType.DATA_FILES) is used to get the statistics info(upper_bound, 
lower_bound, record_count), and then max, min or count are calculated from the 
statistics info, and an InternalRow will be built and returned to Spark.
   
   
   In the tests, I look the explain result of the physical plan to check if 
MIN/MAX/COUNT are pushed down
   
   For example, `SELECT max(data), min(data) FROM table`
   If MIN/MAX/COUNT are not pushed down
   ```
   == Optimized Logical Plan ==
   Aggregate [max(data#146) AS max(data)#150, min(data#146) AS min(data)#151, 
count(data#146) AS count(data)#152L]
   +- RelationV2[data#146] spark_catalog.default.table
   
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- SortAggregate(key=[], functions=[max(data#146), min(data#146), 
count(data#146)], output=[max(data)#150, min(data)#151, count(data)#152L])
      +- SortAggregate(key=[], functions=[partial_max(data#146), 
partial_min(data#146), partial_count(data#146)], output=[max#165, min#166, 
count#167L])
         +- BatchScan[data#146] spark_catalog.default.table [filters=] 
RuntimeFilters: []
   ```
   If MIN/MAX/COUNT are pushed down
   ```
   == Optimized Logical Plan ==
   Aggregate [max(MAX(data)#440) AS max(data)#429, min(MIN(data)#441) AS 
min(data)#430, sum(COUNT(data)#442L) AS count(data)#431L]
   +- RelationV2[MAX(data)#440, MIN(data)#441, COUNT(data)#442L] 
spark_catalog.default.table
   
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- SortAggregate(key=[], functions=[max(MAX(data)#440), min(MIN(data)#441), 
sum(COUNT(data)#442L)], output=[max(data)#429, min(data)#430, count(data)#431L])
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#297]
         +- SortAggregate(key=[], functions=[partial_max(MAX(data)#440), 
partial_min(MIN(data)#441), partial_sum(COUNT(data)#442L)], output=[max#446, 
min#447, sum#448L])
            +- LocalTableScan [MAX(data)#440, MIN(data)#441, COUNT(data)#442L]
   ```
   I check the physical plan to see if it contains 
MAX(data)/MIN(data)/COUNT(data).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to