This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 6f3a71d4c fix: Fix potential resource leak in native shuffle block reader (#2247) 6f3a71d4c is described below commit 6f3a71d4c0447e31075b44b2a15202efd605cf00 Author: Andy Grove <agr...@apache.org> AuthorDate: Wed Aug 27 13:03:27 2025 -0600 fix: Fix potential resource leak in native shuffle block reader (#2247) --- .../sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala index 7ff43d8c3..126db2c63 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala @@ -36,7 +36,7 @@ import org.apache.comet.vector.NativeUtil * and use Arrow FFI to return the Arrow record batch. */ case class NativeBatchDecoderIterator( - var in: InputStream, + in: InputStream, taskContext: TaskContext, decodeTime: SQLMetric) extends Iterator[ColumnarBatch] { @@ -45,6 +45,7 @@ case class NativeBatchDecoderIterator( private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN) private val native = new Native() private val nativeUtil = new NativeUtil() + private val tracingEnabled = CometConf.COMET_TRACING_ENABLED.get() private var currentBatch: ColumnarBatch = null private var batch = fetchNext() @@ -167,7 +168,7 @@ case class NativeBatchDecoderIterator( bytesToRead.toInt, arrayAddrs, schemaAddrs, - CometConf.COMET_TRACING_ENABLED.get()) + tracingEnabled) }) decodeTime.add(System.nanoTime() - startTime) @@ -182,6 +183,7 @@ case class NativeBatchDecoderIterator( currentBatch = null } in.close() + nativeUtil.close() resetDataBuf() isClosed = true } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org