andygrove opened a new issue, #3692: URL: https://github.com/apache/datafusion-comet/issues/3692
## Summary CometBroadcastHashJoin has two significant performance bottlenecks: ### 1. Native side always uses `PartitionMode::Partitioned` for broadcast joins In `planner.rs:1673`, all hash joins are hardcoded to `PartitionMode::Partitioned`. DataFusion supports `PartitionMode::CollectLeft` which is specifically designed for broadcast joins — it skips repartitioning and builds the hash table directly from the collected (broadcast) side. The protobuf schema (`operator.proto:331-337`) also has no field to distinguish broadcast from shuffled joins. ### 2. Per-task deserialization of broadcast data with no caching In `CometBroadcastExchangeExec.scala:312-315`, every task independently deserializes the entire broadcast payload via `Utils.decodeBatches()`. There is no executor-level caching of deserialized Arrow batches between tasks on the same executor. With N partitions, the same broadcast data is deserialized N times per executor. ### Additional context The broadcast data also goes through a triple serialization cycle: - Child output → serialized to `ChunkedByteBuffer` - Broadcast to executors - Deserialized per task back to Arrow batches - Then fed through JNI/Arrow FFI to native code ### Proposed changes 1. Add an `is_broadcast` field to the `HashJoin` protobuf message and use `PartitionMode::CollectLeft` on the native side for broadcast joins 2. Cache deserialized broadcast batches at the executor level to avoid re-decoding the same data for every task -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
