This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 796d4450b docs: Add FFI docs to contributor guide (#2668)
796d4450b is described below

commit 796d4450b5536306ae6bc6c790574e5047fbae4c
Author: Andy Grove <[email protected]>
AuthorDate: Fri Oct 31 11:26:01 2025 -0600

    docs: Add FFI docs to contributor guide (#2668)
---
 docs/source/contributor-guide/ffi.md             | 357 +++++++++++++++++++++++
 docs/source/contributor-guide/index.md           |   1 +
 docs/source/contributor-guide/plugin_overview.md |  18 --
 3 files changed, 358 insertions(+), 18 deletions(-)

diff --git a/docs/source/contributor-guide/ffi.md 
b/docs/source/contributor-guide/ffi.md
new file mode 100644
index 000000000..5af27ae87
--- /dev/null
+++ b/docs/source/contributor-guide/ffi.md
@@ -0,0 +1,357 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Arrow FFI Usage in Comet
+
+## Overview
+
+Comet uses the [Arrow C Data 
Interface](https://arrow.apache.org/docs/format/CDataInterface.html) for 
zero-copy data transfer in two directions:
+
+1. **JVM → Native**: Native code pulls batches from JVM using 
`CometBatchIterator`
+2. **Native → JVM**: JVM pulls batches from native code using 
`CometExecIterator`
+
+The following diagram shows an example of the end-to-end flow for a query 
stage.
+
+![Diagram of Comet Data Flow](/_static/images/comet-dataflow.svg)
+
+Both scenarios use the same FFI mechanism but have different ownership 
semantics and memory management implications.
+
+## Arrow FFI Basics
+
+The Arrow C Data Interface defines two C structures:
+- `ArrowArray`: Contains pointers to data buffers and metadata
+- `ArrowSchema`: Contains type information
+
+### Key Characteristics
+- **Zero-copy**: Data buffers can be shared across language boundaries without 
copying
+- **Ownership transfer**: Clear semantics for who owns and must free the data
+- **Release callbacks**: Custom cleanup functions for proper resource 
management
+
+## JVM → Native Data Flow (ScanExec)
+
+### Architecture
+
+When native code needs data from the JVM, it uses `ScanExec` which calls into 
`CometBatchIterator`:
+
+```
+┌─────────────────┐
+│  Spark/Scala    │
+│ CometExecIter   │
+└────────┬────────┘
+         │ produces batches
+         ▼
+┌─────────────────┐
+│ CometBatchIter  │ ◄─── JNI call from native
+│  (JVM side)     │
+└────────┬────────┘
+         │ Arrow FFI
+         │ (transfers ArrowArray/ArrowSchema pointers)
+         ▼
+┌─────────────────┐
+│    ScanExec     │
+│  (Rust/native)  │
+└────────┬────────┘
+         │
+         ▼
+┌─────────────────┐
+│   DataFusion    │
+│   operators     │
+└─────────────────┘
+```
+
+### FFI Transfer Process
+
+The data transfer happens in `ScanExec::get_next()`:
+
+```rust
+// 1. Allocate FFI structures on native side (Rust heap)
+for _ in 0..num_cols {
+    let arrow_array = Rc::new(FFI_ArrowArray::empty());
+    let arrow_schema = Rc::new(FFI_ArrowSchema::empty());
+    let array_ptr = Rc::into_raw(arrow_array) as i64;
+    let schema_ptr = Rc::into_raw(arrow_schema) as i64;
+    // Store pointers...
+}
+
+// 2. Call JVM to populate FFI structures
+let num_rows: i32 = unsafe {
+    jni_call!(env, comet_batch_iterator(iter).next(array_obj, schema_obj) -> 
i32)?
+};
+
+// 3. Import data from FFI structures
+for i in 0..num_cols {
+    let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;
+    let array = make_array(array_data);
+    // ... process array
+}
+```
+
+### Memory Layout
+
+When a batch is transferred from JVM to native:
+
+```
+JVM Heap:                           Native Memory:
+┌──────────────────┐               ┌──────────────────┐
+│ ColumnarBatch    │               │ FFI_ArrowArray   │
+│ ┌──────────────┐ │               │ ┌──────────────┐ │
+│ │ ArrowBuf     │─┼──────────────>│ │ buffers[0]   │ │
+│ │ (off-heap)   │ │               │ │ (pointer)    │ │
+│ └──────────────┘ │               │ └──────────────┘ │
+└──────────────────┘               └──────────────────┘
+        │                                   │
+        │                                   │
+Off-heap Memory:                            │
+┌──────────────────┐ <──────────────────────┘
+│ Actual Data      │
+│ (e.g., int32[])  │
+└──────────────────┘
+```
+
+**Key Point**: The actual data buffers can be off-heap, but the `ArrowArray` 
and `ArrowSchema` wrapper objects are **always allocated on the JVM heap**.
+
+### Wrapper Object Lifecycle
+
+When arrays are created in the JVM and passed to native code, the JVM creates 
the array data off-heap and creates
+wrapper objects `ArrowArray` and `ArrowSchema` on-heap. These wrapper objects 
can consume significant memory over
+time.
+
+```
+Per batch overhead on JVM heap:
+- ArrowArray object: ~100 bytes
+- ArrowSchema object: ~100 bytes
+- Per column: ~200 bytes
+- 100 columns × 1000 batches = ~20 MB of wrapper objects
+```
+
+When native code pulls batches from the JVM, the JVM wrapper objects are kept 
alive until the native code drops
+all references to the arrays.
+
+When operators such as `SortExec` fetch many batches and buffer them in native 
code, the number of wrapper objects
+in Java on-heap memory keeps growing until the batches are released in native 
code at the end of the sort operation.
+
+### Ownership Transfer
+
+The Arrow C data interface supports ownership transfer by registering 
callbacks in the C struct that is passed over 
+the JNI boundary for the function to delete the array data.  For example, the 
`ArrowArray` struct has:
+
+```c
+// Release callback
+void (*release)(struct ArrowArray*);
+```
+
+Comet currently does not always follow best practice around ownership transfer 
because there are some cases where
+Comet JVM code will retain references to arrays after passing them to native 
code and may mutate the underlying
+buffers. There is an `arrow_ffi_safe` flag in the protocol buffer definition 
of `Scan` that indicates whether 
+ownership is being transferred according to the Arrow C data interface 
specification.
+
+
+```protobuf
+message Scan {
+  repeated spark.spark_expression.DataType fields = 1;
+  // The source of the scan (e.g. file scan, broadcast exchange, shuffle, 
etc). This
+  // is purely for informational purposes when viewing native query plans in
+  // debug mode.
+  string source = 2;
+  // Whether native code can assume ownership of batches that it receives
+  bool arrow_ffi_safe = 3;
+}
+```
+
+#### When ownership is NOT transferred to native:
+
+If the data originates from `native_comet` scan (or from 
`native_iceberg_compat` in some cases), then ownership is 
+not transferred to native and the JVM may re-use the underlying buffers in the 
future.
+
+It is critical that the native code performs a deep copy of the arrays if the 
arrays are to be buffered by
+operators such as `SortExec` or `ShuffleWriterExec`, otherwise data corruption 
is likely to occur.
+
+#### When ownership IS transferred to native:
+
+When ownership is transferred, it is safe to buffer batches in native. 
However, JVM wrapper objects will not be
+released until the native batches are dropped. This can lead to OOM or GC 
pressure if there is not enough Java
+heap memory configured.
+
+## Native → JVM Data Flow (CometExecIterator)
+
+### Architecture
+
+When JVM needs results from native execution:
+
+```
+┌─────────────────┐
+│ DataFusion Plan │
+│   (native)      │
+└────────┬────────┘
+         │ produces RecordBatch
+         ▼
+┌─────────────────┐
+│ CometExecIter   │
+│  (Rust/native)  │
+└────────┬────────┘
+         │ Arrow FFI
+         │ (transfers ArrowArray/ArrowSchema pointers)
+         ▼
+┌─────────────────┐
+│ CometExecIter   │ ◄─── JNI call from Spark
+│  (Scala side)   │
+└────────┬────────┘
+         │
+         ▼
+┌─────────────────┐
+│  Spark Actions  │
+│  (collect, etc) │
+└─────────────────┘
+```
+
+### FFI Transfer Process
+
+The transfer happens in `CometExecIterator::getNextBatch()`:
+
+```scala
+// Scala side
+def getNextBatch(): ColumnarBatch = {
+  val batchHandle = Native.getNextBatch(nativeHandle)
+
+  // Import from FFI structures
+  val vectors = (0 until schema.length).map { i =>
+    val array = Array.empty[Long](1)
+    val schemaPtr = Array.empty[Long](1)
+
+    // Get FFI pointers from native
+    Native.exportVector(batchHandle, i, array, schemaPtr)
+
+    // Import into Arrow Java
+    Data.importVector(allocator, array(0), schemaPtr(0))
+  }
+
+  new ColumnarBatch(vectors.toArray, numRows)
+}
+```
+
+```rust
+// Native side (simplified)
+#[no_mangle]
+pub extern "system" fn Java_..._getNextBatch(
+    env: JNIEnv,
+    handle: jlong,
+) -> jlong {
+    let context = get_exec_context(handle)?;
+    let batch = context.stream.next().await?;
+
+    // Store batch and return handle
+    let batch_handle = Box::into_raw(Box::new(batch)) as i64;
+    batch_handle
+}
+
+#[no_mangle]
+pub extern "system" fn Java_..._exportVector(
+    env: JNIEnv,
+    batch_handle: jlong,
+    col_idx: jint,
+    array_ptr: jlongArray,
+    schema_ptr: jlongArray,
+) {
+    let batch = get_batch(batch_handle)?;
+    let array = batch.column(col_idx);
+
+    // Export to FFI structures
+    let (array_ffi, schema_ffi) = to_ffi(array.to_data())?;
+
+    // Write pointers back to JVM
+    env.set_long_array_region(array_ptr, 0, &[array_ffi as i64])?;
+    env.set_long_array_region(schema_ptr, 0, &[schema_ffi as i64])?;
+}
+```
+
+### Wrapper Object Lifecycle (Native → JVM)
+
+```
+Time    Native Memory              JVM Heap              Off-heap/Native
+────────────────────────────────────────────────────────────────────────
+t0      RecordBatch produced       -                     Data in native
+        in DataFusion
+
+t1      FFI_ArrowArray created     -                     Data in native
+        FFI_ArrowSchema created
+        (native heap)
+
+t2      Pointers exported to JVM   ArrowBuf created      Data in native
+                                   (wraps native ptr)
+
+t3      FFI structures kept alive  Spark processes       Data in native
+        via batch handle           ColumnarBatch         ✓ Valid
+
+t4      Batch handle released      ArrowBuf freed        Data freed
+        Release callback runs      (triggers native      (via release
+                                   release callback)     callback)
+```
+
+**Key Difference from JVM → Native**:
+- Native code controls lifecycle through batch handle
+- JVM creates `ArrowBuf` wrappers that point to native memory
+- Release callback ensures proper cleanup when JVM is done
+- No GC pressure issue because native allocator manages the data
+
+### Release Callbacks
+
+Critical for proper cleanup:
+
+```rust
+// Native release callback (simplified)
+extern "C" fn release_batch(array: *mut FFI_ArrowArray) {
+    if !array.is_null() {
+        unsafe {
+            // Free the data buffers
+            for buffer in (*array).buffers {
+                drop(Box::from_raw(buffer));
+            }
+            // Free the array structure itself
+            drop(Box::from_raw(array));
+        }
+    }
+}
+```
+
+When JVM is done with the data:
+```java
+// ArrowBuf.close() triggers the release callback
+arrowBuf.close();  // → calls native release_batch()
+```
+
+## Memory Ownership Rules
+
+### JVM → Native
+
+| Scenario | `arrow_ffi_safe` | Ownership | Action Required |
+|----------|------------------|-----------|-----------------|
+| Temporary scan | `false` | JVM keeps | **Must deep copy** to avoid 
corruption |
+| Ownership transfer | `true` | Native owns | Copy only to unpack dictionaries 
|
+
+### Native → JVM
+
+| Scenario | Ownership | Action Required |
+|----------|-----------|-----------------|
+| All cases | Native allocates, JVM references | JVM must call `close()` to 
trigger native release callback |
+
+## Further Reading
+
+- [Arrow C Data Interface 
Specification](https://arrow.apache.org/docs/format/CDataInterface.html)
+- [Arrow Java FFI 
Implementation](https://github.com/apache/arrow/tree/main/java/c)
+- [Arrow Rust FFI Implementation](https://docs.rs/arrow/latest/arrow/ffi/)
diff --git a/docs/source/contributor-guide/index.md 
b/docs/source/contributor-guide/index.md
index d4b4e185e..ba4692a97 100644
--- a/docs/source/contributor-guide/index.md
+++ b/docs/source/contributor-guide/index.md
@@ -25,6 +25,7 @@ under the License.
 
 Getting Started <contributing>
 Comet Plugin Overview <plugin_overview>
+Arrow FFI <ffi>
 Development Guide <development>
 Debugging Guide <debugging>
 Benchmarking Guide <benchmarking>
diff --git a/docs/source/contributor-guide/plugin_overview.md 
b/docs/source/contributor-guide/plugin_overview.md
index 4f6863b03..aba852470 100644
--- a/docs/source/contributor-guide/plugin_overview.md
+++ b/docs/source/contributor-guide/plugin_overview.md
@@ -130,21 +130,3 @@ writing the batches to the shuffle file.
 For shuffle reads a `ShuffledRDD` requests a `ShuffleReader` from the shuffle 
manager. Comet provides a 
 `CometBlockStoreShuffleReader` which is implemented in JVM and fetches blocks 
from Spark and then creates an 
 `ArrowReaderIterator` to process the blocks using Arrow's `StreamReader` for 
decoding IPC batches.
-
-## Arrow FFI
-
-Due to the hybrid execution model, it is necessary to pass batches of data 
between the JVM and native code.
-
-The foundation for Arrow FFI is the [Arrow C Data Interface], which provides a 
stable ABI-compatible interface for
-accessing Arrow data structures from multiple languages.
-
-[Arrow C Data Interface]: 
https://arrow.apache.org/docs/format/CDataInterface.html
-
-- `CometExecIterator` invokes native plans and uses Arrow FFI to read the 
output batches
-- Native `ScanExec` operators call `CometBatchIterator` via JNI to fetch input 
batches from the JVM
-
-## End to End Flow
-
-The following diagram shows an example of the end-to-end flow for a query 
stage.
-
-![Diagram of Comet Data Flow](/_static/images/comet-dataflow.svg)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to