This is an automated email from the ASF dual-hosted git repository.
LuciferYang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1360e5c7fa38 [SPARK-56801][SQL] Add bulk read+widen path for INT32 to
Double Parquet vector updater
1360e5c7fa38 is described below
commit 1360e5c7fa38df1dac55476aba983aa6012b62c4
Author: YangJie <[email protected]>
AuthorDate: Tue May 12 19:58:31 2026 +0800
[SPARK-56801][SQL] Add bulk read+widen path for INT32 to Double Parquet
vector updater
### What changes were proposed in this pull request?
Extend the bulk read+widen pattern introduced in SPARK-56791 to
`IntegerToDoubleUpdater` (parquet INT32 read into Spark `DoubleType`).
A new `readIntegersAsDoubles` default method on `VectorizedValuesReader`
does the per-row fallback. `VectorizedPlainValuesReader` overrides it to fetch
source bytes once via `getBuffer(total * 4)` and run a tight in-method
conversion loop. `IntegerToDoubleUpdater.readValues` becomes a one-line
delegation. The widen is Java's primitive int-to-double conversion, lossless
for all INT32 values.
### Why are the changes needed?
`IntegerToDoubleUpdater.readValues` allocates a fresh `ByteBuffer` slice
inside `getBuffer(4)` for every element on the legacy path, and that allocation
dominates the loop. Collapsing N allocations into one is the same win
SPARK-56791 delivered for the INT32 -> Long sibling, with the gain again
amplifying on newer JDKs where escape analysis better optimizes the tight loop:
| JDK | Baseline | After | Speedup |
|----:|---------:|----------:|--------:|
| 17 | 479.1 M/s | 1475.2 M/s | 3.08x |
| 21 | 531.8 M/s | 6215.0 M/s | 11.68x |
| 25 | 454.6 M/s | 6423.7 M/s | 14.13x |
Peer Updaters in the same benchmark group hold within run-to-run noise,
confirming the change is local to `IntegerToDoubleUpdater`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests in `ParquetVectorUpdaterSuite` cover boundary batch lengths
(0, 1, 7, 8, 9, 17, 1024, 4097) and the singular `readValue` path. A new
end-to-end test in `ParquetIOSuite` round-trips INT32 written to parquet and
read back as `DoubleType`, exercising both REQUIRED columns (no def-levels) and
OPTIONAL columns with interleaved nulls so that `readValue` and `readValues`
are both invoked.
Benchmark results on JDK 17, 21, and 25 are committed on the branch.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
Closes #55795 from LuciferYang/SPARK-56801-int32-to-double.
Authored-by: YangJie <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
---
...ParquetVectorUpdaterBenchmark-jdk21-results.txt | 46 ++++++++---------
...ParquetVectorUpdaterBenchmark-jdk25-results.txt | 48 ++++++++---------
.../ParquetVectorUpdaterBenchmark-results.txt | 46 ++++++++---------
.../parquet/ParquetVectorUpdaterFactory.java | 4 +-
.../parquet/VectorizedPlainValuesReader.java | 12 +++++
.../parquet/VectorizedValuesReader.java | 19 +++++++
.../datasources/parquet/ParquetIOSuite.scala | 60 +++++++++++++++++-----
.../parquet/ParquetVectorUpdaterSuite.scala | 47 ++++++++++++++++-
8 files changed, 195 insertions(+), 87 deletions(-)
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
index 3080df6d02dc..a4c48eff31a6 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 16949.1 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3748.1 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1674.2 0.6 0.1X
-IntegerUpdater 0 0
0 10123.9 0.1 0.6X
-LongUpdater 0 0
0 5075.2 0.2 0.3X
-FloatUpdater 0 0
0 10159.3 0.1 0.6X
-DoubleUpdater 0 0
0 5076.7 0.2 0.3X
-BinaryUpdater 15 15
0 70.8 14.1 0.0X
+BooleanUpdater 0 0
0 16954.6 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3764.8 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1677.4 0.6 0.1X
+IntegerUpdater 0 0
0 10269.9 0.1 0.6X
+LongUpdater 0 0
0 5120.9 0.2 0.3X
+FloatUpdater 0 0
0 10252.8 0.1 0.6X
+DoubleUpdater 0 0
0 5133.4 0.2 0.3X
+BinaryUpdater 15 15
3 71.0 14.1 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 6181.3 0.2 1.0X
-IntegerToDoubleUpdater 2 2
0 531.8 1.9 0.1X
-FloatToDoubleUpdater 2 2
0 490.0 2.0 0.1X
-DateToTimestampNTZUpdater 29 29
0 36.5 27.4 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.6 2.2 0.1X
+IntegerToLongUpdater 0 0
0 6220.9 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 6215.0 0.2 1.0X
+FloatToDoubleUpdater 2 2
0 489.5 2.0 0.1X
+DateToTimestampNTZUpdater 29 29
1 36.1 27.7 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.2 2.2 0.1X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3638.9 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2657.0 0.4 0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 420.3 2.4 0.1X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3645.1 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2663.5 0.4 0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 419.9 2.4 0.1X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5824.3 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
1 60.3 16.6 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5846.6 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
0 60.3 16.6 0.0X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10229.8 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5072.8 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 50.8 19.7 0.0X
+IntegerToDecimalUpdater 0 0
0 10296.2 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5145.8 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
1 50.2 19.9 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 20
20 1 53.8 18.6 1.0X
+FixedLenByteArrayUpdater (len=16 -> Binary) 20
21 1 52.7 19.0 1.0X
FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 3.0X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 132.6 7.5 2.5X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 132.7 7.5 2.5X
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
index ee06fbe99609..a22613b7dd4f 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 17154.9 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3684.2 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1660.3 0.6 0.1X
-IntegerUpdater 0 0
0 10300.3 0.1 0.6X
-LongUpdater 0 0
0 5148.1 0.2 0.3X
-FloatUpdater 0 0
0 10316.6 0.1 0.6X
-DoubleUpdater 0 0
0 5153.9 0.2 0.3X
-BinaryUpdater 16 16
0 67.6 14.8 0.0X
+BooleanUpdater 0 0
0 17171.8 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3675.3 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1661.2 0.6 0.1X
+IntegerUpdater 0 0
0 10301.4 0.1 0.6X
+LongUpdater 0 0
0 5147.9 0.2 0.3X
+FloatUpdater 0 0
0 10294.3 0.1 0.6X
+DoubleUpdater 0 0
0 5113.4 0.2 0.3X
+BinaryUpdater 16 16
1 66.9 14.9 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 6460.6 0.2 1.0X
-IntegerToDoubleUpdater 2 2
0 454.6 2.2 0.1X
-FloatToDoubleUpdater 2 2
0 483.4 2.1 0.1X
-DateToTimestampNTZUpdater 29 29
0 36.3 27.6 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.5 2.2 0.1X
+IntegerToLongUpdater 0 0
0 6351.2 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 6423.7 0.2 1.0X
+FloatToDoubleUpdater 2 2
0 483.3 2.1 0.1X
+DateToTimestampNTZUpdater 29 29
1 36.6 27.3 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.4 2.2 0.1X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3667.7 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2676.2 0.4 0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 3 3
0 371.3 2.7 0.1X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3653.2 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2659.6 0.4 0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 3 3
0 371.1 2.7 0.1X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 6354.7 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
0 59.3 16.9 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 6324.8 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 17
0 60.4 16.5 0.0X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10307.4 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5149.6 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
1 50.9 19.6 0.0X
+IntegerToDecimalUpdater 0 0
0 10258.9 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5144.1 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 50.7 19.7 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 22
23 1 47.7 21.0 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.7 6.6 3.2X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.7 7.8 2.7X
+FixedLenByteArrayUpdater (len=16 -> Binary) 21
22 1 50.2 19.9 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.5 6.6 3.0X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.5 7.8 2.5X
diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
index aff5dd3c9c86..fe4f716d2e15 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 14578.7 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3667.6 0.3 0.3X
-ShortUpdater (INT32 -> Short) 1 1
0 2053.7 0.5 0.1X
-IntegerUpdater 0 0
0 10217.7 0.1 0.7X
-LongUpdater 0 0
0 5122.4 0.2 0.4X
-FloatUpdater 0 0
0 10244.9 0.1 0.7X
-DoubleUpdater 0 0
0 5125.7 0.2 0.4X
-BinaryUpdater 15 15
0 68.9 14.5 0.0X
+BooleanUpdater 0 0
0 20526.1 0.0 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3669.4 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 2050.4 0.5 0.1X
+IntegerUpdater 0 0
0 10107.4 0.1 0.5X
+LongUpdater 0 0
0 5037.7 0.2 0.2X
+FloatUpdater 0 0
0 10239.0 0.1 0.5X
+DoubleUpdater 0 0
0 5095.8 0.2 0.2X
+BinaryUpdater 15 15
0 70.1 14.3 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 1 1
0 1280.8 0.8 1.0X
-IntegerToDoubleUpdater 2 2
0 479.1 2.1 0.4X
-FloatToDoubleUpdater 2 2
0 480.8 2.1 0.4X
-DateToTimestampNTZUpdater 36 36
1 29.1 34.4 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 487.0 2.1 0.4X
+IntegerToLongUpdater 1 1
0 1277.6 0.8 1.0X
+IntegerToDoubleUpdater 1 1
0 1475.2 0.7 1.2X
+FloatToDoubleUpdater 2 2
0 480.2 2.1 0.4X
+DateToTimestampNTZUpdater 36 36
0 29.0 34.5 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.0 2.2 0.4X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2608.4 0.4 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2083.9 0.5 0.8X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.6 2.2 0.2X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2400.9 0.4 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2079.6 0.5 0.9X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.0 2.2 0.2X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1110.4 0.9 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
1 59.1 16.9 0.1X
+UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1091.4 0.9 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
0 59.0 16.9 0.1X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10130.8 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5125.5 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 51.1 19.6 0.0X
+IntegerToDecimalUpdater 0 0
0 10204.0 0.1 1.0X
+LongToDecimalUpdater 0 0
0 4883.9 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
1 51.0 19.6 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 1 55.1 18.1 1.0X
+FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 0 55.2 18.1 1.0X
FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 2.9X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
9 0 124.7 8.0 2.3X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9
9 0 123.1 8.1 2.2X
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index a2e240fabade..46a0d3dd212f 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -396,9 +396,7 @@ public class ParquetVectorUpdaterFactory {
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
- for (int i = 0; i < total; ++i) {
- values.putDouble(offset + i, valuesReader.readInteger());
- }
+ valuesReader.readIntegersAsDoubles(total, values, offset);
}
@Override
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 48ea026641f9..0329b1ff8ebf 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -168,6 +168,18 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
}
}
+ @Override
+ public final void readIntegersAsDoubles(int total, WritableColumnVector c,
int rowId) {
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+ // No `hasArray` bulk-copy path: source (int32) and target (double, 8
bytes) have
+ // different widths so a contiguous byte copy is impossible. Matches the
pattern in
+ // `readIntegersAsLongs` and `readUnsignedIntegers`.
+ for (int i = 0; i < total; i += 1) {
+ c.putDouble(rowId + i, buffer.getInt());
+ }
+ }
+
// A fork of `readIntegers` to rebase the date values. For performance
reasons, this method
// iterates the values twice: check if we need to rebase first, then go to
the optimized branch
// if rebase is not needed.
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index c376acd40593..8cc9a7dae2cd 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -75,6 +75,25 @@ public interface VectorizedValuesReader {
}
}
+ /**
+ * Reads {@code total} INT32 values, widens each to a double, and writes
them into
+ * {@code c} starting at {@code c[rowId]}. The widening is lossless because
every
+ * INT32 fits exactly in a double's 53-bit mantissa. Used by the
type-converting
+ * updater that reads parquet INT32 columns into Spark {@code DoubleType}
targets.
+ *
+ * <p>The default implementation falls back to a per-row read+widen+write
loop and is
+ * therefore equivalent in cost to the legacy per-row Updater path.
Subclasses backed
+ * by contiguous bulk storage (e.g. PLAIN encoding via {@link
VectorizedPlainValuesReader})
+ * should override to read source bytes once and run a tight in-method
conversion loop,
+ * avoiding {@code total} virtual dispatches on {@link #readInteger()}.
Readers without
+ * an override preserve correctness but gain no speedup.
+ */
+ default void readIntegersAsDoubles(int total, WritableColumnVector c, int
rowId) {
+ for (int i = 0; i < total; i += 1) {
+ c.putDouble(rowId + i, readInteger());
+ }
+ }
+
void readBinary(int total, WritableColumnVector c, int rowId);
void readGeometry(int total, WritableColumnVector c, int rowId);
void readGeography(int total, WritableColumnVector c, int rowId);
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index becd45f44a72..bab3d4e36f5e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1888,6 +1888,16 @@ class ParquetIOSuite extends ParquetTest with
SharedSparkSession {
}
}
+ // Deterministic INT32 sample shared by the INT32 widening tests below.
Mixes sign,
+ // zero, and MIN/MAX boundaries to catch sign-extension and precision
regressions.
+ private def widenSampleAt(i: Int): Int = i % 5 match {
+ case 0 => Int.MinValue + i
+ case 1 => -1
+ case 2 => 0
+ case 3 => Int.MaxValue - i
+ case _ => i * 13 - 7
+ }
+
test("INT32 -> Long widening end-to-end via vectorized read path") {
// Round-trips an INT32 Parquet file read back with a Long schema,
exercising
// IntegerToLongUpdater on the vectorized path. Covers a non-null column
(REQUIRED,
@@ -1896,21 +1906,11 @@ class ParquetIOSuite extends ParquetTest with
SharedSparkSession {
// the row-based (parquet-mr) reader, which provides additional
correctness coverage.
withTempPath { file =>
val n = 5000
- // Generates a deterministic INT32 sample. Mixes sign, zero, MIN/MAX
boundaries to
- // catch sign-extension regressions; reused for both the non-null and
nullable cases.
- def sampleAt(i: Int): Int = i % 5 match {
- case 0 => Int.MinValue + i
- case 1 => -1
- case 2 => 0
- case 3 => Int.MaxValue - i
- case _ => i * 13 - 7
- }
-
- val nonNullData = (0 until n).map(i => Row(sampleAt(i)))
+ val nonNullData = (0 until n).map(i => Row(widenSampleAt(i)))
// Every 7th row is null: splits the run pattern enough to force the
PACKED def-level
// path to interleave value runs with null runs at sub-batch lengths.
val nullableData = (0 until n).map { i =>
- if (i % 7 == 0) Row(null) else Row(sampleAt(i))
+ if (i % 7 == 0) Row(null) else Row(widenSampleAt(i))
}
val nonNullWriteSchema = new StructType().add("v", IntegerType, nullable
= false)
@@ -1936,6 +1936,42 @@ class ParquetIOSuite extends ParquetTest with
SharedSparkSession {
}
}
}
+
+ test("INT32 -> Double widening end-to-end via vectorized read path") {
+ // Round-trips an INT32 Parquet file read back with a Double schema,
exercising
+ // IntegerToDoubleUpdater. Same REQUIRED/OPTIONAL coverage as the INT32 ->
Long
+ // sibling test above; every INT32 fits losslessly in a double, so exact
equality
+ // is the right assertion.
+ withTempPath { file =>
+ val n = 5000
+ val nonNullData = (0 until n).map(i => Row(widenSampleAt(i)))
+ val nullableData = (0 until n).map { i =>
+ if (i % 7 == 0) Row(null) else Row(widenSampleAt(i))
+ }
+
+ val nonNullWriteSchema = new StructType().add("v", IntegerType, nullable
= false)
+ val nonNullReadSchema = new StructType().add("v", DoubleType, nullable =
false)
+ val nullableWriteSchema = new StructType().add("v", IntegerType,
nullable = true)
+ val nullableReadSchema = new StructType().add("v", DoubleType, nullable
= true)
+
+ val nonNullPath = new java.io.File(file, "nonnull").getCanonicalPath
+ val nullablePath = new java.io.File(file, "nullable").getCanonicalPath
+ spark.createDataFrame(spark.sparkContext.parallelize(nonNullData, 4),
nonNullWriteSchema)
+ .write.parquet(nonNullPath)
+ spark.createDataFrame(spark.sparkContext.parallelize(nullableData, 4),
nullableWriteSchema)
+ .write.parquet(nullablePath)
+
+ val expectedNonNull = nonNullData.map(r => Row(r.getInt(0).toDouble))
+ val expectedNullable = nullableData.map { r =>
+ if (r.isNullAt(0)) Row(null) else Row(r.getInt(0).toDouble)
+ }
+
+ withAllParquetReaders {
+ checkAnswer(spark.read.schema(nonNullReadSchema).parquet(nonNullPath),
expectedNonNull)
+
checkAnswer(spark.read.schema(nullableReadSchema).parquet(nullablePath),
expectedNullable)
+ }
+ }
+ }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context:
TaskAttemptContext)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
index d121f4e2bb73..64a9b2032b65 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
@@ -31,8 +31,10 @@ import
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.types._
/**
- * Correctness tests for the INT32 -> Long widening Updater
(`IntegerToLongUpdater`)
- * on the Parquet vectorized read path.
+ * Correctness tests for type-converting Updaters on the Parquet vectorized
read path
+ * whose `readValues` is a bulk delegation to `VectorizedValuesReader`:
+ * - `IntegerToLongUpdater` (INT32 -> Long, plus long-decimal dispatch)
+ * - `IntegerToDoubleUpdater` (INT32 -> Double)
*
* Covers boundary batch lengths, sign-extension on negative INT32 values, the
singular
* `readValue` path, and the factory's long-decimal dispatch
@@ -90,6 +92,20 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
result
}
+ // Reads `values.length` INT32s through `IntegerToDoubleUpdater.readValues`
and returns
+ // the resulting double column.
+ private def readViaDoubleUpdater(values: Array[Int]): Array[Double] = {
+ val fac = newFactory(int32Descriptor)
+ val updater = fac.getUpdater(int32Descriptor, DataTypes.DoubleType)
+ val out = new OnHeapColumnVector(values.length.max(1),
DataTypes.DoubleType)
+ val reader = newPlainReader(plainIntBytes(values), values.length)
+ updater.readValues(values.length, 0, out, reader)
+ val result = new Array[Double](values.length)
+ var i = 0
+ while (i < values.length) { result(i) = out.getDouble(i); i += 1 }
+ result
+ }
+
// Test data: a mix of positive, negative, zero, MIN/MAX values to catch
sign-extension bugs.
private def signedSampleValues(n: Int): Array[Int] = {
val out = new Array[Int](n)
@@ -168,4 +184,31 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
val actual = (0 until input.length).map(out.getLong).toArray
assert(actual === input.map(_.toLong))
}
+
+ // ---- IntegerToDoubleUpdater: same bulk-path shape, target column is
DoubleType ----
+
+ for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) {
+ test(s"IntegerToDoubleUpdater produces correct widened output (total=$n)")
{
+ val input = signedSampleValues(n)
+ assert(readViaDoubleUpdater(input) === input.map(_.toDouble))
+ }
+ }
+
+ test("IntegerToDoubleUpdater: readValue widens a single INT32 -> Double") {
+ // The singular readValue is the def-level-decoder's run-of-1 path; see the
+ // IntegerToLongUpdater readValue test for the same rationale.
+ val input = Array(0, 1, -1, 42, Int.MinValue, Int.MaxValue)
+ val fac = newFactory(int32Descriptor)
+ val updater = fac.getUpdater(int32Descriptor, DataTypes.DoubleType)
+ val out = new OnHeapColumnVector(input.length, DataTypes.DoubleType)
+ val reader = newPlainReader(plainIntBytes(input), input.length)
+ var i = 0
+ while (i < input.length) {
+ updater.readValue(i, out, reader)
+ i += 1
+ }
+ val actual = (0 until input.length).map(out.getDouble).toArray
+ assert(actual === input.map(_.toDouble))
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]