This is an automated email from the ASF dual-hosted git repository.

LuciferYang pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new c1cc4715b3c5 [SPARK-56803][SQL] Add bulk read+narrow path for INT64 
DECIMAL to 32-bit Decimal Parquet vector updater
c1cc4715b3c5 is described below

commit c1cc4715b3c53a6ff3d10c3150c84d88aba11a11
Author: YangJie <[email protected]>
AuthorDate: Thu May 14 23:25:01 2026 +0800

    [SPARK-56803][SQL] Add bulk read+narrow path for INT64 DECIMAL to 32-bit 
Decimal Parquet vector updater
    
    ### What changes were proposed in this pull request?
    
    Extend the bulk read+widen pattern introduced in SPARK-56791 to 
`DowncastLongUpdater` (parquet INT64 + DECIMAL(p<=9) read into a Spark 32-bit 
`DecimalType`).
    
    A new `readLongsAsInts` default method on `VectorizedValuesReader` does the 
per-row fallback. `VectorizedPlainValuesReader` overrides it to fetch source 
bytes once via `getBuffer(total * 8)` and run a tight in-method conversion 
loop. `DowncastLongUpdater.readValues` becomes a one-line delegation. The 
narrowing is Java's primitive long-to-int cast (`(int) buffer.getLong()`), 
which discards the high 32 bits; this is non-lossy in practice because 
Parquet's DECIMAL(p<=9) encoding bounds t [...]
    
    ### Why are the changes needed?
    
    `DowncastLongUpdater.readValues` allocates a fresh `ByteBuffer` slice 
inside `getBuffer(8)` for every element on the legacy path, and that allocation 
dominates the loop. Collapsing N allocations into one is the same win 
SPARK-56791 delivered for the INT32 -> Long sibling, with the gain again 
amplifying on newer JDKs where escape analysis better optimizes the tight loop:
    
    | JDK | Baseline | After     | Speedup |
    |----:|---------:|----------:|--------:|
    |  17 | 487.0 M/s | 1287.3 M/s | 2.64x |
    |  21 | 455.6 M/s | 5828.5 M/s | 12.79x |
    |  25 | 455.5 M/s | 6568.8 M/s | 14.42x |
    
    Peer Updaters in the same benchmark group hold within run-to-run noise, 
confirming the change is local to `DowncastLongUpdater`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit tests in `ParquetVectorUpdaterSuite` cover boundary batch lengths 
(0, 1, 7, 8, 9, 17, 1024, 4097), the singular `readValue` path, and sign 
preservation for in-range INT64 values that bound the DECIMAL(9, _) wire format.
    
    A new end-to-end test in `ParquetIOSuite` uses `parquet-mr`'s low-level API 
(`MessageTypeParser` + `SimpleGroup`) to write a file with `int64 v (DECIMAL(9, 
2))` — Spark's own writer produces INT32-backed storage for precision <= 9, so 
this wire format only arises from other writers (Hive, Impala). The test reads 
the file back as `DecimalType(9, 2)` and confirms the values round-trip exactly.
    
    Benchmark results on JDK 17, 21, and 25 are committed on the branch.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Claude Code
    
    Closes #55853 from LuciferYang/SPARK-56803-downcast-long.
    
    Authored-by: YangJie <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
    (cherry picked from commit dcd31611d764dbca161610d18b65a2205a75eef8)
    Signed-off-by: yangjie01 <[email protected]>
---
 ...ParquetVectorUpdaterBenchmark-jdk21-results.txt | 46 +++++-----
 ...ParquetVectorUpdaterBenchmark-jdk25-results.txt | 42 +++++-----
 .../ParquetVectorUpdaterBenchmark-results.txt      | 46 +++++-----
 .../parquet/ParquetVectorUpdaterFactory.java       |  4 +-
 .../parquet/VectorizedPlainValuesReader.java       | 12 +++
 .../parquet/VectorizedValuesReader.java            | 21 +++++
 .../datasources/parquet/ParquetIOSuite.scala       | 51 +++++++++++
 .../parquet/ParquetVectorUpdaterSuite.scala        | 98 ++++++++++++++++++++++
 8 files changed, 250 insertions(+), 70 deletions(-)

