This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f9a9c9402c92 [SPARK-55683][SQL] Optimize 
`VectorizedPlainValuesReader.readUnsignedLongs`
f9a9c9402c92 is described below

commit f9a9c9402c929478ba23c1a000d6dbbabbed7b13
Author: yangjie01 <[email protected]>
AuthorDate: Wed Feb 25 09:14:19 2026 -0800

    [SPARK-55683][SQL] Optimize `VectorizedPlainValuesReader.readUnsignedLongs`
    
    ### What changes were proposed in this pull request?
    
    This PR optimizes `VectorizedPlainValuesReader.readUnsignedLongs` by 
replacing the per-element `BigInteger` heap allocation chain with direct byte 
manipulation.
    
    The original implementation allocates multiple objects per element:
    ```java
    // Old: String + BigInteger + internal int[] + byte[] allocations per 
element
    c.putByteArray(rowId + i,
        new BigInteger(Long.toUnsignedString(buffer.getLong())).toByteArray());
    ```
    
    The new implementation reads raw little-endian bytes directly from the 
`ByteBuffer` backing array (when available) and converts them to 
`BigInteger`-compatible big-endian encoding in a single pass:
    
    ```java
    // New: hasArray() fast path - operates directly on backing array, one 
byte[] per element
    if (buffer.hasArray()) {
        byte[] src = buffer.array();
        int offset = buffer.arrayOffset() + buffer.position();
        for (int i = 0; i < total; i++, rowId++, offset += 8) {
            putLittleEndianBytesAsBigInteger(c, rowId, src, offset);
        }
    } else {
        byte[] data = new byte[8];  // reused across all values in this batch
        for (int i = 0; i < total; i++, rowId++) {
            buffer.get(data);
            putLittleEndianBytesAsBigInteger(c, rowId, data, 0);
        }
    }
    ```
    
    The private helper `putLittleEndianBytesAsBigInteger` handles the 
conversion with output matching `BigInteger.toByteArray()` semantics:
    - **Zero value**: writes `[0x00]` (1 byte) rather than an empty array, 
since `new BigInteger(new byte[0])` throws `NumberFormatException`
    - **Sign byte**: prepends `0x00` when the most significant byte has bit 7 
set, to ensure the value is interpreted as positive by `BigInteger`
    - **Byte order**: reverses little-endian Parquet physical encoding to 
big-endian in a single loop
    
    ### Why are the changes needed?
    
    The original implementation constructs a `BigInteger` via 
`Long.toUnsignedString` + `new BigInteger(String)`, which involves per-element 
allocations of a `String`, a `BigInteger`, its internal `int[]` magnitude 
array, and the final `byte[]`. For a typical batch of 4096 values this means 
~16K object allocations, creating significant GC pressure in workloads reading 
large `UINT_64` columns.
    
    The new implementation reduces this to one `byte[]` allocation per element 
by operating directly on the raw bytes from the `ByteBuffer`, avoiding all 
intermediate object creation. Additionally, the direct buffer fallback path 
reuses a single `byte[8]` scratch buffer across the entire batch.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    - The existing test `SPARK-34817: Read UINT_64 as Decimal from parquet` in 
`ParquetIOSuite` was extended with boundary values covering the critical edge 
cases of the new byte manipulation logic
    - Rename the original code to `OldVectorizedPlainValuesReader`, and compare 
