This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f3a71d4c fix: Fix potential resource leak in native shuffle block 
reader (#2247)
6f3a71d4c is described below

commit 6f3a71d4c0447e31075b44b2a15202efd605cf00
Author: Andy Grove <agr...@apache.org>
AuthorDate: Wed Aug 27 13:03:27 2025 -0600

    fix: Fix potential resource leak in native shuffle block reader (#2247)
---
 .../sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
index 7ff43d8c3..126db2c63 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
@@ -36,7 +36,7 @@ import org.apache.comet.vector.NativeUtil
  * and use Arrow FFI to return the Arrow record batch.
  */
 case class NativeBatchDecoderIterator(
-    var in: InputStream,
+    in: InputStream,
     taskContext: TaskContext,
     decodeTime: SQLMetric)
     extends Iterator[ColumnarBatch] {
@@ -45,6 +45,7 @@ case class NativeBatchDecoderIterator(
   private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN)
   private val native = new Native()
   private val nativeUtil = new NativeUtil()
+  private val tracingEnabled = CometConf.COMET_TRACING_ENABLED.get()
   private var currentBatch: ColumnarBatch = null
   private var batch = fetchNext()
 
@@ -167,7 +168,7 @@ case class NativeBatchDecoderIterator(
           bytesToRead.toInt,
           arrayAddrs,
           schemaAddrs,
-          CometConf.COMET_TRACING_ENABLED.get())
+          tracingEnabled)
       })
     decodeTime.add(System.nanoTime() - startTime)
 
@@ -182,6 +183,7 @@ case class NativeBatchDecoderIterator(
           currentBatch = null
         }
         in.close()
+        nativeUtil.close()
         resetDataBuf()
         isClosed = true
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to