This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da26e4bf9ea [SPARK-55683][SQL][FOLLOWUP] Optimize 
`VectorizedPlainValuesReader.readUnsignedLongs` to reuse scratch buffer and 
avoid per-element allocations
4da26e4bf9ea is described below

commit 4da26e4bf9eab119ff2489c5fdf85efe60f6f469
Author: yangjie01 <[email protected]>
AuthorDate: Thu Feb 26 14:11:12 2026 -0800

    [SPARK-55683][SQL][FOLLOWUP] Optimize 
`VectorizedPlainValuesReader.readUnsignedLongs` to reuse scratch buffer and 
avoid per-element allocations
    
    ### What changes were proposed in this pull request?
    This pr refer to the suggestion from Copilot: 
https://github.com/apache/spark/pull/54479#pullrequestreview-3855142296, 
further optimizes `VectorizedPlainValuesReader.readUnsignedLongs` by 
introducing a reusable scratch buffer to eliminate per-element `byte[]` 
allocations introduced in the previous refactoring.
    
    The previous implementation allocates a new `byte[]` per element for the 
encoded output:
    
    ```java
    // Previous: new byte[totalLen] per element, plus new byte[]{0} for zero 
values
    byte[] dest = new byte[totalLen];
    ...
    c.putByteArray(rowId, dest, 0, totalLen);
    ```
    
    The new implementation allocates a single `byte[9]` scratch buffer once per 
batch and reuses it across all elements. Since 
`WritableColumnVector.putByteArray` copies the bytes into its internal storage 
immediately, the scratch buffer can be safely overwritten on the next iteration:
    
    ```java
    // New: one byte[9] allocated per batch, reused for every element
    byte[] scratch = new byte[9];
    for (...) {
        putLittleEndianBytesAsBigInteger(c, rowId, src, offset, scratch);
    }
    ```
    
    The scratch buffer is sized at 9 bytes to accommodate the worst case: 1 
`0x00` sign byte + 8 value bytes. The zero value special case is also handled 
via scratch, avoiding the previous `new byte[]{0}` allocation.
    
    ### Why are the changes needed?
    The previous implementation still allocates one `byte[]` per element for 
the encoded output. For a typical batch of 4096 values this means 4096 heap 
allocations per `readUnsignedLongs` call, creating GC pressure in workloads 
that read large `UINT_64` columns. With the scratch buffer approach, the entire 
batch produces only 2 allocations: `byte[9]` (scratch) and `byte[8]` (direct 
buffer fallback read buffer), regardless of batch size.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Pass Github Action
    - Reused the JMH benchmark provided in 
https://github.com/apache/spark/pull/54479, and the test results are as follows:
    
    Java 17
    
    ```
    [info] Benchmark                                                            
  (numValues)  Mode  Cnt       Score      Error  Units
    [info] 
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     
10000000  avgt   10  233820.658 ± 1888.523  us/op
    [info] 
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     
10000000  avgt   10  255563.248 ± 3500.165  us/op
    [info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New 
     10000000  avgt   10  228672.684 ± 2985.496  us/op
    [info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old 
     10000000  avgt   10  275756.804 ± 2065.405  us/op
    ```
    
    Java 21
    
    ```
    [info] Benchmark                                                            
  (numValues)  Mode  Cnt       Score       Error  Units
    [info] 
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     
10000000  avgt   10  241977.924 ± 15125.343  us/op
    [info] 
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     
10000000  avgt   10  250343.470 ±  1342.509  us/op
    [info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New 
     10000000  avgt   10  212929.948 ±  1387.671  us/op
    [info] VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old 
     10000000  avgt   10  274561.949 ±  1226.348  us/op
    ```
    
    Judging from the test results, the onHeap path demonstrates approximately a 
17-22% improvement, while the offHeap path shows roughly a 3-9% improvement 
across Java 17 and Java 21.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes, Claude Sonnet 4.6 was used to assist in completing the code writing.
    
    Closes #54510 from LuciferYang/SPARK-55683-FOLLOWUP.
    
    Lead-authored-by: yangjie01 <[email protected]>
    Co-authored-by: YangJie <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../parquet/VectorizedPlainValuesReader.java       | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 9ad07a1ff5ee..852d02c2cdf1 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -198,18 +198,20 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
   public final void readUnsignedLongs(int total, WritableColumnVector c, int 
rowId) {
     int requiredBytes = total * 8;
     ByteBuffer buffer = getBuffer(requiredBytes);
+    // scratch buffer: max 9 bytes (0x00 sign byte + 8 value bytes), reused 
per batch
+    byte[] scratch = new byte[9];
     if (buffer.hasArray()) {
       byte[] src = buffer.array();
       int offset = buffer.arrayOffset() + buffer.position();
       for (int i = 0; i < total; i++, rowId++, offset += 8) {
-        putLittleEndianBytesAsBigInteger(c, rowId, src, offset);
+        putLittleEndianBytesAsBigInteger(c, rowId, src, offset, scratch);
       }
     } else {
       // direct buffer fallback: copy 8 bytes per value
       byte[] data = new byte[8];
       for (int i = 0; i < total; i++, rowId++) {
         buffer.get(data);
-        putLittleEndianBytesAsBigInteger(c, rowId, data, 0);
+        putLittleEndianBytesAsBigInteger(c, rowId, data, 0, scratch);
       }
     }
   }
@@ -239,9 +241,11 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
    * @param src    the source byte array containing little-endian encoded data
    * @param offset the starting position in {@code src}; reads bytes
    *               {@code src[offset..offset+7]}
+   * @param scratch a caller-provided reusable buffer of at least 9 bytes; its 
contents
+   *                after this call are undefined
    */
   private static void putLittleEndianBytesAsBigInteger(
-     WritableColumnVector c, int rowId, byte[] src, int offset) {
+     WritableColumnVector c, int rowId, byte[] src, int offset, byte[] 
scratch) {
     // src is little-endian; the most significant byte is at src[offset + 7].
     // Scan from the most significant end to find the first non-zero byte,
     // which determines the minimal number of bytes needed for encoding.
@@ -254,7 +258,9 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
     // BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new 
byte[0])
     // throws NumberFormatException("Zero length BigInteger").
     if (msbIndex == offset && src[offset] == 0) {
-      c.putByteArray(rowId, new byte[]{0});
+      scratch[0] = 0x00;
+      // putByteArray copies the bytes into arrayData(), so scratch can be 
safely reused
+      c.putByteArray(rowId, scratch, 0, 1);
       return;
     }
 
@@ -266,16 +272,16 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
     int valueLen = msbIndex - offset + 1;
     int totalLen = needSignByte ? valueLen + 1 : valueLen;
 
-    byte[] dest = new byte[totalLen];
-    int destOffset = 0;
+    int scratchOffset = 0;
     if (needSignByte) {
-      dest[destOffset++] = 0x00;
+      scratch[scratchOffset++] = 0x00;
     }
     // Reverse byte order: little-endian src → big-endian dest
     for (int i = msbIndex; i >= offset; i--) {
-      dest[destOffset++] = src[i];
+      scratch[scratchOffset++] = src[i];
     }
-    c.putByteArray(rowId, dest, 0, totalLen);
+    // putByteArray copies the bytes into arrayData(), so scratch can be 
safely reused
+    c.putByteArray(rowId, scratch, 0, totalLen);
   }
 
   // A fork of `readLongs` to rebase the timestamp values. For performance 
reasons, this method


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to