This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new da9e3fc68f6c [SPARK-55897][SQL][4.1] Handle UserDefinedType in 
ColumnarRow, ColumnarBatchRow, and ColumnarArray get()
da9e3fc68f6c is described below

commit da9e3fc68f6c8423064a98f591a0f002f5fc1879
Author: jameswillis <[email protected]>
AuthorDate: Fri May 22 21:32:31 2026 -0700

    [SPARK-55897][SQL][4.1] Handle UserDefinedType in ColumnarRow, 
ColumnarBatchRow, and ColumnarArray get()
    
    Backport of #54701 to branch-4.1.
    
    ### What changes were proposed in this pull request?
    
    `ColumnarRow.get()`, `ColumnarBatchRow.get()`, and `ColumnarArray.get()` 
throw `SparkUnsupportedOperationException` when called with a `UserDefinedType` 
because they have no branch to handle UDTs.
    
    This PR adds UDT handling to all three methods:
    - **ColumnarRow** and **ColumnarBatchRow**: Add an `instanceof 
UserDefinedType` branch that recurses with `udt.sqlType()`, matching the 
pattern already used in `SpecializedGettersReader.read()`.
    - **ColumnarArray**: Change the `handleUserDefinedType` flag from `false` 
to `true` in the existing call to `SpecializedGettersReader.read()`.
    
    ### Why are the changes needed?
    
    The codegen path (`CodeGenerator.getValue()`) unwraps `udt.sqlType()` 
before generating accessor calls, so UDT columns work when whole-stage codegen 
is active. However, on the interpreted eval path — when codegen is disabled, 
falls back, or the number of fields exceeds `spark.sql.codegen.maxFields` — 
`GetStructField.nullSafeEval` calls `ColumnarRow.get(ordinal, udtType)` 
directly, which hits the unhandled branch and throws.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. UDT columns in columnar data sources (e.g., Parquet) now work 
correctly on the interpreted evaluation path. Previously they would throw 
`SparkUnsupportedOperationException`.
    
    ### How was this patch tested?
    
    Added 6 new tests in `ColumnarBatchSuite` covering all 3 methods x 2 UDT 
backing types (primitive `IntegerType` and complex `StructType`). Each test 
creates columnar vectors with UDT data and verifies that `get()` returns the 
correct value. Two helper UDT classes (`TestIntUDT`, `TestStructWrapperUDT`) 
are defined for the tests.
    
    Cherry-picked cleanly from 472735cefef on master; no conflicts.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes. Opus 4.6
    
    Closes #55989 from james-willis/backport-SPARK-55897-4.1.
    
    Authored-by: jameswillis <[email protected]>
    Signed-off-by: Huaxin Gao <[email protected]>
---
 .../apache/spark/sql/vectorized/ColumnarArray.java |   2 +-
 .../spark/sql/vectorized/ColumnarBatchRow.java     |   2 +
 .../apache/spark/sql/vectorized/ColumnarRow.java   |   2 +
 .../execution/vectorized/ColumnarBatchSuite.scala  | 121 +++++++++++++++++++++
 4 files changed, 126 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
