andygrove opened a new issue, #3308:
URL: https://github.com/apache/datafusion-comet/issues/3308

   ## Summary
   
   The native columnar-to-row conversion (`CometNativeColumnarToRowExec`) 
returns `UnsafeRow` objects that point directly into a Rust-owned buffer via 
raw pointer. This buffer is cleared and reused on each `convert()` call, which 
can cause SIGSEGV if any row reference outlives the current batch.
   
   ## Root Cause
   
   In `NativeColumnarToRowConverter.scala`, the `NativeRowIterator` does:
   
   ```scala
   unsafeRow.pointTo(null, rowAddress, rowSize)
   ```
   
   This points the `UnsafeRow` at off-heap native memory owned by 
`ColumnarToRowContext.buffer` on the Rust side. The Rust `convert()` method 
calls `self.buffer.clear()` at the start of each invocation, invalidating all 
previous row pointers.
   
   ## Affected Code Paths
   
   ### Broadcast path (high risk)
   
   In `CometNativeColumnarToRowExec.scala` (line ~110-130), multiple batches 
are lazily converted via `flatMap` and passed to `mode.transform(rows, ...)`. 
When the second batch's `convert()` runs, any `UnsafeRow` from the first batch 
that hasn't been copied yet points to overwritten or freed memory. Whether this 
is safe depends on `mode.transform()` eagerly copying each row before the 
iterator advances — a fragile assumption.
   
   ### Streaming path (lower risk)
   
   The `doExecute()` path uses `flatMap` which naturally consumes one batch's 
iterator before requesting the next, so rows don't span batches. However, 
downstream Spark operators that retain `InternalRow` references (e.g., sort 
with spill, certain joins, caching) could still hold stale pointers.
   
   ## Symptoms
   
   - SIGSEGV in CI (intermittent, timing-dependent)
   - Likely became more visible after enabling native columnar-to-row by 
default (#3299)
   
   ## Suggested Fix
   
   Return `unsafeRow.copy()` from `NativeRowIterator.next()` to copy each row 
to Java heap memory. This eliminates dangling pointer risk while preserving the 
performance benefit of native conversion (the copy is just a `memcpy` of 
already-serialized row bytes).
   
   A more targeted fix would be to copy only in the broadcast path, or to 
change the Rust side to allocate a new buffer per batch instead of reusing.
   
   ## Related
   
   - #3299 (feat: Enable native columnar-to-row by default)
   - `native/core/src/execution/columnar_to_row.rs`
   - `spark/src/main/scala/org/apache/comet/NativeColumnarToRowConverter.scala`
   - 
`spark/src/main/scala/org/apache/spark/sql/comet/CometNativeColumnarToRowExec.scala`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to