This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f7d56e2827c [SPARK-45827][SQL] Fix variant parquet reader
f7d56e2827c is described below
commit f7d56e2827c4c04c065c0cf04f23084f3f8594ad
Author: Chenhao Li <[email protected]>
AuthorDate: Thu Nov 16 11:23:11 2023 +0800
[SPARK-45827][SQL] Fix variant parquet reader
## What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/43707. The
previous PR missed a piece in the variant parquet reader: we are treating the
variant type as `struct<value binary, metadata binary>`, so it also needs a
similar `assembleStruct` process in the Parquet reader to correctly set the
nullness of variant values from def/rep levels.
## How was this patch tested?
Extend the existing unit test. It would fail without the change.
Closes #43825 from chenhao-db/fix_variant_parquet_reader.
Authored-by: Chenhao Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/datasources/parquet/ParquetColumnVector.java | 3 ++-
sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala | 7 +++++++
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
index f00b5b3a88b..5198096fe01 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.VariantType;
/**
* Contains necessary information representing a Parquet column, either of
primitive or nested type.
@@ -175,7 +176,7 @@ final class ParquetColumnVector {
child.assemble();
}
assembleCollection();
- } else if (type instanceof StructType) {
+ } else if (type instanceof StructType || type instanceof VariantType) {
for (ParquetColumnVector child : children) {
child.assemble();
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
index dde986c555b..58e0d7eeef3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
@@ -73,5 +73,12 @@ class VariantSuite extends QueryTest with SharedSparkSession
{
values.map(v => if (v == null) "null" else v.debugString()).sorted
}
assert(prepareAnswer(input) == prepareAnswer(result))
+
+ withTempDir { dir =>
+ val tempDir = new File(dir, "files").getCanonicalPath
+ df.write.parquet(tempDir)
+ val readResult =
spark.read.parquet(tempDir).collect().map(_.get(0).asInstanceOf[VariantVal])
+ assert(prepareAnswer(input) == prepareAnswer(readResult))
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]