diff --git 
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt 
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
index 7d09c6fd187e..b120f8d412a3 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
@@ -7,13 +7,13 @@ AMD EPYC 7763 64-Core Processor
 Identity Updaters:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
 BooleanUpdater                                        0              0         
  0      17004.4           0.1       1.0X
-ByteUpdater (INT32 -> Byte)                           0              0         
  0       3758.6           0.3       0.2X
-ShortUpdater (INT32 -> Short)                         1              1         
  0       1678.4           0.6       0.1X
-IntegerUpdater                                        0              0         
  0      10276.1           0.1       0.6X
-LongUpdater                                           0              0         
  0       5112.7           0.2       0.3X
-FloatUpdater                                          0              0         
  0      10293.3           0.1       0.6X
-DoubleUpdater                                         0              0         
  0       3870.8           0.3       0.2X
-BinaryUpdater                                        15             15         
  1         71.3          14.0       0.0X
+ByteUpdater (INT32 -> Byte)                           0              0         
  0       3748.2           0.3       0.2X
+ShortUpdater (INT32 -> Short)                         1              1         
  0       1683.0           0.6       0.1X
+IntegerUpdater                                        0              0         
  0      10179.2           0.1       0.6X
+LongUpdater                                           0              0         
  0       5075.5           0.2       0.3X
+FloatUpdater                                          0              0         
  0      10201.9           0.1       0.6X
+DoubleUpdater                                         0              0         
  0       5140.6           0.2       0.3X
+BinaryUpdater                                        15             15         
  0         71.3          14.0       0.0X
 
 
 
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Type-converting Updaters:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater                                     0              0      
     0       5145.1           0.2       1.0X
-IntegerToDoubleUpdater                                   0              0      
     0       5120.5           0.2       1.0X
-FloatToDoubleUpdater                                     0              0      
     0       2527.2           0.4       0.5X
-DateToTimestampNTZUpdater                               29             29      
     0         36.3          27.5       0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2))              2              2      
     0        455.6           2.2       0.1X
+IntegerToLongUpdater                                     0              0      
     0       6208.4           0.2       1.0X
+IntegerToDoubleUpdater                                   0              0      
     0       6155.9           0.2       1.0X
+FloatToDoubleUpdater                                     0              0      
     0       2526.5           0.4       0.4X
+DateToTimestampNTZUpdater                               27             28      
     1         38.6          25.9       0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2))              0              0      
     0       5828.5           0.2       0.9X
 
 
 
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Rebase Updaters:                                 Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3647.4           0.3       1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2668.6           0.4       0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              3  
         0        420.3           2.4       0.1X
+IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3651.5           0.3       1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2621.2           0.4       0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              3  
         0        420.5           2.4       0.1X
 
 
 
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Unsigned Updaters:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       5089.6           0.2       1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0))             17             18    
       1         60.2          16.6       0.0X
+UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       5879.9           0.2       1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0))             16             17    
       0         63.8          15.7       0.0X
 
 
 
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Decimal Updaters:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater                               0              0         
  0       7746.5           0.1       1.0X
-LongToDecimalUpdater                                  0              0         
  0       3874.1           0.3       0.5X
-FixedLenByteArrayToDecimalUpdater                    21             21         
  0         50.7          19.7       0.0X
+IntegerToDecimalUpdater                               0              0         
  0      10144.6           0.1       1.0X
+LongToDecimalUpdater                                  0              0         
  0       5061.5           0.2       0.5X
+FixedLenByteArrayToDecimalUpdater                    20             21         
  1         51.3          19.5       0.0X
 
 
 
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 FixedLenByteArray Updaters:                              Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary)                         19         
    20           2         54.1          18.5       1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.1           6.2       3.0X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        133.2           7.5       2.5X
