This is an automated email from the ASF dual-hosted git repository.

LuciferYang pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new d78aeec2ecfa [SPARK-56804][SQL] Add bulk read+convert path for DATE to 
TimestampNTZ Parquet vector updater
d78aeec2ecfa is described below

commit d78aeec2ecfa884c73002a9c198529f6fcf04bec
Author: YangJie <[email protected]>
AuthorDate: Tue May 19 18:07:14 2026 +0800

    [SPARK-56804][SQL] Add bulk read+convert path for DATE to TimestampNTZ 
Parquet vector updater
    
    ### What changes were proposed in this pull request?
    
    Extend the bulk-read pattern (SPARK-56791 / 56801 / 56802 / 56803) to 
`DateToTimestampNTZUpdater`. Add a `readIntegersAsTimestampMicros` default 
method on `VectorizedValuesReader`, override it in 
`VectorizedPlainValuesReader` to fetch source bytes once via `getBuffer(total * 
4)` and run a tight conversion loop, and reduce 
`DateToTimestampNTZUpdater.readValues` to a one-line delegation. Scope: UTC, 
`CORRECTED` rebase mode; the `LEGACY` / `EXCEPTION` rebase variants (handled by 
`DateToT [...]
    
    ### Why are the changes needed?
    
    This was first proposed in #55855 and closed because benchmarks showed no 
measurable gain -- the per-row work was dominated by 
`DateTimeUtils.daysToMicros`'s `LocalDate` / `ZonedDateTime` / `Instant` 
allocation chain, swamping the savings from collapsing N `getBuffer(4)` slice 
allocations.
    
    SPARK-56874 fixed that by fast-pathing `daysToMicros` at `ZoneOffset.UTC` 
to `Math.multiplyExact(days.toLong, MICROS_PER_DAY)`. With the conversion now 
cheap, the bulk-read savings become visible:
    
    | JDK | Master baseline | With this PR | Speedup |
    |---|---|---|---|
    | 17 | 2.8 ns/row (357.5 M/s) | 1.7 ns/row (605.2 M/s) | ~1.7x |
    | 21 | 2.7 ns/row (366.1 M/s) | 1.1 ns/row (934.8 M/s) | ~2.5x |
    | 25 | 2.6 ns/row (378.3 M/s) | 1.1 ns/row (884.9 M/s) | ~2.4x |
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit tests in `ParquetVectorUpdaterSuite` for 
`readIntegersAsTimestampMicros` (various batch sizes, single-value `readValue`, 
UTC conversion semantics). New integration test in `ParquetIOSuite` for INT32 
DATE -> `TimestampNTZType` through the vectorized reader with both 
dictionary-encoded and plain pages. Benchmark numbers above are from GHA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #55971 from LuciferYang/SPARK-56804-date-to-tsntz.
    
    Authored-by: YangJie <[email protected]>
    Signed-off-by: yangjie01 <[email protected]>
    (cherry picked from commit 19073d8ac499df9ddccdc086840eb545c4c0a257)
    Signed-off-by: yangjie01 <[email protected]>
---
 ...ParquetVectorUpdaterBenchmark-jdk21-results.txt | 46 ++++++------
 ...ParquetVectorUpdaterBenchmark-jdk25-results.txt | 44 +++++------
 .../ParquetVectorUpdaterBenchmark-results.txt      | 60 +++++++--------
 .../parquet/ParquetVectorUpdaterFactory.java       |  4 +-
 .../parquet/VectorizedPlainValuesReader.java       | 15 ++++
 .../parquet/VectorizedValuesReader.java            | 28 +++++++
 .../datasources/parquet/ParquetIOSuite.scala       | 63 ++++++++++++++++
 .../parquet/ParquetVectorUpdaterSuite.scala        | 85 ++++++++++++++++++++++
 8 files changed, 267 insertions(+), 78 deletions(-)

diff --git 
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt 
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
index 9e86eacb0e8c..07373d576633 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Identity Updaters:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater                                        0              0         
  0      17004.4           0.1       1.0X
-ByteUpdater (INT32 -> Byte)                           0              0         
  0       3746.5           0.3       0.2X
-ShortUpdater (INT32 -> Short)                         1              1         
  0       1681.2           0.6       0.1X
-IntegerUpdater                                        0              0         
  0      10290.1           0.1       0.6X
-LongUpdater                                           0              0         
  0       3875.9           0.3       0.2X
-FloatUpdater                                          0              0         
  0      10148.5           0.1       0.6X
-DoubleUpdater                                         0              0         
  0       5141.3           0.2       0.3X
-BinaryUpdater                                        15             15         
  0         70.7          14.1       0.0X
+BooleanUpdater                                        0              0         
  0      16990.6           0.1       1.0X
+ByteUpdater (INT32 -> Byte)                           0              0         
  0       3765.0           0.3       0.2X
+ShortUpdater (INT32 -> Short)                         1              1         
  0       1682.9           0.6       0.1X
+IntegerUpdater                                        0              0         
  0       7756.2           0.1       0.5X
+LongUpdater                                           0              0         
  0       3870.4           0.3       0.2X
+FloatUpdater                                          0              0         
  0       7758.5           0.1       0.5X
+DoubleUpdater                                         0              0         
  0       3875.9           0.3       0.2X
+BinaryUpdater                                        15             15         
  0         70.4          14.2       0.0X
 
 
 
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Type-converting Updaters:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater                                     0              0      
     0       6237.3           0.2       1.0X
-IntegerToDoubleUpdater                                   0              0      
     0       6117.3           0.2       1.0X
-FloatToDoubleUpdater                                     0              0      
     0       2526.7           0.4       0.4X
-DateToTimestampNTZUpdater                                3              3      
     0        366.1           2.7       0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2))              0              0      
     0       5126.2           0.2       0.8X
+IntegerToLongUpdater                                     0              0      
     0       5133.8           0.2       1.0X
+IntegerToDoubleUpdater                                   0              0      
     0       6090.4           0.2       1.2X
+FloatToDoubleUpdater                                     0              0      
     0       2527.1           0.4       0.5X
+DateToTimestampNTZUpdater                                1              1      
     0        934.8           1.1       0.2X
+DowncastLongUpdater (INT64 -> Decimal(9,2))              0              0      
     0       5108.5           0.2       1.0X
 
 
 
================================================================================================
@@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Rebase Updaters:                                 Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3640.1           0.3       1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2281.8           0.4       0.6X
+IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3263.0           0.3       1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2282.0           0.4       0.7X
 LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              3  
         0        420.5           2.4       0.1X
 
 
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Unsigned Updaters:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       5141.0           0.2       1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0))             16             17    
       0         63.8          15.7       0.0X
+UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       5112.2           0.2       1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0))             16             17    
       0         63.9          15.7       0.0X
 
 
 
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Decimal Updaters:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater                               0              0         
  0      10288.1           0.1       1.0X
-LongToDecimalUpdater                                  0              0         
  0       3872.9           0.3       0.4X
-FixedLenByteArrayToDecimalUpdater                    21             21         
  1         50.2          19.9       0.0X
+IntegerToDecimalUpdater                               0              0         
  0       7750.4           0.1       1.0X
+LongToDecimalUpdater                                  0              0         
  0       3866.5           0.3       0.5X
+FixedLenByteArrayToDecimalUpdater                    21             21         
  0         50.1          19.9       0.0X
 
 
 
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 FixedLenByteArray Updaters:                              Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary)                         20         
    21           2         51.8          19.3       1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.2           6.2       3.1X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        133.3           7.5       2.6X
+FixedLenByteArrayUpdater (len=16 -> Binary)                         20         
    20           0         51.7          19.3       1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           1        160.1           6.2       3.1X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        133.2           7.5       2.6X
 
 
diff --git 
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt 
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
index aed60eaf5136..e03dae8c072a 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Identity Updaters:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater                                        0              0         
  0      17126.9           0.1       1.0X
-ByteUpdater (INT32 -> Byte)                           0              0         
  0       3721.3           0.3       0.2X
-ShortUpdater (INT32 -> Short)                         1              1         
  0       1662.6           0.6       0.1X
-IntegerUpdater                                        0              0         
  0      10216.0           0.1       0.6X
-LongUpdater                                           0              0         
  0       5150.9           0.2       0.3X
-FloatUpdater                                          0              0         
  0      10313.5           0.1       0.6X
-DoubleUpdater                                         0              0         
  0       5147.6           0.2       0.3X
-BinaryUpdater                                        16             16         
  0         66.4          15.1       0.0X
+BooleanUpdater                                        0              0         
  0      17171.8           0.1       1.0X
+ByteUpdater (INT32 -> Byte)                           0              0         
  0       3679.8           0.3       0.2X
+ShortUpdater (INT32 -> Short)                         1              1         
  0       1662.3           0.6       0.1X
+IntegerUpdater                                        0              0         
  0      10261.9           0.1       0.6X
+LongUpdater                                           0              0         
  0       5130.7           0.2       0.3X
+FloatUpdater                                          0              0         
  0      10255.9           0.1       0.6X
+DoubleUpdater                                         0              0         
  0       5127.2           0.2       0.3X
+BinaryUpdater                                        15             16         
  0         67.7          14.8       0.0X
 
 
 
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Type-converting Updaters:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater                                     0              0      
     0       5428.8           0.2       1.0X
-IntegerToDoubleUpdater                                   0              0      
     0       5132.3           0.2       0.9X
-FloatToDoubleUpdater                                     0              0      
     0       3199.4           0.3       0.6X
-DateToTimestampNTZUpdater                                3              3      
     1        378.3           2.6       0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2))              0              0      
     0       6548.3           0.2       1.2X
+IntegerToLongUpdater                                     0              0      
     0       6438.7           0.2       1.0X
+IntegerToDoubleUpdater                                   0              0      
     0       6441.2           0.2       1.0X
+FloatToDoubleUpdater                                     0              0      
     0       3199.5           0.3       0.5X
+DateToTimestampNTZUpdater                                1              1      
     0        884.9           1.1       0.1X
+DowncastLongUpdater (INT64 -> Decimal(9,2))              0              0      
     0       6713.8           0.1       1.0X
 
 
 
================================================================================================
@@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Rebase Updaters:                                 Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3097.0           0.3       1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2286.8           0.4       0.7X
+IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       3664.5           0.3       1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              0              0  
         0       2668.7           0.4       0.7X
 LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       3              3  
         0        371.3           2.7       0.1X
 
 
@@ -52,7 +52,7 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Unsigned Updaters:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       5119.0           0.2       1.0X
+UnsignedIntegerUpdater (UINT32 -> Long)                    0              0    
       0       6183.9           0.2       1.0X
 UnsignedLongUpdater (UINT64 -> Decimal(20,0))             17             17    
       0         60.4          16.6       0.0X
 
 
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 Decimal Updaters:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater                               0              0         
  0       7753.9           0.1       1.0X
-LongToDecimalUpdater                                  0              0         
  0       3880.7           0.3       0.5X
-FixedLenByteArrayToDecimalUpdater                    21             21         
  0         50.9          19.6       0.0X
+IntegerToDecimalUpdater                               0              0         
  0      10268.1           0.1       1.0X
+LongToDecimalUpdater                                  0              0         
  0       5122.2           0.2       0.5X
+FixedLenByteArrayToDecimalUpdater                    21             21         
  0         50.9          19.7       0.0X
 
 
 
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux 
6.17.0-1013-azure
 AMD EPYC 7763 64-Core Processor
 FixedLenByteArray Updaters:                              Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary)                         21         
    21           1         50.5          19.8       1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        152.7           6.5       3.0X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        127.8           7.8       2.5X
+FixedLenByteArrayUpdater (len=16 -> Binary)                         21         
    21           1         50.3          19.9       1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        152.7           6.6       3.0X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     8           0        127.7           7.8       2.5X
 
 
diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt 
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
index a0ad4843991b..828e68578877 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
@@ -3,17 +3,17 @@ Identity Updaters
 
================================================================================================
 
 OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
 Identity Updaters:                        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater                                        0              0         
  0      14742.5           0.1       1.0X
-ByteUpdater (INT32 -> Byte)                           0              0         
  0       3584.0           0.3       0.2X
-ShortUpdater (INT32 -> Short)                         1              1         
  0       1824.8           0.5       0.1X
-IntegerUpdater                                        0              0         
  0       8346.1           0.1       0.6X
-LongUpdater                                           0              0         
  0       4103.9           0.2       0.3X
-FloatUpdater                                          0              0         
  0       8215.2           0.1       0.6X
-DoubleUpdater                                         0              0         
  0       4141.1           0.2       0.3X
-BinaryUpdater                                        18             18         
  0         58.9          17.0       0.0X
+BooleanUpdater                                        0              0         
  0      14640.0           0.1       1.0X
+ByteUpdater (INT32 -> Byte)                           0              0         
  0       3686.8           0.3       0.3X
+ShortUpdater (INT32 -> Short)                         1              1         
  0       2054.1           0.5       0.1X
+IntegerUpdater                                        0              0         
  0       7759.1           0.1       0.5X
+LongUpdater                                           0              0         
  0       3876.1           0.3       0.3X
+FloatUpdater                                          0              0         
  0       7762.5           0.1       0.5X
+DoubleUpdater                                         0              0         
  0       5123.2           0.2       0.3X
+BinaryUpdater                                        15             15         
  0         70.1          14.3       0.0X
 
 
 
================================================================================================
@@ -21,14 +21,14 @@ Type-converting Updaters
 
================================================================================================
 
 OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
 Type-converting Updaters:                    Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater                                     1              1      
     0       1129.8           0.9       1.0X
-IntegerToDoubleUpdater                                   1              1      
     0       1365.8           0.7       1.2X
-FloatToDoubleUpdater                                     1              1      
     0       1284.3           0.8       1.1X
-DateToTimestampNTZUpdater                                3              3      
     0        357.5           2.8       0.3X
-DowncastLongUpdater (INT64 -> Decimal(9,2))              1              1      
     0       1136.5           0.9       1.0X
+IntegerToLongUpdater                                     1              1      
     0       1281.0           0.8       1.0X
+IntegerToDoubleUpdater                                   1              1      
     0       1550.0           0.6       1.2X
+FloatToDoubleUpdater                                     1              1      
     0       1419.0           0.7       1.1X
+DateToTimestampNTZUpdater                                2              2      
     0        605.2           1.7       0.5X
+DowncastLongUpdater (INT64 -> Decimal(9,2))              1              1      
     0       1285.1           0.8       1.0X
 
 
 
================================================================================================
@@ -36,12 +36,12 @@ Rebase Updaters
 
================================================================================================
 
 OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
 Rebase Updaters:                                 Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       2180.9           0.5       1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              1              1  
         0       1744.3           0.6       0.8X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              3  
         0        421.0           2.4       0.2X
+IntegerWithRebaseUpdater (DATE legacy)                       0              0  
         0       2662.8           0.4       1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy)              1              1  
         0       2084.1           0.5       0.8X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS)                       2              2  
         0        454.8           2.2       0.2X
 
 
 
================================================================================================
@@ -49,11 +49,11 @@ Unsigned Updaters
 
================================================================================================
 
 OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
 Unsigned Updaters:                             Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long)                    1              1    
       0        965.9           1.0       1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0))             18             18    
       0         58.6          17.1       0.1X
+UnsignedIntegerUpdater (UINT32 -> Long)                    1              1    
       0       1094.1           0.9       1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0))             17             17    
       0         61.0          16.4       0.1X
 
 
 
================================================================================================
@@ -61,12 +61,12 @@ Decimal Updaters
 
================================================================================================
 
 OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
 Decimal Updaters:                         Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater                               0              0         
  0       8299.8           0.1       1.0X
-LongToDecimalUpdater                                  0              0         
  0       4106.5           0.2       0.5X
-FixedLenByteArrayToDecimalUpdater                    24             24         
  1         43.8          22.8       0.0X
+IntegerToDecimalUpdater                               0              0         
  0      10261.0           0.1       1.0X
+LongToDecimalUpdater                                  0              0         
  0       5118.9           0.2       0.5X
+FixedLenByteArrayToDecimalUpdater                    21             21         
  0         51.0          19.6       0.0X
 
 
 
================================================================================================
@@ -74,11 +74,11 @@ FixedLenByteArray Updaters
 
================================================================================================
 
 OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
 FixedLenByteArray Updaters:                              Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
 
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary)                         22         
    23           0         47.0          21.3       1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                6         
     6           1        166.4           6.0       3.5X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              8         
     9           1        125.1           8.0       2.7X
+FixedLenByteArrayUpdater (len=16 -> Binary)                         19         
    19           0         55.3          18.1       1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2))                7         
     7           0        160.2           6.2       2.9X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4))              9         
     9           0        123.3           8.1       2.2X
 
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 76cedbbef3b4..c088f5f2844b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -429,9 +429,7 @@ public class ParquetVectorUpdaterFactory {
         int offset,
         WritableColumnVector values,
         VectorizedValuesReader valuesReader) {
-      for (int i = 0; i < total; ++i) {
-        readValue(offset + i, values, valuesReader);
-      }
+      valuesReader.readIntegersAsTimestampMicros(total, values, offset);
     }
 
     @Override
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 9249fab7915c..23207e7db357 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.time.ZoneOffset;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
@@ -26,6 +27,7 @@ import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.ParquetDecodingException;
 
 import org.apache.spark.SparkUnsupportedOperationException;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
 import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
@@ -204,6 +206,19 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
     }
   }
 
+  @Override
+  public final void readIntegersAsTimestampMicros(
+      int total, WritableColumnVector c, int rowId) {
+    int requiredBytes = total * 4;
+    ByteBuffer buffer = getBuffer(requiredBytes);
+    // Per-element conversion calls into `DateTimeUtils.daysToMicros`, which 
is `days *
+    // MICROS_PER_DAY` for UTC plus an overflow check via 
`Math.multiplyExact`. No
+    // `hasArray` bulk-copy path because source and target have different 
widths.
+    for (int i = 0; i < total; i += 1) {
+      c.putLong(rowId + i, DateTimeUtils.daysToMicros(buffer.getInt(), 
ZoneOffset.UTC));
+    }
+  }
+
   // A fork of `readIntegers` to rebase the date values. For performance 
