This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c6d7fd277ba [SPARK-52651][SQL] Handle User Defined Type in Nested 
ColumnVector
0c6d7fd277ba is described below

commit 0c6d7fd277ba39899795835ca864402405724ad4
Author: Kent Yao <y...@apache.org>
AuthorDate: Thu Jul 3 09:48:25 2025 +0800

    [SPARK-52651][SQL] Handle User Defined Type in Nested ColumnVector
    
    ### What changes were proposed in this pull request?
    
    When I read a map column with a UDT nested, I encountered:
    
    ```
    Caused by: java.lang.IllegalArgumentException: Spark type: ... doesn't 
match the type: ... in column vector
            at 
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:80)
            at 
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:139)
    ```
    
    This PR adds a recursive loop to omit the UDT
    
    ### Why are the changes needed?
    
    Add UDT missing features
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    New Tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #51349 from yaooqinn/SPARK-52651.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../apache/spark/sql/vectorized/ColumnVector.java  | 23 +++++++++++++++-----
 .../parquet/ParquetSchemaConverter.scala           |  3 +--
 .../execution/vectorized/ColumnVectorSuite.scala   | 25 +++++++++++++++++++++-
 3 files changed, 43 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
index 54b62c00283f..f1d1f5b3ea80 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.vectorized;
 
+import scala.PartialFunction;
+
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
@@ -336,10 +338,21 @@ public abstract class ColumnVector implements 
AutoCloseable {
    * Sets up the data type of this column vector.
    */
   protected ColumnVector(DataType type) {
-    if (type instanceof UserDefinedType) {
-      this.type = ((UserDefinedType) type).sqlType();
-    } else {
-      this.type = type;
-    }
+    this.type = type.transformRecursively(
+      new PartialFunction<DataType, DataType>() {
+        @Override
+        public boolean isDefinedAt(DataType x) {
+          return x instanceof UserDefinedType<?>;
+        }
+
+        @Override
+        public DataType apply(DataType t) {
+          if (t instanceof UserDefinedType<?> udt) {
+            return udt.sqlType();
+          } else {
+            return t;
+          }
+        }
+      });
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index e05d5fe2fd88..16bd776bea0c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -196,8 +196,7 @@ class ParquetToSparkSchemaConverter(
       field: ColumnIO,
       sparkReadType: Option[DataType] = None): ParquetColumn = {
     val targetType = sparkReadType.map {
-      case udt: UserDefinedType[_] => udt.sqlType
-      case otherType => otherType
+      _.transformRecursively { case t: UserDefinedType[_] => t.sqlType }
     }
     field match {
       case primitiveColumn: PrimitiveColumnIO => 
convertPrimitiveField(primitiveColumn, targetType)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 0edbfd10d8cd..a0fe44b96e7d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.vectorized
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.YearUDT
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.execution.columnar.{ColumnAccessor, 
ColumnDictionary}
@@ -926,5 +927,27 @@ class ColumnVectorSuite extends SparkFunSuite with 
SQLHelper {
       }
     }
   }
-}
 
+  val yearUDT = new YearUDT
+  testVectors("user defined type", 10, yearUDT) { testVector =>
+    assert(testVector.dataType() === IntegerType)
+    (0 until 10).foreach { i =>
+      testVector.appendInt(i)
+    }
+  }
+
+  testVectors("user defined type in map type",
+    10, MapType(IntegerType, yearUDT)) { testVector =>
+    assert(testVector.dataType() === MapType(IntegerType, IntegerType))
+  }
+
+  testVectors("user defined type in array type",
+    10, ArrayType(yearUDT, containsNull = true)) { testVector =>
+    assert(testVector.dataType() === ArrayType(IntegerType, containsNull = 
true))
+  }
+
+  testVectors("user defined type in struct type",
+    10, StructType(Seq(StructField("year", yearUDT)))) { testVector =>
+    assert(testVector.dataType() === StructType(Seq(StructField("year", 
IntegerType))))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to