This is an automated email from the ASF dual-hosted git repository.
LuciferYang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4a34226912a4 [SPARK-56803][SQL] Add bulk read+narrow path for INT64
DECIMAL to 32-bit Decimal Parquet vector updater
4a34226912a4 is described below
commit 4a34226912a4e319f7ea8775b0e2c31ec90d4411
Author: YangJie <[email protected]>
AuthorDate: Sun May 17 01:59:51 2026 +0800
[SPARK-56803][SQL] Add bulk read+narrow path for INT64 DECIMAL to 32-bit
Decimal Parquet vector updater
### What changes were proposed in this pull request?
Extend the bulk read+widen pattern introduced in SPARK-56791 to
`DowncastLongUpdater` (parquet INT64 + DECIMAL(p<=9) read into a Spark 32-bit
`DecimalType`).
A new `readLongsAsInts` default method on `VectorizedValuesReader` does the
per-row fallback. `VectorizedPlainValuesReader` overrides it to fetch source
bytes once via `getBuffer(total * 8)` and run a tight in-method conversion
loop. `DowncastLongUpdater.readValues` becomes a one-line delegation. The
narrowing is Java's primitive long-to-int cast (`(int) buffer.getLong()`),
which discards the high 32 bits; this is non-lossy in practice because
Parquet's DECIMAL(p<=9) encoding bounds t [...]
### Why are the changes needed?
`DowncastLongUpdater.readValues` allocates a fresh `ByteBuffer` slice
inside `getBuffer(8)` for every element on the legacy path, and that allocation
dominates the loop. Collapsing N allocations into one is the same win
SPARK-56791 delivered for the INT32 -> Long sibling, with the gain again
amplifying on newer JDKs where escape analysis better optimizes the tight loop:
| JDK | Baseline | After | Speedup |
|----:|---------:|----------:|--------:|
| 17 | 487.0 M/s | 1287.3 M/s | 2.64x |
| 21 | 455.6 M/s | 5828.5 M/s | 12.79x |
| 25 | 455.5 M/s | 6568.8 M/s | 14.42x |
Peer Updaters in the same benchmark group hold within run-to-run noise,
confirming the change is local to `DowncastLongUpdater`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests in `ParquetVectorUpdaterSuite` cover boundary batch lengths
(0, 1, 7, 8, 9, 17, 1024, 4097), the singular `readValue` path, and sign
preservation for in-range INT64 values that bound the DECIMAL(9, _) wire format.
A new end-to-end test in `ParquetIOSuite` uses `parquet-mr`'s low-level API
(`MessageTypeParser` + `SimpleGroup`) to write a file with `int64 v (DECIMAL(9,
2))` — Spark's own writer produces INT32-backed storage for precision <= 9, so
this wire format only arises from other writers (Hive, Impala). The test reads
the file back as `DecimalType(9, 2)` and confirms the values round-trip exactly.
Benchmark results on JDK 17, 21, and 25 are committed on the branch.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
Closes #55853 from LuciferYang/SPARK-56803-downcast-long.
Authored-by: YangJie <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
...ParquetVectorUpdaterBenchmark-jdk21-results.txt | 46 +++++-----
...ParquetVectorUpdaterBenchmark-jdk25-results.txt | 42 +++++-----
.../ParquetVectorUpdaterBenchmark-results.txt | 46 +++++-----
.../parquet/ParquetVectorUpdaterFactory.java | 4 +-
.../parquet/VectorizedPlainValuesReader.java | 12 +++
.../parquet/VectorizedValuesReader.java | 21 +++++
.../datasources/parquet/ParquetIOSuite.scala | 51 +++++++++++
.../parquet/ParquetVectorUpdaterSuite.scala | 98 ++++++++++++++++++++++
8 files changed, 250 insertions(+), 70 deletions(-)
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
index 7d09c6fd187e..b120f8d412a3 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
@@ -7,13 +7,13 @@ AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
BooleanUpdater 0 0
0 17004.4 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3758.6 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1678.4 0.6 0.1X
-IntegerUpdater 0 0
0 10276.1 0.1 0.6X
-LongUpdater 0 0
0 5112.7 0.2 0.3X
-FloatUpdater 0 0
0 10293.3 0.1 0.6X
-DoubleUpdater 0 0
0 3870.8 0.3 0.2X
-BinaryUpdater 15 15
1 71.3 14.0 0.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3748.2 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1683.0 0.6 0.1X
+IntegerUpdater 0 0
0 10179.2 0.1 0.6X
+LongUpdater 0 0
0 5075.5 0.2 0.3X
+FloatUpdater 0 0
0 10201.9 0.1 0.6X
+DoubleUpdater 0 0
0 5140.6 0.2 0.3X
+BinaryUpdater 15 15
0 71.3 14.0 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 5145.1 0.2 1.0X
-IntegerToDoubleUpdater 0 0
0 5120.5 0.2 1.0X
-FloatToDoubleUpdater 0 0
0 2527.2 0.4 0.5X
-DateToTimestampNTZUpdater 29 29
0 36.3 27.5 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.6 2.2 0.1X
+IntegerToLongUpdater 0 0
0 6208.4 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 6155.9 0.2 1.0X
+FloatToDoubleUpdater 0 0
0 2526.5 0.4 0.4X
+DateToTimestampNTZUpdater 27 28
1 38.6 25.9 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0
0 5828.5 0.2 0.9X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3647.4 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2668.6 0.4 0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 420.3 2.4 0.1X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3651.5 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2621.2 0.4 0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 420.5 2.4 0.1X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5089.6 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
1 60.2 16.6 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5879.9 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 16 17
0 63.8 15.7 0.0X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 7746.5 0.1 1.0X
-LongToDecimalUpdater 0 0
0 3874.1 0.3 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 50.7 19.7 0.0X
+IntegerToDecimalUpdater 0 0
0 10144.6 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5061.5 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 20 21
1 51.3 19.5 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 19
20 2 54.1 18.5 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 3.0X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 133.2 7.5 2.5X
+FixedLenByteArrayUpdater (len=16 -> Binary) 20
21 1 51.8 19.3 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.2 6.2 3.1X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 133.2 7.5 2.6X
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
index 627835735a08..6dfd2fdadc25 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 17138.1 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3678.2 0.3 0.2X
+BooleanUpdater 0 0
0 17160.2 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3686.1 0.3 0.2X
ShortUpdater (INT32 -> Short) 1 1
0 1662.8 0.6 0.1X
-IntegerUpdater 0 0
0 10231.8 0.1 0.6X
-LongUpdater 0 0
0 5103.2 0.2 0.3X
-FloatUpdater 0 0
0 10203.9 0.1 0.6X
-DoubleUpdater 0 0
0 5105.4 0.2 0.3X
-BinaryUpdater 17 17
0 62.8 15.9 0.0X
+IntegerUpdater 0 0
0 10282.0 0.1 0.6X
+LongUpdater 0 0
0 5151.9 0.2 0.3X
+FloatUpdater 0 0
0 10306.3 0.1 0.6X
+DoubleUpdater 0 0
0 5149.1 0.2 0.3X
+BinaryUpdater 15 16
0 67.8 14.8 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 6239.9 0.2 1.0X
-IntegerToDoubleUpdater 0 0
0 6382.6 0.2 1.0X
-FloatToDoubleUpdater 0 0
0 2570.7 0.4 0.4X
-DateToTimestampNTZUpdater 26 26
0 40.5 24.7 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 637.7 1.6 0.1X
+IntegerToLongUpdater 0 0
0 6390.8 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 6415.4 0.2 1.0X
+FloatToDoubleUpdater 0 0
0 3196.5 0.3 0.5X
+DateToTimestampNTZUpdater 29 29
0 36.6 27.3 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0
0 6568.8 0.2 1.0X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3660.5 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2663.6 0.4 0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 521.3 1.9 0.1X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3665.9 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2652.9 0.4 0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 3 3
0 371.3 2.7 0.1X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 6196.3 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
1 60.3 16.6 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5111.6 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
0 60.4 16.6 0.0X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10247.8 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5125.7 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 50.7 19.7 0.0X
+IntegerToDecimalUpdater 0 0
0 10205.8 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5111.4 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
2 50.9 19.6 0.0X
================================================================================================
@@ -77,7 +77,7 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 21
21 1 51.0 19.6 1.0X
+FixedLenByteArrayUpdater (len=16 -> Binary) 21
22 1 50.4 19.8 1.0X
FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.6 6.6 3.0X
FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.7 7.8 2.5X
diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
index 5e2c31d2b566..5918db9f759b 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 14526.2 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3679.3 0.3 0.3X
-ShortUpdater (INT32 -> Short) 1 1
0 2054.1 0.5 0.1X
-IntegerUpdater 0 0
0 10178.0 0.1 0.7X
-LongUpdater 0 0
0 5054.4 0.2 0.3X
-FloatUpdater 0 0
0 10212.8 0.1 0.7X
-DoubleUpdater 0 0
0 5051.2 0.2 0.3X
-BinaryUpdater 15 15
0 68.4 14.6 0.0X
+BooleanUpdater 0 0
0 14617.4 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3667.7 0.3 0.3X
+ShortUpdater (INT32 -> Short) 1 1
0 2048.9 0.5 0.1X
+IntegerUpdater 0 0
0 10281.9 0.1 0.7X
+LongUpdater 0 0
0 5138.0 0.2 0.4X
+FloatUpdater 0 0
0 7742.9 0.1 0.5X
+DoubleUpdater 0 0
0 3863.4 0.3 0.3X
+BinaryUpdater 15 15
0 70.2 14.2 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 1 1
0 1280.6 0.8 1.0X
-IntegerToDoubleUpdater 1 1
0 1537.9 0.7 1.2X
-FloatToDoubleUpdater 1 1
0 1418.8 0.7 1.1X
-DateToTimestampNTZUpdater 36 36
0 29.5 33.9 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.3 2.2 0.4X
+IntegerToLongUpdater 1 1
0 1279.7 0.8 1.0X
+IntegerToDoubleUpdater 1 1
0 1544.8 0.6 1.2X
+FloatToDoubleUpdater 1 1
0 1417.9 0.7 1.1X
+DateToTimestampNTZUpdater 36 36
1 29.5 33.9 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 1 1
0 1287.3 0.8 1.0X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2407.3 0.4 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2030.8 0.5 0.8X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.4 2.2 0.2X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2599.8 0.4 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2092.2 0.5 0.8X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.7 2.2 0.2X
================================================================================================
@@ -52,7 +52,7 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1093.1 0.9 1.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1091.2 0.9 1.0X
UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
0 59.1 16.9 0.1X
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10195.9 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5049.2 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 51.0 19.6 0.0X
+IntegerToDecimalUpdater 0 0
0 10241.7 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5118.1 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 51.1 19.6 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 0 54.9 18.2 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 2.9X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9
9 0 123.0 8.1 2.2X
+FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 0 55.1 18.2 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.2 6.2 2.9X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9
9 0 123.1 8.1 2.2X
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index edca5528a190..76cedbbef3b4 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -684,9 +684,7 @@ public class ParquetVectorUpdaterFactory {
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
- for (int i = 0; i < total; ++i) {
- values.putInt(offset + i, (int) valuesReader.readLong());
- }
+ valuesReader.readLongsAsInts(total, values, offset);
}
@Override
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index d91ba5e2b87d..9249fab7915c 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -192,6 +192,18 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
}
}
+ @Override
+ public final void readLongsAsInts(int total, WritableColumnVector c, int
rowId) {
+ int requiredBytes = total * 8;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+ // No `hasArray` bulk-copy path: source (int64, 8 bytes) and target
(int32, 4 bytes)
+ // have different widths so a contiguous byte copy is impossible. Matches
the pattern
+ // in `readIntegersAsLongs`.
+ for (int i = 0; i < total; i += 1) {
+ c.putInt(rowId + i, (int) buffer.getLong());
+ }
+ }
+
// A fork of `readIntegers` to rebase the date values. For performance
reasons, this method
// iterates the values twice: check if we need to rebase first, then go to
the optimized branch
// if rebase is not needed.
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index a90e9bf01c81..c62f7bcec8c3 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -115,6 +115,27 @@ public interface VectorizedValuesReader {
}
}
+ /**
+ * Reads {@code total} INT64 values, narrows each to an int via Java's
primitive
+ * long-to-int cast (the high 32 bits are discarded), and writes them into
{@code c}
+ * starting at {@code c[rowId]}. Used by the type-converting updater that
reads parquet
+ * INT64 DECIMAL columns whose Spark target is a 32-bit decimal (precision
<= 9); such
+ * values are guaranteed by Parquet's decimal encoding to fit in int32, so
the
+ * narrowing is non-lossy in practice.
+ *
+ * <p>The default implementation falls back to a per-row read+narrow+write
loop and is
+ * therefore equivalent in cost to the legacy per-row Updater path.
Subclasses backed
+ * by contiguous bulk storage (e.g. PLAIN encoding via {@link
VectorizedPlainValuesReader})
+ * should override to read source bytes once and run a tight in-method
conversion loop,
+ * avoiding {@code total} virtual dispatches on {@link #readLong()}. Readers
without
+ * an override preserve correctness but gain no speedup.
+ */
+ default void readLongsAsInts(int total, WritableColumnVector c, int rowId) {
+ for (int i = 0; i < total; i += 1) {
+ c.putInt(rowId + i, (int) readLong());
+ }
+ }
+
void readBinary(int total, WritableColumnVector c, int rowId);
void readGeometry(int total, WritableColumnVector c, int rowId);
void readGeography(int total, WritableColumnVector c, int rowId);
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 0bbf6a4f5d5a..c25522161443 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -2022,6 +2022,57 @@ class ParquetIOSuite extends ParquetTest with
SharedSparkSession {
}
}
+ test("INT64 DECIMAL -> 32-bit DecimalType narrowing end-to-end via
vectorized read path") {
+ // Round-trips a custom-built parquet file with INT64 + DECIMAL(9, 2)
annotation read
+ // back as DecimalType(9, 2). DECIMAL(9, _) values fit in int32, so the
factory routes
+ // through DowncastLongUpdater (target is 32-bit decimal storage). Spark's
own writer
+ // produces INT32-backed decimal for precision <= 9, so we use
`parquet-mr`'s low-level
+ // API to force INT64 storage, mirroring the wire format that other
writers (e.g. Hive,
+ // Impala) produce.
+ val schema = MessageTypeParser.parseMessageType(
+ """message root {
+ | required int64 v (DECIMAL(9, 2));
+ |}""".stripMargin)
+
+ for (dictEnabled <- Seq(true, false)) {
+ withTempDir { dir =>
+ val tablePath = new Path(s"${dir.getCanonicalPath}/dec.parquet")
+ val numRecords = 5000
+
+ val writer = createParquetWriter(schema, tablePath, dictionaryEnabled
= dictEnabled)
+ (0 until numRecords).foreach { i =>
+ val unscaled = i % 5 match {
+ case 0 => -999_999_999L
+ case 1 => -1L
+ case 2 => 0L
+ case 3 => 999_999_999L
+ case _ => i.toLong * 13L - 7L
+ }
+ val record = new SimpleGroup(schema)
+ record.add(0, unscaled)
+ writer.write(record)
+ }
+ writer.close
+
+ withAllParquetReaders {
+ val readSchema = new StructType().add("v", DecimalType(9, 2),
nullable = false)
+ val df = spark.read.schema(readSchema).parquet(tablePath.toString)
+ val expected = (0 until numRecords).map { i =>
+ val unscaled = i % 5 match {
+ case 0 => -999_999_999L
+ case 1 => -1L
+ case 2 => 0L
+ case 3 => 999_999_999L
+ case _ => i.toLong * 13L - 7L
+ }
+ Row(java.math.BigDecimal.valueOf(unscaled, 2))
+ }
+ checkAnswer(df, expected)
+ }
+ }
+ }
+ }
+
test("SPARK-56872: INT64 DECIMAL into 32-bit Decimal column with dictionary
fallback") {
// `DowncastLongUpdater.decodeSingleDictionaryId` only runs when the
vectorized reader has
// to eagerly drain buffered dictionary IDs, which happens when parquet-mr
writes the
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
index 2ed6f84e68b9..65a4d90750b8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types._
* - `IntegerToLongUpdater` (INT32 -> Long, plus long-decimal dispatch)
* - `IntegerToDoubleUpdater` (INT32 -> Double)
* - `FloatToDoubleUpdater` (FLOAT -> Double)
+ * - `DowncastLongUpdater` (INT64 DECIMAL -> 32-bit Decimal via narrowing
cast)
*
* Covers boundary batch lengths, sign-extension on negative INT32 values, the
singular
* `readValue` path, and the factory's long-decimal dispatch
@@ -79,6 +80,13 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
buf.array()
}
+ private def plainLongBytes(values: Array[Long]): Array[Byte] = {
+ val buf = ByteBuffer.allocate(values.length *
8).order(ByteOrder.LITTLE_ENDIAN)
+ var i = 0
+ while (i < values.length) { buf.putLong(values(i)); i += 1 }
+ buf.array()
+ }
+
private def newPlainReader(bytes: Array[Byte], numValues: Int):
VectorizedPlainValuesReader = {
val r = new VectorizedPlainValuesReader
r.initFromPage(numValues,
ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)))
@@ -260,6 +268,54 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
out
}
+ // ---- DowncastLongUpdater: INT64 DECIMAL(p<=9) -> 32-bit Decimal via
narrowing cast ----
+
+ // INT64 column descriptor annotated as DECIMAL(precision, scale); when both
source and
+ // target precision are <= 9, the factory routes to DowncastLongUpdater
(target is stored
+ // as int32 because `DecimalType.is32BitDecimalType` requires precision <=
9). Parquet
+ // guarantees DECIMAL(p<=9) values fit in int32, so the narrowing `(int)
buffer.getLong()`
+ // is non-lossy in practice.
+ private def int64DecimalDescriptor(precision: Int, scale: Int):
ColumnDescriptor = {
+ val pt = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL)
+ .as(LogicalTypeAnnotation.decimalType(scale, precision))
+ .named("col")
+ new ColumnDescriptor(Array("col"), pt, 0, 1)
+ }
+
+ // Reads `values.length` INT64s through `DowncastLongUpdater.readValues` and
returns the
+ // resulting int column. Caller guarantees values fit in int32.
+ private def readViaDowncastLongUpdater(
+ desc: ColumnDescriptor,
+ targetType: DataType,
+ values: Array[Long]): Array[Int] = {
+ val fac = newFactory(desc)
+ val updater = fac.getUpdater(desc, targetType)
+ val out = new OnHeapColumnVector(values.length.max(1), targetType)
+ val reader = newPlainReader(plainLongBytes(values), values.length)
+ updater.readValues(values.length, 0, out, reader)
+ val result = new Array[Int](values.length)
+ var i = 0
+ while (i < values.length) { result(i) = out.getInt(i); i += 1 }
+ result
+ }
+
+ // Sample of INT64 values guaranteed to fit in int32 (within DECIMAL(9, _)
range).
+ private def downcastSampleValues(n: Int): Array[Long] = {
+ val out = new Array[Long](n)
+ var i = 0
+ while (i < n) {
+ out(i) = i match {
+ case _ if i % 5 == 0 => -999_999_999L // min DECIMAL(9, _) value
+ case _ if i % 5 == 1 => -1L
+ case _ if i % 5 == 2 => 0L
+ case _ if i % 5 == 3 => 999_999_999L // max DECIMAL(9, _) value
+ case _ => i * 13L - 7L
+ }
+ i += 1
+ }
+ out
+ }
+
// Java's float-to-double widening is exact for finite/infinite values and
produces
// some double NaN for a NaN input (the payload may be canonicalized). Use
// `java.lang.Double.compare` to give NaN well-defined equality and to
distinguish
@@ -310,4 +366,46 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
assert(java.lang.Double.doubleToRawLongBits(actual(1)) ===
java.lang.Double.doubleToRawLongBits(0.0d))
}
+
+ for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) {
+ test(s"DowncastLongUpdater produces correct narrowed output (total=$n)") {
+ val desc = int64DecimalDescriptor(precision = 9, scale = 2)
+ val targetType = DataTypes.createDecimalType(9, 2)
+ val input = downcastSampleValues(n)
+ val actual = readViaDowncastLongUpdater(desc, targetType, input)
+ assert(actual === input.map(_.toInt))
+ }
+ }
+
+ test("DowncastLongUpdater: readValue narrows a single INT64 -> int") {
+ // Same rationale as the IntegerToLongUpdater readValue test: the
def-level-decoder's
+ // run-of-1 path calls `readLong()` directly rather than the bulk method.
+ val desc = int64DecimalDescriptor(precision = 9, scale = 2)
+ val targetType = DataTypes.createDecimalType(9, 2)
+ val input = Array(0L, 1L, -1L, 42L, -999_999_999L, 999_999_999L)
+ val fac = newFactory(desc)
+ val updater = fac.getUpdater(desc, targetType)
+ val out = new OnHeapColumnVector(input.length, targetType)
+ val reader = newPlainReader(plainLongBytes(input), input.length)
+ var i = 0
+ while (i < input.length) {
+ updater.readValue(i, out, reader)
+ i += 1
+ }
+ val actual = (0 until input.length).map(out.getInt).toArray
+ assert(actual === input.map(_.toInt))
+ }
+
+ test("DowncastLongUpdater: narrowing cast preserves sign for in-range
values") {
+ // Spot-check that `(int) buffer.getLong()` preserves sign for every INT64
value
+ // bounded by DECIMAL(9, _)'s range. Out-of-range INT64 values (i.e.,
outside
+ // [-999_999_999, 999_999_999]) are not reachable in production because
Parquet's
+ // DECIMAL encoding bounds writer-side; the cast would truncate the high
32 bits and
+ // is documented as such on `readLongsAsInts`.
+ val desc = int64DecimalDescriptor(precision = 9, scale = 0)
+ val targetType = DataTypes.createDecimalType(9, 0)
+ val input = Array(-999_999_999L, -1L, 0L, 1L, 999_999_999L)
+ val actual = readViaDowncastLongUpdater(desc, targetType, input)
+ assert(actual === Array[Int](-999_999_999, -1, 0, 1, 999_999_999))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]