This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 815039fda040 [SPARK-52240] Corrected row index usage when exploding 
packed arrays in vectorized reader
815039fda040 is described below

commit 815039fda040b5436175066c2623c195ed449a41
Author: Daniel Spiewak <[email protected]>
AuthorDate: Sun May 25 09:47:25 2025 -0700

    [SPARK-52240] Corrected row index usage when exploding packed arrays in 
vectorized reader
    
    This PR fixes an issue in the vectorized parquet reader with respect to 
executing the `explode` function on nested arrays where the array cuts across 
two or more pages. It's probably possible to minimize this slightly more but I 
wasn't able to find a reproducer. It's also worth noting that this issue 
illustrates a current gap in the lower-level unit tests for the vectorized 
reader, which don't appear to test much related to output vector offsets.
    
    The bug in question was a simple typo: the output row offset was used to 
dereference nested array lengths rather than input row offset. This only 
matters for the explode function and then only when resuming the same operation 
on a second page. This case (and all related cases) are, at present, untested. 
I added a high-level test and example `.parquet` file which reproduces the 
issue and verifies the fix, but it would be ideal if more tests were added at a 
lower level. It is very likel [...]
    
    ### What changes were proposed in this pull request?
    
    It's a fairly straightforward typo issue in the code.
    
    ### Why are the changes needed?
    
    The vectorized parquet reader does not correctly handle this case
    
    ### Does this PR introduce _any_ user-facing change?
    
    Aside from fixing the vectorized reader? No.
    
    ### How was this patch tested?
    
    Unit test (well, more of an integration test) included in PR
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Nope
    
    Closes #46928 from djspiewak/bug/packed-list-vectorized.
    
    Authored-by: Daniel Spiewak <[email protected]>
    Signed-off-by: Chao Sun <[email protected]>
    (cherry picked from commit 12e32d65a4c255a50149269006e6e456498ee1dd)
    Signed-off-by: Chao Sun <[email protected]>
---
 .../parquet/VectorizedDeltaLengthByteArrayReader.java    |   2 +-
 .../resources/test-data/packed-list-vectorized.parquet   | Bin 0 -> 1574 bytes
 .../execution/datasources/parquet/ParquetIOSuite.scala   |  10 ++++++++++
 3 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
index ac5b8527f5e1..9be867d61900 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
@@ -56,7 +56,7 @@ public class VectorizedDeltaLengthByteArrayReader extends 
VectorizedReaderBase i
     ByteBufferOutputWriter outputWriter = 
ByteBufferOutputWriter::writeArrayByteBuffer;
     int length;
     for (int i = 0; i < total; i++) {
-      length = lengthsVector.getInt(rowId + i);
+      length = lengthsVector.getInt(currentRow + i);
       try {
         buffer = in.slice(length);
       } catch (EOFException e) {
diff --git 
a/sql/core/src/test/resources/test-data/packed-list-vectorized.parquet 
b/sql/core/src/test/resources/test-data/packed-list-vectorized.parquet
new file mode 100644
index 000000000000..78f4a839e4ca
Binary files /dev/null and 
b/sql/core/src/test/resources/test-data/packed-list-vectorized.parquet differ
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4f8a9e397166..8ed9ef1630eb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1306,6 +1306,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
     }
   }
 
+  test("explode nested lists crossing a rowgroup boundary") {
+    withAllParquetReaders {
+      checkAnswer(
+        readResourceParquetFile("test-data/packed-list-vectorized.parquet")
+          .selectExpr("explode(DIStatus.command_status.actions_status)")
+          .selectExpr("col.result"),
+        List.fill(4992)(Row("SUCCESS")))
+    }
+  }
+
   test("read dictionary encoded decimals written as INT64") {
     withAllParquetReaders {
       checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to