This is an automated email from the ASF dual-hosted git repository.
dongjoon-hyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 004e626a054c Revert "[SPARK-56803][SQL] Add bulk read+narrow path for
INT64 DECIMAL to 32-bit Decimal Parquet vector updater"
004e626a054c is described below
commit 004e626a054ca8c9e9fd86743a6ecc2b473e1604
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu May 14 12:50:16 2026 -0700
Revert "[SPARK-56803][SQL] Add bulk read+narrow path for INT64 DECIMAL to
32-bit Decimal Parquet vector updater"
This reverts commit dcd31611d764dbca161610d18b65a2205a75eef8.
---
...ParquetVectorUpdaterBenchmark-jdk21-results.txt | 46 +++++-----
...ParquetVectorUpdaterBenchmark-jdk25-results.txt | 42 +++++-----
.../ParquetVectorUpdaterBenchmark-results.txt | 46 +++++-----
.../parquet/ParquetVectorUpdaterFactory.java | 4 +-
.../parquet/VectorizedPlainValuesReader.java | 12 ---
.../parquet/VectorizedValuesReader.java | 21 -----
.../datasources/parquet/ParquetIOSuite.scala | 51 -----------
.../parquet/ParquetVectorUpdaterSuite.scala | 98 ----------------------
8 files changed, 70 insertions(+), 250 deletions(-)
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
index b120f8d412a3..7d09c6fd187e 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk21-results.txt
@@ -7,13 +7,13 @@ AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
BooleanUpdater 0 0
0 17004.4 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3748.2 0.3 0.2X
-ShortUpdater (INT32 -> Short) 1 1
0 1683.0 0.6 0.1X
-IntegerUpdater 0 0
0 10179.2 0.1 0.6X
-LongUpdater 0 0
0 5075.5 0.2 0.3X
-FloatUpdater 0 0
0 10201.9 0.1 0.6X
-DoubleUpdater 0 0
0 5140.6 0.2 0.3X
-BinaryUpdater 15 15
0 71.3 14.0 0.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3758.6 0.3 0.2X
+ShortUpdater (INT32 -> Short) 1 1
0 1678.4 0.6 0.1X
+IntegerUpdater 0 0
0 10276.1 0.1 0.6X
+LongUpdater 0 0
0 5112.7 0.2 0.3X
+FloatUpdater 0 0
0 10293.3 0.1 0.6X
+DoubleUpdater 0 0
0 3870.8 0.3 0.2X
+BinaryUpdater 15 15
1 71.3 14.0 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 6208.4 0.2 1.0X
-IntegerToDoubleUpdater 0 0
0 6155.9 0.2 1.0X
-FloatToDoubleUpdater 0 0
0 2526.5 0.4 0.4X
-DateToTimestampNTZUpdater 27 28
1 38.6 25.9 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0
0 5828.5 0.2 0.9X
+IntegerToLongUpdater 0 0
0 5145.1 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 5120.5 0.2 1.0X
+FloatToDoubleUpdater 0 0
0 2527.2 0.4 0.5X
+DateToTimestampNTZUpdater 29 29
0 36.3 27.5 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.6 2.2 0.1X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3651.5 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2621.2 0.4 0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 420.5 2.4 0.1X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3647.4 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2668.6 0.4 0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 3
0 420.3 2.4 0.1X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5879.9 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 16 17
0 63.8 15.7 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5089.6 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
1 60.2 16.6 0.0X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10144.6 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5061.5 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 20 21
1 51.3 19.5 0.0X
+IntegerToDecimalUpdater 0 0
0 7746.5 0.1 1.0X
+LongToDecimalUpdater 0 0
0 3874.1 0.3 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 50.7 19.7 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 21.0.11+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 20
21 1 51.8 19.3 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.2 6.2 3.1X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 133.2 7.5 2.6X
+FixedLenByteArrayUpdater (len=16 -> Binary) 19
20 2 54.1 18.5 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 3.0X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 133.2 7.5 2.5X
diff --git
a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
index 6dfd2fdadc25..627835735a08 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-jdk25-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 17160.2 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3686.1 0.3 0.2X
+BooleanUpdater 0 0
0 17138.1 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3678.2 0.3 0.2X
ShortUpdater (INT32 -> Short) 1 1
0 1662.8 0.6 0.1X
-IntegerUpdater 0 0
0 10282.0 0.1 0.6X
-LongUpdater 0 0
0 5151.9 0.2 0.3X
-FloatUpdater 0 0
0 10306.3 0.1 0.6X
-DoubleUpdater 0 0
0 5149.1 0.2 0.3X
-BinaryUpdater 15 16
0 67.8 14.8 0.0X
+IntegerUpdater 0 0
0 10231.8 0.1 0.6X
+LongUpdater 0 0
0 5103.2 0.2 0.3X
+FloatUpdater 0 0
0 10203.9 0.1 0.6X
+DoubleUpdater 0 0
0 5105.4 0.2 0.3X
+BinaryUpdater 17 17
0 62.8 15.9 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 0 0
0 6390.8 0.2 1.0X
-IntegerToDoubleUpdater 0 0
0 6415.4 0.2 1.0X
-FloatToDoubleUpdater 0 0
0 3196.5 0.3 0.5X
-DateToTimestampNTZUpdater 29 29
0 36.6 27.3 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0
0 6568.8 0.2 1.0X
+IntegerToLongUpdater 0 0
0 6239.9 0.2 1.0X
+IntegerToDoubleUpdater 0 0
0 6382.6 0.2 1.0X
+FloatToDoubleUpdater 0 0
0 2570.7 0.4 0.4X
+DateToTimestampNTZUpdater 26 26
0 40.5 24.7 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 637.7 1.6 0.1X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3665.9 0.3 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2652.9 0.4 0.7X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 3 3
0 371.3 2.7 0.1X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 3660.5 0.3 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0
0 2663.6 0.4 0.7X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 521.3 1.9 0.1X
================================================================================================
@@ -52,8 +52,8 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 5111.6 0.2 1.0X
-UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
0 60.4 16.6 0.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 0 0
0 6196.3 0.2 1.0X
+UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 17 18
1 60.3 16.6 0.0X
================================================================================================
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10205.8 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5111.4 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
2 50.9 19.6 0.0X
+IntegerToDecimalUpdater 0 0
0 10247.8 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5125.7 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 50.7 19.7 0.0X
================================================================================================
@@ -77,7 +77,7 @@ OpenJDK 64-Bit Server VM 25.0.3+9-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 21
22 1 50.4 19.8 1.0X
+FixedLenByteArrayUpdater (len=16 -> Binary) 21
21 1 51.0 19.6 1.0X
FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 152.6 6.6 3.0X
FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 8
8 0 127.7 7.8 2.5X
diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
index 5918db9f759b..5e2c31d2b566 100644
--- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
+++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt
@@ -6,14 +6,14 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Identity Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-BooleanUpdater 0 0
0 14617.4 0.1 1.0X
-ByteUpdater (INT32 -> Byte) 0 0
0 3667.7 0.3 0.3X
-ShortUpdater (INT32 -> Short) 1 1
0 2048.9 0.5 0.1X
-IntegerUpdater 0 0
0 10281.9 0.1 0.7X
-LongUpdater 0 0
0 5138.0 0.2 0.4X
-FloatUpdater 0 0
0 7742.9 0.1 0.5X
-DoubleUpdater 0 0
0 3863.4 0.3 0.3X
-BinaryUpdater 15 15
0 70.2 14.2 0.0X
+BooleanUpdater 0 0
0 14526.2 0.1 1.0X
+ByteUpdater (INT32 -> Byte) 0 0
0 3679.3 0.3 0.3X
+ShortUpdater (INT32 -> Short) 1 1
0 2054.1 0.5 0.1X
+IntegerUpdater 0 0
0 10178.0 0.1 0.7X
+LongUpdater 0 0
0 5054.4 0.2 0.3X
+FloatUpdater 0 0
0 10212.8 0.1 0.7X
+DoubleUpdater 0 0
0 5051.2 0.2 0.3X
+BinaryUpdater 15 15
0 68.4 14.6 0.0X
================================================================================================
@@ -24,11 +24,11 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Type-converting Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------
-IntegerToLongUpdater 1 1
0 1279.7 0.8 1.0X
-IntegerToDoubleUpdater 1 1
0 1544.8 0.6 1.2X
-FloatToDoubleUpdater 1 1
0 1417.9 0.7 1.1X
-DateToTimestampNTZUpdater 36 36
1 29.5 33.9 0.0X
-DowncastLongUpdater (INT64 -> Decimal(9,2)) 1 1
0 1287.3 0.8 1.0X
+IntegerToLongUpdater 1 1
0 1280.6 0.8 1.0X
+IntegerToDoubleUpdater 1 1
0 1537.9 0.7 1.2X
+FloatToDoubleUpdater 1 1
0 1418.8 0.7 1.1X
+DateToTimestampNTZUpdater 36 36
0 29.5 33.9 0.0X
+DowncastLongUpdater (INT64 -> Decimal(9,2)) 2 2
0 455.3 2.2 0.4X
================================================================================================
@@ -39,9 +39,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Rebase Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------
-IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2599.8 0.4 1.0X
-LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2092.2 0.5 0.8X
-LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.7 2.2 0.2X
+IntegerWithRebaseUpdater (DATE legacy) 0 0
0 2407.3 0.4 1.0X
+LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1
0 2030.8 0.5 0.8X
+LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2
0 454.4 2.2 0.2X
================================================================================================
@@ -52,7 +52,7 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Unsigned Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------
-UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1091.2 0.9 1.0X
+UnsignedIntegerUpdater (UINT32 -> Long) 1 1
0 1093.1 0.9 1.0X
UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18
0 59.1 16.9 0.1X
@@ -64,9 +64,9 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
Decimal Updaters: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
-IntegerToDecimalUpdater 0 0
0 10241.7 0.1 1.0X
-LongToDecimalUpdater 0 0
0 5118.1 0.2 0.5X
-FixedLenByteArrayToDecimalUpdater 21 21
0 51.1 19.6 0.0X
+IntegerToDecimalUpdater 0 0
0 10195.9 0.1 1.0X
+LongToDecimalUpdater 0 0
0 5049.2 0.2 0.5X
+FixedLenByteArrayToDecimalUpdater 21 21
0 51.0 19.6 0.0X
================================================================================================
@@ -77,8 +77,8 @@ OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux
6.17.0-1010-azure
AMD EPYC 7763 64-Core Processor
FixedLenByteArray Updaters: Best Time(ms) Avg
Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 0 55.1 18.2 1.0X
-FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.2 6.2 2.9X
-FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9
9 0 123.1 8.1 2.2X
+FixedLenByteArrayUpdater (len=16 -> Binary) 19
19 0 54.9 18.2 1.0X
+FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7
7 0 160.1 6.2 2.9X
+FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9
9 0 123.0 8.1 2.2X
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index d9a47f16cfc4..eba4b426a0d8 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -684,7 +684,9 @@ public class ParquetVectorUpdaterFactory {
int offset,
WritableColumnVector values,
VectorizedValuesReader valuesReader) {
- valuesReader.readLongsAsInts(total, values, offset);
+ for (int i = 0; i < total; ++i) {
+ values.putInt(offset + i, (int) valuesReader.readLong());
+ }
}
@Override
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 9249fab7915c..d91ba5e2b87d 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -192,18 +192,6 @@ public class VectorizedPlainValuesReader extends
ValuesReader implements Vectori
}
}
- @Override
- public final void readLongsAsInts(int total, WritableColumnVector c, int
rowId) {
- int requiredBytes = total * 8;
- ByteBuffer buffer = getBuffer(requiredBytes);
- // No `hasArray` bulk-copy path: source (int64, 8 bytes) and target
(int32, 4 bytes)
- // have different widths so a contiguous byte copy is impossible. Matches
the pattern
- // in `readIntegersAsLongs`.
- for (int i = 0; i < total; i += 1) {
- c.putInt(rowId + i, (int) buffer.getLong());
- }
- }
-
// A fork of `readIntegers` to rebase the date values. For performance
reasons, this method
// iterates the values twice: check if we need to rebase first, then go to
the optimized branch
// if rebase is not needed.
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index c62f7bcec8c3..a90e9bf01c81 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -115,27 +115,6 @@ public interface VectorizedValuesReader {
}
}
- /**
- * Reads {@code total} INT64 values, narrows each to an int via Java's
primitive
- * long-to-int cast (the high 32 bits are discarded), and writes them into
{@code c}
- * starting at {@code c[rowId]}. Used by the type-converting updater that
reads parquet
- * INT64 DECIMAL columns whose Spark target is a 32-bit decimal (precision
<= 9); such
- * values are guaranteed by Parquet's decimal encoding to fit in int32, so
the
- * narrowing is non-lossy in practice.
- *
- * <p>The default implementation falls back to a per-row read+narrow+write
loop and is
- * therefore equivalent in cost to the legacy per-row Updater path.
Subclasses backed
- * by contiguous bulk storage (e.g. PLAIN encoding via {@link
VectorizedPlainValuesReader})
- * should override to read source bytes once and run a tight in-method
conversion loop,
- * avoiding {@code total} virtual dispatches on {@link #readLong()}. Readers
without
- * an override preserve correctness but gain no speedup.
- */
- default void readLongsAsInts(int total, WritableColumnVector c, int rowId) {
- for (int i = 0; i < total; i += 1) {
- c.putInt(rowId + i, (int) readLong());
- }
- }
-
void readBinary(int total, WritableColumnVector c, int rowId);
void readGeometry(int total, WritableColumnVector c, int rowId);
void readGeography(int total, WritableColumnVector c, int rowId);
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 7b3cdc7df2a7..692306fb52f2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -2021,57 +2021,6 @@ class ParquetIOSuite extends ParquetTest with
SharedSparkSession {
}
}
}
-
- test("INT64 DECIMAL -> 32-bit DecimalType narrowing end-to-end via
vectorized read path") {
- // Round-trips a custom-built parquet file with INT64 + DECIMAL(9, 2)
annotation read
- // back as DecimalType(9, 2). DECIMAL(9, _) values fit in int32, so the
factory routes
- // through DowncastLongUpdater (target is 32-bit decimal storage). Spark's
own writer
- // produces INT32-backed decimal for precision <= 9, so we use
`parquet-mr`'s low-level
- // API to force INT64 storage, mirroring the wire format that other
writers (e.g. Hive,
- // Impala) produce.
- val schema = MessageTypeParser.parseMessageType(
- """message root {
- | required int64 v (DECIMAL(9, 2));
- |}""".stripMargin)
-
- for (dictEnabled <- Seq(true, false)) {
- withTempDir { dir =>
- val tablePath = new Path(s"${dir.getCanonicalPath}/dec.parquet")
- val numRecords = 5000
-
- val writer = createParquetWriter(schema, tablePath, dictionaryEnabled
= dictEnabled)
- (0 until numRecords).foreach { i =>
- val unscaled = i % 5 match {
- case 0 => -999_999_999L
- case 1 => -1L
- case 2 => 0L
- case 3 => 999_999_999L
- case _ => i.toLong * 13L - 7L
- }
- val record = new SimpleGroup(schema)
- record.add(0, unscaled)
- writer.write(record)
- }
- writer.close
-
- withAllParquetReaders {
- val readSchema = new StructType().add("v", DecimalType(9, 2),
nullable = false)
- val df = spark.read.schema(readSchema).parquet(tablePath.toString)
- val expected = (0 until numRecords).map { i =>
- val unscaled = i % 5 match {
- case 0 => -999_999_999L
- case 1 => -1L
- case 2 => 0L
- case 3 => 999_999_999L
- case _ => i.toLong * 13L - 7L
- }
- Row(java.math.BigDecimal.valueOf(unscaled, 2))
- }
- checkAnswer(df, expected)
- }
- }
- }
- }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context:
TaskAttemptContext)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
index 65a4d90750b8..2ed6f84e68b9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterSuite.scala
@@ -36,7 +36,6 @@ import org.apache.spark.sql.types._
* - `IntegerToLongUpdater` (INT32 -> Long, plus long-decimal dispatch)
* - `IntegerToDoubleUpdater` (INT32 -> Double)
* - `FloatToDoubleUpdater` (FLOAT -> Double)
- * - `DowncastLongUpdater` (INT64 DECIMAL -> 32-bit Decimal via narrowing
cast)
*
* Covers boundary batch lengths, sign-extension on negative INT32 values, the
singular
* `readValue` path, and the factory's long-decimal dispatch
@@ -80,13 +79,6 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
buf.array()
}
- private def plainLongBytes(values: Array[Long]): Array[Byte] = {
- val buf = ByteBuffer.allocate(values.length *
8).order(ByteOrder.LITTLE_ENDIAN)
- var i = 0
- while (i < values.length) { buf.putLong(values(i)); i += 1 }
- buf.array()
- }
-
private def newPlainReader(bytes: Array[Byte], numValues: Int):
VectorizedPlainValuesReader = {
val r = new VectorizedPlainValuesReader
r.initFromPage(numValues,
ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)))
@@ -268,54 +260,6 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
out
}
- // ---- DowncastLongUpdater: INT64 DECIMAL(p<=9) -> 32-bit Decimal via
narrowing cast ----
-
- // INT64 column descriptor annotated as DECIMAL(precision, scale); when both
source and
- // target precision are <= 9, the factory routes to DowncastLongUpdater
(target is stored
- // as int32 because `DecimalType.is32BitDecimalType` requires precision <=
9). Parquet
- // guarantees DECIMAL(p<=9) values fit in int32, so the narrowing `(int)
buffer.getLong()`
- // is non-lossy in practice.
- private def int64DecimalDescriptor(precision: Int, scale: Int):
ColumnDescriptor = {
- val pt = Types.primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL)
- .as(LogicalTypeAnnotation.decimalType(scale, precision))
- .named("col")
- new ColumnDescriptor(Array("col"), pt, 0, 1)
- }
-
- // Reads `values.length` INT64s through `DowncastLongUpdater.readValues` and
returns the
- // resulting int column. Caller guarantees values fit in int32.
- private def readViaDowncastLongUpdater(
- desc: ColumnDescriptor,
- targetType: DataType,
- values: Array[Long]): Array[Int] = {
- val fac = newFactory(desc)
- val updater = fac.getUpdater(desc, targetType)
- val out = new OnHeapColumnVector(values.length.max(1), targetType)
- val reader = newPlainReader(plainLongBytes(values), values.length)
- updater.readValues(values.length, 0, out, reader)
- val result = new Array[Int](values.length)
- var i = 0
- while (i < values.length) { result(i) = out.getInt(i); i += 1 }
- result
- }
-
- // Sample of INT64 values guaranteed to fit in int32 (within DECIMAL(9, _)
range).
- private def downcastSampleValues(n: Int): Array[Long] = {
- val out = new Array[Long](n)
- var i = 0
- while (i < n) {
- out(i) = i match {
- case _ if i % 5 == 0 => -999_999_999L // min DECIMAL(9, _) value
- case _ if i % 5 == 1 => -1L
- case _ if i % 5 == 2 => 0L
- case _ if i % 5 == 3 => 999_999_999L // max DECIMAL(9, _) value
- case _ => i * 13L - 7L
- }
- i += 1
- }
- out
- }
-
// Java's float-to-double widening is exact for finite/infinite values and
produces
// some double NaN for a NaN input (the payload may be canonicalized). Use
// `java.lang.Double.compare` to give NaN well-defined equality and to
distinguish
@@ -366,46 +310,4 @@ class ParquetVectorUpdaterSuite extends SparkFunSuite {
assert(java.lang.Double.doubleToRawLongBits(actual(1)) ===
java.lang.Double.doubleToRawLongBits(0.0d))
}
-
- for (n <- Seq(0, 1, 7, 8, 9, 17, 1024, 4097)) {
- test(s"DowncastLongUpdater produces correct narrowed output (total=$n)") {
- val desc = int64DecimalDescriptor(precision = 9, scale = 2)
- val targetType = DataTypes.createDecimalType(9, 2)
- val input = downcastSampleValues(n)
- val actual = readViaDowncastLongUpdater(desc, targetType, input)
- assert(actual === input.map(_.toInt))
- }
- }
-
- test("DowncastLongUpdater: readValue narrows a single INT64 -> int") {
- // Same rationale as the IntegerToLongUpdater readValue test: the
def-level-decoder's
- // run-of-1 path calls `readLong()` directly rather than the bulk method.
- val desc = int64DecimalDescriptor(precision = 9, scale = 2)
- val targetType = DataTypes.createDecimalType(9, 2)
- val input = Array(0L, 1L, -1L, 42L, -999_999_999L, 999_999_999L)
- val fac = newFactory(desc)
- val updater = fac.getUpdater(desc, targetType)
- val out = new OnHeapColumnVector(input.length, targetType)
- val reader = newPlainReader(plainLongBytes(input), input.length)
- var i = 0
- while (i < input.length) {
- updater.readValue(i, out, reader)
- i += 1
- }
- val actual = (0 until input.length).map(out.getInt).toArray
- assert(actual === input.map(_.toInt))
- }
-
- test("DowncastLongUpdater: narrowing cast preserves sign for in-range
values") {
- // Spot-check that `(int) buffer.getLong()` preserves sign for every INT64
value
- // bounded by DECIMAL(9, _)'s range. Out-of-range INT64 values (i.e.,
outside
- // [-999_999_999, 999_999_999]) are not reachable in production because
Parquet's
- // DECIMAL encoding bounds writer-side; the cast would truncate the high
32 bits and
- // is documented as such on `readLongsAsInts`.
- val desc = int64DecimalDescriptor(precision = 9, scale = 0)
- val targetType = DataTypes.createDecimalType(9, 0)
- val input = Array(-999_999_999L, -1L, 0L, 1L, 999_999_999L)
- val actual = readViaDowncastLongUpdater(desc, targetType, input)
- assert(actual === Array[Int](-999_999_999, -1, 0, 1, 999_999_999))
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]