This is an automated email from the ASF dual-hosted git repository. mbutrovich pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new ae35db75f feat: Reset data buf of NativeBatchDecoderIterator on close (#2235) ae35db75f is described below commit ae35db75f1ed2b83ad177c7e62fa36502d611cc4 Author: Zhen Wang <643348...@qq.com> AuthorDate: Thu Aug 28 01:14:10 2025 +0800 feat: Reset data buf of NativeBatchDecoderIterator on close (#2235) * Reset data buf of NativeBatchDecoderIterator on close * address comment * address comment --- .../execution/shuffle/NativeBatchDecoderIterator.scala | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala index d461564f0..7ff43d8c3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala @@ -48,7 +48,7 @@ case class NativeBatchDecoderIterator( private var currentBatch: ColumnarBatch = null private var batch = fetchNext() - import NativeBatchDecoderIterator.threadLocalDataBuf + import NativeBatchDecoderIterator._ if (taskContext != null) { taskContext.addTaskCompletionListener[Unit](_ => { @@ -182,6 +182,7 @@ case class NativeBatchDecoderIterator( currentBatch = null } in.close() + resetDataBuf() isClosed = true } } @@ -189,7 +190,16 @@ case class NativeBatchDecoderIterator( } object NativeBatchDecoderIterator { + + private val INITIAL_BUFFER_SIZE = 128 * 1024 + private val threadLocalDataBuf: ThreadLocal[ByteBuffer] = ThreadLocal.withInitial(() => { - ByteBuffer.allocateDirect(128 * 1024) + ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE) }) + + private def resetDataBuf(): Unit = { + if (threadLocalDataBuf.get().capacity() > INITIAL_BUFFER_SIZE) { + threadLocalDataBuf.set(ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE)) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org