This is an automated email from the ASF dual-hosted git repository.
lihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 1c4a4aeb [AURON #2022] [BUG] Native engine panic on closed channel
causes JVM crash. (#2023)
1c4a4aeb is described below
commit 1c4a4aebd59bfd50ad2daab2b0c8e5f7f0c2e18c
Author: slfan1989 <[email protected]>
AuthorDate: Sat Feb 28 17:15:36 2026 +0800
[AURON #2022] [BUG] Native engine panic on closed channel causes JVM crash.
(#2023)
### Which issue does this PR close?
Closes #2022
### Rationale for this change
When running TPC-DS queries (q60-q69) with Spark 4.0 / JDK 21, the
native engine panics when `WrappedSender::send` attempts to send data
into a closed channel. This panic escalates into a JVM crash with exit
code 134.
The root cause is a race condition where the producer task continues
sending data while the receiver has already been closed/canceled due to
task completion or cancellation. The current implementation panics on
send failure instead of handling it gracefully.
Link:
https://github.com/apache/auron/actions/runs/22128240337/job/63964149831?pr=2018
### What changes are included in this PR?
Modified `WrappedSender::send` in `execution_context.rs` to gracefully
handle channel closure:
- Check `send().await.is_err()` instead of panicking
- Log debug message with context (partition_id, task_id, session_id) for
observability
- Return early without updating metrics when channel is closed
### Are there any user-facing changes?
No user-facing changes.
This is an internal fix that prevents JVM crashes when tasks are
canceled or completed early.
### How was this patch tested?
Exist Junit Test.
Signed-off-by: slfan1989 <[email protected]>
---
.../datafusion-ext-plans/src/common/execution_context.rs | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs
b/native-engine/datafusion-ext-plans/src/common/execution_context.rs
index 23ee22f1..d274e194 100644
--- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs
+++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs
@@ -47,6 +47,7 @@ use datafusion_ext_commons::{
};
use futures::{Stream, StreamExt};
use futures_util::{FutureExt, stream::BoxStream};
+use log::debug;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use tokio::{
@@ -717,10 +718,16 @@ impl<T: RecordBatchWithPayload> WrappedSender<T> {
}
let exclude_time = self.exclude_time.get().cloned();
let send_time = exclude_time.as_ref().map(|_| Instant::now());
- self.sender
- .send(Ok(batch))
- .await
- .unwrap_or_else(|err| panic!("output_with_sender: send error:
{err}"));
+ if self.sender.send(Ok(batch)).await.is_err() {
+ let task_ctx = self.exec_ctx.task_ctx();
+ debug!(
+ "output_with_sender: channel closed, skipping batch send;
partition_id={}, task_id={:?}, session_id={}",
+ self.exec_ctx.partition_id(),
+ task_ctx.task_id(),
+ task_ctx.session_id()
+ );
+ return;
+ }
send_time.inspect(|send_time| {
exclude_time