This is an automated email from the ASF dual-hosted git repository.
asf-gitbox-commits pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 43793c38540c [SPARK-55897][SQL][4.0] Handle UserDefinedType in
ColumnarRow, ColumnarBatchRow, and ColumnarArray get()
43793c38540c is described below
commit 43793c38540cb3e1531c8b14c6f45acb3fd2bb75
Author: jameswillis <[email protected]>
AuthorDate: Fri May 22 21:38:30 2026 -0700
[SPARK-55897][SQL][4.0] Handle UserDefinedType in ColumnarRow,
ColumnarBatchRow, and ColumnarArray get()
Backport of #54701 to branch-4.0.
### What changes were proposed in this pull request?
`ColumnarRow.get()`, `ColumnarBatchRow.get()`, and `ColumnarArray.get()`
throw `SparkUnsupportedOperationException` when called with a `UserDefinedType`
because they have no branch to handle UDTs.
This PR adds UDT handling to all three methods:
- **ColumnarRow** and **ColumnarBatchRow**: Add an `instanceof
UserDefinedType` branch that recurses with `udt.sqlType()`, matching the
pattern already used in `SpecializedGettersReader.read()`.
- **ColumnarArray**: Change the `handleUserDefinedType` flag from `false`
to `true` in the existing call to `SpecializedGettersReader.read()`.
### Why are the changes needed?
The codegen path (`CodeGenerator.getValue()`) unwraps `udt.sqlType()`
before generating accessor calls, so UDT columns work when whole-stage codegen
is active. However, on the interpreted eval path — when codegen is disabled,
falls back, or the number of fields exceeds `spark.sql.codegen.maxFields` —
`GetStructField.nullSafeEval` calls `ColumnarRow.get(ordinal, udtType)`
directly, which hits the unhandled branch and throws.
### Does this PR introduce _any_ user-facing change?
Yes. UDT columns in columnar data sources (e.g., Parquet) now work
correctly on the interpreted evaluation path. Previously they would throw
`SparkUnsupportedOperationException`.
### How was this patch tested?
Added 6 new tests in `ColumnarBatchSuite` covering all 3 methods x 2 UDT
backing types (primitive `IntegerType` and complex `StructType`). Each test
creates columnar vectors with UDT data and verifies that `get()` returns the
correct value. Two helper UDT classes (`TestIntUDT`, `TestStructWrapperUDT`)
are defined for the tests.
Cherry-picked from 472735cefef on master. The cherry-pick had a trivial
conflict in `ColumnarBatchSuite.scala`: the neighboring `[SPARK-55552] Variant`
test exists on branch-4.1+ but not on branch-4.0, so its insertion point was
contested. Resolved by keeping only the SPARK-55897 tests (the Variant test is
unrelated).
### Was this patch authored or co-authored using generative AI tooling?
Yes. Opus 4.6
Closes #55990 from james-willis/backport-SPARK-55897-4.0.
Authored-by: jameswillis <[email protected]>
Signed-off-by: Huaxin Gao <[email protected]>
---
.../apache/spark/sql/vectorized/ColumnarArray.java | 2 +-
.../spark/sql/vectorized/ColumnarBatchRow.java | 2 +
.../apache/spark/sql/vectorized/ColumnarRow.java | 2 +
.../execution/vectorized/ColumnarBatchSuite.scala | 121 +++++++++++++++++++++
4 files changed, 126 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
index 12a2879794b1..0db9c0073b1c 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java
@@ -201,7 +201,7 @@ public final class ColumnarArray extends ArrayData {
@Override
public Object get(int ordinal, DataType dataType) {
- return SpecializedGettersReader.read(this, ordinal, dataType, false,
false);
+ return SpecializedGettersReader.read(this, ordinal, dataType, false, true);
}
@Override
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
index d05b3e2dc2d9..957d502380e9 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
@@ -193,6 +193,8 @@ public final class ColumnarBatchRow extends InternalRow {
return getMap(ordinal);
} else if (dataType instanceof VariantType) {
return getVariant(ordinal);
+ } else if (dataType instanceof UserDefinedType<?> udt) {
+ return get(ordinal, udt.sqlType());
} else {
throw new SparkUnsupportedOperationException(
"_LEGACY_ERROR_TEMP_3152", Map.of("dataType",
String.valueOf(dataType)));
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
index b14cd3429e47..f207facd0e42 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarRow.java
@@ -199,6 +199,8 @@ public final class ColumnarRow extends InternalRow {
return getMap(ordinal);
} else if (dataType instanceof VariantType) {
return getVariant(ordinal);
+ } else if (dataType instanceof UserDefinedType<?> udt) {
+ return get(ordinal, udt.sqlType());
} else {
throw new SparkUnsupportedOperationException("_LEGACY_ERROR_TEMP_3155");
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 97ad2c1f5bf9..565d99d2903d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -48,6 +48,38 @@ import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String, VariantVal}
import org.apache.spark.util.ArrayImplicits._
+/**
+ * A minimal UDT backed by IntegerType, used by SPARK-55897 tests.
+ */
+@SQLUserDefinedType(udt = classOf[TestIntUDT])
+private case class TestIntWrapper(value: Int)
+
+private class TestIntUDT extends UserDefinedType[TestIntWrapper] {
+ override def sqlType: DataType = IntegerType
+ override def serialize(obj: TestIntWrapper): Any = obj.value
+ override def userClass: Class[TestIntWrapper] = classOf[TestIntWrapper]
+ override def deserialize(datum: Any): TestIntWrapper = datum match {
+ case v: Int => TestIntWrapper(v)
+ }
+}
+
+/**
+ * A minimal UDT backed by StructType, used by SPARK-55897 tests.
+ */
+@SQLUserDefinedType(udt = classOf[TestStructWrapperUDT])
+private case class TestStructWrapper(x: Int, y: Long)
+
+private class TestStructWrapperUDT extends UserDefinedType[TestStructWrapper] {
+ override def sqlType: DataType = new StructType()
+ .add("x", IntegerType)
+ .add("y", LongType)
+ override def serialize(obj: TestStructWrapper): Any = InternalRow(obj.x,
obj.y)
+ override def userClass: Class[TestStructWrapper] = classOf[TestStructWrapper]
+ override def deserialize(datum: Any): TestStructWrapper = datum match {
+ case row: InternalRow => TestStructWrapper(row.getInt(0), row.getLong(1))
+ }
+}
+
@ExtendedSQLTest
class ColumnarBatchSuite extends SparkFunSuite {
@@ -2025,4 +2057,93 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}
}
+
+ testVector(
+ "SPARK-55897: ColumnarRow.get with primitive-backed UDT",
+ 10,
+ new StructType().add("name", StringType).add("udt_field", IntegerType)) {
column =>
+ column.getChild(0).putByteArray(0, "hello".getBytes)
+ column.getChild(1).putInt(0, 42)
+
+ val row = column.getStruct(0)
+ assert(row.get(1, new TestIntUDT()) === 42)
+ }
+
+ testVector(
+ "SPARK-55897: ColumnarRow.get with struct-backed UDT",
+ 10,
+ new StructType()
+ .add("id", IntegerType)
+ .add("nested", new StructType().add("x", IntegerType).add("y",
LongType))) { column =>
+ column.getChild(0).putInt(0, 1)
+ column.getChild(1).getChild(0).putInt(0, 10)
+ column.getChild(1).getChild(1).putLong(0, 20L)
+
+ val row = column.getStruct(0)
+ val nested = row.get(1, new
TestStructWrapperUDT()).asInstanceOf[InternalRow]
+ assert(nested.getInt(0) === 10)
+ assert(nested.getLong(1) === 20L)
+ }
+
+ testVector(
+ "SPARK-55897: ColumnarArray.get with primitive-backed UDT",
+ 10,
+ new ArrayType(IntegerType, false)) { column =>
+ val data = column.arrayData()
+ data.putInt(0, 10)
+ data.putInt(1, 20)
+ column.putArray(0, 0, 2)
+
+ val arr = column.getArray(0)
+ assert(arr.get(0, new TestIntUDT()) === 10)
+ assert(arr.get(1, new TestIntUDT()) === 20)
+ }
+
+ testVector(
+ "SPARK-55897: ColumnarArray.get with struct-backed UDT",
+ 10,
+ new ArrayType(new StructType().add("x", IntegerType).add("y", LongType),
false)) { column =>
+ val data = column.arrayData()
+ data.getChild(0).putInt(0, 100)
+ data.getChild(1).putLong(0, 200L)
+ column.putArray(0, 0, 1)
+
+ val arr = column.getArray(0)
+ val row = arr.get(0, new
TestStructWrapperUDT()).asInstanceOf[InternalRow]
+ assert(row.getInt(0) === 100)
+ assert(row.getLong(1) === 200L)
+ }
+
+ test("SPARK-55897: ColumnarBatchRow.get with primitive-backed UDT") {
+ Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { memMode =>
+ val col = allocate(10, IntegerType, memMode)
+ try {
+ col.putInt(0, 99)
+ val batchRow = new ColumnarBatchRow(Array(col))
+ batchRow.rowId = 0
+ assert(batchRow.get(0, new TestIntUDT()) === 99)
+ } finally {
+ col.close()
+ }
+ }
+ }
+
+ test("SPARK-55897: ColumnarBatchRow.get with struct-backed UDT") {
+ Seq(MemoryMode.ON_HEAP, MemoryMode.OFF_HEAP).foreach { memMode =>
+ val col = allocate(10,
+ new StructType().add("x", IntegerType).add("y", LongType), memMode)
+ try {
+ col.getChild(0).putInt(0, 5)
+ col.getChild(1).putLong(0, 15L)
+ val batchRow = new ColumnarBatchRow(Array(col))
+ batchRow.rowId = 0
+
+ val row = batchRow.get(0, new
TestStructWrapperUDT()).asInstanceOf[InternalRow]
+ assert(row.getInt(0) === 5)
+ assert(row.getLong(1) === 15L)
+ } finally {
+ col.close()
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]