This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 815039fda040 [SPARK-52240] Corrected row index usage when exploding
packed arrays in vectorized reader
815039fda040 is described below
commit 815039fda040b5436175066c2623c195ed449a41
Author: Daniel Spiewak <[email protected]>
AuthorDate: Sun May 25 09:47:25 2025 -0700
[SPARK-52240] Corrected row index usage when exploding packed arrays in
vectorized reader
This PR fixes an issue in the vectorized parquet reader with respect to
executing the `explode` function on nested arrays where the array cuts across
two or more pages. It's probably possible to minimize this slightly more but I
wasn't able to find a reproducer. It's also worth noting that this issue
illustrates a current gap in the lower-level unit tests for the vectorized
reader, which don't appear to test much related to output vector offsets.
The bug in question was a simple typo: the output row offset was used to
dereference nested array lengths rather than input row offset. This only
matters for the explode function and then only when resuming the same operation
on a second page. This case (and all related cases) are, at present, untested.
I added a high-level test and example `.parquet` file which reproduces the
issue and verifies the fix, but it would be ideal if more tests were added at a
lower level. It is very likel [...]
### What changes were proposed in this pull request?
It's a fairly straightforward typo issue in the code.
### Why are the changes needed?
The vectorized parquet reader does not correctly handle this case
### Does this PR introduce _any_ user-facing change?
Aside from fixing the vectorized reader? No.
### How was this patch tested?
Unit test (well, more of an integration test) included in PR
### Was this patch authored or co-authored using generative AI tooling?
Nope
Closes #46928 from djspiewak/bug/packed-list-vectorized.
Authored-by: Daniel Spiewak <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
(cherry picked from commit 12e32d65a4c255a50149269006e6e456498ee1dd)
Signed-off-by: Chao Sun <[email protected]>
---
.../parquet/VectorizedDeltaLengthByteArrayReader.java | 2 +-
.../resources/test-data/packed-list-vectorized.parquet | Bin 0 -> 1574 bytes
.../execution/datasources/parquet/ParquetIOSuite.scala | 10 ++++++++++
3 files changed, 11 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
index ac5b8527f5e1..9be867d61900 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java
@@ -56,7 +56,7 @@ public class VectorizedDeltaLengthByteArrayReader extends
VectorizedReaderBase i
ByteBufferOutputWriter outputWriter =
ByteBufferOutputWriter::writeArrayByteBuffer;
int length;
for (int i = 0; i < total; i++) {
- length = lengthsVector.getInt(rowId + i);
+ length = lengthsVector.getInt(currentRow + i);
try {
buffer = in.slice(length);
} catch (EOFException e) {
diff --git
a/sql/core/src/test/resources/test-data/packed-list-vectorized.parquet
b/sql/core/src/test/resources/test-data/packed-list-vectorized.parquet
new file mode 100644
index 000000000000..78f4a839e4ca
Binary files /dev/null and
b/sql/core/src/test/resources/test-data/packed-list-vectorized.parquet differ
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 4f8a9e397166..8ed9ef1630eb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -1306,6 +1306,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
}
+ test("explode nested lists crossing a rowgroup boundary") {
+ withAllParquetReaders {
+ checkAnswer(
+ readResourceParquetFile("test-data/packed-list-vectorized.parquet")
+ .selectExpr("explode(DIStatus.command_status.actions_status)")
+ .selectExpr("col.result"),
+ List.fill(4992)(Row("SUCCESS")))
+ }
+ }
+
test("read dictionary encoded decimals written as INT64") {
withAllParquetReaders {
checkAnswer(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]