andygrove opened a new pull request, #3833: URL: https://github.com/apache/datafusion-comet/pull/3833
## Which issue does this PR close? Closes #2453. ## Rationale for this change When a tokio task is spawned for async execution (the path taken when there are no JVM data sources), the execution stream and its `MemoryReservation`s are owned by the tokio task. If `releasePlan` drops the `ExecutionContext` without waiting for the task to complete, the reservations are released asynchronously on the tokio thread — potentially **after** Spark has already called `cleanUpAllAllocatedMemory()`, causing: ``` WARN ExecutionMemoryPool: Internal error: release called on 917504 bytes but task only has 0 bytes of memory from the off-heap execution pool ``` The sequence that triggers this: 1. `TaskCompletionListener` calls `CometExecIterator.close()` → `releasePlan()` via JNI 2. `releasePlan` drops `ExecutionContext`, which drops `batch_receiver` — signaling the tokio task to stop 3. `releasePlan` returns to JVM before the tokio task finishes cleanup 4. Spark calls `cleanUpAllAllocatedMemory()` — zeroes out the task's allocation 5. The tokio task finally drops the stream and its `MemoryReservation`s → `release_to_spark()` → Spark sees "0 bytes" This also fixes Source 2 of #2470 — `GlobalRef`s held by the stream are now dropped while the JVM thread is still the caller, avoiding the "Dropping a GlobalRef in a detached thread" warning. ## What changes are included in this PR? - Add a `task_handle` field to `ExecutionContext` to store the `JoinHandle` from the spawned tokio task - In `releasePlan`, drop the `batch_receiver` first (to signal the task to exit its loop), then `block_on` the handle to wait for the tokio task to complete before dropping the context This guarantees all memory releases and `GlobalRef` drops happen before `releasePlan` returns to the JVM. ## How are these changes tested? This race condition requires a full Spark executor environment where the task completion sequence (`TaskCompletionListener` → `cleanUpAllAllocatedMemory`) races with async tokio task cleanup. It is not reproducible in unit tests. The fix is verified by code inspection: `block_on(handle)` ensures the tokio task completes (dropping the stream and all reservations) before `releasePlan` returns. Clippy passes cleanly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