+FixedLenByteArrayUpdater (len=16 -> Binary)                         20         
    21           1         51.8          19.3       1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.2           6.2       3.1X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        133.2           7.5       2.6X
 
 
diff --git 
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt 
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
index 627835735a08..6dfd2fdadc25 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Identity Updaters:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater                                        0              0         
  0      17138.1           0.1       1.0X
-ByteUpdater (INT32 -> Byte)                           0              0         
  0       3678.2           0.3       0.2X
+BooleanUpdater                                        0              0         
  0      17160.2           0.1       1.0X
+ByteUpdater (INT32 -> Byte)                           0              0         
  0       3686.1           0.3       0.2X
 ShortUpdater (INT32 -> Short)                         1              1         
  0       1662.8           0.6       0.1X
-IntegerUpdater                                        0              0         
  0      10231.8           0.1       0.6X
-LongUpdater                                           0              0         
  0       5103.2           0.2       0.3X
-FloatUpdater                                          0              0         
  0      10203.9           0.1       0.6X
-DoubleUpdater                                         0              0         
  0       5105.4           0.2       0.3X
-BinaryUpdater                                        17             17         
  0         62.8          15.9       0.0X
+IntegerUpdater                                        0              0         
  0      10282.0           0.1       0.6X
+LongUpdater                                           0              0         
  0       5151.9           0.2       0.3X
+FloatUpdater                                          0              0         
  0      10306.3           0.1       0.6X
+DoubleUpdater                                         0              0         
  0       5149.1           0.2       0.3X
+BinaryUpdater                                        15             16         
  0         67.8          14.8       0.0X
 
 
 
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Type-converting Updaters:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater                                     0              0      
     0       6239.9           0.2       1.0X
-IntegerToDoubleUpdater                                   0              0      
     0       6382.6           0.2       1.0X
-FloatToDoubleUpdater                                     0              0      
     0       2570.7           0.4       0.4X
-DateToTimestampNTZUpdater                               26             26      
     0         40.5          24.7       0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2))              2              2      
     0        637.7           1.6       0.1X
+IntegerToLongUpdater                                     0              0      
     0       6390.8           0.2       1.0X
+IntegerToDoubleUpdater                                   0              0      
     0       6415.4           0.2       1.0X
+FloatToDoubleUpdater                                     0              0      
     0       3196.5           0.3       0.5X
+DateToTimestampNTZUpdater                               29             29      
     0         36.6          27.3       0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2))              0              0      
     0       6568.8           0.2       1.0X
 
 
 
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Rebase Updaters:                                 Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3660.5           0.3       1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2663.6           0.4       0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              2  
         0        521.3           1.9       0.1X
+IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3665.9           0.3       1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2652.9           0.4       0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       3              3  
         0        371.3           2.7       0.1X
 
 
 
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Unsigned Updaters:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       6196.3           0.2       1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0))             17             18    
       1         60.3          16.6       0.0X
+UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       5111.6           0.2       1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0))             17             18    
       0         60.4          16.6       0.0X
 
 
 
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Decimal Updaters:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater                               0              0         
  0      10247.8           0.1       1.0X
-LongToDecimalUpdater                                  0              0         
  0       5125.7           0.2       0.5X
-FixedLenByteArrayToDecimalUpdater                    21             21         
  0         50.7          19.7       0.0X
+IntegerToDecimalUpdater                               0              0         
  0      10205.8           0.1       1.0X
+LongToDecimalUpdater                                  0              0         
  0       5111.4           0.2       0.5X
+FixedLenByteArrayToDecimalUpdater                    21             21         
  2         50.9          19.6       0.0X
 
 
 
================================================================================================
@@ -77,7 +77,7 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 FixedLenByteArray Updaters:                              Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary)                         21         
    21           1         51.0          19.6       1.0X
+FixedLenByteArrayUpdater (len=16 -> Binary)                         21         
    22           1         50.4          19.8       1.0X
 FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        152.6           6.6       3.0X
 FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        127.7           7.8       2.5X
 
diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt 
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
index 5e2c31d2b566..5918db9f759b 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Identity Updaters:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater                                        0              0         
  0      14526.2           0.1       1.0X
-ByteUpdater (INT32 -> Byte)                           0              0         
  0       3679.3           0.3       0.3X
-ShortUpdater (INT32 -> Short)                         1              1         
  0       2054.1           0.5       0.1X
-IntegerUpdater                                        0              0         
  0      10178.0           0.1       0.7X
-LongUpdater                                           0              0         
  0       5054.4           0.2       0.3X
-FloatUpdater                                          0              0         
  0      10212.8           0.1       0.7X
-DoubleUpdater                                         0              0         
  0       5051.2           0.2       0.3X
-BinaryUpdater                                        15             15         
  0         68.4          14.6       0.0X
+BooleanUpdater                                        0              0         
  0      14617.4           0.1       1.0X
+ByteUpdater (INT32 -> Byte)                           0              0         
  0       3667.7           0.3       0.3X
+ShortUpdater (INT32 -> Short)                         1              1         
  0       2048.9           0.5       0.1X
+IntegerUpdater                                        0              0         
  0      10281.9           0.1       0.7X
+LongUpdater                                           0              0         
  0       5138.0           0.2       0.4X
+FloatUpdater                                          0              0         
  0       7742.9           0.1       0.5X
+DoubleUpdater                                         0              0         
  0       3863.4           0.3       0.3X
+BinaryUpdater                                        15             15         
  0         70.2          14.2       0.0X
 
 
 
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Type-converting Updaters:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater                                     1              1      
     0       1280.6           0.8       1.0X
-IntegerToDoubleUpdater                                   1              1      
     0       1537.9           0.7       1.2X
-FloatToDoubleUpdater                                     1              1      
     0       1418.8           0.7       1.1X
-DateToTimestampNTZUpdater                               36             36      
     0         29.5          33.9       0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2))              2              2      
     0        455.3           2.2       0.4X
+IntegerToLongUpdater                                     1              1      
     0       1279.7           0.8       1.0X
+IntegerToDoubleUpdater                                   1              1      
     0       1544.8           0.6       1.2X
+FloatToDoubleUpdater                                     1              1      
     0       1417.9           0.7       1.1X
+DateToTimestampNTZUpdater                               36             36      
     1         29.5          33.9       0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2))              1              1      
     0       1287.3           0.8       1.0X
 
 
 
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Rebase Updaters:                                 Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       2407.3           0.4       1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              1              1  
         0       2030.8           0.5       0.8X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              2  
         0        454.4           2.2       0.2X
+IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       2599.8           0.4       1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              1              1  
         0       2092.2           0.5       0.8X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              2  
         0        454.7           2.2       0.2X
 
 
 
================================================================================================
@@ -52,7 +52,7 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Unsigned Updaters:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long)                    1              1    
       0       1093.1           0.9       1.0X
+UnsignedIntegerUpdater (UINT32 -> Long)                    1              1    
       0       1091.2           0.9       1.0X
 UnsignedLongUpdater (UINT64 -> Decimal(20,0))             18             18    
       0         59.1          16.9       0.1X
 
 
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 Decimal Updaters:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater                               0              0         
  0      10195.9           0.1       1.0X
-LongToDecimalUpdater                                  0              0         
  0       5049.2           0.2       0.5X
-FixedLenByteArrayToDecimalUpdater                    21             21         
  0         51.0          19.6       0.0X
+IntegerToDecimalUpdater                               0              0         
  0      10241.7           0.1       1.0X
+LongToDecimalUpdater                                  0              0         
  0       5118.1           0.2       0.5X
+FixedLenByteArrayToDecimalUpdater                    21             21         
  0         51.1          19.6       0.0X
 
 
 
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 
6.17.0-1010-azure
 AMD EPYC 7763 64-Core Processor
 FixedLenByteArray Updaters:                              Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary)                         19         
    19           0         54.9          18.2       1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.1           6.2       2.9X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              9         
     9           0        123.0           8.1       2.2X
