anuragmantri opened a new pull request, #16305:
URL: https://github.com/apache/iceberg/pull/16305

   This PR adds a K-way merge compaction strategy to the RewriteDataFiles 
action. K-way merge rewrites pre-sorted data files by streaming-merging them in 
 sort-key order without shuffle, preserving the table's sort order in output 
files.                                                                      
                                                                                
                                                                             
   K-way merge is intended for tables that are already sorted but have 
accumulated multiple overlapping files per partition (e.g., after daily 
ingestion into a previously sort-compacted table). It achieves the same output 
as SORT but eliminates shuffle and spill entirely.
                                                                                
                                                                             
   **Implementation summary:**                                                  
                                                                               
   1. API: New kWayMerge() default method on RewriteDataFiles interface. The 
table must have a defined sort order.                                         
   2. Planner (KWayMergeRewriteFilePlanner): Extends 
SizeBasedFileRewritePlanner with sort-key-ordered file grouping. Files are 
sorted by their lower bounds on the first sort field before bin-packing, 
ensuring groups cover contiguous key ranges. Includes includeColumnStats() on 
the table scan to load file bounds for planning.                       
   3. Runner (SparkKWayMergeFileRewriteRunner): Opens all files in a group as 
streaming iterators, applies GenericDeleteFilter for position/equality  
deletes, then merges using SortedMerge (priority queue). Output is written via 
GenericAppenderFactory with size-based file rotation and partition-change 
detection.                         
   4. Range parallelism: For large groups, the runner splits files into ranges 
by total size and processes ranges in parallel via jsc.parallelize(). Range 
assignments are broadcast to executors. Controlled by range-parallelism-enabled 
(default true), ranges-per-group (default 25), and  
min-files-for-range-parallelism (default 10).                                   
                                                                   
   
   **Constraints:**                                                             
                                                                                
                                      
   1. All input files must have a valid sort_order_id > 0 matching the table's 
sort order. Files without sort metadata are rejected with a clear error.
   2. Uses Iceberg's generic reader/writer stack (row-by-row), not Spark's 
vectorized path. This means higher per-record overhead than sort, but zero 
shuffle I/O.
   3. Output files are individually sorted. When range parallelism is enabled, 
files from different ranges may have overlapping key ranges (consistent with 
how SORT behaves). 
   
   **Usage:**                                                                   
                                                                               
    
   ```sql                                                
   -- Via procedure                                                             
                                                                           
   CALL catalog.system.rewrite_data_files(                                      
                                                                           
       table => 'db.my_table',                                                  
                                                                             
       strategy => 'k-way-merge',                                               
                                                                             
       options => map('max-concurrent-file-group-rewrites', '100')              
                                                                             
     ) 
   ```                                                                          
                                                                            
      
   ```sql                                                                       
                                                                                
   // Via action API                                                            
                                                                           
   SparkActions.get()                                                           
                                                                           
      .rewriteDataFiles(table)                                                  
                                                                          
       .kWayMerge()                                                             
                                                                           
       .option("max-concurrent-file-group-rewrites", "100")                     
                                                                           
       .execute(); 
   ```                                                                          
                                                                                
                                                                                
                                                                                
                                                                                
                                      
   
   AI Usage: I used Claude Opus 4.7 for code generation, test writing, and 
review. I manually reviewed and validated all generated code.                   
                     


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to