This is an automated email from the ASF dual-hosted git repository.
LuciferYang pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new 5950210f6992 [SPARK-56791][SQL] Add bulk read+widen path for INT32 to
Long Parquet vector updater
5950210f6992 is described below
commit 5950210f6992380de3693bb22d8c4e2e46035a16
Author: YangJie <[email protected]>
AuthorDate: Mon May 11 11:05:32 2026 +0800
[SPARK-56791][SQL] Add bulk read+widen path for INT32 to Long Parquet
vector updater
### What changes were proposed in this pull request?
INT32 -> Long widening on the Parquet vectorized read path goes through
`IntegerToLongUpdater.readValues`, which loops per-row and calls
`valuesReader.readInteger()` for each value. Each `readInteger()` invokes
`getBuffer(4).order(LITTLE_ENDIAN)`, allocating a fresh `ByteBuffer` slice.
That per-element allocation, not the int-to-long widen itself, dominates the
loop.
This PR adds a bulk method
`VectorizedValuesReader.readIntegersAsLongs(total, c, rowId)` that calls
`getBuffer(total * 4)` once and runs a tight in-method conversion loop.
`VectorizedPlainValuesReader` overrides it with the specialized implementation;
the interface default mirrors the legacy per-row pattern so non-PLAIN readers
continue to work unchanged. `IntegerToLongUpdater.readValues` delegates
straight to the new method.
Sibling Updaters (`IntegerToDouble`, `FloatToDouble`, `DowncastLong`,
`DateToTimestampNTZ`) and other reader implementations (e.g.
`VectorizedDeltaBinaryPackedReader`) follow the same pattern and are tracked as
follow-up sub-tasks under the umbrella ticket.
### Why are the changes needed?
`ParquetVectorUpdaterBenchmark` shows the win grows substantially on newer
JDKs, where escape analysis and inlining better optimize the resulting tight
loop:
| JDK | Baseline | After | Speedup |
|----:|---------:|----------:|--------:|
| 17 | 454.5 M/s | 1280.8 M/s | 2.82x |
| 21 | 530.9 M/s | 6181.3 M/s | 11.64x |
| 25 | 454.8 M/s | 6460.6 M/s | 14.20x |
Peer Updaters in the same benchmark group stay within run-to-run noise,
confirming the change is local to `IntegerToLongUpdater`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests in `ParquetVectorUpdaterSuite` cover boundary batch lengths
(0, 1, 7, 8, 9, 17, 1024, 4097), sign-extension on negative INT32 values, the
singular `readValue` path, and the factory's long-decimal dispatch (INT32 +
DECIMAL(9, 0) -> DecimalType(15, 0) via `canReadAsLongDecimal`).
A new end-to-end test in `ParquetIOSuite` round-trips INT32 written to
Parquet and read back as `LongType`, exercising both REQUIRED columns (no
def-levels) and OPTIONAL columns with interleaved nulls so that `readValue` and
`readValues` are both invoked.
Benchmark results on JDK 17, 21, and 25 are committed on the branch.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
Closes #55751 from LuciferYang/parquet-opt-p3-updater-batching.
Authored-by: YangJie <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit a5a8aa4e8cf4b482b9d2468656cfa96f9e41a4f3)
Signed-off-by: yangjie01 <[email protected]>
---
...ParquetVectorUpdaterBenchmark-jdk21-results.txt | 48 +++---
...ParquetVectorUpdaterBenchmark-jdk25-results.txt | 42 ++---
.../ParquetVectorUpdaterBenchmark-results.txt | 46 +++---
.../parquet/ParquetVectorUpdaterFactory.java | 4 +-
.../parquet/VectorizedPlainValuesReader.java | 12 ++
.../parquet/VectorizedValuesReader.java | 19 +++
.../datasources/parquet/ParquetIOSuite.scala | 49 ++++++
.../parquet/ParquetVectorUpdaterSuite.scala | 171 +++++++++++++++++++++
8 files changed, 320 insertions(+), 71 deletions(-)
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
index 86c0201d35c1..3080df6d02dc 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 16946.4 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3743.2 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1676.4 0.6 0.1X
-IntegerUpdater 0 0
0 10258.9 0.1 0.6X
-LongUpdater 0 0
0 5140.3 0.2 0.3X
-FloatUpdater 0 0
0 10259.8 0.1 0.6X
-DoubleUpdater 0 0
0 5130.4 0.2 0.3X
-BinaryUpdater 15 15
0 70.4 14.2 0.0X
+BooleanUpdater 0 0
0 16949.1 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3748.1 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1674.2 0.6 0.1X
+IntegerUpdater 0 0
0 10123.9 0.1 0.6X
+LongUpdater 0 0
0 5075.2 0.2 0.3X
+FloatUpdater 0 0
0 10159.3 0.1 0.6X
+DoubleUpdater 0 0
0 5076.7 0.2 0.3X
+BinaryUpdater 15 15
0 70.8 14.1 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 2 2
0 530.9 1.9 1.0X
-IntegerToDoubleUpdater 2 2
0 531.3 1.9 1.0X
-FloatToDoubleUpdater 2 2
0 489.7 2.0 0.9X
-DateToTimestampNTZUpdater 29 29
0 36.2 27.6 0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.7 2.2 0.9X
+IntegerToLongUpdater 0 0
0 6181.3 0.2 1.0X
+IntegerToDoubleUpdater 2 2
0 531.8 1.9 0.1X
+FloatToDoubleUpdater 2 2
0 490.0 2.0 0.1X
+DateToTimestampNTZUpdater 29 29
0 36.5 27.4 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.6 2.2 0.1X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3644.6 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2663.3 0.4 0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 420.0 2.4 0.1X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3638.9 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2657.0 0.4 0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 420.3 2.4 0.1X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5974.2 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
0 60.3 16.6 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5824.3 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
1 60.3 16.6 0.0X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10257.8 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5133.7 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 50.2 19.9 0.0X
+IntegerToDecimalUpdater 0 0
0 10229.8 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5072.8 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 50.8 19.7 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 20
21 1 51.5 19.4 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 3.1X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 133.2 7.5 2.6X
+FixedLenByteArrayUpdater (len=16 -> Binary) 20
20 1 53.8 18.6 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 3.0X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 132.6 7.5 2.5X
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
index d4aaaca05263..ee06fbe99609 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
@@ -6,13 +6,13 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 17177.7 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3680.4 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1664.2 0.6 0.1X
-IntegerUpdater 0 0
0 10311.6 0.1 0.6X
-LongUpdater 0 0
0 5153.5 0.2 0.3X
-FloatUpdater 0 0
0 10313.6 0.1 0.6X
-DoubleUpdater 0 0
0 5157.8 0.2 0.3X
+BooleanUpdater 0 0
0 17154.9 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3684.2 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1660.3 0.6 0.1X
+IntegerUpdater 0 0
0 10300.3 0.1 0.6X
+LongUpdater 0 0
0 5148.1 0.2 0.3X
+FloatUpdater 0 0
0 10316.6 0.1 0.6X
+DoubleUpdater 0 0
0 5153.9 0.2 0.3X
BinaryUpdater 16 16
0 67.6 14.8 0.0X
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 2 2
0 454.8 2.2 1.0X
-IntegerToDoubleUpdater 2 2
0 454.5 2.2 1.0X
-FloatToDoubleUpdater 2 2
0 483.4 2.1 1.1X
-DateToTimestampNTZUpdater 29 29
0 36.6 27.3 0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.5 2.2 1.0X
+IntegerToLongUpdater 0 0
0 6460.6 0.2 1.0X
+IntegerToDoubleUpdater 2 2
0 454.6 2.2 0.1X
+FloatToDoubleUpdater 2 2
0 483.4 2.1 0.1X
+DateToTimestampNTZUpdater 29 29
0 36.3 27.6 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.5 2.2 0.1X
================================================================================================
@@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3668.8 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2671.2 0.4 0.7X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3667.7 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2676.2 0.4 0.7X
LongAsMicrosUpdater (TIMESTAMP_MILLIS) 3 3
0 371.3 2.7 0.1X
@@ -52,7 +52,7 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 6344.0 0.2 1.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 6354.7 0.2 1.0X
UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
0 59.3 16.9 0.0X
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10280.2 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5153.3 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 50.6 19.8 0.0X
+IntegerToDecimalUpdater 0 0
0 10307.4 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5149.6 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
1 50.9 19.6 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 21
21 1 50.5 19.8 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.6 6.6 3.0X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.7 7.8 2.5X
+FixedLenByteArrayUpdater (len=16 -> Binary) 22
23 1 47.7 21.0 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.7 6.6 3.2X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.7 7.8 2.7X
diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
index 2f76df57b9bf..aff5dd3c9c86 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 14625.7 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3672.0 0.3 0.3X
-ShortUpdater (INT32 -> Short) 1 1
0 2053.4 0.5 0.1X
-IntegerUpdater 0 0
0 10284.1 0.1 0.7X
-LongUpdater 0 0
0 5132.8 0.2 0.4X
-FloatUpdater 0 0
0 10257.9 0.1 0.7X
-DoubleUpdater 0 0
0 5097.0 0.2 0.3X
-BinaryUpdater 15 15
1 70.3 14.2 0.0X
+BooleanUpdater 0 0
0 14578.7 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3667.6 0.3 0.3X
+ShortUpdater (INT32 -> Short) 1 1
0 2053.7 0.5 0.1X
+IntegerUpdater 0 0
0 10217.7 0.1 0.7X
+LongUpdater 0 0
0 5122.4 0.2 0.4X
+FloatUpdater 0 0
0 10244.9 0.1 0.7X
+DoubleUpdater 0 0
0 5125.7 0.2 0.4X
+BinaryUpdater 15 15
0 68.9 14.5 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 2 2
0 454.5 2.2 1.0X
-IntegerToDoubleUpdater 2 2
0 478.3 2.1 1.1X
-FloatToDoubleUpdater 2 2
0 480.2 2.1 1.1X
-DateToTimestampNTZUpdater 36 36
0 29.5 33.9 0.1X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.3 2.2 1.0X
+IntegerToLongUpdater 1 1
0 1280.8 0.8 1.0X
+IntegerToDoubleUpdater 2 2
0 479.1 2.1 0.4X
+FloatToDoubleUpdater 2 2
0 480.8 2.1 0.4X
+DateToTimestampNTZUpdater 36 36
1 29.1 34.4 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 487.0 2.1 0.4X
================================================================================================
@@ -39,8 +39,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2651.7 0.4 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 1
0 2101.9 0.5 0.8X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2608.4 0.4 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2083.9 0.5 0.8X
LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.6 2.2 0.2X
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1093.3 0.9 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
0 59.1 16.9 0.1X
+UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1110.4 0.9 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
1 59.1 16.9 0.1X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10263.1 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5133.0 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 51.0 19.6 0.0X
+IntegerToDecimalUpdater 0 0
0 10130.8 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5125.5 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 51.1 19.6 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 0 54.8 18.3 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.2 6.2 2.9X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9
9 0 123.3 8.1 2.3X
+FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 1 55.1 18.1 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 2.9X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
9 0 124.7 8.0 2.3X
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 0e2c997e553f..a2e240fabade 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -363,9 +363,7 @@ public class ParquetVectorUpdaterFactory {
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
- for (int i = 0; i < total; ++i) {
- values.putLong(offset + i, valuesReader.readInteger());
- }
+ valuesReader.readIntegersAsLongs(total, values, offset);
}
@Override
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index d792ba0d7259..48ea026641f9 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -156,6 +156,18 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
}
}
+ @Override
+ public final void readIntegersAsLongs(int total, WritableColumnVector c, int
rowId) {
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+ // No `hasArray` bulk-copy path: source (int32) and target (int64) have
different byte
+ // widths so a contiguous byte copy is impossible. Matches the pattern in
peer
+ // type-converting bulk methods such as `readUnsignedIntegers`.
+ for (int i = 0; i < total; i += 1) {
+ c.putLong(rowId + i, buffer.getInt());
+ }
+ }
+
// A fork of `readIntegers` to rebase the date values. For performance
reasons, this method
// iterates the values twice: check if we need to rebase first, then go to
the optimized branch
// if rebase is not needed.
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index d29ce0dd12e4..c376acd40593 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -56,6 +56,25 @@ public interface VectorizedValuesReader {
String timeZone);
void readFloats(int total, WritableColumnVector c, int rowId);
void readDoubles(int total, WritableColumnVector c, int rowId);
+
+ /**
+ * Reads {@code total} INT32 values, sign-extends each to a long, and writes
them into
+ * {@code c} starting at {@code c[rowId]}. Used by type-converting updaters
that read
+ * parquet INT32 columns into Spark {@code LongType} (or wider decimal)
targets.
+ *
+ * <p>The default implementation falls back to a per-row read+widen+write
loop and is
+ * therefore equivalent in cost to the legacy per-row Updater path.
Subclasses backed
+ * by contiguous bulk storage (e.g. PLAIN encoding via {@link
VectorizedPlainValuesReader})
+ * should override to read source bytes once and run a tight in-method
conversion loop,
+ * avoiding {@code total} virtual dispatches on {@link #readInteger()}.
Readers without
+ * an override preserve correctness but gain no speedup.
+ */
+ default void readIntegersAsLongs(int total, WritableColumnVector c, int
rowId) {
+ for (int i = 0; i < total; i += 1) {
+ c.putLong(rowId + i, readInteger());
+ }
+ }
+
void readBinary(int total, WritableColumnVector c, int rowId);
void readGeometry(int total, WritableColumnVector c, int rowId);
void readGeography(int total, WritableColumnVector c, int rowId);
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 44e6ae4da6a5..becd45f44a72 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1887,6 +1887,55 @@ class ParquetIOSuite extends ParquetTest with
SharedSparkSession {
}
}
}
+
+ test("INT32 -> Long widening end-to-end via vectorized read path") {
+ // Round-trips an INT32 Parquet file read back with a Long schema,
exercising
+ // IntegerToLongUpdater on the vectorized path. Covers a non-null column
(REQUIRED,
+ // no def-levels) and a nullable column (OPTIONAL, def-levels split runs
and force
+ // `readValue` calls alongside `readValues`). `withAllParquetReaders` also
exercises
+ // the row-based (parquet-mr) reader, which provides additional
correctness coverage.
+ withTempPath { file =>
+ val n = 5000
+ // Generates a deterministic INT32 sample. Mixes sign, zero, MIN/MAX
boundaries to
+ // catch sign-extension regressions; reused for both the non-null and
nullable cases.
+ def sampleAt(i: Int): Int = i % 5 match {
+ case 0 => Int.MinValue + i
+ case 1 => -1
+ case 2 => 0
+ case 3 => Int.MaxValue - i
+ case _ => i * 13 - 7
+ }
+
+ val nonNullData = (0 until n).map(i => Row(sampleAt(i)))
+ // Every 7th row is null: splits the run pattern enough to force the
PACKED def-level
+ // path to interleave value runs with null runs at sub-batch lengths.
+ val nullableData = (0 until n).map { i =>
+ if (i % 7 == 0) Row(null) else Row(sampleAt(i))
+ }
+
+ val nonNullWriteSchema = new StructType().add("v", IntegerType, nullable
= false)
+ val nonNullReadSchema = new StructType().add("v", LongType, nullable =
false)
+ val nullableWriteSchema = new StructType().add("v", IntegerType,
nullable = true)
+ val nullableReadSchema = new StructType().add("v", LongType, nullable =
true)
+
+ val nonNullPath = new java.io.File(file, "nonnull").getCanonicalPath
+ val nullablePath = new java.io.File(file, "nullable").getCanonicalPath
+ spark.createDataFrame(spark.sparkContext.parallelize(nonNullData, 4),
nonNullWriteSchema)
+ .write.parquet(nonNullPath)
+ spark.createDataFrame(spark.sparkContext.parallelize(nullableData, 4),
nullableWriteSchema)
+ .write.parquet(nullablePath)
+
+ val expectedNonNull = nonNullData.map(r => Row(r.getInt(0).toLong))
+ val expectedNullable = nullableData.map { r =>
+ if (r.isNullAt(0)) Row(null) else Row(r.getInt(0).toLong)
+ }
+
+ withAllParquetReaders {
+ checkAnswer(spark.read.schema(nonNullReadSchema).parquet(nonNullPath),
expectedNonNull)
+
checkAnswer(spark.read.schema(nullableReadSchema).parquet(nullablePath),
expectedNullable)
+ }
+ }
+ }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context:
TaskAttemptContext)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
new file mode 100644
index 000000000000..d121f4e2bb73
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.nio.{ByteBuffer, ByteOrder}
+import java.time.ZoneOffset
+
+import org.apache.parquet.bytes.ByteBufferInputStream
+import org.apache.parquet.column.ColumnDescriptor
+import org.apache.parquet.schema.{LogicalTypeAnnotation, Types}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.Type.Repetition
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.types._
+
+/**
+ * Correctness tests for the INT32 -> Long widening Updater
(`IntegerToLongUpdater`)
+ * on the Parquet vectorized read path.
+ *
+ * Covers boundary batch lengths, sign-extension on negative INT32 values, the
singular
+ * `readValue` path, and the factory's long-decimal dispatch
+ * (INT32 + DECIMAL(p<=9) -> DecimalType(p in (9, 18])).
+ */
+class ParquetVectorUpdaterSuite extends SparkFunSuite {
+
+ // INT32 column descriptor with no logical-type annotation; matches what the
production
+ // factory uses for plain INT32 -> Long widening.
+ private val int32Descriptor: ColumnDescriptor = {
+ val pt = Types.primitive(PrimitiveTypeName.INT32,
Repetition.OPTIONAL).named("col")
+ new ColumnDescriptor(Array("col"), pt, 0, 1)
+ }
+
+ // INT32 column descriptor annotated as DECIMAL(precision, scale); routes
the factory's
+ // INT32 dispatch through the `canReadAsLongDecimal` branch when target
precision is in
+ // (9, 18].
+ private def int32DecimalDescriptor(precision: Int, scale: Int):
ColumnDescriptor = {
+ val pt = Types.primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL)
+ .as(LogicalTypeAnnotation.decimalType(scale, precision))
+ .named("col")
+ new ColumnDescriptor(Array("col"), pt, 0, 1)
+ }
+
+ private def newFactory(desc: ColumnDescriptor): ParquetVectorUpdaterFactory =
+ ParquetTestAccess.newFactory(
+ desc.getPrimitiveType.getLogicalTypeAnnotation,
+ ZoneOffset.UTC, "CORRECTED", "UTC", "CORRECTED", "UTC")
+
+ private def plainIntBytes(values: Array[Int]): Array[Byte] = {
+ val buf = ByteBuffer.allocate(values.length *
4).order(ByteOrder.LITTLE_ENDIAN)
+ var i = 0
+ while (i < values.length) { buf.putInt(values(i)); i += 1 }
+ buf.array()
+ }
+
+ private def newPlainReader(bytes: Array[Byte], numValues: Int):
VectorizedPlainValuesReader = {
+ val r = new VectorizedPlainValuesReader
+ r.initFromPage(numValues,
ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)))
+ r
+ }
+
+ // Reads `values.length` INT32s through `IntegerToLongUpdater.readValues`
and returns the
+ // resulting long column.
+ private def readViaUpdater(values: Array[Int]): Array[Long] = {
+ val fac = newFactory(int32Descriptor)
+ val updater = fac.getUpdater(int32Descriptor, DataTypes.LongType)
+ // OnHeapColumnVector requires capacity >= 1 even when nothing is written
into it.
+ val out = new OnHeapColumnVector(values.length.max(1), DataTypes.LongType)
+ val reader = newPlainReader(plainIntBytes(values), values.length)
+ updater.readValues(values.length, 0, out, reader)
+ val result = new Array[Long](values.length)
+ var i = 0
+ while (i < values.length) { result(i) = out.getLong(i); i += 1 }
+ result
+ }
+
+ // Test data: a mix of positive, negative, zero, MIN/MAX values to catch
sign-extension bugs.
+ private def signedSampleValues(n: Int): Array[Int] = {
+ val out = new Array[Int](n)
+ var i = 0
+ while (i < n) {
+ out(i) = i match {
+ case _ if i % 5 == 0 => Int.MinValue + i
+ case _ if i % 5 == 1 => -1
+ case _ if i % 5 == 2 => 0
+ case _ if i % 5 == 3 => Int.MaxValue - i
+ case _ => i * 13 - 7
+ }
+ i += 1
+ }
+ out
+ }
+
+ private def expectedWiden(values: Array[Int]): Array[Long] =
values.map(_.toLong)
+
+ // ---- Boundary-length correctness: empty, sub-batch, batch-aligned,
multi-batch ----
+
+ for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) {
+ test(s"IntegerToLongUpdater produces correct widened output (total=$n)") {
+ val input = signedSampleValues(n)
+ assert(readViaUpdater(input) === expectedWiden(input))
+ }
+ }
+
+ // ---- readValue (singular) path is separate from readValues ----
+
+ test("IntegerToLongUpdater: readValue widens a single INT32 -> Long") {
+ // The singular readValue is invoked from the RLE/PACKED def-level decoder
for runs
+ // of length 1, which calls `readInteger()` directly rather than the bulk
method.
+ // Pinned here so a future change that conflates the two paths is caught
at unit level.
+ val input = Array(0, 1, -1, 42, Int.MinValue, Int.MaxValue)
+ val fac = newFactory(int32Descriptor)
+ val updater = fac.getUpdater(int32Descriptor, DataTypes.LongType)
+ val out = new OnHeapColumnVector(input.length, DataTypes.LongType)
+ val reader = newPlainReader(plainIntBytes(input), input.length)
+ var i = 0
+ while (i < input.length) {
+ updater.readValue(i, out, reader)
+ i += 1
+ }
+ val actual = (0 until input.length).map(out.getLong).toArray
+ assert(actual === input.map(_.toLong))
+ }
+
+ // ---- Sign-extension: negative INT32 must become negative INT64 ----
+
+ test("IntegerToLongUpdater: negative INT32 sign-extends to negative INT64") {
+ val input = Array(Int.MinValue, -1, -42, 0, Int.MaxValue)
+ assert(readViaUpdater(input) ===
+ Array[Long](Int.MinValue.toLong, -1L, -42L, 0L, Int.MaxValue.toLong))
+ }
+
+ // ---- Long-decimal dispatch: factory routes INT32+DECIMAL(p<=9) ->
IntegerToLongUpdater
+ // when the Spark target is a DecimalType(precision in (9, 18]) ----
+
+ test("IntegerToLongUpdater handles INT32 -> DecimalType(p<=18) via
canReadAsLongDecimal") {
+ // Parquet caps INT32 DECIMAL precision at 9 (max digits in int32), so the
source is
+ // DECIMAL(9, 0); the Spark target DecimalType(15, 0) is a long-decimal
(precision in
+ // (9, 18]). The factory routes this through `canReadAsLongDecimal` to
+ // IntegerToLongUpdater, which writes via `putLong` exactly like the
LongType case.
+ // This test confirms the dispatch wiring stays intact for long-decimal
targets.
+ val desc = int32DecimalDescriptor(precision = 9, scale = 0)
+ val targetType = DataTypes.createDecimalType(15, 0)
+ val input = Array(0, 1, 42, -7, Int.MinValue, Int.MaxValue, 1234567)
+
+ val fac = newFactory(desc)
+ val updater = fac.getUpdater(desc, targetType)
+ val out = new OnHeapColumnVector(input.length, targetType)
+ val reader = newPlainReader(plainIntBytes(input), input.length)
+ updater.readValues(input.length, 0, out, reader)
+
+ val actual = (0 until input.length).map(out.getLong).toArray
+ assert(actual === input.map(_.toLong))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]