Copilot commented on code in PR #1980:
URL: https://github.com/apache/auron/pull/1980#discussion_r2752682407
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala:
##########
@@ -108,32 +117,40 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow],
schema: StructType)
nativeCurrentUser.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
- while (tc == null || (!tc.isCompleted() && !tc.isInterrupted())) {
- if (!rowIter.hasNext) {
- outputQueue.put(Finished(None))
- return
- }
+ try {
+ while (tc == null || (!tc.isCompleted() && !tc.isInterrupted()))
{
+ if (!rowIter.hasNext) {
+ outputQueue.put(Finished(None))
+ return
+ }
- Using.resource(CHILD_ALLOCATOR("ArrowFFIExporter")) { allocator
=>
- Using.resource(VectorSchemaRoot.create(arrowSchema,
allocator)) { root =>
- val arrowWriter = ArrowWriter.create(root)
- while (rowIter.hasNext
- && allocator.getAllocatedMemory < maxBatchMemorySize
- && arrowWriter.currentCount < maxBatchNumRows) {
- arrowWriter.write(rowIter.next())
+ Using.resource(CHILD_ALLOCATOR("ArrowFFIExporter")) {
allocator =>
+ Using.resource(VectorSchemaRoot.create(arrowSchema,
allocator)) { root =>
+ val arrowWriter = ArrowWriter.create(root)
+ while (rowIter.hasNext
+ && allocator.getAllocatedMemory < maxBatchMemorySize
+ && arrowWriter.currentCount < maxBatchNumRows) {
+ arrowWriter.write(rowIter.next())
+ }
+ arrowWriter.finish()
+
+ // export root
+ currentRoot = root
+ outputQueue.put(NextBatch)
+
+ // wait for processing next batch
+ processingQueue.take()
}
- arrowWriter.finish()
-
- // export root
- currentRoot = root
- outputQueue.put(NextBatch)
-
- // wait for processing next batch
- processingQueue.take()
}
}
+ outputQueue.put(Finished(None))
+ } catch {
+ case _: InterruptedException =>
+ // Thread was interrupted during close(), this is expected -
just exit gracefully
+ logDebug(s"ArrowFFIExporter-$exporterId: outputThread
interrupted, exiting")
+ outputQueue.clear()
+ outputQueue.put(Finished(None))
Review Comment:
The InterruptedException catch block attempts to call blocking operations
(outputQueue.clear() and outputQueue.put()) after the thread has been
interrupted. This can fail because the interrupted status is still set.
Consider checking and clearing the interrupted status before these operations,
or using non-blocking alternatives. For example, you could check
Thread.interrupted() to clear the flag before calling put(), or catch and
handle any subsequent InterruptedException from these cleanup operations.
```suggestion
// Clear the interrupted status before performing
potentially blocking operations
Thread.interrupted()
outputQueue.clear()
try {
outputQueue.put(Finished(None))
} catch {
case _: InterruptedException =>
// Restore the interrupted status and exit; cleanup
notification could not be sent
Thread.currentThread().interrupt()
}
```
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala:
##########
@@ -156,6 +173,28 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow],
schema: StructType)
}
override def close(): Unit = {
- outputThread.interrupt()
+ // Ensure close() is idempotent - only execute once
+ if (!closed.compareAndSet(false, true)) {
+ logDebug(s"ArrowFFIExporter-$exporterId: close() already called,
skipping")
+ return
+ }
+
+ if (outputThread.isAlive) {
+ logDebug(s"ArrowFFIExporter-$exporterId: interrupting outputThread")
+ outputThread.interrupt()
+ // Wait for the thread to terminate to ensure resources are properly
released
+ try {
+ outputThread.join(5000) // Wait up to 5 seconds
+ if (outputThread.isAlive) {
+ logWarning(
+ s"ArrowFFIExporter-$exporterId: outputThread did not terminate
within 5 seconds")
+ }
Review Comment:
If the outputThread does not terminate within 5 seconds, only a warning is
logged but the thread is left running and resources may not be properly
released. Consider implementing a more robust cleanup strategy, such as
attempting to drain the queues, forcibly stopping resource-consuming
operations, or documenting what resources might leak in this scenario and how
operators should handle this situation.
##########
native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs:
##########
@@ -192,27 +192,41 @@ fn read_ffi(
struct AutoCloseableExporter(GlobalRef);
impl Drop for AutoCloseableExporter {
fn drop(&mut self) {
- let _ =
jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ());
+ if let Err(e) =
jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ()) {
+ log::error!("FFIReader: JNI close() failed: {:?}", e);
+ }
}
}
let exporter = AutoCloseableExporter(exporter);
+ log::info!("FFIReader: starting to read from ArrowFFIExporter");
loop {
let batch = {
// load batch from ffi
- let mut ffi_arrow_array = FFI_ArrowArray::empty();
- let ffi_arrow_array_ptr = &mut ffi_arrow_array as *mut
FFI_ArrowArray as i64;
+ // IMPORTANT: FFI_ArrowArray is created inside
spawn_blocking to ensure its
+ // lifetime is tied to the blocking task. This prevents
data races if the
+ // async task is aborted while spawn_blocking is still
running.
let exporter_obj = exporter.0.clone();
- let has_next = tokio::task::spawn_blocking(move || {
- jni_call!(
+ let ffi_result = match tokio::task::spawn_blocking(move ||
{
+ let mut ffi_arrow_array = FFI_ArrowArray::empty();
+ let ffi_arrow_array_ptr =
+ &mut ffi_arrow_array as *mut FFI_ArrowArray as i64;
+ let has_next = jni_call!(
AuronArrowFFIExporter(exporter_obj.as_obj())
.exportNextBatch(ffi_arrow_array_ptr) -> bool
- )
+ )?;
+ Ok::<_, DataFusionError>((has_next, ffi_arrow_array))
})
.await
- .expect("tokio spawn_blocking error")?;
+ {
+ Ok(Ok(result)) => result,
+ Ok(Err(err)) => return Err(err),
+ Err(err) => return df_execution_err!("spawn_blocking
error: {err:?}"),
Review Comment:
The error handling for spawn_blocking JoinError deviates from the
established pattern in the codebase, where other files consistently use
.expect("tokio spawn_blocking error"). While this change appears intentional to
handle task abortion more gracefully, it's inconsistent with patterns seen in
files like agg_table.rs:222, ipc_reader_exec.rs:194, and parquet_exec.rs:311.
Consider either documenting why this location requires different handling or
aligning with the existing pattern for consistency.
##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala:
##########
@@ -156,6 +173,28 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow],
schema: StructType)
}
override def close(): Unit = {
- outputThread.interrupt()
+ // Ensure close() is idempotent - only execute once
+ if (!closed.compareAndSet(false, true)) {
+ logDebug(s"ArrowFFIExporter-$exporterId: close() already called,
skipping")
+ return
+ }
+
+ if (outputThread.isAlive) {
+ logDebug(s"ArrowFFIExporter-$exporterId: interrupting outputThread")
+ outputThread.interrupt()
+ // Wait for the thread to terminate to ensure resources are properly
released
+ try {
+ outputThread.join(5000) // Wait up to 5 seconds
+ if (outputThread.isAlive) {
+ logWarning(
+ s"ArrowFFIExporter-$exporterId: outputThread did not terminate
within 5 seconds")
+ }
+ } catch {
+ case _: InterruptedException =>
+ // Ignore - we don't need to propagate this to caller
+ logDebug(s"ArrowFFIExporter-$exporterId: interrupted while waiting
for outputThread")
+ }
+ logDebug(s"ArrowFFIExporter-$exporterId: close() completed")
+ }
Review Comment:
There's a race condition between checking outputThread.isAlive and the
actual thread termination. After the closed flag is set to true (line 177), the
outputThread might terminate before the isAlive check on line 182, causing the
logging and join logic to be skipped. While this is not critical (the thread
terminates anyway), it could result in inconsistent logging. Consider
restructuring the logic to always attempt join if the thread was started,
regardless of the isAlive status at the time of check.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]