This is an automated email from the ASF dual-hosted git repository.

LuciferYang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a5a8aa4e8cf4 [SPARK-56791][SQL] Add bulk read+widen path for INT32 to 
Long Parquet vector updater
a5a8aa4e8cf4 is described below

commit a5a8aa4e8cf4b482b9d2468656cfa96f9e41a4f3
Author: YangJie <[email protected]>
AuthorDate: Mon May 11 11:05:32 2026 +0800

    [SPARK-56791][SQL] Add bulk read+widen path for INT32 to Long Parquet 
vector updater
    
    ### What changes were proposed in this pull request?
    
    INT32 -> Long widening on the Parquet vectorized read path goes through 
`IntegerToLongUpdater.readValues`, which loops per-row and calls 
`valuesReader.readInteger()` for each value. Each `readInteger()` invokes 
`getBuffer(4).order(LITTLE_ENDIAN)`, allocating a fresh `ByteBuffer` slice. 
That per-element allocation, not the int-to-long widen itself, dominates the 
loop.
    
    This PR adds a bulk method 
`VectorizedValuesReader.readIntegersAsLongs(total, c, rowId)` that calls 
`getBuffer(total * 4)` once and runs a tight in-method conversion loop. 
`VectorizedPlainValuesReader` overrides it with the specialized implementation; 
the interface default mirrors the legacy per-row pattern so non-PLAIN readers 
continue to work unchanged. `IntegerToLongUpdater.readValues` delegates 
straight to the new method.
    
    Sibling Updaters (`IntegerToDouble`, `FloatToDouble`, `DowncastLong`, 
`DateToTimestampNTZ`) and other reader implementations (e.g. 
`VectorizedDeltaBinaryPackedReader`) follow the same pattern and are tracked as 
follow-up sub-tasks under the umbrella ticket.
    
    ### Why are the changes needed?
    
    `ParquetVectorUpdaterBenchmark` shows the win grows substantially on newer 
JDKs, where escape analysis and inlining better optimize the resulting tight 
loop:
    
    | JDK | Baseline | After     | Speedup |
    |----:|---------:|----------:|--------:|
    |  17 | 454.5 M/s | 1280.8 M/s | 2.82x |
    |  21 | 530.9 M/s | 6181.3 M/s | 11.64x |
    |  25 | 454.8 M/s | 6460.6 M/s | 14.20x |
    
    Peer Updaters in the same benchmark group stay within run-to-run noise, 
confirming the change is local to `IntegerToLongUpdater`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit tests in `ParquetVectorUpdaterSuite` cover boundary batch lengths 
(0, 1, 7, 8, 9, 17, 1024, 4097), sign-extension on negative INT32 values, the 
singular `readValue` path, and the factory's long-decimal dispatch (INT32 + 
DECIMAL(9, 0) -> DecimalType(15, 0) via `canReadAsLongDecimal`).
    
    A new end-to-end test in `ParquetIOSuite` round-trips INT32 written to 
Parquet and read back as `LongType`, exercising both REQUIRED columns (no 
def-levels) and OPTIONAL columns with interleaved nulls so that `readValue` and 
`readValues` are both invoked.
    
    Benchmark results on JDK 17, 21, and 25 are committed on the branch.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code
    
    Closes #55751 from LuciferYang/parquet-opt-p3-updater-batching.
    
    Authored-by: YangJie <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
---
 ...ParquetVectorUpdaterBenchmark-jdk21-results.txt |  48 +++---
 ...ParquetVectorUpdaterBenchmark-jdk25-results.txt |  42 ++---
 .../ParquetVectorUpdaterBenchmark-results.txt      |  46 +++---
 .../parquet/ParquetVectorUpdaterFactory.java       |   4 +-
 .../parquet/VectorizedPlainValuesReader.java       |  12 ++
 .../parquet/VectorizedValuesReader.java            |  19 +++
 .../datasources/parquet/ParquetIOSuite.scala       |  49 ++++++
 .../parquet/ParquetVectorUpdaterSuite.scala        | 171 +++++++++++++++++++++
 8 files changed, 320 insertions(+), 71 deletions(-)