the latency of the old and new `readUnsignedLongs` methods using JMH:
    
    <details>
    <summary><b>Benchmark Code (click to expand)</b></summary>
    
    ```java
    package org.apache.spark.sql.execution.datasources.parquet;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.ByteOrder;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    import org.openjdk.jmh.annotations.*;
    import org.openjdk.jmh.runner.Runner;
    import org.openjdk.jmh.runner.RunnerException;
    import org.openjdk.jmh.runner.options.Options;
    import org.openjdk.jmh.runner.options.OptionsBuilder;
    
    import org.apache.parquet.bytes.ByteBufferInputStream;
    
    import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
    import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
    import org.apache.spark.sql.types.DataTypes;
    
    BenchmarkMode(Mode.AverageTime)
    OutputTimeUnit(TimeUnit.MICROSECONDS)
    State(Scope.Thread)
    Fork(value = 1, jvmArgs = {"-Xms4G", "-Xmx4G"})
    Warmup(iterations = 5, time = 1)
    Measurement(iterations = 10, time = 1)
    public class VectorizedPlainValuesReaderJMHBenchmark {
    
        // ==================== Parameters ====================
    
        Param({"10000000"})
        private int numValues;
    
        // ==================== Test Data ====================
    
        private byte[] longData;
        private static final int BATCH_SIZE = 4096;
    
        private OldVectorizedPlainValuesReader oldSingleBufferOnHeapReader;
        private OldVectorizedPlainValuesReader oldSingleBufferOffHeapReader;
        private VectorizedPlainValuesReader newSingleBufferOnHeapReader;
        private VectorizedPlainValuesReader newSingleBufferOffHeapReader;
    
        // ==================== State Classes ====================
    
        /**
         * Column vector state using DecimalType(20, 0), which is the correct 
type for UINT64.
         * Parquet UINT_64 logical type is mapped to DecimalType(20, 0) in 
Spark.
         * Using LongType would cause NullPointerException because 
readUnsignedLongs
         * calls arrayData() which requires childColumns, only initialized for 
DecimalType.
         */
        State(Scope.Thread)
        public static class DecimalColumnVectorState {
            public WritableColumnVector decimalColumn;
    
            Setup(Level.Iteration)
            public void setup() {
                // UINT64 -> DecimalType(20, 0): precision=20, scale=0
                decimalColumn = new OnHeapColumnVector(BATCH_SIZE, 
DataTypes.createDecimalType(20, 0));
            }
    
            TearDown(Level.Iteration)
            public void tearDown() {
                decimalColumn.close();
            }
    
            Setup(Level.Invocation)
            public void reset() {
                decimalColumn.reset();
            }
        }
    
        // ==================== Setup ====================
    
        Setup(Level.Trial)
        public void setupTrial() {
            Random random = new Random(42);
            longData = generateLongData(numValues, random);
        }
    
        Setup(Level.Invocation)
        public void setupInvocation() throws IOException {
            oldSingleBufferOnHeapReader = new OldVectorizedPlainValuesReader();
            oldSingleBufferOnHeapReader.initFromPage(numValues, 
createSingleBufferInputStream(longData));
            oldSingleBufferOffHeapReader = new OldVectorizedPlainValuesReader();
            oldSingleBufferOffHeapReader.initFromPage(numValues, 
createDirectSingleBufferInputStream(longData));
            newSingleBufferOnHeapReader = new VectorizedPlainValuesReader();
            newSingleBufferOnHeapReader.initFromPage(numValues, 
createSingleBufferInputStream(longData));
            newSingleBufferOffHeapReader = new VectorizedPlainValuesReader();
            newSingleBufferOffHeapReader.initFromPage(numValues, 
createDirectSingleBufferInputStream(longData));
        }
    
        // ==================== Data Generation ====================
    
        private byte[] generateLongData(int count, Random random) {
            ByteBuffer buffer = ByteBuffer.allocate(count * 
8).order(ByteOrder.LITTLE_ENDIAN);
            for (int i = 0; i < count; i++) {
                buffer.putLong(random.nextLong()); // full unsigned long range
            }
            return buffer.array();
        }
    
        // ==================== ByteBufferInputStream Creation 
====================
    
        private ByteBufferInputStream createSingleBufferInputStream(byte[] 
data) {
            ByteBuffer buffer = 
ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN);
            return ByteBufferInputStream.wrap(buffer);
        }
    
        private ByteBuffer createDirectBuffer(byte[] data) {
            ByteBuffer buffer = 
ByteBuffer.allocateDirect(data.length).order(ByteOrder.LITTLE_ENDIAN);
            buffer.put(data);
            buffer.flip();
            return buffer;
        }
    
        private ByteBufferInputStream 
createDirectSingleBufferInputStream(byte[] data) {
            ByteBuffer buffer = createDirectBuffer(data);
            return ByteBufferInputStream.wrap(buffer);
        }
    
        // 
====================================================================================
        // readUnsignedLongs onHeap
        // 
====================================================================================
    
        Benchmark
        public void readUnsignedLongs_onHeap_Old(DecimalColumnVectorState 
state) throws IOException {
            for (int i = 0; i < numValues; i += BATCH_SIZE) {
                oldSingleBufferOnHeapReader.readUnsignedLongs(
                        Math.min(BATCH_SIZE, numValues - i), 
state.decimalColumn, 0);
            }
        }
    
        Benchmark
        public void readUnsignedLongs_onHeap_New(DecimalColumnVectorState 
state) throws IOException {
            for (int i = 0; i < numValues; i += BATCH_SIZE) {
                newSingleBufferOnHeapReader.readUnsignedLongs(
                        Math.min(BATCH_SIZE, numValues - i), 
state.decimalColumn, 0);
            }
        }
    
        // 
====================================================================================
        // readUnsignedLongs offHeap
        // 
====================================================================================
    
        Benchmark
        public void readUnsignedLongs_offHeap_Old(DecimalColumnVectorState 
state) throws IOException {
            for (int i = 0; i < numValues; i += BATCH_SIZE) {
                oldSingleBufferOffHeapReader.readUnsignedLongs(
                        Math.min(BATCH_SIZE, numValues - i), 
state.decimalColumn, 0);
            }
        }
    
        Benchmark
        public void readUnsignedLongs_offHeap_New(DecimalColumnVectorState 
state) throws IOException {
            for (int i = 0; i < numValues; i += BATCH_SIZE) {
                newSingleBufferOffHeapReader.readUnsignedLongs(
                        Math.min(BATCH_SIZE, numValues - i), 
state.decimalColumn, 0);
            }
        }
    
        // ==================== Main Method ====================
    
        public static void main(String[] args) throws RunnerException {
            String filter = args.length > 0 ?
                    args[0] : 
VectorizedPlainValuesReaderJMHBenchmark.class.getSimpleName();
            Options opt = new OptionsBuilder()
                    .include(filter)
                    .build();
    
            new Runner(opt).run();
        }
    }
    
    ```
    </details>
    
    Perform `build/sbt "sql/Test/runMain 
org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReaderJMHBenchmark"`
 to conduct the test
    
    **Benchmark results:**
    
    - Java 17.0.18+8-LTS
    
    ```
    Benchmark                                                              
(numValues)  Mode  Cnt        Score       Error  Units
    VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     
10000000  avgt   10   249413.824 ± 12242.331  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     
10000000  avgt   10  2301279.127 ± 14970.249  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New      
10000000  avgt   10   282651.747 ±  5031.717  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old      
10000000  avgt   10  2382690.093 ± 10364.228  us/op
    ```
    
    - Java 21.0.10+7-LTS
    
    ```
    Benchmark                                                              
(numValues)  Mode  Cnt        Score       Error  Units
    VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     
10000000  avgt   10   256621.630 ± 24087.509  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     
10000000  avgt   10  2120170.591 ±  4862.317  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New      
10000000  avgt   10   284058.229 ± 19966.179  us/op
    VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old      
10000000  avgt   10  2190838.305 ±  7979.740  us/op
    ```
    
    Both onHeap and offHeap paths show approximately **~8x** improvement.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes, Claude Sonnet 4.6 was used to assist in completing the code writing.
    
    Closes #54479 from LuciferYang/SPARK-55683.
    
    Authored-by: yangjie01 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../parquet/VectorizedPlainValuesReader.java       | 81 ++++++++++++++++++++--
 .../datasources/parquet/ParquetIOSuite.scala       | 18 +++--
 2 files changed, 91 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index a040a8990bad..9ad07a1ff5ee 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.io.IOException;
