This is an automated email from the ASF dual-hosted git repository.
LuciferYang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 19073d8ac499 [SPARK-56804][SQL] Add bulk read+convert path for DATE to
TimestampNTZ Parquet vector updater
19073d8ac499 is described below
commit 19073d8ac499df9ddccdc086840eb545c4c0a257
Author: YangJie <[email protected]>
AuthorDate: Tue May 19 18:07:14 2026 +0800
[SPARK-56804][SQL] Add bulk read+convert path for DATE to TimestampNTZ
Parquet vector updater
### What changes were proposed in this pull request?
Extend the bulk-read pattern (SPARK-56791 / 56801 / 56802 / 56803) to
`DateToTimestampNTZUpdater`. Add a `readIntegersAsTimestampMicros` default
method on `VectorizedValuesReader`, override it in
`VectorizedPlainValuesReader` to fetch source bytes once via `getBuffer(total *
4)` and run a tight conversion loop, and reduce
`DateToTimestampNTZUpdater.readValues` to a one-line delegation. Scope: UTC,
`CORRECTED` rebase mode; the `LEGACY` / `EXCEPTION` rebase variants (handled by
`DateToT [...]
### Why are the changes needed?
This was first proposed in #55855 and closed because benchmarks showed no
measurable gain -- the per-row work was dominated by
`DateTimeUtils.daysToMicros`'s `LocalDate` / `ZonedDateTime` / `Instant`
allocation chain, swamping the savings from collapsing N `getBuffer(4)` slice
allocations.
SPARK-56874 fixed that by fast-pathing `daysToMicros` at `ZoneOffset.UTC`
to `Math.multiplyExact(days.toLong, MICROS_PER_DAY)`. With the conversion now
cheap, the bulk-read savings become visible:
| JDK | Master baseline | With this PR | Speedup |
|---|---|---|---|
| 17 | 2.8 ns/row (357.5 M/s) | 1.7 ns/row (605.2 M/s) | ~1.7x |
| 21 | 2.7 ns/row (366.1 M/s) | 1.1 ns/row (934.8 M/s) | ~2.5x |
| 25 | 2.6 ns/row (378.3 M/s) | 1.1 ns/row (884.9 M/s) | ~2.4x |
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests in `ParquetVectorUpdaterSuite` for
`readIntegersAsTimestampMicros` (various batch sizes, single-value `readValue`,
UTC conversion semantics). New integration test in `ParquetIOSuite` for INT32
DATE -> `TimestampNTZType` through the vectorized reader with both
dictionary-encoded and plain pages. Benchmark numbers above are from GHA.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #55971 from LuciferYang/SPARK-56804-date-to-tsntz.
Authored-by: YangJie <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
...ParquetVectorUpdaterBenchmark-jdk21-results.txt | 46 ++++++------
...ParquetVectorUpdaterBenchmark-jdk25-results.txt | 44 +++++------
.../ParquetVectorUpdaterBenchmark-results.txt | 60 +++++++--------
.../parquet/ParquetVectorUpdaterFactory.java | 4 +-
.../parquet/VectorizedPlainValuesReader.java | 15 ++++
.../parquet/VectorizedValuesReader.java | 28 +++++++
.../datasources/parquet/ParquetIOSuite.scala | 63 ++++++++++++++++
.../parquet/ParquetVectorUpdaterSuite.scala | 85 ++++++++++++++++++++++
8 files changed, 267 insertions(+), 78 deletions(-)
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
index 9e86eacb0e8c..07373d576633 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 17004.4 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3746.5 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1681.2 0.6 0.1X
-IntegerUpdater 0 0
0 10290.1 0.1 0.6X
-LongUpdater 0 0
0 3875.9 0.3 0.2X
-FloatUpdater 0 0
0 10148.5 0.1 0.6X
-DoubleUpdater 0 0
0 5141.3 0.2 0.3X
-BinaryUpdater 15 15
0 70.7 14.1 0.0X
+BooleanUpdater 0 0
0 16990.6 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3765.0 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1682.9 0.6 0.1X
+IntegerUpdater 0 0
0 7756.2 0.1 0.5X
+LongUpdater 0 0
0 3870.4 0.3 0.2X
+FloatUpdater 0 0
0 7758.5 0.1 0.5X
+DoubleUpdater 0 0
0 3875.9 0.3 0.2X
+BinaryUpdater 15 15
0 70.4 14.2 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 6237.3 0.2 1.0X
-IntegerToDoubleUpdater 0 0
0 6117.3 0.2 1.0X
-FloatToDoubleUpdater 0 0
0 2526.7 0.4 0.4X
-DateToTimestampNTZUpdater 3 3
0 366.1 2.7 0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0
0 5126.2 0.2 0.8X
+IntegerToLongUpdater 0 0
0 5133.8 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 6090.4 0.2 1.2X
+FloatToDoubleUpdater 0 0
0 2527.1 0.4 0.5X
+DateToTimestampNTZUpdater 1 1
0 934.8 1.1 0.2X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0
0 5108.5 0.2 1.0X
================================================================================================
@@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3640.1 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2281.8 0.4 0.6X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3263.0 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2282.0 0.4 0.7X
LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 420.5 2.4 0.1X
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5141.0 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 16 17
0 63.8 15.7 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5112.2 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 16 17
0 63.9 15.7 0.0X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10288.1 0.1 1.0X
-LongToDecimalUpdater 0 0
0 3872.9 0.3 0.4X
-FixedLenByteArrayToDecimalUpdater 21 21
1 50.2 19.9 0.0X
+IntegerToDecimalUpdater 0 0
0 7750.4 0.1 1.0X
+LongToDecimalUpdater 0 0
0 3866.5 0.3 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 50.1 19.9 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 20
21 2 51.8 19.3 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.2 6.2 3.1X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 133.3 7.5 2.6X
+FixedLenByteArrayUpdater (len=16 -> Binary) 20
20 0 51.7 19.3 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 1 160.1 6.2 3.1X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 133.2 7.5 2.6X
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
index aed60eaf5136..e03dae8c072a 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 17126.9 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3721.3 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1662.6 0.6 0.1X
-IntegerUpdater 0 0
0 10216.0 0.1 0.6X
-LongUpdater 0 0
0 5150.9 0.2 0.3X
-FloatUpdater 0 0
0 10313.5 0.1 0.6X
-DoubleUpdater 0 0
0 5147.6 0.2 0.3X
-BinaryUpdater 16 16
0 66.4 15.1 0.0X
+BooleanUpdater 0 0
0 17171.8 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3679.8 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1662.3 0.6 0.1X
+IntegerUpdater 0 0
0 10261.9 0.1 0.6X
+LongUpdater 0 0
0 5130.7 0.2 0.3X
+FloatUpdater 0 0
0 10255.9 0.1 0.6X
+DoubleUpdater 0 0
0 5127.2 0.2 0.3X
+BinaryUpdater 15 16
0 67.7 14.8 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 5428.8 0.2 1.0X
-IntegerToDoubleUpdater 0 0
0 5132.3 0.2 0.9X
-FloatToDoubleUpdater 0 0
0 3199.4 0.3 0.6X
-DateToTimestampNTZUpdater 3 3
1 378.3 2.6 0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0
0 6548.3 0.2 1.2X
+IntegerToLongUpdater 0 0
0 6438.7 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 6441.2 0.2 1.0X
+FloatToDoubleUpdater 0 0
0 3199.5 0.3 0.5X
+DateToTimestampNTZUpdater 1 1
0 884.9 1.1 0.1X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0
0 6713.8 0.1 1.0X
================================================================================================
@@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3097.0 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2286.8 0.4 0.7X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3664.5 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2668.7 0.4 0.7X
LongAsMicrosUpdater (TIMESTAMP_MILLIS) 3 3
0 371.3 2.7 0.1X
@@ -52,7 +52,7 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5119.0 0.2 1.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 6183.9 0.2 1.0X
UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 17
0 60.4 16.6 0.0X
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 7753.9 0.1 1.0X
-LongToDecimalUpdater 0 0
0 3880.7 0.3 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 50.9 19.6 0.0X
+IntegerToDecimalUpdater 0 0
0 10268.1 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5122.2 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 50.9 19.7 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1013-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 21
21 1 50.5 19.8 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.7 6.5 3.0X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.8 7.8 2.5X
+FixedLenByteArrayUpdater (len=16 -> Binary) 21
21 1 50.3 19.9 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.7 6.6 3.0X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.7 7.8 2.5X
diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
index a0ad4843991b..828e68578877 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
@@ -3,17 +3,17 @@ Identity Updaters
================================================================================================
OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 14742.5 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3584.0 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1824.8 0.5 0.1X
-IntegerUpdater 0 0
0 8346.1 0.1 0.6X
-LongUpdater 0 0
0 4103.9 0.2 0.3X
-FloatUpdater 0 0
0 8215.2 0.1 0.6X
-DoubleUpdater 0 0
0 4141.1 0.2 0.3X
-BinaryUpdater 18 18
0 58.9 17.0 0.0X
+BooleanUpdater 0 0
0 14640.0 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3686.8 0.3 0.3X
+ShortUpdater (INT32 -> Short) 1 1
0 2054.1 0.5 0.1X
+IntegerUpdater 0 0
0 7759.1 0.1 0.5X
+LongUpdater 0 0
0 3876.1 0.3 0.3X
+FloatUpdater 0 0
0 7762.5 0.1 0.5X
+DoubleUpdater 0 0
0 5123.2 0.2 0.3X
+BinaryUpdater 15 15
0 70.1 14.3 0.0X
================================================================================================
@@ -21,14 +21,14 @@ Type-converting Updaters
================================================================================================
OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 1 1
0 1129.8 0.9 1.0X
-IntegerToDoubleUpdater 1 1
0 1365.8 0.7 1.2X
-FloatToDoubleUpdater 1 1
0 1284.3 0.8 1.1X
-DateToTimestampNTZUpdater 3 3
0 357.5 2.8 0.3X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 1 1
0 1136.5 0.9 1.0X
+IntegerToLongUpdater 1 1
0 1281.0 0.8 1.0X
+IntegerToDoubleUpdater 1 1
0 1550.0 0.6 1.2X
+FloatToDoubleUpdater 1 1
0 1419.0 0.7 1.1X
+DateToTimestampNTZUpdater 2 2
0 605.2 1.7 0.5X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 1 1
0 1285.1 0.8 1.0X
================================================================================================
@@ -36,12 +36,12 @@ Rebase Updaters
================================================================================================
OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2180.9 0.5 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 1744.3 0.6 0.8X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 421.0 2.4 0.2X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2662.8 0.4 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2084.1 0.5 0.8X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.8 2.2 0.2X
================================================================================================
@@ -49,11 +49,11 @@ Unsigned Updaters
================================================================================================
OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 965.9 1.0 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
0 58.6 17.1 0.1X
+UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1094.1 0.9 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 17
0 61.0 16.4 0.1X
================================================================================================
@@ -61,12 +61,12 @@ Decimal Updaters
================================================================================================
OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 8299.8 0.1 1.0X
-LongToDecimalUpdater 0 0
0 4106.5 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 24 24
1 43.8 22.8 0.0X
+IntegerToDecimalUpdater 0 0
0 10261.0 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5118.9 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 51.0 19.6 0.0X
================================================================================================
@@ -74,11 +74,11 @@ FixedLenByteArray Updaters
================================================================================================
OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1013-azure
-AMD EPYC 9V74 80-Core Processor
+AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 22
23 0 47.0 21.3 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 6
6 1 166.4 6.0 3.5X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
9 1 125.1 8.0 2.7X
+FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 0 55.3 18.1 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.2 6.2 2.9X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9
9 0 123.3 8.1 2.2X
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 76cedbbef3b4..c088f5f2844b 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -429,9 +429,7 @@ public class ParquetVectorUpdaterFactory {
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
- for (int i = 0; i < total; ++i) {
- readValue(offset + i, values, valuesReader);
- }
+ valuesReader.readIntegersAsTimestampMicros(total, values, offset);
}
@Override
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 9249fab7915c..23207e7db357 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.time.ZoneOffset;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.values.ValuesReader;
@@ -26,6 +27,7 @@ import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.spark.SparkUnsupportedOperationException;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
import org.apache.spark.sql.execution.datasources.DataSourceUtils;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
@@ -204,6 +206,19 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
}
}
+ @Override
+ public final void readIntegersAsTimestampMicros(
+ int total, WritableColumnVector c, int rowId) {
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+ // Per-element conversion calls into `DateTimeUtils.daysToMicros`, which
is `days *
+ // MICROS_PER_DAY` for UTC plus an overflow check via
`Math.multiplyExact`. No
+ // `hasArray` bulk-copy path because source and target have different
widths.
+ for (int i = 0; i < total; i += 1) {
+ c.putLong(rowId + i, DateTimeUtils.daysToMicros(buffer.getInt(),
ZoneOffset.UTC));
+ }
+ }
+
// A fork of `readIntegers` to rebase the date values. For performance
reasons, this method
// iterates the values twice: check if we need to rebase first, then go to
the optimized branch
// if rebase is not needed.
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index c62f7bcec8c3..bf6a1c6a0388 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -18,7 +18,9 @@
package org.apache.spark.sql.execution.datasources.parquet;
import java.nio.ByteBuffer;
+import java.time.ZoneOffset;
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.parquet.io.api.Binary;
@@ -136,6 +138,32 @@ public interface VectorizedValuesReader {
}
}
+ /**
+ * Reads {@code total} INT32 date-day values (days since 1970-01-01,
Proleptic Gregorian),
+ * converts each to TimestampNTZ micros at UTC via
+ * {@link DateTimeUtils#daysToMicros(int, java.time.ZoneId)}, and writes
them into
+ * {@code c} starting at {@code c[rowId]}. Used by the type-converting
updater that
+ * reads parquet INT32 DATE columns into Spark {@code TimestampNTZType}
targets in
+ * {@code CORRECTED} datetime-rebase mode. The {@code LEGACY}/{@code
EXCEPTION} rebase
+ * variants are out of scope for this method.
+ *
+ * <p>The default implementation is a per-row loop that calls
+ * {@code DateTimeUtils.daysToMicros} per element; it is algorithmically
equivalent to
+ * the legacy per-row Updater path but the per-element conversion call
dominates the
+ * loop, so the speedup from overriding this method is more modest than for
the pure
+ * primitive-cast siblings ({@link #readIntegersAsLongs}, {@link
#readIntegersAsDoubles}).
+ * Subclasses backed by contiguous bulk storage (e.g. PLAIN encoding via
+ * {@link VectorizedPlainValuesReader}) should override to read source bytes
once and run
+ * a tight in-method conversion loop, avoiding {@code total} virtual
dispatches on
+ * {@link #readInteger()}. Readers without an override preserve correctness
but gain no
+ * speedup.
+ */
+ default void readIntegersAsTimestampMicros(int total, WritableColumnVector
c, int rowId) {
+ for (int i = 0; i < total; i += 1) {
+ c.putLong(rowId + i, DateTimeUtils.daysToMicros(readInteger(),
ZoneOffset.UTC));
+ }
+ }
+
void readBinary(int total, WritableColumnVector c, int rowId);
void readGeometry(int total, WritableColumnVector c, int rowId);
void readGeography(int total, WritableColumnVector c, int rowId);
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index c25522161443..0fc32b15d833 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -2073,6 +2073,69 @@ class ParquetIOSuite extends ParquetTest with
SharedSparkSession {
}
}
+ test("DATE -> TimestampNTZ widening end-to-end via vectorized read path") {
+ // Round-trips a DATE Parquet file read back as TimestampNTZType,
exercising
+ // DateToTimestampNTZUpdater on the vectorized path in CORRECTED rebase
mode.
+ // Sample mixes epoch, recent dates, pre-epoch, far-past and far-future
days; each
+ // day-count converts to UTC midnight via `DateTimeUtils.daysToMicros`.
Both
+ // REQUIRED and OPTIONAL columns are covered so that `readValue` and
`readValues`
+ // are both invoked.
+ withSQLConf(
+ SQLConf.PARQUET_REBASE_MODE_IN_READ.key -> "CORRECTED",
+ SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
+ withTempPath { file =>
+ val n = 5000
+ def sampleAt(i: Int): java.sql.Date = i % 6 match {
+ case 0 => java.sql.Date.valueOf("1970-01-01")
+ case 1 => java.sql.Date.valueOf("1900-01-01")
+ case 2 => java.sql.Date.valueOf("2023-01-01")
+ case 3 => java.sql.Date.valueOf("0001-01-01")
+ case 4 => java.sql.Date.valueOf("4707-11-28")
+ case _ =>
+ // Cover every month (1..12) and a few years to exercise
month-boundary
+ // behavior, not just months 1..9 the previous string-formatting
allowed.
+ val year = 2020 + (i % 80)
+ val month = 1 + (i % 12)
+ java.sql.Date.valueOf(f"$year%04d-$month%02d-15")
+ }
+
+ val nonNullData = (0 until n).map(i => Row(sampleAt(i)))
+ // Every 7th row is null; mixes value runs and null runs at sub-batch
lengths.
+ val nullableData = (0 until n).map { i =>
+ if (i % 7 == 0) Row(null) else Row(sampleAt(i))
+ }
+
+ val nonNullWriteSchema = new StructType().add("v", DateType, nullable
= false)
+ val nonNullReadSchema = new StructType().add("v", TimestampNTZType,
nullable = false)
+ val nullableWriteSchema = new StructType().add("v", DateType, nullable
= true)
+ val nullableReadSchema = new StructType().add("v", TimestampNTZType,
nullable = true)
+
+ val nonNullPath = new java.io.File(file, "nonnull").getCanonicalPath
+ val nullablePath = new java.io.File(file, "nullable").getCanonicalPath
+ spark.createDataFrame(spark.sparkContext.parallelize(nonNullData, 4),
nonNullWriteSchema)
+ .write.parquet(nonNullPath)
+ spark.createDataFrame(
+ spark.sparkContext.parallelize(nullableData, 4), nullableWriteSchema)
+ .write.parquet(nullablePath)
+
+ def dateToNtz(d: java.sql.Date): java.time.LocalDateTime = {
+ val days = d.toLocalDate.toEpochDay.toInt
+ val micros = DateTimeUtils.daysToMicros(days,
java.time.ZoneOffset.UTC)
+ DateTimeUtils.microsToLocalDateTime(micros)
+ }
+ val expectedNonNull = nonNullData.map(r =>
Row(dateToNtz(r.getDate(0))))
+ val expectedNullable = nullableData.map { r =>
+ if (r.isNullAt(0)) Row(null) else Row(dateToNtz(r.getDate(0)))
+ }
+
+ withAllParquetReaders {
+
checkAnswer(spark.read.schema(nonNullReadSchema).parquet(nonNullPath),
expectedNonNull)
+
checkAnswer(spark.read.schema(nullableReadSchema).parquet(nullablePath),
expectedNullable)
+ }
+ }
+ }
+ }
+
test("SPARK-56872: INT64 DECIMAL into 32-bit Decimal column with dictionary
fallback") {
// `DowncastLongUpdater.decodeSingleDictionaryId` only runs when the
vectorized reader has
// to eagerly drain buffered dictionary IDs, which happens when parquet-mr
writes the
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
index 65a4d90750b8..13e9376fb6fa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
@@ -27,6 +27,7 @@ import
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.types._
@@ -37,6 +38,7 @@ import org.apache.spark.sql.types._
* - `IntegerToDoubleUpdater` (INT32 -> Double)
* - `FloatToDoubleUpdater` (FLOAT -> Double)
* - `DowncastLongUpdater` (INT64 DECIMAL -> 32-bit Decimal via narrowing
cast)
+ * - `DateToTimestampNTZUpdater` (INT32 DATE -> TimestampNTZ micros at UTC)
*
* Covers boundary batch lengths, sign-extension on negative INT32 values, the
singular
* `readValue` path, and the factory's long-decimal dispatch
@@ -408,4 +410,87 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
val actual = readViaDowncastLongUpdater(desc, targetType, input)
assert(actual === Array[Int](-999_999_999, -1, 0, 1, 999_999_999))
}
+
+ // ---- DateToTimestampNTZUpdater: INT32 DATE -> TimestampNTZ micros at UTC
----
+
+ // INT32 column descriptor annotated as DATE; routes the factory through the
+ // `sparkType == TimestampNTZType && isDateTypeMatched(descriptor)` branch.
+ private val int32DateDescriptor: ColumnDescriptor = {
+ val pt = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL)
+ .as(LogicalTypeAnnotation.dateType())
+ .named("col")
+ new ColumnDescriptor(Array("col"), pt, 0, 1)
+ }
+
+ // Reads `values.length` INT32 day-counts through
`DateToTimestampNTZUpdater.readValues`
+ // and returns the resulting micros column.
+ private def readViaDateToTimestampNTZUpdater(values: Array[Int]):
Array[Long] = {
+ val fac = newFactory(int32DateDescriptor)
+ val updater = fac.getUpdater(int32DateDescriptor,
DataTypes.TimestampNTZType)
+ val out = new OnHeapColumnVector(values.length.max(1),
DataTypes.TimestampNTZType)
+ val reader = newPlainReader(plainIntBytes(values), values.length)
+ updater.readValues(values.length, 0, out, reader)
+ val result = new Array[Long](values.length)
+ var i = 0
+ while (i < values.length) { result(i) = out.getLong(i); i += 1 }
+ result
+ }
+
+ // Realistic day-count sample: epoch, recent dates, pre-epoch, far-past,
far-future. All
+ // values are well within `Math.multiplyExact(days * 86400, 1_000_000)`'s
safe range.
+ private def dateSampleValues(n: Int): Array[Int] = {
+ val out = new Array[Int](n)
+ var i = 0
+ while (i < n) {
+ out(i) = i % 6 match {
+ case 0 => 0 // 1970-01-01 (epoch)
+ case 1 => -25567 // ~1900-01-01
+ case 2 => 19366 // ~2023-01-01
+ case 3 => -719162 // ~0001-01-01
+ case 4 => 1000000 // ~4707-11-28 (far future, still safe)
+ case _ => i * 37 - 100
+ }
+ i += 1
+ }
+ out
+ }
+
+ private def expectedMicros(values: Array[Int]): Array[Long] =
+ values.map(d => DateTimeUtils.daysToMicros(d, ZoneOffset.UTC))
+
+ for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) {
+ test(s"DateToTimestampNTZUpdater produces correct UTC micros (total=$n)") {
+ val input = dateSampleValues(n)
+ assert(readViaDateToTimestampNTZUpdater(input) === expectedMicros(input))
+ }
+ }
+
+ test("DateToTimestampNTZUpdater: readValue converts a single date-day to UTC
micros") {
+ // Same rationale as the IntegerToLongUpdater readValue test: the
def-level-decoder's
+ // run-of-1 path calls `readInteger()` directly rather than the bulk
method.
+ val input = Array(0, 1, -1, 19366, -25567)
+ val fac = newFactory(int32DateDescriptor)
+ val updater = fac.getUpdater(int32DateDescriptor,
DataTypes.TimestampNTZType)
+ val out = new OnHeapColumnVector(input.length, DataTypes.TimestampNTZType)
+ val reader = newPlainReader(plainIntBytes(input), input.length)
+ var i = 0
+ while (i < input.length) {
+ updater.readValue(i, out, reader)
+ i += 1
+ }
+ val actual = (0 until input.length).map(out.getLong).toArray
+ assert(actual === input.map(d => DateTimeUtils.daysToMicros(d,
ZoneOffset.UTC)))
+ }
+
+ test("DateToTimestampNTZUpdater: epoch and signed day-counts produce
expected micros") {
+ // Pins reference micros for a handful of well-known dates so the
conversion semantics
+ // are visible at unit-test level without relying on DateTimeUtils for the
expected
+ // side.
+ val input = Array(0, 1, -1)
+ val expected = Array(
+ 0L, // 1970-01-01 00:00:00 UTC
+ 86_400_000_000L, // 1970-01-02 00:00:00 UTC
+ -86_400_000_000L) // 1969-12-31 00:00:00 UTC
+ assert(readViaDateToTimestampNTZUpdater(input) === expected)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]