This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 28d874ce9 chore: Improve documentation for `CometBatchIterator` and 
fix a potential issue (#2168)
28d874ce9 is described below

commit 28d874ce92b51c5c5d6b0dc81f85818ed8e48467
Author: Andy Grove <agr...@apache.org>
AuthorDate: Tue Aug 26 18:36:15 2025 -0600

    chore: Improve documentation for `CometBatchIterator` and fix a potential 
issue (#2168)
---
 .../java/org/apache/comet/CometBatchIterator.java  | 31 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git a/spark/src/main/java/org/apache/comet/CometBatchIterator.java 
b/spark/src/main/java/org/apache/comet/CometBatchIterator.java
index e05bea1df..9b48a47c5 100644
--- a/spark/src/main/java/org/apache/comet/CometBatchIterator.java
+++ b/spark/src/main/java/org/apache/comet/CometBatchIterator.java
@@ -26,13 +26,17 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
 import org.apache.comet.vector.NativeUtil;
 
 /**
- * An iterator that can be used to get batches of Arrow arrays from a Spark 
iterator of
- * ColumnarBatch. It will consume input iterator and return Arrow arrays by 
addresses. This is
- * called by native code to retrieve Arrow arrays from Spark through JNI.
+ * Iterator for fetching batches from JVM to native code. Usually called via 
JNI from native
+ * ScanExec.
+ *
+ * <p>Batches are owned by the JVM. Native code can safely access the batch 
after calling `next` but
+ * the native code must not retain references to the batch because the next 
call to `hasNext` will
+ * signal to the JVM that the batch can be closed.
  */
 public class CometBatchIterator {
-  final Iterator<ColumnarBatch> input;
-  final NativeUtil nativeUtil;
+  private final Iterator<ColumnarBatch> input;
+  private final NativeUtil nativeUtil;
+  private ColumnarBatch previousBatch = null;
   private ColumnarBatch currentBatch = null;
 
   CometBatchIterator(Iterator<ColumnarBatch> input, NativeUtil nativeUtil) {
@@ -41,11 +45,16 @@ public class CometBatchIterator {
   }
 
   /**
-   * Fetch the next input batch.
+   * Fetch the next input batch and allow the previous batch to be closed 
(this may not happen
+   * immediately).
    *
    * @return Number of rows in next batch or -1 if no batches left.
    */
   public int hasNext() {
+
+    // release reference to previous batch
+    previousBatch = null;
+
     if (currentBatch == null) {
       if (input.hasNext()) {
         currentBatch = input.next();
@@ -59,7 +68,7 @@ public class CometBatchIterator {
   }
 
   /**
-   * Get the next batches of Arrow arrays.
+   * Get the next batch of Arrow arrays.
    *
    * @param arrayAddrs The addresses of the ArrowArray structures.
    * @param schemaAddrs The addresses of the ArrowSchema structures.
@@ -69,8 +78,16 @@ public class CometBatchIterator {
     if (currentBatch == null) {
       return -1;
     }
+
+    // export the batch using the Arrow C Data Interface
     int numRows = nativeUtil.exportBatch(arrayAddrs, schemaAddrs, 
currentBatch);
+
+    // keep a reference to the exported batch so that it doesn't get garbage 
collected
+    // while the native code is still processing it
+    previousBatch = currentBatch;
+
     currentBatch = null;
+
     return numRows;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to