+FixedLenByteArrayUpdater (len=16 -> Binary)                         19         
    19           0         55.1          18.2       1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.2           6.2       2.9X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              9         
     9           0        123.1           8.1       2.2X
 
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index eba4b426a0d8..d9a47f16cfc4 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -684,9 +684,7 @@ public class ParquetVectorUpdaterFactory {
         int offset,
         WritableColumnVector values,
         VectorizedValuesReader valuesReader) {
-      for (int i = 0; i < total; ++i) {
-        values.putInt(offset + i, (int) valuesReader.readLong());
-      }
+      valuesReader.readLongsAsInts(total, values, offset);
     }
 
     @Override
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index d91ba5e2b87d..9249fab7915c 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -192,6 +192,18 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
     }
   }
 
+  @Override
+  public final void readLongsAsInts(int total, WritableColumnVector c, int 
rowId) {
+    int requiredBytes = total * 8;
+    ByteBuffer buffer = getBuffer(requiredBytes);
+    // No `hasArray` bulk-copy path: source (int64, 8 bytes) and target 
(int32, 4 bytes)
+    // have different widths so a contiguous byte copy is impossible. Matches 
the pattern
+    // in `readIntegersAsLongs`.
+    for (int i = 0; i < total; i += 1) {
+      c.putInt(rowId + i, (int) buffer.getLong());
+    }
+  }
+
   // A fork of `readIntegers` to rebase the date values. For performance 
reasons, this method
   // iterates the values twice: check if we need to rebase first, then go to 
the optimized branch
   // if rebase is not needed.
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index a90e9bf01c81..c62f7bcec8c3 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -115,6 +115,27 @@ public interface VectorizedValuesReader {
     }
   }
 
+  /**
+   * Reads {@code total} INT64 values, narrows each to an int via Java's 
primitive
+   * long-to-int cast (the high 32 bits are discarded), and writes them into 
{@code c}
+   * starting at {@code c[rowId]}. Used by the type-converting updater that 
reads parquet
+   * INT64 DECIMAL columns whose Spark target is a 32-bit decimal (precision 
<= 9); such
+   * values are guaranteed by Parquet's decimal encoding to fit in int32, so 
the
+   * narrowing is non-lossy in practice.
+   *
+   * <p>The default implementation falls back to a per-row read+narrow+write 
loop and is
+   * therefore equivalent in cost to the legacy per-row Updater path. 
Subclasses backed
+   * by contiguous bulk storage (e.g. PLAIN encoding via {@link 
VectorizedPlainValuesReader})
+   * should override to read source bytes once and run a tight in-method 
conversion loop,
+   * avoiding {@code total} virtual dispatches on {@link #readLong()}. Readers 
without
+   * an override preserve correctness but gain no speedup.
+   */
+  default void readLongsAsInts(int total, WritableColumnVector c, int rowId) {
+    for (int i = 0; i < total; i += 1) {
+      c.putInt(rowId + i, (int) readLong());
+    }
+  }
+
   void readBinary(int total, WritableColumnVector c, int rowId);
   void readGeometry(int total, WritableColumnVector c, int rowId);
   void readGeography(int total, WritableColumnVector c, int rowId);
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 692306fb52f2..7b3cdc7df2a7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -2021,6 +2021,57 @@ class ParquetIOSuite extends ParquetTest with 
SharedSparkSession {
       }
     }
   }