reasons, this method
   // iterates the values twice: check if we need to rebase first, then go to 
the optimized branch
   // if rebase is not needed.
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index c62f7bcec8c3..bf6a1c6a0388 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.execution.datasources.parquet;
 
 import java.nio.ByteBuffer;
+import java.time.ZoneOffset;
 
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 import org.apache.parquet.io.api.Binary;
@@ -136,6 +138,32 @@ public interface VectorizedValuesReader {
     }
   }
 
+  /**
+   * Reads {@code total} INT32 date-day values (days since 1970-01-01, 
Proleptic Gregorian),
+   * converts each to TimestampNTZ micros at UTC via
+   * {@link DateTimeUtils#daysToMicros(int, java.time.ZoneId)}, and writes 
them into
+   * {@code c} starting at {@code c[rowId]}. Used by the type-converting 
updater that
+   * reads parquet INT32 DATE columns into Spark {@code TimestampNTZType} 
targets in
+   * {@code CORRECTED} datetime-rebase mode. The {@code LEGACY}/{@code 
EXCEPTION} rebase
+   * variants are out of scope for this method.
+   *
+   * <p>The default implementation is a per-row loop that calls
+   * {@code DateTimeUtils.daysToMicros} per element; it is algorithmically 
equivalent to
+   * the legacy per-row Updater path but the per-element conversion call 
dominates the
+   * loop, so the speedup from overriding this method is more modest than for 
the pure
+   * primitive-cast siblings ({@link #readIntegersAsLongs}, {@link 
#readIntegersAsDoubles}).
+   * Subclasses backed by contiguous bulk storage (e.g. PLAIN encoding via
+   * {@link VectorizedPlainValuesReader}) should override to read source bytes 
once and run
+   * a tight in-method conversion loop, avoiding {@code total} virtual 
dispatches on
+   * {@link #readInteger()}. Readers without an override preserve correctness 
but gain no
+   * speedup.
+   */
+  default void readIntegersAsTimestampMicros(int total, WritableColumnVector 
c, int rowId) {
+    for (int i = 0; i < total; i += 1) {
+      c.putLong(rowId + i, DateTimeUtils.daysToMicros(readInteger(), 
ZoneOffset.UTC));
+    }
+  }
+
   void readBinary(int total, WritableColumnVector c, int rowId);
   void readGeometry(int total, WritableColumnVector c, int rowId);
   void readGeography(int total, WritableColumnVector c, int rowId);
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index c25522161443..0fc32b15d833 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -2073,6 +2073,69 @@ class ParquetIOSuite extends ParquetTest with 
SharedSparkSession {
     }
   }
 
