This is an automated email from the ASF dual-hosted git repository.
peter-toth pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 384299acb47e [SPARK-56872][SQL][4.1] Fix NPE in
DowncastLongUpdater.decodeSingleDictionaryId
384299acb47e is described below
commit 384299acb47e9829a87ce9e5c793fbfb315cb90d
Author: YangJie <[email protected]>
AuthorDate: Fri May 15 18:27:06 2026 +0200
[SPARK-56872][SQL][4.1] Fix NPE in
DowncastLongUpdater.decodeSingleDictionaryId
### What changes were proposed in this pull request?
`DowncastLongUpdater.decodeSingleDictionaryId` calls `values.putLong(...)`,
but `DowncastLongUpdater` is only chosen when the target is a 32-bit Decimal
(precision <= 9), whose column vector stores into `intData`, not `longData`. So
`putLong` NPEs whenever this path runs.
Switch the call to `putInt` with the same `(int) longValue` narrowing cast
already used by `readValue` and `readValues`.
### Why are the changes needed?
The bug has been latent since SPARK-35640 (Jun 2021) because the path is
only reachable when all three conditions hold:
1. Parquet stores the column as INT64 + DECIMAL(p<=9). Spark's own writer
emits INT32 for this case, so the file must come from another writer (Hive,
Impala, ...).
2. Spark reads it as a Decimal with precision <= 9.
3. The vectorized reader has to eagerly drain buffered dictionary IDs —
typically when parquet-mr writes the column as a mix of dictionary-encoded and
PLAIN pages and a non-dict page follows a dict page in the same batch. The
normal lazy-dictionary path decodes at row read time via `ParquetDictionary`
and never touches this updater method.
Without the fix, the new regression test fails with:
```
Cause: java.lang.NullPointerException:
at
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLong(OnHeapColumnVector.java:393)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$DowncastLongUpdater.decodeSingleDictionaryId(ParquetVectorUpdaterFactory.java:713)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdater.decodeDictionaryIds(ParquetVectorUpdater.java:75)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:288)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:406)
...
```
### Does this PR introduce _any_ user-facing change?
Yes. Reads that previously NPE'd now return correct values.
### How was this patch tested?
New `ParquetIOSuite` test that writes an INT64 + DECIMAL(9, 2) column via
parquet-mr's low-level writer with mix-cardinality data (4-value pool +
unique-per-row) to force the dictionary -> PLAIN fallback. Without the fix it
reproduces the NPE above; with the fix it passes. Full `ParquetIOSuite` is
green locally.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #55896 from LuciferYang/SPARK-56872-branch-4.1.
Authored-by: YangJie <[email protected]>
Signed-off-by: Peter Toth <[email protected]>
---
.../parquet/ParquetVectorUpdaterFactory.java | 4 ++-
.../datasources/parquet/ParquetIOSuite.scala | 40 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 4f90f878da86..eb80f08b34bf 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -708,7 +708,9 @@ public class ParquetVectorUpdaterFactory {
WritableColumnVector values,
WritableColumnVector dictionaryIds,
Dictionary dictionary) {
- values.putLong(offset,
dictionary.decodeToLong(dictionaryIds.getDictId(offset)));
+ // 32-bit Decimal target (precision <= 9) is stored in `intData`;
`longData` is
+ // unallocated, so use `putInt` with the same narrowing cast as
`readValue`/`readValues`.
+ values.putInt(offset, (int)
dictionary.decodeToLong(dictionaryIds.getDictId(offset)));
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index c0aa3dd22d44..6ba790deddff 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1877,6 +1877,46 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
}
}
+
+ test("SPARK-56872: INT64 DECIMAL into 32-bit Decimal column with dictionary
fallback") {
+ // `DowncastLongUpdater.decodeSingleDictionaryId` only runs when the
vectorized reader has
+ // to eagerly drain buffered dictionary IDs, which happens when parquet-mr
writes the
+ // column as a mix of dictionary-encoded and PLAIN pages. The
mix-cardinality values below
+ // (4-value pool + unique-per-row) force that fallback; uniformly low- or
high-cardinality
+ // data bypasses the path. INT64 DECIMAL(p<=9) is built via parquet-mr's
low-level writer
+ // because Spark's own writer emits INT32 for that case.
+ val schema = MessageTypeParser.parseMessageType(
+ """message root {
+ | required int64 v (DECIMAL(9, 2));
+ |}""".stripMargin)
+ def unscaledAt(i: Int): Long = i % 5 match {
+ case 0 => -999_999_999L
+ case 1 => -1L
+ case 2 => 0L
+ case 3 => 999_999_999L
+ case _ => i.toLong * 13L - 7L
+ }
+ withTempDir { dir =>
+ val tablePath = new Path(s"${dir.getCanonicalPath}/dec.parquet")
+ val writer = createParquetWriter(schema, tablePath, dictionaryEnabled =
true)
+ val numRecords = 5000
+ (0 until numRecords).foreach { i =>
+ val record = new SimpleGroup(schema)
+ record.add(0, unscaledAt(i))
+ writer.write(record)
+ }
+ writer.close()
+
+ withAllParquetReaders {
+ val readSchema = new StructType().add("v", DecimalType(9, 2), nullable
= false)
+ val df = spark.read.schema(readSchema).parquet(tablePath.toString)
+ val expected = (0 until numRecords).map { i =>
+ Row(java.math.BigDecimal.valueOf(unscaledAt(i), 2))
+ }
+ checkAnswer(df, expected)
+ }
+ }
+ }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context:
TaskAttemptContext)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]