+
+  test("INT64 DECIMAL -> 32-bit DecimalType narrowing end-to-end via 
vectorized read path") {
+    // Round-trips a custom-built parquet file with INT64 + DECIMAL(9, 2) 
annotation read
+    // back as DecimalType(9, 2). DECIMAL(9, _) values fit in int32, so the 
factory routes
+    // through DowncastLongUpdater (target is 32-bit decimal storage). Spark's 
own writer
+    // produces INT32-backed decimal for precision <= 9, so we use 
`parquet-mr`'s low-level
+    // API to force INT64 storage, mirroring the wire format that other 
writers (e.g. Hive,
+    // Impala) produce.
+    val schema = MessageTypeParser.parseMessageType(
+      """message root {
+        |  required int64 v (DECIMAL(9, 2));
+        |}""".stripMargin)
+
+    for (dictEnabled <- Seq(true, false)) {
+      withTempDir { dir =>
+        val tablePath = new Path(s"${dir.getCanonicalPath}/dec.parquet")
+        val numRecords = 5000
+
+        val writer = createParquetWriter(schema, tablePath, dictionaryEnabled 
= dictEnabled)
+        (0 until numRecords).foreach { i =>
+          val unscaled = i % 5 match {
+            case 0 => -999_999_999L
+            case 1 => -1L
+            case 2 => 0L
+            case 3 => 999_999_999L
+            case _ => i.toLong * 13L - 7L
+          }
+          val record = new SimpleGroup(schema)
+          record.add(0, unscaled)
+          writer.write(record)
+        }
+        writer.close
+
+        withAllParquetReaders {
+          val readSchema = new StructType().add("v", DecimalType(9, 2), 
nullable = false)
+          val df = spark.read.schema(readSchema).parquet(tablePath.toString)
+          val expected = (0 until numRecords).map { i =>
+            val unscaled = i % 5 match {
+              case 0 => -999_999_999L
+              case 1 => -1L
+              case 2 => 0L
+              case 3 => 999_999_999L
+              case _ => i.toLong * 13L - 7L
+            }
+            Row(java.math.BigDecimal.valueOf(unscaled, 2))
+          }
+          checkAnswer(df, expected)
+        }
+      }
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
index 2ed6f84e68b9..65a4d90750b8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types._
  *   - `IntegerToLongUpdater` (INT32 -> Long, plus long-decimal dispatch)
  *   - `IntegerToDoubleUpdater` (INT32 -> Double)
  *   - `FloatToDoubleUpdater` (FLOAT -> Double)
+ *   - `DowncastLongUpdater` (INT64 DECIMAL -> 32-bit Decimal via narrowing 
cast)
  *
  * Covers boundary batch lengths, sign-extension on negative INT32 values, the 
singular
  * `readValue` path, and the factory's long-decimal dispatch
@@ -79,6 +80,13 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
     buf.array()
   }
 
+  private def plainLongBytes(values: Array[Long]): Array[Byte] = {
+    val buf = ByteBuffer.allocate(values.length * 
8).order(ByteOrder.LITTLE_ENDIAN)
+    var i = 0
+    while (i < values.length) { buf.putLong(values(i)); i += 1 }
+    buf.array()
+  }
+
   private def newPlainReader(bytes: Array[Byte], numValues: Int): 
VectorizedPlainValuesReader = {
     val r = new VectorizedPlainValuesReader
     r.initFromPage(numValues, 
ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)))
@@ -260,6 +268,54 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
     out
   }
 