diff --git 
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt 
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
index 86c0201d35c1..3080df6d02dc 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Identity Updaters:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater                                        0              0         
  0      16946.4           0.1       1.0X
-ByteUpdater (INT32 -> Byte)                           0              0         
  0       3743.2           0.3       0.2X
-ShortUpdater (INT32 -> Short)                         1              1         
  0       1676.4           0.6       0.1X
-IntegerUpdater                                        0              0         
  0      10258.9           0.1       0.6X
-LongUpdater                                           0              0         
  0       5140.3           0.2       0.3X
-FloatUpdater                                          0              0         
  0      10259.8           0.1       0.6X
-DoubleUpdater                                         0              0         
  0       5130.4           0.2       0.3X
-BinaryUpdater                                        15             15         
  0         70.4          14.2       0.0X
+BooleanUpdater                                        0              0         
  0      16949.1           0.1       1.0X
+ByteUpdater (INT32 -> Byte)                           0              0         
  0       3748.1           0.3       0.2X
+ShortUpdater (INT32 -> Short)                         1              1         
  0       1674.2           0.6       0.1X
+IntegerUpdater                                        0              0         
  0      10123.9           0.1       0.6X
+LongUpdater                                           0              0         
  0       5075.2           0.2       0.3X
+FloatUpdater                                          0              0         
  0      10159.3           0.1       0.6X
+DoubleUpdater                                         0              0         
  0       5076.7           0.2       0.3X
+BinaryUpdater                                        15             15         
  0         70.8          14.1       0.0X
 
 
 
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Type-converting Updaters:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater                                     2              2      
     0        530.9           1.9       1.0X
-IntegerToDoubleUpdater                                   2              2      
     0        531.3           1.9       1.0X
-FloatToDoubleUpdater                                     2              2      
     0        489.7           2.0       0.9X
-DateToTimestampNTZUpdater                               29             29      
     0         36.2          27.6       0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2))              2              2      
     0        455.7           2.2       0.9X
+IntegerToLongUpdater                                     0              0      
     0       6181.3           0.2       1.0X
+IntegerToDoubleUpdater                                   2              2      
     0        531.8           1.9       0.1X
+FloatToDoubleUpdater                                     2              2      
     0        490.0           2.0       0.1X
+DateToTimestampNTZUpdater                               29             29      
     0         36.5          27.4       0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2))              2              2      
     0        455.6           2.2       0.1X
 
 
 
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Rebase Updaters:                                 Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3644.6           0.3       1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2663.3           0.4       0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              3  
         0        420.0           2.4       0.1X
+IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3638.9           0.3       1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2657.0           0.4       0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              3  
         0        420.3           2.4       0.1X
 
 
 
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Unsigned Updaters:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       5974.2           0.2       1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0))             17             18    
       0         60.3          16.6       0.0X
+UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       5824.3           0.2       1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0))             17             18    
       1         60.3          16.6       0.0X
 
 
 
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Decimal Updaters:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater                               0              0         
  0      10257.8           0.1       1.0X
-LongToDecimalUpdater                                  0              0         
  0       5133.7           0.2       0.5X
-FixedLenByteArrayToDecimalUpdater                    21             21         
  0         50.2          19.9       0.0X
+IntegerToDecimalUpdater                               0              0         
  0      10229.8           0.1       1.0X
+LongToDecimalUpdater                                  0              0         
  0       5072.8           0.2       0.5X
+FixedLenByteArrayToDecimalUpdater                    21             21         
  0         50.8          19.7       0.0X
 
 
 
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 FixedLenByteArray Updaters:                              Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary)                         20         
    21           1         51.5          19.4       1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.1           6.2       3.1X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        133.2           7.5       2.6X
+FixedLenByteArrayUpdater (len=16 -> Binary)                         20         
    20           1         53.8          18.6       1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.1           6.2       3.0X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        132.6           7.5       2.5X
 
 
diff --git 
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt 
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
index d4aaaca05263..ee06fbe99609 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
@@ -6,13 +6,13 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Identity Updaters:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater                                        0              0         
  0      17177.7           0.1       1.0X
-ByteUpdater (INT32 -> Byte)                           0              0         
  0       3680.4           0.3       0.2X
