This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 28d874ce9 chore: Improve documentation for `CometBatchIterator` and fix a potential issue (#2168) 28d874ce9 is described below commit 28d874ce92b51c5c5d6b0dc81f85818ed8e48467 Author: Andy Grove <agr...@apache.org> AuthorDate: Tue Aug 26 18:36:15 2025 -0600 chore: Improve documentation for `CometBatchIterator` and fix a potential issue (#2168) --- .../java/org/apache/comet/CometBatchIterator.java | 31 +++++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/spark/src/main/java/org/apache/comet/CometBatchIterator.java b/spark/src/main/java/org/apache/comet/CometBatchIterator.java index e05bea1df..9b48a47c5 100644 --- a/spark/src/main/java/org/apache/comet/CometBatchIterator.java +++ b/spark/src/main/java/org/apache/comet/CometBatchIterator.java @@ -26,13 +26,17 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.comet.vector.NativeUtil; /** - * An iterator that can be used to get batches of Arrow arrays from a Spark iterator of - * ColumnarBatch. It will consume input iterator and return Arrow arrays by addresses. This is - * called by native code to retrieve Arrow arrays from Spark through JNI. + * Iterator for fetching batches from JVM to native code. Usually called via JNI from native + * ScanExec. + * + * <p>Batches are owned by the JVM. Native code can safely access the batch after calling `next` but + * the native code must not retain references to the batch because the next call to `hasNext` will + * signal to the JVM that the batch can be closed. */ public class CometBatchIterator { - final Iterator<ColumnarBatch> input; - final NativeUtil nativeUtil; + private final Iterator<ColumnarBatch> input; + private final NativeUtil nativeUtil; + private ColumnarBatch previousBatch = null; private ColumnarBatch currentBatch = null; CometBatchIterator(Iterator<ColumnarBatch> input, NativeUtil nativeUtil) { @@ -41,11 +45,16 @@ public class CometBatchIterator { } /** - * Fetch the next input batch. + * Fetch the next input batch and allow the previous batch to be closed (this may not happen + * immediately). * * @return Number of rows in next batch or -1 if no batches left. */ public int hasNext() { + + // release reference to previous batch + previousBatch = null; + if (currentBatch == null) { if (input.hasNext()) { currentBatch = input.next(); @@ -59,7 +68,7 @@ public class CometBatchIterator { } /** - * Get the next batches of Arrow arrays. + * Get the next batch of Arrow arrays. * * @param arrayAddrs The addresses of the ArrowArray structures. * @param schemaAddrs The addresses of the ArrowSchema structures. @@ -69,8 +78,16 @@ public class CometBatchIterator { if (currentBatch == null) { return -1; } + + // export the batch using the Arrow C Data Interface int numRows = nativeUtil.exportBatch(arrayAddrs, schemaAddrs, currentBatch); + + // keep a reference to the exported batch so that it doesn't get garbage collected + // while the native code is still processing it + previousBatch = currentBatch; + currentBatch = null; + return numRows; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org