This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0c6d7fd277ba [SPARK-52651][SQL] Handle User Defined Type in Nested ColumnVector 0c6d7fd277ba is described below commit 0c6d7fd277ba39899795835ca864402405724ad4 Author: Kent Yao <y...@apache.org> AuthorDate: Thu Jul 3 09:48:25 2025 +0800 [SPARK-52651][SQL] Handle User Defined Type in Nested ColumnVector ### What changes were proposed in this pull request? When I read a map column with a UDT nested, I encountered: ``` Caused by: java.lang.IllegalArgumentException: Spark type: ... doesn't match the type: ... in column vector at org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:80) at org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:139) ``` This PR adds a recursive loop to omit the UDT ### Why are the changes needed? Add UDT missing features ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New Tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #51349 from yaooqinn/SPARK-52651. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../apache/spark/sql/vectorized/ColumnVector.java | 23 +++++++++++++++----- .../parquet/ParquetSchemaConverter.scala | 3 +-- .../execution/vectorized/ColumnVectorSuite.scala | 25 +++++++++++++++++++++- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java index 54b62c00283f..f1d1f5b3ea80 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.vectorized; +import scala.PartialFunction; + import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; @@ -336,10 +338,21 @@ public abstract class ColumnVector implements AutoCloseable { * Sets up the data type of this column vector. */ protected ColumnVector(DataType type) { - if (type instanceof UserDefinedType) { - this.type = ((UserDefinedType) type).sqlType(); - } else { - this.type = type; - } + this.type = type.transformRecursively( + new PartialFunction<DataType, DataType>() { + @Override + public boolean isDefinedAt(DataType x) { + return x instanceof UserDefinedType<?>; + } + + @Override + public DataType apply(DataType t) { + if (t instanceof UserDefinedType<?> udt) { + return udt.sqlType(); + } else { + return t; + } + } + }); } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index e05d5fe2fd88..16bd776bea0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -196,8 +196,7 @@ class ParquetToSparkSchemaConverter( field: ColumnIO, sparkReadType: Option[DataType] = None): ParquetColumn = { val targetType = sparkReadType.map { - case udt: UserDefinedType[_] => udt.sqlType - case otherType => otherType + _.transformRecursively { case t: UserDefinedType[_] => t.sqlType } } field match { case primitiveColumn: PrimitiveColumnIO => convertPrimitiveField(primitiveColumn, targetType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 0edbfd10d8cd..a0fe44b96e7d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.vectorized import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.YearUDT import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.execution.columnar.{ColumnAccessor, ColumnDictionary} @@ -926,5 +927,27 @@ class ColumnVectorSuite extends SparkFunSuite with SQLHelper { } } } -} + val yearUDT = new YearUDT + testVectors("user defined type", 10, yearUDT) { testVector => + assert(testVector.dataType() === IntegerType) + (0 until 10).foreach { i => + testVector.appendInt(i) + } + } + + testVectors("user defined type in map type", + 10, MapType(IntegerType, yearUDT)) { testVector => + assert(testVector.dataType() === MapType(IntegerType, IntegerType)) + } + + testVectors("user defined type in array type", + 10, ArrayType(yearUDT, containsNull = true)) { testVector => + assert(testVector.dataType() === ArrayType(IntegerType, containsNull = true)) + } + + testVectors("user defined type in struct type", + 10, StructType(Seq(StructField("year", yearUDT)))) { testVector => + assert(testVector.dataType() === StructType(Seq(StructField("year", IntegerType)))) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org