Copilot commented on code in PR #1980:
URL: https://github.com/apache/auron/pull/1980#discussion_r2752682407


##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala:
##########
@@ -108,32 +117,40 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow], 
schema: StructType)
 
         nativeCurrentUser.doAs(new PrivilegedExceptionAction[Unit] {
           override def run(): Unit = {
-            while (tc == null || (!tc.isCompleted() && !tc.isInterrupted())) {
-              if (!rowIter.hasNext) {
-                outputQueue.put(Finished(None))
-                return
-              }
+            try {
+              while (tc == null || (!tc.isCompleted() && !tc.isInterrupted())) 
{
+                if (!rowIter.hasNext) {
+                  outputQueue.put(Finished(None))
+                  return
+                }
 
-              Using.resource(CHILD_ALLOCATOR("ArrowFFIExporter")) { allocator 
=>
-                Using.resource(VectorSchemaRoot.create(arrowSchema, 
allocator)) { root =>
-                  val arrowWriter = ArrowWriter.create(root)
-                  while (rowIter.hasNext
-                    && allocator.getAllocatedMemory < maxBatchMemorySize
-                    && arrowWriter.currentCount < maxBatchNumRows) {
-                    arrowWriter.write(rowIter.next())
+                Using.resource(CHILD_ALLOCATOR("ArrowFFIExporter")) { 
allocator =>
+                  Using.resource(VectorSchemaRoot.create(arrowSchema, 
allocator)) { root =>
+                    val arrowWriter = ArrowWriter.create(root)
+                    while (rowIter.hasNext
+                      && allocator.getAllocatedMemory < maxBatchMemorySize
+                      && arrowWriter.currentCount < maxBatchNumRows) {
+                      arrowWriter.write(rowIter.next())
+                    }
+                    arrowWriter.finish()
+
+                    // export root
+                    currentRoot = root
+                    outputQueue.put(NextBatch)
+
+                    // wait for processing next batch
+                    processingQueue.take()
                   }
-                  arrowWriter.finish()
-
-                  // export root
-                  currentRoot = root
-                  outputQueue.put(NextBatch)
-
-                  // wait for processing next batch
-                  processingQueue.take()
                 }
               }
+              outputQueue.put(Finished(None))
+            } catch {
+              case _: InterruptedException =>
+                // Thread was interrupted during close(), this is expected - 
just exit gracefully
+                logDebug(s"ArrowFFIExporter-$exporterId: outputThread 
interrupted, exiting")
+                outputQueue.clear()
+                outputQueue.put(Finished(None))

Review Comment:
   The InterruptedException catch block attempts to call blocking operations 
(outputQueue.clear() and outputQueue.put()) after the thread has been 
interrupted. This can fail because the interrupted status is still set. 
Consider checking and clearing the interrupted status before these operations, 
or using non-blocking alternatives. For example, you could check 
Thread.interrupted() to clear the flag before calling put(), or catch and 
handle any subsequent InterruptedException from these cleanup operations.
   ```suggestion
                   // Clear the interrupted status before performing 
potentially blocking operations
                   Thread.interrupted()
                   outputQueue.clear()
                   try {
                     outputQueue.put(Finished(None))
                   } catch {
                     case _: InterruptedException =>
                       // Restore the interrupted status and exit; cleanup 
notification could not be sent
                       Thread.currentThread().interrupt()
                   }
   ```



##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala:
##########
@@ -156,6 +173,28 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow], 
schema: StructType)
   }
 
   override def close(): Unit = {
-    outputThread.interrupt()
+    // Ensure close() is idempotent - only execute once
+    if (!closed.compareAndSet(false, true)) {
+      logDebug(s"ArrowFFIExporter-$exporterId: close() already called, 
skipping")
+      return
+    }
+
+    if (outputThread.isAlive) {
+      logDebug(s"ArrowFFIExporter-$exporterId: interrupting outputThread")
+      outputThread.interrupt()
+      // Wait for the thread to terminate to ensure resources are properly 
released
+      try {
+        outputThread.join(5000) // Wait up to 5 seconds
+        if (outputThread.isAlive) {
+          logWarning(
+            s"ArrowFFIExporter-$exporterId: outputThread did not terminate 
within 5 seconds")
+        }

Review Comment:
   If the outputThread does not terminate within 5 seconds, only a warning is 
logged but the thread is left running and resources may not be properly 
released. Consider implementing a more robust cleanup strategy, such as 
attempting to drain the queues, forcibly stopping resource-consuming 
operations, or documenting what resources might leak in this scenario and how 
operators should handle this situation.



##########
native-engine/datafusion-ext-plans/src/ffi_reader_exec.rs:
##########
@@ -192,27 +192,41 @@ fn read_ffi(
             struct AutoCloseableExporter(GlobalRef);
             impl Drop for AutoCloseableExporter {
                 fn drop(&mut self) {
-                    let _ = 
jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ());
+                    if let Err(e) = 
jni_call!(JavaAutoCloseable(self.0.as_obj()).close() -> ()) {
+                        log::error!("FFIReader: JNI close() failed: {:?}", e);
+                    }
                 }
             }
             let exporter = AutoCloseableExporter(exporter);
+            log::info!("FFIReader: starting to read from ArrowFFIExporter");
 
             loop {
                 let batch = {
                     // load batch from ffi
-                    let mut ffi_arrow_array = FFI_ArrowArray::empty();
-                    let ffi_arrow_array_ptr = &mut ffi_arrow_array as *mut 
FFI_ArrowArray as i64;
+                    // IMPORTANT: FFI_ArrowArray is created inside 
spawn_blocking to ensure its
+                    // lifetime is tied to the blocking task. This prevents 
data races if the
+                    // async task is aborted while spawn_blocking is still 
running.
                     let exporter_obj = exporter.0.clone();
-                    let has_next = tokio::task::spawn_blocking(move || {
-                        jni_call!(
+                    let ffi_result = match tokio::task::spawn_blocking(move || 
{
+                        let mut ffi_arrow_array = FFI_ArrowArray::empty();
+                        let ffi_arrow_array_ptr =
+                            &mut ffi_arrow_array as *mut FFI_ArrowArray as i64;
+                        let has_next = jni_call!(
                             AuronArrowFFIExporter(exporter_obj.as_obj())
                                 .exportNextBatch(ffi_arrow_array_ptr) -> bool
-                        )
+                        )?;
+                        Ok::<_, DataFusionError>((has_next, ffi_arrow_array))
                     })
                     .await
-                    .expect("tokio spawn_blocking error")?;
+                    {
+                        Ok(Ok(result)) => result,
+                        Ok(Err(err)) => return Err(err),
+                        Err(err) => return df_execution_err!("spawn_blocking 
error: {err:?}"),

Review Comment:
   The error handling for spawn_blocking JoinError deviates from the 
established pattern in the codebase, where other files consistently use 
.expect("tokio spawn_blocking error"). While this change appears intentional to 
handle task abortion more gracefully, it's inconsistent with patterns seen in 
files like agg_table.rs:222, ipc_reader_exec.rs:194, and parquet_exec.rs:311. 
Consider either documenting why this location requires different handling or 
aligning with the existing pattern for consistency.



##########
spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/arrowio/ArrowFFIExporter.scala:
##########
@@ -156,6 +173,28 @@ class ArrowFFIExporter(rowIter: Iterator[InternalRow], 
schema: StructType)
   }
 
   override def close(): Unit = {
-    outputThread.interrupt()
+    // Ensure close() is idempotent - only execute once
+    if (!closed.compareAndSet(false, true)) {
+      logDebug(s"ArrowFFIExporter-$exporterId: close() already called, 
skipping")
+      return
+    }
+
+    if (outputThread.isAlive) {
+      logDebug(s"ArrowFFIExporter-$exporterId: interrupting outputThread")
+      outputThread.interrupt()
+      // Wait for the thread to terminate to ensure resources are properly 
released
+      try {
+        outputThread.join(5000) // Wait up to 5 seconds
+        if (outputThread.isAlive) {
+          logWarning(
+            s"ArrowFFIExporter-$exporterId: outputThread did not terminate 
within 5 seconds")
+        }
+      } catch {
+        case _: InterruptedException =>
+          // Ignore - we don't need to propagate this to caller
+          logDebug(s"ArrowFFIExporter-$exporterId: interrupted while waiting 
for outputThread")
+      }
+      logDebug(s"ArrowFFIExporter-$exporterId: close() completed")
+    }

Review Comment:
   There's a race condition between checking outputThread.isAlive and the 
actual thread termination. After the closed flag is set to true (line 177), the 
outputThread might terminate before the isAlive check on line 182, causing the 
logging and join logic to be skipped. While this is not critical (the thread 
terminates anyway), it could result in inconsistent logging. Consider 
restructuring the logic to always attempt join if the thread was started, 
regardless of the isAlive status at the time of check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to