+  // ---- DowncastLongUpdater: INT64 DECIMAL(p<=9) -> 32-bit Decimal via 
narrowing cast ----
+
+  // INT64 column descriptor annotated as DECIMAL(precision, scale); when both 
source and
+  // target precision are <= 9, the factory routes to DowncastLongUpdater 
(target is stored
+  // as int32 because `DecimalType.is32BitDecimalType` requires precision <= 
9). Parquet
+  // guarantees DECIMAL(p<=9) values fit in int32, so the narrowing `(int) 
buffer.getLong()`
+  // is non-lossy in practice.
+  private def int64DecimalDescriptor(precision: Int, scale: Int): 
ColumnDescriptor = {
+    val pt = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL)
+      .as(LogicalTypeAnnotation.decimalType(scale, precision))
+      .named("col")
+    new ColumnDescriptor(Array("col"), pt, 0, 1)
+  }
+
+  // Reads `values.length` INT64s through `DowncastLongUpdater.readValues` and 
returns the
+  // resulting int column. Caller guarantees values fit in int32.
+  private def readViaDowncastLongUpdater(
+      desc: ColumnDescriptor,
+      targetType: DataType,
+      values: Array[Long]): Array[Int] = {
+    val fac = newFactory(desc)
+    val updater = fac.getUpdater(desc, targetType)
+    val out = new OnHeapColumnVector(values.length.max(1), targetType)
+    val reader = newPlainReader(plainLongBytes(values), values.length)
+    updater.readValues(values.length, 0, out, reader)
+    val result = new Array[Int](values.length)
+    var i = 0
+    while (i < values.length) { result(i) = out.getInt(i); i += 1 }
+    result
+  }
+
+  // Sample of INT64 values guaranteed to fit in int32 (within DECIMAL(9, _) 
range).
+  private def downcastSampleValues(n: Int): Array[Long] = {
+    val out = new Array[Long](n)
+    var i = 0
+    while (i < n) {
+      out(i) = i match {
+        case _ if i % 5 == 0 => -999_999_999L  // min DECIMAL(9, _) value
+        case _ if i % 5 == 1 => -1L
+        case _ if i % 5 == 2 => 0L
+        case _ if i % 5 == 3 => 999_999_999L   // max DECIMAL(9, _) value
+        case _ => i * 13L - 7L
+      }
+      i += 1
+    }
+    out
+  }
+
   // Java's float-to-double widening is exact for finite/infinite values and 
produces
   // some double NaN for a NaN input (the payload may be canonicalized). Use
   // `java.lang.Double.compare` to give NaN well-defined equality and to 
distinguish
@@ -310,4 +366,46 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
     assert(java.lang.Double.doubleToRawLongBits(actual(1)) ===
       java.lang.Double.doubleToRawLongBits(0.0d))
   }
+
+  for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) {
+    test(s"DowncastLongUpdater produces correct narrowed output (total=$n)") {
+      val desc = int64DecimalDescriptor(precision = 9, scale = 2)
+      val targetType = DataTypes.createDecimalType(9, 2)
+      val input = downcastSampleValues(n)
+      val actual = readViaDowncastLongUpdater(desc, targetType, input)
+      assert(actual === input.map(_.toInt))
+    }
+  }
+
+  test("DowncastLongUpdater: readValue narrows a single INT64 -> int") {
+    // Same rationale as the IntegerToLongUpdater readValue test: the 
def-level-decoder's
+    // run-of-1 path calls `readLong()` directly rather than the bulk method.
+    val desc = int64DecimalDescriptor(precision = 9, scale = 2)
+    val targetType = DataTypes.createDecimalType(9, 2)
+    val input = Array(0L, 1L, -1L, 42L, -999_999_999L, 999_999_999L)
+    val fac = newFactory(desc)
+    val updater = fac.getUpdater(desc, targetType)
+    val out = new OnHeapColumnVector(input.length, targetType)
+    val reader = newPlainReader(plainLongBytes(input), input.length)
+    var i = 0
+    while (i < input.length) {
+      updater.readValue(i, out, reader)
+      i += 1
+    }
+    val actual = (0 until input.length).map(out.getInt).toArray
+    assert(actual === input.map(_.toInt))
+  }
+
+  test("DowncastLongUpdater: narrowing cast preserves sign for in-range 
values") {
+    // Spot-check that `(int) buffer.getLong()` preserves sign for every INT64 
value
+    // bounded by DECIMAL(9, _)'s range. Out-of-range INT64 values (i.e., 
outside
+    // [-999_999_999, 999_999_999]) are not reachable in production because 
Parquet's
+    // DECIMAL encoding bounds writer-side; the cast would truncate the high 
32 bits and
+    // is documented as such on `readLongsAsInts`.
+    val desc = int64DecimalDescriptor(precision = 9, scale = 0)
+    val targetType = DataTypes.createDecimalType(9, 0)
+    val input = Array(-999_999_999L, -1L, 0L, 1L, 999_999_999L)
+    val actual = readViaDowncastLongUpdater(desc, targetType, input)
+    assert(actual === Array[Int](-999_999_999, -1, 0, 1, 999_999_999))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to