-import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
@@ -199,10 +198,84 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
   public final void readUnsignedLongs(int total, WritableColumnVector c, int 
rowId) {
     int requiredBytes = total * 8;
     ByteBuffer buffer = getBuffer(requiredBytes);
-    for (int i = 0; i < total; i += 1) {
-      c.putByteArray(
-        rowId + i, new 
BigInteger(Long.toUnsignedString(buffer.getLong())).toByteArray());
+    if (buffer.hasArray()) {
+      byte[] src = buffer.array();
+      int offset = buffer.arrayOffset() + buffer.position();
+      for (int i = 0; i < total; i++, rowId++, offset += 8) {
+        putLittleEndianBytesAsBigInteger(c, rowId, src, offset);
+      }
+    } else {
+      // direct buffer fallback: copy 8 bytes per value
+      byte[] data = new byte[8];
+      for (int i = 0; i < total; i++, rowId++) {
+        buffer.get(data);
+        putLittleEndianBytesAsBigInteger(c, rowId, data, 0);
+      }
+    }
+  }
+
+  /**
+   * Writes 8 little-endian bytes from {@code src[offset..offset+7]} into 
{@code c} at
+   * {@code rowId} as a big-endian byte array compatible with {@link 
java.math.BigInteger}
+   * two's-complement encoding.
+   *
+   * <p>The output matches the semantics of {@link 
java.math.BigInteger#toByteArray()}:
+   * <ul>
+   *   <li>Big-endian byte order</li>
+   *   <li>Minimal encoding: no unnecessary leading zero bytes</li>
+   *   <li>A {@code 0x00} sign byte is prepended if the most significant byte 
has bit 7
+   *       set, ensuring the value is interpreted as positive by {@code 
BigInteger}</li>
+   *   <li>Zero is encoded as {@code [0x00]} (1 byte), not an empty array, 
because
+   *       {@code new BigInteger(new byte[0])} throws {@link 
NumberFormatException}</li>
+   * </ul>
+   *
+   * <p>This is used by {@link #readUnsignedLongs} to store Parquet {@code 
UINT_64} values
+   * into a {@code DecimalType(20, 0)} column vector, where each value is 
stored as a
+   * byte array in {@code arrayData()} and later reconstructed via
+   * {@code new BigInteger(bytes)}.
+   *
+   * @param c      the target column vector; must be of {@code DecimalType(20, 
0)}
+   * @param rowId  the row index to write into
+   * @param src    the source byte array containing little-endian encoded data
+   * @param offset the starting position in {@code src}; reads bytes
+   *               {@code src[offset..offset+7]}
+   */
+  private static void putLittleEndianBytesAsBigInteger(
+     WritableColumnVector c, int rowId, byte[] src, int offset) {
+    // src is little-endian; the most significant byte is at src[offset + 7].
+    // Scan from the most significant end to find the first non-zero byte,
+    // which determines the minimal number of bytes needed for encoding.
+    int msbIndex = offset + 7;
+    while (msbIndex > offset && src[msbIndex] == 0) {
+      msbIndex--;
+    }
+
+    // Zero value: must write [0x00] rather than an empty array.
+    // BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new 
byte[0])
+    // throws NumberFormatException("Zero length BigInteger").
+    if (msbIndex == offset && src[offset] == 0) {
+      c.putByteArray(rowId, new byte[]{0});
+      return;
+    }
+
+    // Prepend a 0x00 sign byte if the most significant byte has bit 7 set.
+    // This matches BigInteger.toByteArray() behavior: positive values whose 
highest
+    // magnitude byte has the MSB set are prefixed with 0x00 to distinguish 
them
+    // from negative values in two's-complement encoding.
+    boolean needSignByte = (src[msbIndex] & 0x80) != 0;
+    int valueLen = msbIndex - offset + 1;
+    int totalLen = needSignByte ? valueLen + 1 : valueLen;
+
+    byte[] dest = new byte[totalLen];
+    int destOffset = 0;
+    if (needSignByte) {
+      dest[destOffset++] = 0x00;
+    }
+    // Reverse byte order: little-endian src → big-endian dest
+    for (int i = msbIndex; i >= offset; i--) {
+      dest[destOffset++] = src[i];
     }
+    c.putByteArray(rowId, dest, 0, totalLen);
   }
 
   // A fork of `readLongs` to rebase the timestamp values. For performance 