-ShortUpdater (INT32 -> Short)                         1              1         
  0       1664.2           0.6       0.1X
-IntegerUpdater                                        0              0         
  0      10311.6           0.1       0.6X
-LongUpdater                                           0              0         
  0       5153.5           0.2       0.3X
-FloatUpdater                                          0              0         
  0      10313.6           0.1       0.6X
-DoubleUpdater                                         0              0         
  0       5157.8           0.2       0.3X
+BooleanUpdater                                        0              0         
  0      17154.9           0.1       1.0X
+ByteUpdater (INT32 -> Byte)                           0              0         
  0       3684.2           0.3       0.2X
+ShortUpdater (INT32 -> Short)                         1              1         
  0       1660.3           0.6       0.1X
+IntegerUpdater                                        0              0         
  0      10300.3           0.1       0.6X
+LongUpdater                                           0              0         
  0       5148.1           0.2       0.3X
+FloatUpdater                                          0              0         
  0      10316.6           0.1       0.6X
+DoubleUpdater                                         0              0         
  0       5153.9           0.2       0.3X
 BinaryUpdater                                        16             16         
  0         67.6          14.8       0.0X
 
 
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Type-converting Updaters:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater                                     2              2      
     0        454.8           2.2       1.0X
-IntegerToDoubleUpdater                                   2              2      
     0        454.5           2.2       1.0X
-FloatToDoubleUpdater                                     2              2      
     0        483.4           2.1       1.1X
-DateToTimestampNTZUpdater                               29             29      
     0         36.6          27.3       0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2))              2              2      
     0        455.5           2.2       1.0X
+IntegerToLongUpdater                                     0              0      
     0       6460.6           0.2       1.0X
+IntegerToDoubleUpdater                                   2              2      
     0        454.6           2.2       0.1X
+FloatToDoubleUpdater                                     2              2      
     0        483.4           2.1       0.1X
+DateToTimestampNTZUpdater                               29             29      
     0         36.3          27.6       0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2))              2              2      
     0        455.5           2.2       0.1X
 
 
 
================================================================================================
@@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Rebase Updaters:                                 Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3668.8           0.3       1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2671.2           0.4       0.7X
+IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3667.7           0.3       1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2676.2           0.4       0.7X
 LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       3              3  
         0        371.3           2.7       0.1X
 
 
@@ -52,7 +52,7 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Unsigned Updaters:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       6344.0           0.2       1.0X
+UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       6354.7           0.2       1.0X
 UnsignedLongUpdater (UINT64 -> Decimal(20,0))             18             18    
       0         59.3          16.9       0.0X
 
 
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Decimal Updaters:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater                               0              0         
  0      10280.2           0.1       1.0X
-LongToDecimalUpdater                                  0              0         
  0       5153.3           0.2       0.5X
-FixedLenByteArrayToDecimalUpdater                    21             21         
  0         50.6          19.8       0.0X
+IntegerToDecimalUpdater                               0              0         
  0      10307.4           0.1       1.0X
+LongToDecimalUpdater                                  0              0         
  0       5149.6           0.2       0.5X
+FixedLenByteArrayToDecimalUpdater                    21             21         
  1         50.9          19.6       0.0X
 
 
 
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 FixedLenByteArray Updaters:                              Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary)                         21         
    21           1         50.5          19.8       1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        152.6           6.6       3.0X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        127.7           7.8       2.5X
+FixedLenByteArrayUpdater (len=16 -> Binary)                         22         
    23           1         47.7          21.0       1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        152.7           6.6       3.2X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        127.7           7.8       2.7X
 
 
diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt 
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
index 2f76df57b9bf..aff5dd3c9c86 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Identity Updaters:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater                                        0              0         
  0      14625.7           0.1       1.0X
-ByteUpdater (INT32 -> Byte)                           0              0         
  0       3672.0           0.3       0.3X
-ShortUpdater (INT32 -> Short)                         1              1         
  0       2053.4           0.5       0.1X
-IntegerUpdater                                        0              0         
  0      10284.1           0.1       0.7X
-LongUpdater                                           0              0         
  0       5132.8           0.2       0.4X
-FloatUpdater                                          0              0         
  0      10257.9           0.1       0.7X