+  test("DATE -> TimestampNTZ widening end-to-end via vectorized read path") {
+    // Round-trips a DATE Parquet file read back as TimestampNTZType, 
exercising
+    // DateToTimestampNTZUpdater on the vectorized path in CORRECTED rebase 
mode.
+    // Sample mixes epoch, recent dates, pre-epoch, far-past and far-future 
days; each
+    // day-count converts to UTC midnight via `DateTimeUtils.daysToMicros`. 
Both
+    // REQUIRED and OPTIONAL columns are covered so that `readValue` and 
`readValues`
+    // are both invoked.
+    withSQLConf(
+        SQLConf.PARQUET_REBASE_MODE_IN_READ.key -> "CORRECTED",
+        SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
+      withTempPath { file =>
+        val n = 5000
+        def sampleAt(i: Int): java.sql.Date = i % 6 match {
+          case 0 => java.sql.Date.valueOf("1970-01-01")
+          case 1 => java.sql.Date.valueOf("1900-01-01")
+          case 2 => java.sql.Date.valueOf("2023-01-01")
+          case 3 => java.sql.Date.valueOf("0001-01-01")
+          case 4 => java.sql.Date.valueOf("4707-11-28")
+          case _ =>
+            // Cover every month (1..12) and a few years to exercise 
month-boundary
+            // behavior, not just months 1..9 the previous string-formatting 
allowed.
+            val year = 2020 + (i % 80)
+            val month = 1 + (i % 12)
+            java.sql.Date.valueOf(f"$year%04d-$month%02d-15")
+        }
+
+        val nonNullData = (0 until n).map(i => Row(sampleAt(i)))
+        // Every 7th row is null; mixes value runs and null runs at sub-batch 
lengths.
+        val nullableData = (0 until n).map { i =>
+          if (i % 7 == 0) Row(null) else Row(sampleAt(i))
+        }
+
+        val nonNullWriteSchema = new StructType().add("v", DateType, nullable 
= false)
+        val nonNullReadSchema = new StructType().add("v", TimestampNTZType, 
nullable = false)
+        val nullableWriteSchema = new StructType().add("v", DateType, nullable 
= true)
+        val nullableReadSchema = new StructType().add("v", TimestampNTZType, 
nullable = true)
+
+        val nonNullPath = new java.io.File(file, "nonnull").getCanonicalPath
+        val nullablePath = new java.io.File(file, "nullable").getCanonicalPath
+        spark.createDataFrame(spark.sparkContext.parallelize(nonNullData, 4), 
nonNullWriteSchema)
+          .write.parquet(nonNullPath)
+        spark.createDataFrame(
+          spark.sparkContext.parallelize(nullableData, 4), nullableWriteSchema)
+          .write.parquet(nullablePath)
+
+        def dateToNtz(d: java.sql.Date): java.time.LocalDateTime = {
+          val days = d.toLocalDate.toEpochDay.toInt
+          val micros = DateTimeUtils.daysToMicros(days, 
java.time.ZoneOffset.UTC)
+          DateTimeUtils.microsToLocalDateTime(micros)
+        }
+        val expectedNonNull = nonNullData.map(r => 
Row(dateToNtz(r.getDate(0))))
+        val expectedNullable = nullableData.map { r =>
+          if (r.isNullAt(0)) Row(null) else Row(dateToNtz(r.getDate(0)))
+        }
+
+        withAllParquetReaders {
+          
checkAnswer(spark.read.schema(nonNullReadSchema).parquet(nonNullPath), 
expectedNonNull)
+          
checkAnswer(spark.read.schema(nullableReadSchema).parquet(nullablePath), 
expectedNullable)
+        }
+      }
+    }
+  }
+
   test("SPARK-56872: INT64 DECIMAL into 32-bit Decimal column with dictionary 
fallback") {
     // `DowncastLongUpdater.decodeSingleDictionaryId` only runs when the 
vectorized reader has
     // to eagerly drain buffered dictionary IDs, which happens when parquet-mr 
writes the
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
index 65a4d90750b8..13e9376fb6fa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
@@ -27,6 +27,7 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.Type.Repetition
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
 import org.apache.spark.sql.types._
 
@@ -37,6 +38,7 @@ import org.apache.spark.sql.types._
  *   - `IntegerToDoubleUpdater` (INT32 -> Double)
  *   - `FloatToDoubleUpdater` (FLOAT -> Double)
  *   - `DowncastLongUpdater` (INT64 DECIMAL -> 32-bit Decimal via narrowing 
cast)
+ *   - `DateToTimestampNTZUpdater` (INT32 DATE -> TimestampNTZ micros at UTC)
  *
  * Covers boundary batch lengths, sign-extension on negative INT32 values, the 
singular
  * `readValue` path, and the factory's long-decimal dispatch
@@ -408,4 +410,87 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
     val actual = readViaDowncastLongUpdater(desc, targetType, input)
     assert(actual === Array[Int](-999_999_999, -1, 0, 1, 999_999_999))
   }
