This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 4b884f2ea1cf [SPARK-55552][SQL] Add VariantType support to
ColumnarBatchRow.copy() and MutableColumnarRow
4b884f2ea1cf is described below
commit 4b884f2ea1cfc05b21a4f7c94a5849f245b23c26
Author: tugce-applied <[email protected]>
AuthorDate: Tue Feb 17 22:14:48 2026 +0800
[SPARK-55552][SQL] Add VariantType support to ColumnarBatchRow.copy() and
MutableColumnarRow
### What changes were proposed in this pull request?
`ColumnarBatchRow.copy()` and `MutableColumnarRow.copy()`/`get()` do not
handle `VariantType`, causing a `RuntimeException: Not implemented.
VariantType` when using `VariantType` columns with streaming custom data
sources that rely on columnar batch row copying.
PR #53137 (SPARK-54427) added `VariantType` support to `ColumnarRow` but
missed `ColumnarBatchRow` and `MutableColumnarRow`. PR #54006 attempted this
fix but was closed.
This patch adds:
- `PhysicalVariantType` branch in `ColumnarBatchRow.copy()`
- `VariantType` branch in `MutableColumnarRow.copy()` and `get()`
- Test in `ColumnarBatchSuite` validating `VariantVal` round-trip through
`copy()`
### Why are the changes needed?
Without this fix, any streaming pipeline that returns `VariantType` columns
from a custom columnar data source throws a runtime exception when Spark
attempts to copy the batch row.
### Does this PR introduce _any_ user-facing change?
No. This is a bug fix for an existing feature.
### How was this patch tested?
Added a new test `[SPARK-55552] Variant` in `ColumnarBatchSuite` that
creates a `VariantType` column vector, populates it with `VariantVal` data
(including a null), wraps it in a `ColumnarBatchRow`, calls `copy()`, and
verifies the values round-trip correctly.
### Was this patch authored or co-authored using generative AI tooling?
Yes. GitHub Copilot was used to assist in drafting portions of this
contribution.
This contribution is my original work and I license it under the Apache 2.0
license.
Closes #54337 from tugceozberk/fix-columnar-batch-row-variant-copy.
Authored-by: tugce-applied <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit c4188b0e43182e4585ee09cbf3cd00d633ec72e7)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/vectorized/ColumnarBatchRow.java | 2 ++
.../execution/vectorized/MutableColumnarRow.java | 4 +++
.../execution/vectorized/ColumnarBatchSuite.scala | 35 ++++++++++++++++++++++
3 files changed, 41 insertions(+)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
index 4be45dc5d399..3d1e780f6e05 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
@@ -86,6 +86,8 @@ public final class ColumnarBatchRow extends InternalRow {
row.update(i, getArray(i).copy());
} else if (pdt instanceof PhysicalMapType) {
row.update(i, getMap(i).copy());
+ } else if (pdt instanceof PhysicalVariantType) {
+ row.update(i, getVariant(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index 49c27f977562..a46b5143eef6 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -96,6 +96,8 @@ public final class MutableColumnarRow extends InternalRow {
row.update(i, getArray(i).copy());
} else if (dt instanceof MapType) {
row.update(i, getMap(i).copy());
+ } else if (dt instanceof VariantType) {
+ row.update(i, getVariant(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
@@ -217,6 +219,8 @@ public final class MutableColumnarRow extends InternalRow {
return getStruct(ordinal, structType.fields().length);
} else if (dataType instanceof MapType) {
return getMap(ordinal);
+ } else if (dataType instanceof VariantType) {
+ return getVariant(ordinal);
} else {
throw new SparkUnsupportedOperationException(
"_LEGACY_ERROR_TEMP_3192", Map.of("dt", dataType.toString()));
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 97ad2c1f5bf9..6d90bb985e26 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -2025,4 +2025,39 @@ class ColumnarBatchSuite extends SparkFunSuite {
}
}
}
+
+ testVector("[SPARK-55552] Variant", 3, VariantType) {
+ column =>
+ val valueChild = column.getChild(0)
+ val metadataChild = column.getChild(1)
+
+ column.putNotNull(0)
+ valueChild.appendByteArray(Array[Byte](1, 2, 3), 0, 3)
+ metadataChild.appendByteArray(Array[Byte](10, 11), 0, 2)
+
+ column.putNotNull(1)
+ valueChild.appendByteArray(Array[Byte](4, 5), 0, 2)
+ metadataChild.appendByteArray(Array[Byte](12, 13, 14), 0, 3)
+
+ column.putNull(2)
+ valueChild.appendNull()
+ metadataChild.appendNull()
+
+ val batchRow = new ColumnarBatchRow(Array(column))
+ (0 until 3).foreach { i =>
+ batchRow.rowId = i
+ val batchRowCopy = batchRow.copy()
+ if (i < 2) {
+ assert(!batchRow.isNullAt(0))
+ assert(!batchRowCopy.isNullAt(0))
+ val original = batchRow.getVariant(0)
+ val copied = batchRowCopy.get(0,
VariantType).asInstanceOf[VariantVal]
+ assert(java.util.Arrays.equals(original.getValue, copied.getValue))
+ assert(java.util.Arrays.equals(original.getMetadata,
copied.getMetadata))
+ } else {
+ assert(batchRow.isNullAt(0))
+ assert(batchRowCopy.isNullAt(0))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]