This is an automated email from the ASF dual-hosted git repository.
richox pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 7da41e13 [AURON #1957] Fix panic/coredumps/memory leaks and Java
thread InterruptedException errors. (#1980)
7da41e13 is described below
commit 7da41e132f0144900b013e03f95f63c3609a84cd
Author: Bryton Lee <[email protected]>
AuthorDate: Mon Mar 9 15:07:06 2026 +0800
[AURON #1957] Fix panic/coredumps/memory leaks and Java thread
InterruptedException errors. (#1980)
# Which issue does this PR close?
Closes #1957
# Rationale for this change
Optimized rt.rs to prevent SendError panic to gracefully finalize a
native plan execution.
coredumps and memory leaks are all introdced a memory leak issue inside
ffi_reader_exec.rs, when a stream is droped, the Java side ffi import
has a race condition to write memory that was drop in rust side.
Optimized ArrowFFIExporter.scala close() logic to gracefully close
outputThread without sending errors.
# What changes are included in this PR?
Changed rt.rs, ffi_reader_exec.rs and ArrowFFIExporter.scala.
# Are there any user-facing changes?
No
# How was this patch tested?
It was tested on the latest master branch and some earlier versions.
---
native-engine/auron/src/lib.rs | 11 ++-
native-engine/auron/src/rt.rs | 39 ++++++++--
.../datafusion-ext-plans/src/ffi_reader_exec.rs | 32 +++++---
.../execution/auron/arrowio/ArrowFFIExporter.scala | 85 ++++++++++++++++------
4 files changed, 125 insertions(+), 42 deletions(-)
diff --git a/native-engine/auron/src/lib.rs b/native-engine/auron/src/lib.rs
index 2dfb336e..aaf71ee8 100644
--- a/native-engine/auron/src/lib.rs
+++ b/native-engine/auron/src/lib.rs
@@ -55,8 +55,15 @@ fn handle_unwinded(err: Box<dyn Any + Send>) {
}
fn handle_unwinded_scope<T: Default, E: Debug>(scope: impl FnOnce() ->
Result<T, E>) -> T {
- match std::panic::catch_unwind(AssertUnwindSafe(|| scope().expect("scope
failed"))) {
- Ok(v) => v,
+ match std::panic::catch_unwind(AssertUnwindSafe(|| scope())) {
+ Ok(Ok(v)) => v,
+ Ok(Err(err)) => {
+ // Defensive handling: this path should not be reached in normal
operation
+ // after the SendError fixes (is_finalizing flag, FFI_ArrowArray
lifetime).
+ // If triggered, it indicates a new issue that needs investigation.
+ log::error!("error in unwinded scope: {err:?}");
+ T::default()
+ }
Err(err) => {
handle_unwinded(err);
T::default()
diff --git a/native-engine/auron/src/rt.rs b/native-engine/auron/src/rt.rs
index 37f40daf..005aac1d 100644
--- a/native-engine/auron/src/rt.rs
+++ b/native-engine/auron/src/rt.rs
@@ -16,7 +16,11 @@
use std::{
error::Error,
panic::AssertUnwindSafe,
- sync::{Arc, mpsc::Receiver},
+ sync::{
+ Arc,
+ atomic::{AtomicBool, Ordering},
+ mpsc::Receiver,
+ },
};
use arrow::{
@@ -63,6 +67,8 @@ pub struct NativeExecutionRuntime {
batch_receiver: Receiver<Result<Option<RecordBatch>>>,
tokio_runtime: Runtime,
join_handle: JoinHandle<()>,
+ // Flag to indicate runtime is being finalized - used to gracefully handle
SendError
+ is_finalizing: Arc<AtomicBool>,
}
impl NativeExecutionRuntime {
@@ -134,6 +140,8 @@ impl NativeExecutionRuntime {
// spawn batch producer
let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
let err_sender = batch_sender.clone();
+ let is_finalizing = Arc::new(AtomicBool::new(false));
+ let is_finalizing_clone = is_finalizing.clone();
let execution_plan_cloned = execution_plan.clone();
let exec_ctx_cloned = exec_ctx.clone();
let native_wrapper_cloned = native_wrapper.clone();
@@ -173,14 +181,24 @@ impl NativeExecutionRuntime {
.transpose()
.or_else(|err| df_execution_err!("{err}"))?
{
- batch_sender
- .send(Ok(Some(batch)))
- .or_else(|err| df_execution_err!("send batch error:
{err}"))?;
+ if let Err(err) = batch_sender.send(Ok(Some(batch))) {
+ if is_finalizing_clone.load(Ordering::Acquire) {
+ log::debug!("send skipped: runtime is finalizing");
+ return Ok(());
+ } else {
+ return df_execution_err!("unexpected send error:
{err}");
+ }
+ }
+ }
+ log::info!("stream exhausted, sending Ok(None) to signal
completion");
+ if let Err(err) = batch_sender.send(Ok(None)) {
+ if is_finalizing_clone.load(Ordering::Acquire) {
+ log::debug!("send skipped: runtime is finalizing");
+ return Ok(());
+ } else {
+ return df_execution_err!("unexpected send error: {err}");
+ }
}
- batch_sender
- .send(Ok(None))
- .or_else(|err| df_execution_err!("send batch error: {err}"))?;
- log::info!("task finished");
Ok::<_, DataFusionError>(())
};
@@ -224,6 +242,7 @@ impl NativeExecutionRuntime {
tokio_runtime,
batch_receiver,
join_handle,
+ is_finalizing,
};
Ok(native_execution_runtime)
}
@@ -266,6 +285,10 @@ impl NativeExecutionRuntime {
log::info!("(partition={partition}) native execution finalizing");
self.update_metrics().unwrap_or_default();
drop(self.plan);
+
+ // Set finalizing flag before dropping receiver to allow graceful
SendError
+ // handling
+ self.is_finalizing.store(true, Ordering::Release);
drop(self.batch_receiver);
cancel_all_tasks(&self.exec_ctx.task_ctx()); // cancel all pending
streams
diff --git a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
index 1d579234..81097c9e 100644
--- a/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
+++ b/native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs
@@ -26,7 +26,7 @@ use arrow::{
};
use auron_jni_bridge::{jni_call, jni_call_static, jni_new_global_ref,
jni_new_string};
use datafusion::{
- error::Result,
+ error::{DataFusionError, Result},
execution::context::TaskContext,
physical_expr::EquivalenceProperties,
physical_plan::{
@@ -37,7 +37,7 @@ use datafusion::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
},
};
-use datafusion_ext_commons::arrow::array_size::BatchSize;
+use datafusion_ext_commons::{arrow::array_size::BatchSize, df_execution_err};
use jni::objects::GlobalRef;
use once_cell::sync::OnceCell;
@@ -192,27 +192,41 @@ fn read_ffi(
struct AutoCloseableExporter(GlobalRef);
impl Drop for AutoCloseableExporter {
fn drop(&mut self) {
- let _ =
jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ());
+ if let Err(e) =
jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ()) {
+ log::error!("FFIReader: JNI close() failed: {e:?}");
+ }
}
}
let exporter = AutoCloseableExporter(exporter);
+ log::info!("FFIReader: starting to read from ArrowFFIExporter");
loop {
let batch = {
// load batch from ffi
- let mut ffi_arrow_array = FFI_ArrowArray::empty();
- let ffi_arrow_array_ptr = &mut ffi_arrow_array as *mut
FFI_ArrowArray as i64;
+ // IMPORTANT: FFI_ArrowArray is created inside
spawn_blocking to ensure its
+ // lifetime is tied to the blocking task. This prevents
data races if the
+ // async task is aborted while spawn_blocking is still
running.
let exporter_obj = exporter.0.clone();
- let has_next = tokio::task::spawn_blocking(move || {
- jni_call!(
+ let ffi_result = match tokio::task::spawn_blocking(move ||
{
+ let mut ffi_arrow_array = FFI_ArrowArray::empty();
+ let ffi_arrow_array_ptr =
+ &mut ffi_arrow_array as *mut FFI_ArrowArray as i64;
+ let has_next = jni_call!(
AuronArrowFFIExporter(exporter_obj.as_obj())
.exportNextBatch(ffi_arrow_array_ptr) -> bool
- )
+ )?;
+ Ok::<_, DataFusionError>((has_next, ffi_arrow_array))
})
.await
- .expect("tokio spawn_blocking error")?;
+ {
+ Ok(Ok(result)) => result,
+ Ok(Err(err)) => return Err(err),
+ Err(err) => return df_execution_err!("spawn_blocking
error: {err:?}"),
+ };
+ let (has_next, ffi_arrow_array) = ffi_result;
if !has_next {
+ log::info!("FFIReader: no more batches, exiting read
loop");
break;
}
let import_data_type =
DataType::Struct(schema.fields().clone());
diff --git
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
index 591ae266..a23acebf 100644
---
a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
+++
b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala
@@ -27,6 +27,7 @@ import org.apache.arrow.c.Data
import org.apache.arrow.vector.VectorSchemaRoot
import
org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider
import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.auron.NativeHelper
import org.apache.spark.sql.auron.util.Using
import org.apache.spark.sql.catalyst.InternalRow
@@ -42,7 +43,8 @@ import org.apache.auron.jni.AuronAdaptor
import org.apache.auron.spark.configuration.SparkAuronConfiguration
class ArrowFFIExporter(rowIter: Iterator[InternalRow], schema: StructType)
- extends AuronArrowFFIExporter {
+ extends AuronArrowFFIExporter
+ with Logging {
private val sparkAuronConfig: AuronConfiguration =
AuronAdaptor.getInstance.getAuronConfiguration
private val maxBatchNumRows =
sparkAuronConfig.getInteger(AuronConfiguration.BATCH_SIZE)
@@ -58,6 +60,13 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow],
schema: StructType)
private case class Finished(t: Option[Throwable]) extends QueueState
private val tc = TaskContext.get()
+ // Build a meaningful identifier from TaskContext info
+ private val exporterId = if (tc != null) {
+
s"stage-${tc.stageId()}-part-${tc.partitionId()}-tid-${tc.taskAttemptId()}-${System.identityHashCode(this)}"
+ } else {
+ s"no-context-${System.identityHashCode(this)}"
+ }
+ private val closed = new java.util.concurrent.atomic.AtomicBoolean(false)
private val outputQueue: BlockingQueue[QueueState] = new
ArrayBlockingQueue[QueueState](16)
private val processingQueue: BlockingQueue[Unit] = new
ArrayBlockingQueue[Unit](16)
private var currentRoot: VectorSchemaRoot = _
@@ -108,32 +117,40 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow],
schema: StructType)
nativeCurrentUser.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
- while (tc == null || (!tc.isCompleted() && !tc.isInterrupted())) {
- if (!rowIter.hasNext) {
- outputQueue.put(Finished(None))
- return
- }
+ try {
+ while (tc == null || (!tc.isCompleted() && !tc.isInterrupted()))
{
+ if (!rowIter.hasNext) {
+ outputQueue.put(Finished(None))
+ return
+ }
- Using.resource(CHILD_ALLOCATOR("ArrowFFIExporter")) { allocator
=>
- Using.resource(VectorSchemaRoot.create(arrowSchema,
allocator)) { root =>
- val arrowWriter = ArrowWriter.create(root)
- while (rowIter.hasNext
- && allocator.getAllocatedMemory < maxBatchMemorySize
- && arrowWriter.currentCount < maxBatchNumRows) {
- arrowWriter.write(rowIter.next())
+ Using.resource(CHILD_ALLOCATOR("ArrowFFIExporter")) {
allocator =>
+ Using.resource(VectorSchemaRoot.create(arrowSchema,
allocator)) { root =>
+ val arrowWriter = ArrowWriter.create(root)
+ while (rowIter.hasNext
+ && allocator.getAllocatedMemory < maxBatchMemorySize
+ && arrowWriter.currentCount < maxBatchNumRows) {
+ arrowWriter.write(rowIter.next())
+ }
+ arrowWriter.finish()
+
+ // export root
+ currentRoot = root
+ outputQueue.put(NextBatch)
+
+ // wait for processing next batch
+ processingQueue.take()
}
- arrowWriter.finish()
-
- // export root
- currentRoot = root
- outputQueue.put(NextBatch)
-
- // wait for processing next batch
- processingQueue.take()
}
}
+ outputQueue.put(Finished(None))
+ } catch {
+ case _: InterruptedException =>
+ // Thread was interrupted during close(), this is expected -
just exit gracefully
+ logDebug(s"ArrowFFIExporter-$exporterId: outputThread
interrupted, exiting")
+ outputQueue.clear()
+ outputQueue.put(Finished(None))
}
- outputQueue.put(Finished(None))
}
})
}
@@ -156,6 +173,28 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow],
schema: StructType)
}
override def close(): Unit = {
- outputThread.interrupt()
+ // Ensure close() is idempotent - only execute once
+ if (!closed.compareAndSet(false, true)) {
+ logDebug(s"ArrowFFIExporter-$exporterId: close() already called,
skipping")
+ return
+ }
+
+ if (outputThread.isAlive) {
+ logDebug(s"ArrowFFIExporter-$exporterId: interrupting outputThread")
+ outputThread.interrupt()
+ // Wait for the thread to terminate to ensure resources are properly
released
+ try {
+ outputThread.join(5000) // Wait up to 5 seconds
+ if (outputThread.isAlive) {
+ logWarning(
+ s"ArrowFFIExporter-$exporterId: outputThread did not terminate
within 5 seconds")
+ }
+ } catch {
+ case _: InterruptedException =>
+ // Ignore - we don't need to propagate this to caller
+ logDebug(s"ArrowFFIExporter-$exporterId: interrupted while waiting
for outputThread")
+ }
+ logDebug(s"ArrowFFIExporter-$exporterId: close() completed")
+ }
}
}