andygrove opened a new pull request, #3930: URL: https://github.com/apache/datafusion-comet/pull/3930
## Which issue does this PR close? Closes #3925. ## Rationale for this change When Comet has a native ShuffleWriter and a native child plan, batches get created in native code, then exported to JVM via Arrow FFI, then imported back to native for the shuffle writer. The JVM never reads the data, so the FFI round-trip is unnecessary overhead -- 4 JNI/FFI boundary crossings per batch with data copying. ## What changes are included in this PR? Introduces a "batch stash" optimization that passes batches as opaque `u64` handles through the JVM instead of doing full Arrow FFI export/import. This reduces the per-batch overhead from 4 FFI boundary crossings to 2 lightweight JNI calls passing a single `long`. **New components:** - **`BatchStash`** (`native/core/src/execution/batch_stash.rs`) -- global `Mutex<HashMap<u64, RecordBatch>>` registry. The child plan stashes its output batch, and the shuffle writer's ScanExec retrieves it by handle. - **`CometHandleBatchIterator`** (Java + Rust JNI bridge) -- passes handles between the two native execution contexts through the JVM. - **`executePlanBatchHandle`** JNI function -- like `executePlan` but stashes the output batch instead of FFI-exporting it. Shared execution logic is extracted into `execute_plan_impl` with an `OutputMode` enum to avoid duplication. - **`CometShuffleWriterInputIterator`** -- preserves the `CometExecIterator` reference through Spark's shuffle dependency RDD so the shuffle writer can detect native input. **Modified components:** - **`CometExecIterator`** -- gains stash mode (`enableStashMode()`, `nextHandle()`) for producing handles instead of `ColumnarBatch`. - **`ScanExec`** -- gains `handle_mode` flag. When true, retrieves batches from the BatchStash instead of via Arrow FFI import. - **`Scan` protobuf message** -- new `bool batch_stash_handle` field signals handle mode to the native planner. - **`CometNativeShuffleWriter`** -- detects native child plans automatically and enables the stash path. **Detection is automatic** -- no configuration needed. When a `CometExecIterator` feeds into a native shuffle writer, the optimization activates. Non-native child plans fall back to the existing FFI path. ## How are these changes tested? - Existing `CometNativeShuffleSuite` (22 tests) passes -- validates correctness of the stash path since all native shuffle queries now use it. - Existing `CometShuffleSuite` passes -- validates the fallback FFI path for columnar shuffle. - New Rust unit tests for `BatchStash` (stash/take semantics, handle uniqueness, cleanup). - Clippy clean, formatted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