-DoubleUpdater                                         0              0         
  0       5097.0           0.2       0.3X
-BinaryUpdater                                        15             15         
  1         70.3          14.2       0.0X
+BooleanUpdater                                        0              0         
  0      14578.7           0.1       1.0X
+ByteUpdater (INT32 -> Byte)                           0              0         
  0       3667.6           0.3       0.3X
+ShortUpdater (INT32 -> Short)                         1              1         
  0       2053.7           0.5       0.1X
+IntegerUpdater                                        0              0         
  0      10217.7           0.1       0.7X
+LongUpdater                                           0              0         
  0       5122.4           0.2       0.4X
+FloatUpdater                                          0              0         
  0      10244.9           0.1       0.7X
+DoubleUpdater                                         0              0         
  0       5125.7           0.2       0.4X
+BinaryUpdater                                        15             15         
  0         68.9          14.5       0.0X
 
 
 
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Type-converting Updaters:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater                                     2              2      
     0        454.5           2.2       1.0X
-IntegerToDoubleUpdater                                   2              2      
     0        478.3           2.1       1.1X
-FloatToDoubleUpdater                                     2              2      
     0        480.2           2.1       1.1X
-DateToTimestampNTZUpdater                               36             36      
     0         29.5          33.9       0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2))              2              2      
     0        455.3           2.2       1.0X
+IntegerToLongUpdater                                     1              1      
     0       1280.8           0.8       1.0X
+IntegerToDoubleUpdater                                   2              2      
     0        479.1           2.1       0.4X
+FloatToDoubleUpdater                                     2              2      
     0        480.8           2.1       0.4X
+DateToTimestampNTZUpdater                               36             36      
     1         29.1          34.4       0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2))              2              2      
     0        487.0           2.1       0.4X
 
 
 
================================================================================================
@@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Rebase Updaters:                                 Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       2651.7           0.4       1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              1  
         0       2101.9           0.5       0.8X
+IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       2608.4           0.4       1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              1              1  
         0       2083.9           0.5       0.8X
 LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              2  
         0        454.6           2.2       0.2X
 
 
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Unsigned Updaters:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long)                    1              1    
       0       1093.3           0.9       1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0))             18             18    
       0         59.1          16.9       0.1X
+UnsignedIntegerUpdater (UINT32 -> Long)                    1              1    
       0       1110.4           0.9       1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0))             18             18    
       1         59.1          16.9       0.1X
 
 
 
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Decimal Updaters:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater                               0              0         
  0      10263.1           0.1       1.0X
-LongToDecimalUpdater                                  0              0         
  0       5133.0           0.2       0.5X
-FixedLenByteArrayToDecimalUpdater                    21             21         
  0         51.0          19.6       0.0X
+IntegerToDecimalUpdater                               0              0         
  0      10130.8           0.1       1.0X
+LongToDecimalUpdater                                  0              0         
  0       5125.5           0.2       0.5X
+FixedLenByteArrayToDecimalUpdater                    21             21         
  0         51.1          19.6       0.0X
 
 
 
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 FixedLenByteArray Updaters:                              Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary)                         19         
    19           0         54.8          18.3       1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.2           6.2       2.9X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              9         
     9           0        123.3           8.1       2.3X
+FixedLenByteArrayUpdater (len=16 -> Binary)                         19         
    19           1         55.1          18.1       1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.1           6.2       2.9X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     9           0        124.7           8.0       2.3X
 
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 0e2c997e553f..a2e240fabade 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -363,9 +363,7 @@ public class ParquetVectorUpdaterFactory {
         int offset,
         WritableColumnVector values,
         VectorizedValuesReader valuesReader) {
-      for (int i = 0; i < total; ++i) {
-        values.putLong(offset + i, valuesReader.readInteger());
-      }
+      valuesReader.readIntegersAsLongs(total, values, offset);
     }
 
     @Override
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index d792ba0d7259..48ea026641f9 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -156,6 +156,18 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
     }
   }
 
