This is an automated email from the ASF dual-hosted git repository.

mbutrovich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new ae35db75f feat: Reset data buf of NativeBatchDecoderIterator on close 
(#2235)
ae35db75f is described below

commit ae35db75f1ed2b83ad177c7e62fa36502d611cc4
Author: Zhen Wang <643348...@qq.com>
AuthorDate: Thu Aug 28 01:14:10 2025 +0800

    feat: Reset data buf of NativeBatchDecoderIterator on close (#2235)
    
    * Reset data buf of NativeBatchDecoderIterator on close
    
    * address comment
    
    * address comment
---
 .../execution/shuffle/NativeBatchDecoderIterator.scala     | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
index d461564f0..7ff43d8c3 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala
@@ -48,7 +48,7 @@ case class NativeBatchDecoderIterator(
   private var currentBatch: ColumnarBatch = null
   private var batch = fetchNext()
 
-  import NativeBatchDecoderIterator.threadLocalDataBuf
+  import NativeBatchDecoderIterator._
 
   if (taskContext != null) {
     taskContext.addTaskCompletionListener[Unit](_ => {
@@ -182,6 +182,7 @@ case class NativeBatchDecoderIterator(
           currentBatch = null
         }
         in.close()
+        resetDataBuf()
         isClosed = true
       }
     }
@@ -189,7 +190,16 @@ case class NativeBatchDecoderIterator(
 }
 
 object NativeBatchDecoderIterator {
+
+  private val INITIAL_BUFFER_SIZE = 128 * 1024
+
   private val threadLocalDataBuf: ThreadLocal[ByteBuffer] = 
ThreadLocal.withInitial(() => {
-    ByteBuffer.allocateDirect(128 * 1024)
+    ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE)
   })
+
+  private def resetDataBuf(): Unit = {
+    if (threadLocalDataBuf.get().capacity() > INITIAL_BUFFER_SIZE) {
+      threadLocalDataBuf.set(ByteBuffer.allocateDirect(INITIAL_BUFFER_SIZE))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to