This is an automated email from the ASF dual-hosted git repository.

peter-toth pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 384299acb47e [SPARK-56872][SQL][4.1] Fix NPE in 
DowncastLongUpdater.decodeSingleDictionaryId
384299acb47e is described below

commit 384299acb47e9829a87ce9e5c793fbfb315cb90d
Author: YangJie <[email protected]>
AuthorDate: Fri May 15 18:27:06 2026 +0200

    [SPARK-56872][SQL][4.1] Fix NPE in 
DowncastLongUpdater.decodeSingleDictionaryId
    
    ### What changes were proposed in this pull request?
    
    `DowncastLongUpdater.decodeSingleDictionaryId` calls `values.putLong(...)`, 
but `DowncastLongUpdater` is only chosen when the target is a 32-bit Decimal 
(precision <= 9), whose column vector stores into `intData`, not `longData`. So 
`putLong` NPEs whenever this path runs.
    
    Switch the call to `putInt` with the same `(int) longValue` narrowing cast 
already used by `readValue` and `readValues`.
    
    ### Why are the changes needed?
    
    The bug has been latent since SPARK-35640 (Jun 2021) because the path is 
only reachable when all three conditions hold:
    
    1. Parquet stores the column as INT64 + DECIMAL(p<=9). Spark's own writer 
emits INT32 for this case, so the file must come from another writer (Hive, 
Impala, ...).
    2. Spark reads it as a Decimal with precision <= 9.
    3. The vectorized reader has to eagerly drain buffered dictionary IDs — 
typically when parquet-mr writes the column as a mix of dictionary-encoded and 
PLAIN pages and a non-dict page follows a dict page in the same batch. The 
normal lazy-dictionary path decodes at row read time via `ParquetDictionary` 
and never touches this updater method.
    
    Without the fix, the new regression test fails with:
    
    ```
    Cause: java.lang.NullPointerException:
      at 
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLong(OnHeapColumnVector.java:393)
      at 
org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$DowncastLongUpdater.decodeSingleDictionaryId(ParquetVectorUpdaterFactory.java:713)
      at 
org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdater.decodeDictionaryIds(ParquetVectorUpdater.java:75)
      at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:288)
      at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:406)
      ...
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Reads that previously NPE'd now return correct values.
    
    ### How was this patch tested?
    
    New `ParquetIOSuite` test that writes an INT64 + DECIMAL(9, 2) column via 
parquet-mr's low-level writer with mix-cardinality data (4-value pool + 
unique-per-row) to force the dictionary -> PLAIN fallback. Without the fix it 
reproduces the NPE above; with the fix it passes. Full `ParquetIOSuite` is 
green locally.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #55896 from LuciferYang/SPARK-56872-branch-4.1.
    
    Authored-by: YangJie <[email protected]>
    Signed-off-by: Peter Toth <[email protected]>
---
 .../parquet/ParquetVectorUpdaterFactory.java       |  4 ++-
 .../datasources/parquet/ParquetIOSuite.scala       | 40 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 4f90f878da86..eb80f08b34bf 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -708,7 +708,9 @@ public class ParquetVectorUpdaterFactory {
         WritableColumnVector values,
         WritableColumnVector dictionaryIds,
         Dictionary dictionary) {
-      values.putLong(offset, 
dictionary.decodeToLong(dictionaryIds.getDictId(offset)));
+      // 32-bit Decimal target (precision <= 9) is stored in `intData`; 
`longData` is
+      // unallocated, so use `putInt` with the same narrowing cast as 
`readValue`/`readValues`.
+      values.putInt(offset, (int) 
dictionary.decodeToLong(dictionaryIds.getDictId(offset)));
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index c0aa3dd22d44..6ba790deddff 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1877,6 +1877,46 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
       }
     }
   }
+
+  test("SPARK-56872: INT64 DECIMAL into 32-bit Decimal column with dictionary 
fallback") {
+    // `DowncastLongUpdater.decodeSingleDictionaryId` only runs when the 
vectorized reader has
+    // to eagerly drain buffered dictionary IDs, which happens when parquet-mr 
writes the
+    // column as a mix of dictionary-encoded and PLAIN pages. The 
mix-cardinality values below
+    // (4-value pool + unique-per-row) force that fallback; uniformly low- or 
high-cardinality
+    // data bypasses the path. INT64 DECIMAL(p<=9) is built via parquet-mr's 
low-level writer
+    // because Spark's own writer emits INT32 for that case.
+    val schema = MessageTypeParser.parseMessageType(
+      """message root {
+        |  required int64 v (DECIMAL(9, 2));
+        |}""".stripMargin)
+    def unscaledAt(i: Int): Long = i % 5 match {
+      case 0 => -999_999_999L
+      case 1 => -1L
+      case 2 => 0L
+      case 3 => 999_999_999L
+      case _ => i.toLong * 13L - 7L
+    }
+    withTempDir { dir =>
+      val tablePath = new Path(s"${dir.getCanonicalPath}/dec.parquet")
+      val writer = createParquetWriter(schema, tablePath, dictionaryEnabled = 
true)
+      val numRecords = 5000
+      (0 until numRecords).foreach { i =>
+        val record = new SimpleGroup(schema)
+        record.add(0, unscaledAt(i))
+        writer.write(record)
+      }
+      writer.close()
+
+      withAllParquetReaders {
+        val readSchema = new StructType().add("v", DecimalType(9, 2), nullable 
= false)
+        val df = spark.read.schema(readSchema).parquet(tablePath.toString)
+        val expected = (0 until numRecords).map { i =>
+          Row(java.math.BigDecimal.valueOf(unscaledAt(i), 2))
+        }
+        checkAnswer(df, expected)
+      }
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to