andygrove opened a new pull request, #3930:
URL: https://github.com/apache/datafusion-comet/pull/3930

   ## Which issue does this PR close?
   
   Closes #3925.
   
   ## Rationale for this change
   
   When Comet has a native ShuffleWriter and a native child plan, batches get 
created in native code, then exported to JVM via Arrow FFI, then imported back 
to native for the shuffle writer. The JVM never reads the data, so the FFI 
round-trip is unnecessary overhead -- 4 JNI/FFI boundary crossings per batch 
with data copying.
   
   ## What changes are included in this PR?
   
   Introduces a "batch stash" optimization that passes batches as opaque `u64` 
handles through the JVM instead of doing full Arrow FFI export/import. This 
reduces the per-batch overhead from 4 FFI boundary crossings to 2 lightweight 
JNI calls passing a single `long`.
   
   **New components:**
   - **`BatchStash`** (`native/core/src/execution/batch_stash.rs`) -- global 
`Mutex<HashMap<u64, RecordBatch>>` registry. The child plan stashes its output 
batch, and the shuffle writer's ScanExec retrieves it by handle.
   - **`CometHandleBatchIterator`** (Java + Rust JNI bridge) -- passes handles 
between the two native execution contexts through the JVM.
   - **`executePlanBatchHandle`** JNI function -- like `executePlan` but 
stashes the output batch instead of FFI-exporting it. Shared execution logic is 
extracted into `execute_plan_impl` with an `OutputMode` enum to avoid 
duplication.
   - **`CometShuffleWriterInputIterator`** -- preserves the `CometExecIterator` 
reference through Spark's shuffle dependency RDD so the shuffle writer can 
detect native input.
   
   **Modified components:**
   - **`CometExecIterator`** -- gains stash mode (`enableStashMode()`, 
`nextHandle()`) for producing handles instead of `ColumnarBatch`.
   - **`ScanExec`** -- gains `handle_mode` flag. When true, retrieves batches 
from the BatchStash instead of via Arrow FFI import.
   - **`Scan` protobuf message** -- new `bool batch_stash_handle` field signals 
handle mode to the native planner.
   - **`CometNativeShuffleWriter`** -- detects native child plans automatically 
and enables the stash path.
   
   **Detection is automatic** -- no configuration needed. When a 
`CometExecIterator` feeds into a native shuffle writer, the optimization 
activates. Non-native child plans fall back to the existing FFI path.
   
   ## How are these changes tested?
   
   - Existing `CometNativeShuffleSuite` (22 tests) passes -- validates 
correctness of the stash path since all native shuffle queries now use it.
   - Existing `CometShuffleSuite` passes -- validates the fallback FFI path for 
columnar shuffle.
   - New Rust unit tests for `BatchStash` (stash/take semantics, handle 
uniqueness, cleanup).
   - Clippy clean, formatted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to