This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 10d759fef9 [VL] Reuse byte[] buffers in shuffle read and broadcast 
serialization paths (#11777)
10d759fef9 is described below

commit 10d759fef9f80c7b9f6843d982803878e489a93b
Author: Ankita Victor <[email protected]>
AuthorDate: Fri Mar 20 22:27:47 2026 +0530

    [VL] Reuse byte[] buffers in shuffle read and broadcast serialization paths 
(#11777)
---
 .../vectorized/OnHeapJniByteInputStream.java       |  9 ++++---
 .../sql/execution/unsafe/UnsafeByteArray.java      | 30 +++++++++++++---------
 2 files changed, 24 insertions(+), 15 deletions(-)

diff --git 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/OnHeapJniByteInputStream.java
 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/OnHeapJniByteInputStream.java
index 3947dbdeb4..88b446a1e5 100644
--- 
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/OnHeapJniByteInputStream.java
+++ 
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/OnHeapJniByteInputStream.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 public class OnHeapJniByteInputStream implements JniByteInputStream {
   private final InputStream in;
   private long bytesRead = 0L;
+  private byte[] buf = new byte[0];
 
   public OnHeapJniByteInputStream(InputStream in) {
     this.in = in;
@@ -32,15 +33,17 @@ public class OnHeapJniByteInputStream implements 
JniByteInputStream {
   @Override
   public long read(long destAddress, long maxSize) {
     int maxSize32 = Math.toIntExact(maxSize);
-    byte[] tmp = new byte[maxSize32];
+    if (buf.length < maxSize32) {
+      buf = new byte[maxSize32];
+    }
     try {
       // The code conducts copy as long as 'in' wraps off-heap data,
       // which is about to be moved to heap
-      int read = in.read(tmp);
+      int read = in.read(buf, 0, maxSize32);
       if (read == -1 || read == 0) {
         return 0;
       }
-      memCopyFromHeap(tmp, destAddress, read); // The code conducts copy, from 
heap to off-heap
+      memCopyFromHeap(buf, destAddress, read); // The code conducts copy, from 
heap to off-heap
       bytesRead += read;
       return read;
     } catch (IOException e) {
diff --git 
a/gluten-arrow/src/main/java/org/apache/spark/sql/execution/unsafe/UnsafeByteArray.java
 
b/gluten-arrow/src/main/java/org/apache/spark/sql/execution/unsafe/UnsafeByteArray.java
index 089abdfd29..1d4e0de6f3 100644
--- 
a/gluten-arrow/src/main/java/org/apache/spark/sql/execution/unsafe/UnsafeByteArray.java
+++ 
b/gluten-arrow/src/main/java/org/apache/spark/sql/execution/unsafe/UnsafeByteArray.java
@@ -31,8 +31,11 @@ import java.io.ObjectOutput;
 
 /** A serializable unsafe byte array. */
 public class UnsafeByteArray implements Externalizable, KryoSerializable {
+  private static final int CHUNK_SIZE = 8 * 1024;
+
   private ArrowBuf buffer;
   private long size;
+  private transient byte[] chunkBuf;
 
   UnsafeByteArray(ArrowBuf buffer, long size) {
     this.buffer = buffer;
@@ -42,6 +45,13 @@ public class UnsafeByteArray implements Externalizable, 
KryoSerializable {
 
   public UnsafeByteArray() {}
 
+  private byte[] chunkBuf() {
+    if (chunkBuf == null) {
+      chunkBuf = new byte[CHUNK_SIZE];
+    }
+    return chunkBuf;
+  }
+
   public long address() {
     return buffer.memoryAddress();
   }
@@ -66,13 +76,12 @@ public class UnsafeByteArray implements Externalizable, 
KryoSerializable {
     output.writeLong(size);
 
     // stream bytes out of ArrowBuf
-    final int chunkSize = 8 * 1024;
-    byte[] tmp = new byte[chunkSize];
+    byte[] tmp = chunkBuf();
 
     long remaining = size;
     int index = 0;
     while (remaining > 0) {
-      int chunk = (int) Math.min(chunkSize, remaining);
+      int chunk = (int) Math.min(CHUNK_SIZE, remaining);
       buffer.getBytes(index, tmp, 0, chunk);
       output.write(tmp, 0, chunk);
       index += chunk;
@@ -93,13 +102,12 @@ public class UnsafeByteArray implements Externalizable, 
KryoSerializable {
     this.buffer = ArrowBufferAllocators.globalInstance().buffer((int) size);
 
     // stream bytes into ArrowBuf
-    final int chunkSize = 8 * 1024;
-    byte[] tmp = new byte[chunkSize];
+    byte[] tmp = chunkBuf();
 
     long remaining = size;
     int index = 0;
     while (remaining > 0) {
-      int chunk = (int) Math.min(chunkSize, remaining);
+      int chunk = (int) Math.min(CHUNK_SIZE, remaining);
       input.readBytes(tmp, 0, chunk);
       buffer.setBytes(index, tmp, 0, chunk);
       index += chunk;
@@ -114,13 +122,12 @@ public class UnsafeByteArray implements Externalizable, 
KryoSerializable {
     // write length first
     out.writeLong(size);
 
-    final int chunkSize = 8 * 1024;
-    byte[] tmp = new byte[chunkSize];
+    byte[] tmp = chunkBuf();
 
     long remaining = size;
     int index = 0;
     while (remaining > 0) {
-      int chunk = (int) Math.min(chunkSize, remaining);
+      int chunk = (int) Math.min(CHUNK_SIZE, remaining);
       buffer.getBytes(index, tmp, 0, chunk);
       out.write(tmp, 0, chunk);
       index += chunk;
@@ -138,13 +145,12 @@ public class UnsafeByteArray implements Externalizable, 
KryoSerializable {
 
     this.buffer = ArrowBufferAllocators.globalInstance().buffer((int) size);
 
-    final int chunkSize = 8 * 1024;
-    byte[] tmp = new byte[chunkSize];
+    byte[] tmp = chunkBuf();
 
     long remaining = size;
     int index = 0;
     while (remaining > 0) {
-      int chunk = (int) Math.min(chunkSize, remaining);
+      int chunk = (int) Math.min(CHUNK_SIZE, remaining);
       // ObjectInput extends DataInput, so we can use readFully
       in.readFully(tmp, 0, chunk);
       buffer.setBytes(index, tmp, 0, chunk);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to