This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 4b884f2ea1cf [SPARK-55552][SQL] Add VariantType support to 
ColumnarBatchRow.copy() and MutableColumnarRow
4b884f2ea1cf is described below

commit 4b884f2ea1cfc05b21a4f7c94a5849f245b23c26
Author: tugce-applied <[email protected]>
AuthorDate: Tue Feb 17 22:14:48 2026 +0800

    [SPARK-55552][SQL] Add VariantType support to ColumnarBatchRow.copy() and 
MutableColumnarRow
    
    ### What changes were proposed in this pull request?
    
    `ColumnarBatchRow.copy()` and `MutableColumnarRow.copy()`/`get()` do not 
handle `VariantType`, causing a `RuntimeException: Not implemented. 
VariantType` when using `VariantType` columns with streaming custom data 
sources that rely on columnar batch row copying.
    
    PR #53137 (SPARK-54427) added `VariantType` support to `ColumnarRow` but 
missed `ColumnarBatchRow` and `MutableColumnarRow`. PR #54006 attempted this 
fix but was closed.
    
    This patch adds:
    - `PhysicalVariantType` branch in `ColumnarBatchRow.copy()`
    - `VariantType` branch in `MutableColumnarRow.copy()` and `get()`
    - Test in `ColumnarBatchSuite` validating `VariantVal` round-trip through 
`copy()`
    
    ### Why are the changes needed?
    
    Without this fix, any streaming pipeline that returns `VariantType` columns 
from a custom columnar data source throws a runtime exception when Spark 
attempts to copy the batch row.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. This is a bug fix for an existing feature.
    
    ### How was this patch tested?
    
    Added a new test `[SPARK-55552] Variant` in `ColumnarBatchSuite` that 
creates a `VariantType` column vector, populates it with `VariantVal` data 
(including a null), wraps it in a `ColumnarBatchRow`, calls `copy()`, and 
verifies the values round-trip correctly.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes. GitHub Copilot was used to assist in drafting portions of this 
contribution.
    
    This contribution is my original work and I license it under the Apache 2.0 
license.
    
    Closes #54337 from tugceozberk/fix-columnar-batch-row-variant-copy.
    
    Authored-by: tugce-applied <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit c4188b0e43182e4585ee09cbf3cd00d633ec72e7)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/vectorized/ColumnarBatchRow.java     |  2 ++
 .../execution/vectorized/MutableColumnarRow.java   |  4 +++
 .../execution/vectorized/ColumnarBatchSuite.scala  | 35 ++++++++++++++++++++++
 3 files changed, 41 insertions(+)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
index 4be45dc5d399..3d1e780f6e05 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarBatchRow.java
@@ -86,6 +86,8 @@ public final class ColumnarBatchRow extends InternalRow {
           row.update(i, getArray(i).copy());
         } else if (pdt instanceof PhysicalMapType) {
           row.update(i, getMap(i).copy());
+        } else if (pdt instanceof PhysicalVariantType) {
+          row.update(i, getVariant(i));
         } else {
           throw new RuntimeException("Not implemented. " + dt);
         }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
index 49c27f977562..a46b5143eef6 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/MutableColumnarRow.java
@@ -96,6 +96,8 @@ public final class MutableColumnarRow extends InternalRow {
           row.update(i, getArray(i).copy());
         } else if (dt instanceof MapType) {
           row.update(i, getMap(i).copy());
+        } else if (dt instanceof VariantType) {
+          row.update(i, getVariant(i));
         } else {
           throw new RuntimeException("Not implemented. " + dt);
         }
@@ -217,6 +219,8 @@ public final class MutableColumnarRow extends InternalRow {
       return getStruct(ordinal, structType.fields().length);
     } else if (dataType instanceof MapType) {
       return getMap(ordinal);
+    } else if (dataType instanceof VariantType) {
+      return getVariant(ordinal);
     } else {
       throw new SparkUnsupportedOperationException(
         "_LEGACY_ERROR_TEMP_3192", Map.of("dt", dataType.toString()));
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 97ad2c1f5bf9..6d90bb985e26 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -2025,4 +2025,39 @@ class ColumnarBatchSuite extends SparkFunSuite {
         }
     }
   }
+
+  testVector("[SPARK-55552] Variant", 3, VariantType) {
+    column =>
+      val valueChild = column.getChild(0)
+      val metadataChild = column.getChild(1)
+
+      column.putNotNull(0)
+      valueChild.appendByteArray(Array[Byte](1, 2, 3), 0, 3)
+      metadataChild.appendByteArray(Array[Byte](10, 11), 0, 2)
+
+      column.putNotNull(1)
+      valueChild.appendByteArray(Array[Byte](4, 5), 0, 2)
+      metadataChild.appendByteArray(Array[Byte](12, 13, 14), 0, 3)
+
+      column.putNull(2)
+      valueChild.appendNull()
+      metadataChild.appendNull()
+
+      val batchRow = new ColumnarBatchRow(Array(column))
+      (0 until 3).foreach { i =>
+        batchRow.rowId = i
+        val batchRowCopy = batchRow.copy()
+        if (i < 2) {
+          assert(!batchRow.isNullAt(0))
+          assert(!batchRowCopy.isNullAt(0))
+          val original = batchRow.getVariant(0)
+          val copied = batchRowCopy.get(0, 
VariantType).asInstanceOf[VariantVal]
+          assert(java.util.Arrays.equals(original.getValue, copied.getValue))
+          assert(java.util.Arrays.equals(original.getMetadata, 
copied.getMetadata))
+        } else {
+          assert(batchRow.isNullAt(0))
+          assert(batchRowCopy.isNullAt(0))
+        }
+      }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to