andygrove opened a new pull request, #3295:
URL: https://github.com/apache/datafusion-comet/pull/3295

   ## Summary
   
   This PR implements split serialization for Iceberg native scans to reduce 
network transfer and deserialization overhead when scanning tables with many 
partitions.
   
   **⚠️ EXPERIMENTAL**: This is an experimental optimization that needs further 
testing and benchmarking before production use.
   
   ### Problem
   
   Currently, ALL partition metadata is serialized into a single `IcebergScan` 
protobuf message sent to every executor task. Each task only uses one 
partition's data but deserializes all partitions.
   
   **Impact for 10,000 partitions:**
   - Each task receives ~10,000x more `file_partitions` data than needed
   - Network transfer overhead
   - Unnecessary deserialization of unused partition data
   
   ### Solution
   
   Split serialization into:
   - **Common data** (`IcebergScanCommon`): pools, metadata, catalog properties 
- serialized once, captured in RDD closure
   - **Per-partition data** (`IcebergFilePartition[]`): file scan tasks - one 
per partition, stored in Partition objects
   
   **Data flow:**
   ```
   Driver:
     - Serialize IcebergScanCommon → commonBytes (once)
     - Serialize IcebergFilePartition[i] → partitionBytes[i] (once per 
partition)
   
   Task N:
     - Receives: commonBytes (from closure) + partitionBytes[N] (from Partition 
object)
     - Combines into IcebergScan{split_mode=true, common, partition}
     - Passes to Rust
   ```
   
   ### Changes
   
   | File | Changes |
   |------|---------|
   | `native/proto/src/proto/operator.proto` | Add `IcebergScanCommon`, add 
`split_mode`/`common`/`partition` fields to `IcebergScan` |
   | `native/core/src/execution/planner.rs` | Handle `split_mode`, add 
`parse_file_scan_tasks_from_common()` |
   | `spark/.../CometIcebergSplitRDD.scala` | **New file** - RDD with custom 
Partition holding per-partition bytes |
   | `spark/.../CometIcebergNativeScan.scala` | Build common bytes + 
per-partition bytes, thread-local for passing to createExec |
   | `spark/.../CometIcebergNativeScanExec.scala` | Add split data fields, 
override `doExecuteColumnar` for split mode |
   
   ## Test plan
   
   - [x] All existing Iceberg tests pass (49/49)
   - [ ] Benchmark with large partition count to quantify improvement
   - [ ] Test with different Spark versions
   
   🤖 Generated with [Claude Code](https://claude.ai/code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to