index fad1817aca19..861a6a4c50e4 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
@@ -213,7 +213,7 @@ public final class ColumnarArray extends ArrayData {
 
   @Override
   public Object get(int ordinal, DataType dataType) {
-    return SpecializedGettersReader.read(this, ordinal, dataType, false, 
false);
+    return SpecializedGettersReader.read(this, ordinal, dataType, false, true);
   }
 
   @Override
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
index 3d1e780f6e05..42b335dfd2bc 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
@@ -215,6 +215,8 @@ public final class ColumnarBatchRow extends InternalRow {
       return getMap(ordinal);
     } else if (dataType instanceof VariantType) {
       return getVariant(ordinal);
+    } else if (dataType instanceof UserDefinedType<?> udt) {
+      return get(ordinal, udt.sqlType());
     } else {
       throw new SparkUnsupportedOperationException(
         "_LEGACY_ERROR_TEMP_3152", Map.of("dataType", 
String.valueOf(dataType)));
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index 656c5f8a8f30..d66baa8fd8fe 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -217,6 +217,8 @@ public final class ColumnarRow extends InternalRow {
       return getMap(ordinal);
     } else if (dataType instanceof VariantType) {
       return getVariant(ordinal);
+    } else if (dataType instanceof UserDefinedType<?> udt) {
+      return get(ordinal, udt.sqlType());
     } else {
       throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3155");
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 6d90bb985e26..93b3ea67d6bc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -48,6 +48,38 @@ import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
 import org.apache.spark.util.ArrayImplicits._
 
+/**
+ * A minimal UDT backed by IntegerType, used by SPARK-55897 tests.
+ */
+@SQLUserDefinedType(udt = classOf[TestIntUDT])
+private case class TestIntWrapper(value: Int)
+
+private class TestIntUDT extends UserDefinedType[TestIntWrapper] {
+  override def sqlType: DataType = IntegerType
+  override def serialize(obj: TestIntWrapper): Any = obj.value
+  override def userClass: Class[TestIntWrapper] = classOf[TestIntWrapper]
+  override def deserialize(datum: Any): TestIntWrapper = datum match {
+    case v: Int => TestIntWrapper(v)
+  }
+}
+
+/**
+ * A minimal UDT backed by StructType, used by SPARK-55897 tests.
+ */
+@SQLUserDefinedType(udt = classOf[TestStructWrapperUDT])
+private case class TestStructWrapper(x: Int, y: Long)
+
+private class TestStructWrapperUDT extends UserDefinedType[TestStructWrapper] {
+  override def sqlType: DataType = new StructType()
+    .add("x", IntegerType)
+    .add("y", LongType)
+  override def serialize(obj: TestStructWrapper): Any = InternalRow(obj.x, 
obj.y)
+  override def userClass: Class[TestStructWrapper] = classOf[TestStructWrapper]
+  override def deserialize(datum: Any): TestStructWrapper = datum match {
+    case row: InternalRow => TestStructWrapper(row.getInt(0), row.getLong(1))
+  }
+}
+
 @ExtendedSQLTest
 class ColumnarBatchSuite extends SparkFunSuite {
 
@@ -2060,4 +2092,93 @@ class ColumnarBatchSuite extends SparkFunSuite {
         }
       }
   }
+
+  testVector(
+    "SPARK-55897: ColumnarRow.get with primitive-backed UDT",
+    10,
+    new StructType().add("name", StringType).add("udt_field", IntegerType)) { 
column =>
+      column.getChild(0).putByteArray(0, "hello".getBytes)
+      column.getChild(1).putInt(0, 42)
+
+      val row = column.getStruct(0)
+      assert(row.get(1, new TestIntUDT()) === 42)
+  }
+
+  testVector(
+    "SPARK-55897: ColumnarRow.get with struct-backed UDT",
+    10,
+    new StructType()
+      .add("id", IntegerType)
+      .add("nested", new StructType().add("x", IntegerType).add("y", 
LongType))) { column =>
+      column.getChild(0).putInt(0, 1)
+      column.getChild(1).getChild(0).putInt(0, 10)
+      column.getChild(1).getChild(1).putLong(0, 20L)
+
+      val row = column.getStruct(0)
+      val nested = row.get(1, new 
TestStructWrapperUDT()).asInstanceOf[InternalRow]
+      assert(nested.getInt(0) === 10)
+      assert(nested.getLong(1) === 20L)
+  }
+
+  testVector(
+    "SPARK-55897: ColumnarArray.get with primitive-backed UDT",
+    10,
+    new ArrayType(IntegerType, false)) { column =>
+      val data = column.arrayData()
+      data.putInt(0, 10)
+      data.putInt(1, 20)
+      column.putArray(0, 0, 2)
+
+      val arr = column.getArray(0)
+      assert(arr.get(0, new TestIntUDT()) === 10)
+      assert(arr.get(1, new TestIntUDT()) === 20)
+  }
+
+  testVector(
+    "SPARK-55897: ColumnarArray.get with struct-backed UDT",
+    10,
+    new ArrayType(new StructType().add("x", IntegerType).add("y", LongType), 
false)) { column =>
+      val data = column.arrayData()
+      data.getChild(0).putInt(0, 100)
+      data.getChild(1).putLong(0, 200L)
+      column.putArray(0, 0, 1)
+
+      val arr = column.getArray(0)
+      val row = arr.get(0, new 
TestStructWrapperUDT()).asInstanceOf[InternalRow]
+      assert(row.getInt(0) === 100)
+      assert(row.getLong(1) === 200L)
+  }
+
+  test("SPARK-55897: ColumnarBatchRow.get with primitive-backed UDT") {
+    Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { memMode =>
+      val col = allocate(10, IntegerType, memMode)
+      try {
+        col.putInt(0, 99)
+        val batchRow = new ColumnarBatchRow(Array(col))
+        batchRow.rowId = 0
+        assert(batchRow.get(0, new TestIntUDT()) === 99)
+      } finally {
+        col.close()
+      }
+    }
+  }
+
+  test("SPARK-55897: ColumnarBatchRow.get with struct-backed UDT") {
+    Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { memMode =>
+      val col = allocate(10,
+        new StructType().add("x", IntegerType).add("y", LongType), memMode)
+      try {
+        col.getChild(0).putInt(0, 5)
+        col.getChild(1).putLong(0, 15L)
+        val batchRow = new ColumnarBatchRow(Array(col))
+        batchRow.rowId = 0
+
+        val row = batchRow.get(0, new 
TestStructWrapperUDT()).asInstanceOf[InternalRow]
+        assert(row.getInt(0) === 5)
+        assert(row.getLong(1) === 15L)
+      } finally {
+        col.close()
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to