anuragmantri opened a new pull request, #14948:
URL: https://github.com/apache/iceberg/pull/14948

   Note: This PR builds on top of https://github.com/apache/iceberg/pull/14683 
which is still in review. Reviewers can look at the changes in this commit 
https://github.com/apache/iceberg/commit/cc08ff23834743b72bb64109c646f72023d2752a
   
   This PR implements the Spark DSv2 SupportsReportOrdering API to enable 
Spark's sort elimination optimization for partitioned tables when reading 
sorted Iceberg tables when they have a defined sort order and files are written 
respecting that order.  
   
   **Implementation summary:**
   
   1.  **Ordering Validation**: `SparkPartitioningAwareScan.outputOrdering()` 
validates all files have the current table's sort order ID before reporting 
ordering to Spark. If validation fails, no ordering is reported.
   
   2. **Merging Sorted Files**: Since sorted files within a partition may have 
overlapping ranges, this PR introduces  MergingSortedRowDataReader that merges 
rows from multiple sorted files using k-way merge with a min-heap.
   
   3. **Row Comparison**: InternalRowComparator compares Spark InternalRows 
based on Iceberg sort order.
   
   
    **Constraints**
   
     1. Bin-packing of file scan tasks is disabled when ordering is required 
since [Spark will discard 
](https://github.com/apache/spark/blob/2fc65e1c98ed53641f5204215b840e33463df987/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala#L163)ordering
 if multiple input partitions exist with the same grouping key.
     2. Row-by-row comparison is required for merging, so vectorized reader is 
disabled when ordering is reported.
     3. Currently only reports sort order if files are sorted in the table sort 
order, we could extend this to report any historical sort order if all files 
have that order.
   
   ---
   
   Sort elimination examples 
   
   1. For MERGE INTO 
   ---------------------
   
   Without reporting sort order
   ```
   CommandResult <empty>
      +- WriteDelta
         +- *(4) Sort [_spec_id#287 ASC NULLS FIRST, _partition#288 ASC NULLS 
FIRST, _file#285 ASC NULLS FIRST, _pos#286L ASC NULLS FIRST, 
static_invoke(org.apache.iceberg.spark.functions.BucketFunction$BucketInt.invoke(4,
 c1#282)) ASC NULLS FIRST, c1#282 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(_spec_id#287, _partition#288, 
static_invoke(org.apache.iceberg.spark.functions.BucketFunction$BucketInt.invoke(4,
 c1#282)), 200), REBALANCE_PARTITIONS_BY_COL, 402653184, [plan_id=394]
               +- MergeRowsExec[__row_operation#281, c1#282, c2#283, c3#284, 
_file#285, _pos#286L, _spec_id#287, _partition#288]
                  +- *(3) SortMergeJoin [c1#257], [c1#260], RightOuter
                     :- *(1) Sort [c1#257 ASC NULLS FIRST], false, 0
                     :  +- *(1) Filter isnotnull(c1#257)
                     :     +- *(1) Project [c1#257, _file#265, _pos#266L, 
_spec_id#263, _partition#264, true AS __row_from_target#273, 
monotonically_increasing_id() AS __row_id#274L]
                     :        +- *(1) ColumnarToRow
                     :           +- BatchScan testhadoop.default.table[c1#257, 
_file#265, _pos#266L, _spec_id#263, _partition#264] testhadoop.default.table 
(branch=null) [filters=, groupedBy=c1_bucket] RuntimeFilters: []
                     +- *(2) Sort [c1#260 ASC NULLS FIRST], false, 0
                        +- *(2) ColumnarToRow
                           +- BatchScan testhadoop.default.table_source[c1#260, 
c2#261, c3#262] testhadoop.default.table_source (branch=null) [filters=, 
groupedBy=c1_bucket] RuntimeFilters: []
   ```
   
   With sort order reporting:
   ```
   CommandResult <empty>
      +- WriteDelta
         +- *(4) Sort [_spec_id#80 ASC NULLS FIRST, _partition#81 ASC NULLS 
FIRST, _file#78 ASC NULLS FIRST, _pos#79L ASC NULLS FIRST, 
static_invoke(org.apache.iceberg.spark.functions.BucketFunction$BucketInt.invoke(4,
 c1#75)) ASC NULLS FIRST, c1#75 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(_spec_id#80, _partition#81, 
static_invoke(org.apache.iceberg.spark.functions.BucketFunction$BucketInt.invoke(4,
 c1#75)), 200), REBALANCE_PARTITIONS_BY_COL, 402653184, [plan_id=255]
               +- MergeRowsExec[__row_operation#74, c1#75, c2#76, c3#77, 
_file#78, _pos#79L, _spec_id#80, _partition#81]
                  +- *(3) SortMergeJoin [c1#50], [c1#53], RightOuter
                     :- *(1) Filter isnotnull(c1#50)
                     :  +- *(1) Project [c1#50, _file#58, _pos#59L, 
_spec_id#56, _partition#57, true AS __row_from_target#66, 
monotonically_increasing_id() AS __row_id#67L]
                     :     +- *(1) ColumnarToRow
                     :        +- BatchScan testhadoop.default.table[c1#50, 
_file#58, _pos#59L, _spec_id#56, _partition#57] testhadoop.default.table 
(branch=null) [filters=, groupedBy=c1_bucket] RuntimeFilters: []
                     +- *(2) Project [c1#53, c2#54, c3#55]
                        +- BatchScan testhadoop.default.table_source[c1#53, 
c2#54, c3#55] testhadoop.default.table_source (branch=null) [filters=, 
groupedBy=c1_bucket] RuntimeFilters: []
   ```
   
   2. For JOIN 
   -------------
   
   Without reporting sort order
   
   ```
   *(3) Project [c1#118, c2#119, c2#122]
   +- *(3) SortMergeJoin [c1#118], [c1#121], Inner
      :- *(1) Sort [c1#118 ASC NULLS FIRST], false, 0
      :  +- *(1) ColumnarToRow
      :     +- BatchScan testhadoop.default.table[c1#118, c2#119] 
testhadoop.default.table (branch=null) [filters=c1 IS NOT NULL, 
groupedBy=c1_bucket] RuntimeFilters: []
      +- *(2) Sort [c1#121 ASC NULLS FIRST], false, 0
         +- *(2) ColumnarToRow
            +- BatchScan testhadoop.default.table_source[c1#121, c2#122] 
testhadoop.default.table_source (branch=null) [filters=c1 IS NOT NULL, 
groupedBy=c1_bucket] RuntimeFilters: []
   
   ```
   
   With sort order reporting:
   ```
   *(3) Project [c1#36, c2#37, c2#40]
   +- *(3) SortMergeJoin [c1#36], [c1#39], Inner
      :- *(1) ColumnarToRow
      :  +- BatchScan testhadoop.default.table[c1#36, c2#37] 
testhadoop.default.table (branch=null) [filters=c1 IS NOT NULL, 
groupedBy=c1_bucket] RuntimeFilters: []
      +- *(2) ColumnarToRow
         +- BatchScan testhadoop.default.table_source[c1#39, c2#40] 
testhadoop.default.table_source (branch=null) [filters=c1 IS NOT NULL, 
groupedBy=c1_bucket] RuntimeFilters: []
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to