This is an automated email from the ASF dual-hosted git repository.

lihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c4a4aeb [AURON #2022] [BUG] Native engine panic on closed channel 
causes JVM crash. (#2023)
1c4a4aeb is described below

commit 1c4a4aebd59bfd50ad2daab2b0c8e5f7f0c2e18c
Author: slfan1989 <[email protected]>
AuthorDate: Sat Feb 28 17:15:36 2026 +0800

    [AURON #2022] [BUG] Native engine panic on closed channel causes JVM crash. 
(#2023)
    
    ### Which issue does this PR close?
    
    Closes #2022
    
    ### Rationale for this change
    
    When running TPC-DS queries (q60-q69) with Spark 4.0 / JDK 21, the
    native engine panics when `WrappedSender::send` attempts to send data
    into a closed channel. This panic escalates into a JVM crash with exit
    code 134.
    
    The root cause is a race condition where the producer task continues
    sending data while the receiver has already been closed/canceled due to
    task completion or cancellation. The current implementation panics on
    send failure instead of handling it gracefully.
    
    Link:
    
https://github.com/apache/auron/actions/runs/22128240337/job/63964149831?pr=2018
    
    ### What changes are included in this PR?
    
    Modified `WrappedSender::send` in `execution_context.rs` to gracefully
    handle channel closure:
    - Check `send().await.is_err()` instead of panicking
    - Log debug message with context (partition_id, task_id, session_id) for
    observability
    - Return early without updating metrics when channel is closed
    
    ### Are there any user-facing changes?
    
    No user-facing changes.
    This is an internal fix that prevents JVM crashes when tasks are
    canceled or completed early.
    
    ### How was this patch tested?
    
    Exist Junit Test.
    
    Signed-off-by: slfan1989 <[email protected]>
---
 .../datafusion-ext-plans/src/common/execution_context.rs  | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/native-engine/datafusion-ext-plans/src/common/execution_context.rs 
b/native-engine/datafusion-ext-plans/src/common/execution_context.rs
index 23ee22f1..d274e194 100644
--- a/native-engine/datafusion-ext-plans/src/common/execution_context.rs
+++ b/native-engine/datafusion-ext-plans/src/common/execution_context.rs
@@ -47,6 +47,7 @@ use datafusion_ext_commons::{
 };
 use futures::{Stream, StreamExt};
 use futures_util::{FutureExt, stream::BoxStream};
+use log::debug;
 use once_cell::sync::OnceCell;
 use parking_lot::Mutex;
 use tokio::{
@@ -717,10 +718,16 @@ impl<T: RecordBatchWithPayload> WrappedSender<T> {
         }
         let exclude_time = self.exclude_time.get().cloned();
         let send_time = exclude_time.as_ref().map(|_| Instant::now());
-        self.sender
-            .send(Ok(batch))
-            .await
-            .unwrap_or_else(|err| panic!("output_with_sender: send error: 
{err}"));
+        if self.sender.send(Ok(batch)).await.is_err() {
+            let task_ctx = self.exec_ctx.task_ctx();
+            debug!(
+                "output_with_sender: channel closed, skipping batch send; 
partition_id={}, task_id={:?}, session_id={}",
+                self.exec_ctx.partition_id(),
+                task_ctx.task_id(),
+                task_ctx.session_id()
+            );
+            return;
+        }
 
         send_time.inspect(|send_time| {
             exclude_time

Reply via email to