andygrove opened a new pull request, #3731: URL: https://github.com/apache/datafusion-comet/pull/3731
## Which issue does this PR close? Related performance optimization for native shuffle read path. ## Rationale for this change When a native shuffle exchange feeds into a downstream native operator, shuffle data currently crosses the JVM/native FFI boundary twice: 1. **Native → JVM**: `decodeShuffleBlock` JNI call decompresses Arrow IPC, creates a `RecordBatch`, and exports it via Arrow C Data Interface (per-column `FFI_ArrowArray` + `FFI_ArrowSchema` allocation, export, and import). 2. **JVM → Native**: `CometBatchIterator` re-exports the `ColumnarBatch` via Arrow C Data Interface back to native, where `ScanExec` imports and copies/unpacks the arrays. Each crossing involves per-column schema serialization, struct allocation, and array copying. For queries with many shuffle stages or wide schemas, this overhead is significant. ## What changes are included in this PR? Introduces a direct read path where native code consumes compressed shuffle blocks directly, bypassing Arrow FFI entirely. The JVM reads raw bytes from Spark's shuffle infrastructure and hands them to native via a `DirectByteBuffer` (zero-copy pointer access). Native decompresses and decodes in-place, feeding `RecordBatch` directly into the execution plan. **New components:** - `ShuffleScan` protobuf message — signals the planner to create `ShuffleScanExec` instead of `ScanExec` - `CometShuffleBlockIterator` (Java) — reads raw compressed shuffle blocks from an `InputStream`, exposes them to native via JNI - `ShuffleScanExec` (Rust) — pulls compressed bytes from JVM, decompresses via `read_ipc_compressed()`, feeds `RecordBatch` directly into the plan - JNI bridge for `CometShuffleBlockIterator` with cached method IDs - `CometExchangeSink` conditionally emits `ShuffleScan` for native shuffle with direct read enabled **Config:** `spark.comet.shuffle.directRead.enabled` (default: `true`). Falls back to the existing FFI path when disabled or when shuffle output feeds a non-native operator. **Scope:** Native shuffle only (`CometNativeShuffle`). JVM columnar shuffle is excluded because its per-batch dictionary encoding can change the schema between batches. ### Future work A follow-up Phase 2 optimization could eliminate JNI entirely for local shuffle reads by having native code read shuffle data files directly from disk using file paths + partition offsets from the index file, bypassing the JVM shuffle fetch infrastructure completely. This would only apply to local shuffle (remote blocks would still use this PR's JNI-based path). ## How are these changes tested? - All existing shuffle tests pass with the new path enabled by default - New comparison test runs queries with `directRead=true` and `directRead=false`, asserting identical results via `checkSparkAnswer` - Rust unit test for compressed IPC round-trip using `ShuffleBlockWriter` + `read_ipc_compressed` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