reasons, this method
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 3072657a0954..b52a464c5bd2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.math.{BigDecimal => JBigDecimal}
 import java.time.{LocalDateTime, LocalTime}
 import java.util.Locale
 
@@ -1261,11 +1262,17 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
         val writer = createParquetWriter(schema, path, dictionaryEnabled)
 
         val factory = new SimpleGroupFactory(schema)
+        // Original range retained to avoid regression
         (-500 until 500).foreach { i =>
           val group = factory.newGroup()
             .append("a", i % 100L)
           writer.write(group)
         }
+        // Boundary values: zero, one, signed extremes interpreted as unsigned
+        Seq(0L, 1L, Long.MaxValue, Long.MinValue, -2L, -1L).foreach { v =>
+          val group = factory.newGroup().append("a", v)
+          writer.write(group)
+        }
         writer.close()
       }
 
@@ -1273,10 +1280,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
         val path = new Path(dir.toURI.toString, "part-r-0.parquet")
         makeRawParquetFile(path)
         readParquetFile(path.toString) { df =>
-          checkAnswer(df, (-500 until 500).map { i =>
-            val bi = UnsignedLong.fromLongBits(i % 100L).bigIntegerValue()
-            Row(new java.math.BigDecimal(bi))
-          })
+          val originalExpected = (-500 until 500).map { i =>
+            Row(new JBigDecimal(UnsignedLong.fromLongBits(i % 
100L).bigIntegerValue()))
+          }
+          val boundaryExpected = Seq(0L, 1L, Long.MaxValue, Long.MinValue, 
-2L, -1L).map { v =>
+            Row(new 
JBigDecimal(UnsignedLong.fromLongBits(v).bigIntegerValue()))
+          }
+          checkAnswer(df, originalExpected ++ boundaryExpected)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to