andygrove opened a new pull request, #3833:
URL: https://github.com/apache/datafusion-comet/pull/3833

   ## Which issue does this PR close?
   
   Closes #2453.
   
   ## Rationale for this change
   
   When a tokio task is spawned for async execution (the path taken when there 
are no JVM data sources), the execution stream and its `MemoryReservation`s are 
owned by the tokio task. If `releasePlan` drops the `ExecutionContext` without 
waiting for the task to complete, the reservations are released asynchronously 
on the tokio thread — potentially **after** Spark has already called 
`cleanUpAllAllocatedMemory()`, causing:
   
   ```
   WARN ExecutionMemoryPool: Internal error: release called on 917504 bytes but 
task only has 0 bytes of memory from the off-heap execution pool
   ```
   
   The sequence that triggers this:
   1. `TaskCompletionListener` calls `CometExecIterator.close()` → 
`releasePlan()` via JNI
   2. `releasePlan` drops `ExecutionContext`, which drops `batch_receiver` — 
signaling the tokio task to stop
   3. `releasePlan` returns to JVM before the tokio task finishes cleanup
   4. Spark calls `cleanUpAllAllocatedMemory()` — zeroes out the task's 
allocation
   5. The tokio task finally drops the stream and its `MemoryReservation`s → 
`release_to_spark()` → Spark sees "0 bytes"
   
   This also fixes Source 2 of #2470 — `GlobalRef`s held by the stream are now 
dropped while the JVM thread is still the caller, avoiding the "Dropping a 
GlobalRef in a detached thread" warning.
   
   ## What changes are included in this PR?
   
   - Add a `task_handle` field to `ExecutionContext` to store the `JoinHandle` 
from the spawned tokio task
   - In `releasePlan`, drop the `batch_receiver` first (to signal the task to 
exit its loop), then `block_on` the handle to wait for the tokio task to 
complete before dropping the context
   
   This guarantees all memory releases and `GlobalRef` drops happen before 
`releasePlan` returns to the JVM.
   
   ## How are these changes tested?
   
   This race condition requires a full Spark executor environment where the 
task completion sequence (`TaskCompletionListener` → 
`cleanUpAllAllocatedMemory`) races with async tokio task cleanup. It is not 
reproducible in unit tests. The fix is verified by code inspection: 
`block_on(handle)` ensures the tokio task completes (dropping the stream and 
all reservations) before `releasePlan` returns. Clippy passes cleanly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to