+
+  // ---- DateToTimestampNTZUpdater: INT32 DATE -> TimestampNTZ micros at UTC 
----
+
+  // INT32 column descriptor annotated as DATE; routes the factory through the
+  // `sparkType == TimestampNTZType && isDateTypeMatched(descriptor)` branch.
+  private val int32DateDescriptor: ColumnDescriptor = {
+    val pt = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL)
+      .as(LogicalTypeAnnotation.dateType())
+      .named("col")
+    new ColumnDescriptor(Array("col"), pt, 0, 1)
+  }
+
+  // Reads `values.length` INT32 day-counts through 
`DateToTimestampNTZUpdater.readValues`
+  // and returns the resulting micros column.
+  private def readViaDateToTimestampNTZUpdater(values: Array[Int]): 
Array[Long] = {
+    val fac = newFactory(int32DateDescriptor)
+    val updater = fac.getUpdater(int32DateDescriptor, 
DataTypes.TimestampNTZType)
+    val out = new OnHeapColumnVector(values.length.max(1), 
DataTypes.TimestampNTZType)
+    val reader = newPlainReader(plainIntBytes(values), values.length)
+    updater.readValues(values.length, 0, out, reader)
+    val result = new Array[Long](values.length)
+    var i = 0
+    while (i < values.length) { result(i) = out.getLong(i); i += 1 }
+    result
+  }
+
+  // Realistic day-count sample: epoch, recent dates, pre-epoch, far-past, 
far-future. All
+  // values are well within `Math.multiplyExact(days * 86400, 1_000_000)`'s 
safe range.
+  private def dateSampleValues(n: Int): Array[Int] = {
+    val out = new Array[Int](n)
+    var i = 0
+    while (i < n) {
+      out(i) = i % 6 match {
+        case 0 => 0          // 1970-01-01 (epoch)
+        case 1 => -25567     // ~1900-01-01
+        case 2 => 19366      // ~2023-01-01
+        case 3 => -719162    // ~0001-01-01
+        case 4 => 1000000    // ~4707-11-28 (far future, still safe)
+        case _ => i * 37 - 100
+      }
+      i += 1
+    }
+    out
+  }
+
+  private def expectedMicros(values: Array[Int]): Array[Long] =
+    values.map(d => DateTimeUtils.daysToMicros(d, ZoneOffset.UTC))
+
+  for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) {
+    test(s"DateToTimestampNTZUpdater produces correct UTC micros (total=$n)") {
+      val input = dateSampleValues(n)
+      assert(readViaDateToTimestampNTZUpdater(input) === expectedMicros(input))
+    }
+  }
+
+  test("DateToTimestampNTZUpdater: readValue converts a single date-day to UTC 
micros") {
+    // Same rationale as the IntegerToLongUpdater readValue test: the 
def-level-decoder's
+    // run-of-1 path calls `readInteger()` directly rather than the bulk 
method.
+    val input = Array(0, 1, -1, 19366, -25567)
+    val fac = newFactory(int32DateDescriptor)
+    val updater = fac.getUpdater(int32DateDescriptor, 
DataTypes.TimestampNTZType)
+    val out = new OnHeapColumnVector(input.length, DataTypes.TimestampNTZType)
+    val reader = newPlainReader(plainIntBytes(input), input.length)
+    var i = 0
+    while (i < input.length) {
+      updater.readValue(i, out, reader)
+      i += 1
+    }
+    val actual = (0 until input.length).map(out.getLong).toArray
+    assert(actual === input.map(d => DateTimeUtils.daysToMicros(d, 
ZoneOffset.UTC)))
+  }
+
+  test("DateToTimestampNTZUpdater: epoch and signed day-counts produce 
expected micros") {
+    // Pins reference micros for a handful of well-known dates so the 
conversion semantics
+    // are visible at unit-test level without relying on DateTimeUtils for the 
expected
+    // side.
+    val input = Array(0, 1, -1)
+    val expected = Array(
+      0L,                  // 1970-01-01 00:00:00 UTC
+      86_400_000_000L,     // 1970-01-02 00:00:00 UTC
+      -86_400_000_000L)    // 1969-12-31 00:00:00 UTC
+    assert(readViaDateToTimestampNTZUpdater(input) === expected)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to