This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 10d759fef9 [VL] Reuse byte[] buffers in shuffle read and broadcast
serialization paths (#11777)
10d759fef9 is described below
commit 10d759fef9f80c7b9f6843d982803878e489a93b
Author: Ankita Victor <[email protected]>
AuthorDate: Fri Mar 20 22:27:47 2026 +0530
[VL] Reuse byte[] buffers in shuffle read and broadcast serialization paths
(#11777)
---
.../vectorized/OnHeapJniByteInputStream.java | 9 ++++---
.../sql/execution/unsafe/UnsafeByteArray.java | 30 +++++++++++++---------
2 files changed, 24 insertions(+), 15 deletions(-)
diff --git
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/OnHeapJniByteInputStream.java
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/OnHeapJniByteInputStream.java
index 3947dbdeb4..88b446a1e5 100644
---
a/gluten-arrow/src/main/java/org/apache/gluten/vectorized/OnHeapJniByteInputStream.java
+++
b/gluten-arrow/src/main/java/org/apache/gluten/vectorized/OnHeapJniByteInputStream.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
public class OnHeapJniByteInputStream implements JniByteInputStream {
private final InputStream in;
private long bytesRead = 0L;
+ private byte[] buf = new byte[0];
public OnHeapJniByteInputStream(InputStream in) {
this.in = in;
@@ -32,15 +33,17 @@ public class OnHeapJniByteInputStream implements
JniByteInputStream {
@Override
public long read(long destAddress, long maxSize) {
int maxSize32 = Math.toIntExact(maxSize);
- byte[] tmp = new byte[maxSize32];
+ if (buf.length < maxSize32) {
+ buf = new byte[maxSize32];
+ }
try {
// The code conducts copy as long as 'in' wraps off-heap data,
// which is about to be moved to heap
- int read = in.read(tmp);
+ int read = in.read(buf, 0, maxSize32);
if (read == -1 || read == 0) {
return 0;
}
- memCopyFromHeap(tmp, destAddress, read); // The code conducts copy, from
heap to off-heap
+ memCopyFromHeap(buf, destAddress, read); // The code conducts copy, from
heap to off-heap
bytesRead += read;
return read;
} catch (IOException e) {
diff --git
a/gluten-arrow/src/main/java/org/apache/spark/sql/execution/unsafe/UnsafeByteArray.java
b/gluten-arrow/src/main/java/org/apache/spark/sql/execution/unsafe/UnsafeByteArray.java
index 089abdfd29..1d4e0de6f3 100644
---
a/gluten-arrow/src/main/java/org/apache/spark/sql/execution/unsafe/UnsafeByteArray.java
+++
b/gluten-arrow/src/main/java/org/apache/spark/sql/execution/unsafe/UnsafeByteArray.java
@@ -31,8 +31,11 @@ import java.io.ObjectOutput;
/** A serializable unsafe byte array. */
public class UnsafeByteArray implements Externalizable, KryoSerializable {
+ private static final int CHUNK_SIZE = 8 * 1024;
+
private ArrowBuf buffer;
private long size;
+ private transient byte[] chunkBuf;
UnsafeByteArray(ArrowBuf buffer, long size) {
this.buffer = buffer;
@@ -42,6 +45,13 @@ public class UnsafeByteArray implements Externalizable,
KryoSerializable {
public UnsafeByteArray() {}
+ private byte[] chunkBuf() {
+ if (chunkBuf == null) {
+ chunkBuf = new byte[CHUNK_SIZE];
+ }
+ return chunkBuf;
+ }
+
public long address() {
return buffer.memoryAddress();
}
@@ -66,13 +76,12 @@ public class UnsafeByteArray implements Externalizable,
KryoSerializable {
output.writeLong(size);
// stream bytes out of ArrowBuf
- final int chunkSize = 8 * 1024;
- byte[] tmp = new byte[chunkSize];
+ byte[] tmp = chunkBuf();
long remaining = size;
int index = 0;
while (remaining > 0) {
- int chunk = (int) Math.min(chunkSize, remaining);
+ int chunk = (int) Math.min(CHUNK_SIZE, remaining);
buffer.getBytes(index, tmp, 0, chunk);
output.write(tmp, 0, chunk);
index += chunk;
@@ -93,13 +102,12 @@ public class UnsafeByteArray implements Externalizable,
KryoSerializable {
this.buffer = ArrowBufferAllocators.globalInstance().buffer((int) size);
// stream bytes into ArrowBuf
- final int chunkSize = 8 * 1024;
- byte[] tmp = new byte[chunkSize];
+ byte[] tmp = chunkBuf();
long remaining = size;
int index = 0;
while (remaining > 0) {
- int chunk = (int) Math.min(chunkSize, remaining);
+ int chunk = (int) Math.min(CHUNK_SIZE, remaining);
input.readBytes(tmp, 0, chunk);
buffer.setBytes(index, tmp, 0, chunk);
index += chunk;
@@ -114,13 +122,12 @@ public class UnsafeByteArray implements Externalizable,
KryoSerializable {
// write length first
out.writeLong(size);
- final int chunkSize = 8 * 1024;
- byte[] tmp = new byte[chunkSize];
+ byte[] tmp = chunkBuf();
long remaining = size;
int index = 0;
while (remaining > 0) {
- int chunk = (int) Math.min(chunkSize, remaining);
+ int chunk = (int) Math.min(CHUNK_SIZE, remaining);
buffer.getBytes(index, tmp, 0, chunk);
out.write(tmp, 0, chunk);
index += chunk;
@@ -138,13 +145,12 @@ public class UnsafeByteArray implements Externalizable,
KryoSerializable {
this.buffer = ArrowBufferAllocators.globalInstance().buffer((int) size);
- final int chunkSize = 8 * 1024;
- byte[] tmp = new byte[chunkSize];
+ byte[] tmp = chunkBuf();
long remaining = size;
int index = 0;
while (remaining > 0) {
- int chunk = (int) Math.min(chunkSize, remaining);
+ int chunk = (int) Math.min(CHUNK_SIZE, remaining);
// ObjectInput extends DataInput, so we can use readFully
in.readFully(tmp, 0, chunk);
buffer.setBytes(index, tmp, 0, chunk);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]