+  @Override
+  public final void readIntegersAsLongs(int total, WritableColumnVector c, int 
rowId) {
+    int requiredBytes = total * 4;
+    ByteBuffer buffer = getBuffer(requiredBytes);
+    // No `hasArray` bulk-copy path: source (int32) and target (int64) have 
different byte
+    // widths so a contiguous byte copy is impossible. Matches the pattern in 
peer
+    // type-converting bulk methods such as `readUnsignedIntegers`.
+    for (int i = 0; i < total; i += 1) {
+      c.putLong(rowId + i, buffer.getInt());
+    }
+  }
+
   // A fork of `readIntegers` to rebase the date values. For performance 
reasons, this method
   // iterates the values twice: check if we need to rebase first, then go to 
the optimized branch
   // if rebase is not needed.
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index d29ce0dd12e4..c376acd40593 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -56,6 +56,25 @@ public interface VectorizedValuesReader {
       String timeZone);
   void readFloats(int total, WritableColumnVector c, int rowId);
   void readDoubles(int total, WritableColumnVector c, int rowId);
+
+  /**
+   * Reads {@code total} INT32 values, sign-extends each to a long, and writes 
them into
+   * {@code c} starting at {@code c[rowId]}. Used by type-converting updaters 
that read
+   * parquet INT32 columns into Spark {@code LongType} (or wider decimal) 
targets.
+   *
+   * <p>The default implementation falls back to a per-row read+widen+write 
loop and is
+   * therefore equivalent in cost to the legacy per-row Updater path. 
Subclasses backed
+   * by contiguous bulk storage (e.g. PLAIN encoding via {@link 
VectorizedPlainValuesReader})
+   * should override to read source bytes once and run a tight in-method 
conversion loop,
+   * avoiding {@code total} virtual dispatches on {@link #readInteger()}. 
Readers without
+   * an override preserve correctness but gain no speedup.
+   */
+  default void readIntegersAsLongs(int total, WritableColumnVector c, int 
rowId) {
+    for (int i = 0; i < total; i += 1) {
+      c.putLong(rowId + i, readInteger());
+    }
+  }
+
   void readBinary(int total, WritableColumnVector c, int rowId);
   void readGeometry(int total, WritableColumnVector c, int rowId);
   void readGeography(int total, WritableColumnVector c, int rowId);
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 44e6ae4da6a5..becd45f44a72 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1887,6 +1887,55 @@ class ParquetIOSuite extends ParquetTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("INT32 -> Long widening end-to-end via vectorized read path") {
+    // Round-trips an INT32 Parquet file read back with a Long schema, 
exercising
+    // IntegerToLongUpdater on the vectorized path. Covers a non-null column 
(REQUIRED,
+    // no def-levels) and a nullable column (OPTIONAL, def-levels split runs 
and force
+    // `readValue` calls alongside `readValues`). `withAllParquetReaders` also 
exercises
+    // the row-based (parquet-mr) reader, which provides additional 
correctness coverage.
+    withTempPath { file =>
+      val n = 5000
+      // Generates a deterministic INT32 sample. Mixes sign, zero, MIN/MAX 
boundaries to
+      // catch sign-extension regressions; reused for both the non-null and 
nullable cases.
+      def sampleAt(i: Int): Int = i % 5 match {
+        case 0 => Int.MinValue + i
+        case 1 => -1
+        case 2 => 0
+        case 3 => Int.MaxValue - i
+        case _ => i * 13 - 7
+      }
+
+      val nonNullData = (0 until n).map(i => Row(sampleAt(i)))
+      // Every 7th row is null: splits the run pattern enough to force the 
PACKED def-level
+      // path to interleave value runs with null runs at sub-batch lengths.
+      val nullableData = (0 until n).map { i =>
+        if (i % 7 == 0) Row(null) else Row(sampleAt(i))
+      }
+
+      val nonNullWriteSchema = new StructType().add("v", IntegerType, nullable 
= false)
+      val nonNullReadSchema = new StructType().add("v", LongType, nullable = 
false)
+      val nullableWriteSchema = new StructType().add("v", IntegerType, 
nullable = true)
+      val nullableReadSchema = new StructType().add("v", LongType, nullable = 
true)
+
+      val nonNullPath = new java.io.File(file, "nonnull").getCanonicalPath
+      val nullablePath = new java.io.File(file, "nullable").getCanonicalPath
+      spark.createDataFrame(spark.sparkContext.parallelize(nonNullData, 4), 
nonNullWriteSchema)
+        .write.parquet(nonNullPath)
+      spark.createDataFrame(spark.sparkContext.parallelize(nullableData, 4), 
nullableWriteSchema)
+        .write.parquet(nullablePath)
+
+      val expectedNonNull = nonNullData.map(r => Row(r.getInt(0).toLong))
+      val expectedNullable = nullableData.map { r =>
+        if (r.isNullAt(0)) Row(null) else Row(r.getInt(0).toLong)
+      }
+
+      withAllParquetReaders {
+        checkAnswer(spark.read.schema(nonNullReadSchema).parquet(nonNullPath), 
expectedNonNull)
+        
checkAnswer(spark.read.schema(nullableReadSchema).parquet(nullablePath), 
expectedNullable)
+      }
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
new file mode 100644
index 000000000000..d121f4e2bb73
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.nio.{ByteBuffer, ByteOrder}
+import java.time.ZoneOffset
+
+import org.apache.parquet.bytes.ByteBufferInputStream
+import org.apache.parquet.column.ColumnDescriptor
+import org.apache.parquet.schema.{LogicalTypeAnnotation, Types}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.Type.Repetition
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.types._
+
+/**
+ * Correctness tests for the INT32 -> Long widening Updater 
(`IntegerToLongUpdater`)
+ * on the Parquet vectorized read path.
+ *
+ * Covers boundary batch lengths, sign-extension on negative INT32 values, the 
singular
+ * `readValue` path, and the factory's long-decimal dispatch
+ * (INT32 + DECIMAL(p<=9) -> DecimalType(p in (9, 18])).
+ */
+class ParquetVectorUpdaterSuite extends SparkFunSuite {
+
+  // INT32 column descriptor with no logical-type annotation; matches what the 
production
+  // factory uses for plain INT32 -> Long widening.
+  private val int32Descriptor: ColumnDescriptor = {
+    val pt = Types.primitive(PrimitiveTypeName.INT32, 
Repetition.OPTIONAL).named("col")
+    new ColumnDescriptor(Array("col"), pt, 0, 1)
+  }
+
+  // INT32 column descriptor annotated as DECIMAL(precision, scale); routes 
the factory's
+  // INT32 dispatch through the `canReadAsLongDecimal` branch when target 
precision is in
+  // (9, 18].
+  private def int32DecimalDescriptor(precision: Int, scale: Int): 
ColumnDescriptor = {
+    val pt = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL)
+      .as(LogicalTypeAnnotation.decimalType(scale, precision))
+      .named("col")
+    new ColumnDescriptor(Array("col"), pt, 0, 1)
+  }
+
+  private def newFactory(desc: ColumnDescriptor): ParquetVectorUpdaterFactory =
+    ParquetTestAccess.newFactory(
+      desc.getPrimitiveType.getLogicalTypeAnnotation,
+      ZoneOffset.UTC, "CORRECTED", "UTC", "CORRECTED", "UTC")
+
+  private def plainIntBytes(values: Array[Int]): Array[Byte] = {
+    val buf = ByteBuffer.allocate(values.length * 
4).order(ByteOrder.LITTLE_ENDIAN)
+    var i = 0
+    while (i < values.length) { buf.putInt(values(i)); i += 1 }
+    buf.array()
+  }
+
+  private def newPlainReader(bytes: Array[Byte], numValues: Int): 
VectorizedPlainValuesReader = {
+    val r = new VectorizedPlainValuesReader
+    r.initFromPage(numValues, 
ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)))
+    r
+  }
+
+  // Reads `values.length` INT32s through `IntegerToLongUpdater.readValues` 
and returns the
+  // resulting long column.
+  private def readViaUpdater(values: Array[Int]): Array[Long] = {
+    val fac = newFactory(int32Descriptor)
+    val updater = fac.getUpdater(int32Descriptor, DataTypes.LongType)
+    // OnHeapColumnVector requires capacity >= 1 even when nothing is written 
into it.
+    val out = new OnHeapColumnVector(values.length.max(1), DataTypes.LongType)
+    val reader = newPlainReader(plainIntBytes(values), values.length)
+    updater.readValues(values.length, 0, out, reader)
+    val result = new Array[Long](values.length)
+    var i = 0
+    while (i < values.length) { result(i) = out.getLong(i); i += 1 }
+    result
+  }
+
+  // Test data: a mix of positive, negative, zero, MIN/MAX values to catch 
sign-extension bugs.
+  private def signedSampleValues(n: Int): Array[Int] = {
+    val out = new Array[Int](n)
+    var i = 0
+    while (i < n) {
+      out(i) = i match {
+        case _ if i % 5 == 0 => Int.MinValue + i
+        case _ if i % 5 == 1 => -1
+        case _ if i % 5 == 2 => 0
+        case _ if i % 5 == 3 => Int.MaxValue - i
+        case _ => i * 13 - 7
+      }
+      i += 1
+    }
+    out
+  }
+
+  private def expectedWiden(values: Array[Int]): Array[Long] = 
values.map(_.toLong)
+
+  // ---- Boundary-length correctness: empty, sub-batch, batch-aligned, 
multi-batch ----
+
+  for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) {
+    test(s"IntegerToLongUpdater produces correct widened output (total=$n)") {
+      val input = signedSampleValues(n)
+      assert(readViaUpdater(input) === expectedWiden(input))
+    }
+  }
+
+  // ---- readValue (singular) path is separate from readValues ----
+
+  test("IntegerToLongUpdater: readValue widens a single INT32 -> Long") {
+    // The singular readValue is invoked from the RLE/PACKED def-level decoder 
for runs
+    // of length 1, which calls `readInteger()` directly rather than the bulk 
method.
+    // Pinned here so a future change that conflates the two paths is caught 
at unit level.
+    val input = Array(0, 1, -1, 42, Int.MinValue, Int.MaxValue)
+    val fac = newFactory(int32Descriptor)
+    val updater = fac.getUpdater(int32Descriptor, DataTypes.LongType)
+    val out = new OnHeapColumnVector(input.length, DataTypes.LongType)
+    val reader = newPlainReader(plainIntBytes(input), input.length)
+    var i = 0
+    while (i < input.length) {
+      updater.readValue(i, out, reader)
+      i += 1
+    }
+    val actual = (0 until input.length).map(out.getLong).toArray
+    assert(actual === input.map(_.toLong))
+  }
+
+  // ---- Sign-extension: negative INT32 must become negative INT64 ----
+
+  test("IntegerToLongUpdater: negative INT32 sign-extends to negative INT64") {
+    val input = Array(Int.MinValue, -1, -42, 0, Int.MaxValue)
+    assert(readViaUpdater(input) ===
+      Array[Long](Int.MinValue.toLong, -1L, -42L, 0L, Int.MaxValue.toLong))
+  }
+
+  // ---- Long-decimal dispatch: factory routes INT32+DECIMAL(p<=9) -> 
IntegerToLongUpdater
+  //      when the Spark target is a DecimalType(precision in (9, 18]) ----
+
+  test("IntegerToLongUpdater handles INT32 -> DecimalType(p<=18) via 
canReadAsLongDecimal") {
+    // Parquet caps INT32 DECIMAL precision at 9 (max digits in int32), so the 
source is
+    // DECIMAL(9, 0); the Spark target DecimalType(15, 0) is a long-decimal 
(precision in
+    // (9, 18]). The factory routes this through `canReadAsLongDecimal` to
+    // IntegerToLongUpdater, which writes via `putLong` exactly like the 
LongType case.
+    // This test confirms the dispatch wiring stays intact for long-decimal 
targets.
+    val desc = int32DecimalDescriptor(precision = 9, scale = 0)
+    val targetType = DataTypes.createDecimalType(15, 0)
+    val input = Array(0, 1, 42, -7, Int.MinValue, Int.MaxValue, 1234567)
+
+    val fac = newFactory(desc)
+    val updater = fac.getUpdater(desc, targetType)
+    val out = new OnHeapColumnVector(input.length, targetType)
+    val reader = newPlainReader(plainIntBytes(input), input.length)
+    updater.readValues(input.length, 0, out, reader)
+
+    val actual = (0 until input.length).map(out.getLong).toArray
+    assert(actual === input.map(_.toLong))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to