This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e28be434c support StructInternalRow.getVariant (#14379)
7e28be434c is described below

commit 7e28be434c64afb9675c743335734bb8217c1df5
Author: Huaxin Gao <[email protected]>
AuthorDate: Sat Nov 8 20:41:33 2025 -0800

    support StructInternalRow.getVariant (#14379)
    
    * support StructInternalRow.getVariant
    
    * add a test for GetVariantPassesThroughVariantVal
    
    * address comments
    
    * remove path for VariantVal
---
 .../iceberg/spark/source/StructInternalRow.java    |  40 ++++-
 .../spark/source/TestStructInternalRowVariant.java | 182 +++++++++++++++++++++
 2 files changed, 218 insertions(+), 4 deletions(-)

diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
index 27fb014d8c..2d3c917e58 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.source;
 
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDate;
@@ -36,6 +37,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.variants.Variant;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
 import org.apache.spark.sql.catalyst.util.ArrayData;
@@ -58,6 +60,7 @@ import org.apache.spark.sql.types.ShortType;
 import org.apache.spark.sql.types.StringType;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.VariantType;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.apache.spark.unsafe.types.VariantVal;
@@ -229,14 +232,19 @@ class StructInternalRow extends InternalRow {
     return isNullAt(ordinal) ? null : getMapInternal(ordinal);
   }
 
+  private MapData getMapInternal(int ordinal) {
+    return mapToMapData(
+        type.fields().get(ordinal).type().asMapType(), struct.get(ordinal, 
Map.class));
+  }
+
   @Override
   public VariantVal getVariant(int ordinal) {
-    throw new UnsupportedOperationException("Unsupported method: getVariant");
+    return isNullAt(ordinal) ? null : getVariantInternal(ordinal);
   }
 
-  private MapData getMapInternal(int ordinal) {
-    return mapToMapData(
-        type.fields().get(ordinal).type().asMapType(), struct.get(ordinal, 
Map.class));
+  private VariantVal getVariantInternal(int ordinal) {
+    Object value = struct.get(ordinal, Object.class);
+    return toVariantVal(value);
   }
 
   @Override
@@ -276,6 +284,8 @@ class StructInternalRow extends InternalRow {
       return getInt(ordinal);
     } else if (dataType instanceof TimestampType) {
       return getLong(ordinal);
+    } else if (dataType instanceof VariantType) {
+      return getVariantInternal(ordinal);
     }
     return null;
   }
@@ -338,11 +348,33 @@ class StructInternalRow extends InternalRow {
             array ->
                 (BiConsumer<Integer, Map<?, ?>>)
                     (pos, map) -> array[pos] = 
mapToMapData(elementType.asMapType(), map));
+      case VARIANT:
+        return fillArray(
+            values,
+            array -> (BiConsumer<Integer, Object>) (pos, v) -> array[pos] = 
toVariantVal(v));
       default:
         throw new UnsupportedOperationException("Unsupported array element 
type: " + elementType);
     }
   }
 
+  private static VariantVal toVariantVal(Object value) {
+    if (value instanceof Variant) {
+      Variant variant = (Variant) value;
+      byte[] metadataBytes = new byte[variant.metadata().sizeInBytes()];
+      ByteBuffer metadataBuffer = 
ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN);
+      variant.metadata().writeTo(metadataBuffer, 0);
+
+      byte[] valueBytes = new byte[variant.value().sizeInBytes()];
+      ByteBuffer valueBuffer = 
ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN);
+      variant.value().writeTo(valueBuffer, 0);
+
+      return new VariantVal(valueBytes, metadataBytes);
+    }
+
+    throw new UnsupportedOperationException(
+        "Unsupported value for VARIANT in StructInternalRow: " + 
value.getClass());
+  }
+
   @SuppressWarnings("unchecked")
   private <T> GenericArrayData fillArray(
       Collection<?> values, Function<Object[], BiConsumer<Integer, T>> 
makeSetter) {
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructInternalRowVariant.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructInternalRowVariant.java
new file mode 100644
index 0000000000..78e7e23b86
--- /dev/null
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructInternalRowVariant.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.Variant;
+import org.apache.iceberg.variants.VariantMetadata;
+import org.apache.iceberg.variants.VariantValue;
+import org.apache.iceberg.variants.Variants;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.VariantType$;
+import org.apache.spark.unsafe.types.VariantVal;
+import org.junit.jupiter.api.Test;
+
+public class TestStructInternalRowVariant {
+
+  @Test
+  public void testGetVariantReturnsVariantVal() {
+    Types.StructType structType = variantStructType();
+    GenericRecord rec = newRecord(structType);
+    Variant variant = sampleVariant();
+    rec.set(0, variant);
+
+    InternalRow row = new StructInternalRow(structType).setStruct(rec);
+
+    VariantVal actual = row.getVariant(0);
+    assertThat(actual).isNotNull();
+
+    VariantMetadata metadata =
+        
VariantMetadata.from(ByteBuffer.wrap(actual.getMetadata()).order(ByteOrder.LITTLE_ENDIAN));
+    assertThat(metadata.dictionarySize()).isEqualTo(1);
+    assertThat(metadata.get(0)).isEqualTo("k");
+
+    VariantValue actualValue =
+        VariantValue.from(
+            metadata, 
ByteBuffer.wrap(actual.getValue()).order(ByteOrder.LITTLE_ENDIAN));
+
+    
assertThat(actualValue.asObject().get("k").asPrimitive().get()).isEqualTo("v1");
+  }
+
+  @Test
+  public void testGetVariantNull() {
+    Types.StructType structType = variantStructType();
+    GenericRecord rec = newRecord(structType);
+    rec.set(0, null);
+
+    InternalRow row = new StructInternalRow(structType).setStruct(rec);
+    assertThat(row.getVariant(0)).isNull();
+  }
+
+  @Test
+  public void testArrayOfVariant() {
+    Types.ListType listType = Types.ListType.ofOptional(2, 
Types.VariantType.get());
+    Types.StructType structType =
+        Types.StructType.of(Types.NestedField.optional(1, "arr", listType));
+
+    GenericRecord rec = GenericRecord.create(structType);
+
+    Variant v1 = sampleVariant();
+    Variant v2 = sampleVariant();
+
+    List<Object> elements = Arrays.asList(v1, v2, null);
+    rec.set(0, elements);
+
+    InternalRow row = new StructInternalRow(structType).setStruct(rec);
+    ArrayData arr = row.getArray(0);
+
+    Object firstVar = arr.get(0, VariantType$.MODULE$);
+    Object secondVar = arr.get(1, VariantType$.MODULE$);
+
+    assertThat(firstVar).isInstanceOf(VariantVal.class);
+    assertThat(secondVar).isInstanceOf(VariantVal.class);
+    assertThat(arr.isNullAt(2)).isTrue();
+
+    assertVariantValEqualsKV((VariantVal) firstVar, "k", "v1");
+    assertVariantValEqualsKV((VariantVal) secondVar, "k", "v1");
+  }
+
+  @Test
+  public void testMapWithVariant() {
+    Types.MapType mapType =
+        Types.MapType.ofOptional(2, 3, Types.StringType.get(), 
Types.VariantType.get());
+    Types.StructType structType = 
Types.StructType.of(Types.NestedField.optional(1, "m", mapType));
+
+    GenericRecord rec = GenericRecord.create(structType);
+    Map<String, Object> map = Maps.newHashMap();
+    map.put("a", sampleVariant());
+    map.put("b", sampleVariant());
+    rec.set(0, map);
+
+    InternalRow row = new StructInternalRow(structType).setStruct(rec);
+    MapData mapData = row.getMap(0);
+
+    ArrayData values = mapData.valueArray();
+    for (int i = 0; i < values.numElements(); i++) {
+      Object variant = values.get(i, VariantType$.MODULE$);
+      assertThat(variant).isInstanceOf(VariantVal.class);
+      assertVariantValEqualsKV((VariantVal) variant, "k", "v1");
+    }
+  }
+
+  @Test
+  public void testNestedStructVariant() {
+    Types.StructType variant =
+        Types.StructType.of(Types.NestedField.optional(2, "v", 
Types.VariantType.get()));
+    Types.StructType structVariant =
+        Types.StructType.of(Types.NestedField.optional(1, "n", variant));
+
+    // Case 1: nested struct holds Iceberg Variant
+    GenericRecord variantStructRec = GenericRecord.create(variant);
+    variantStructRec.set(0, sampleVariant());
+    GenericRecord structRec = GenericRecord.create(structVariant);
+    structRec.set(0, variantStructRec);
+
+    InternalRow structRow = new 
StructInternalRow(structVariant).setStruct(structRec);
+    InternalRow nested = structRow.getStruct(0, 1);
+    VariantVal variantVal1 = nested.getVariant(0);
+    assertVariantValEqualsKV(variantVal1, "k", "v1");
+  }
+
+  @Test
+  public void testGetWithVariantType() {
+    Types.StructType structType = variantStructType();
+    GenericRecord rec = newRecord(structType);
+    rec.set(0, sampleVariant());
+
+    InternalRow row = new StructInternalRow(structType).setStruct(rec);
+    Object obj = row.get(0, VariantType$.MODULE$);
+    assertThat(obj).isInstanceOf(VariantVal.class);
+    assertVariantValEqualsKV((VariantVal) obj, "k", "v1");
+  }
+
+  private static Types.StructType variantStructType() {
+    return Types.StructType.of(Types.NestedField.optional(1, "a", 
Types.VariantType.get()));
+  }
+
+  private static GenericRecord newRecord(Types.StructType structType) {
+    return GenericRecord.create(structType);
+  }
+
+  private static Variant sampleVariant() {
+    VariantMetadata md = Variants.metadata("k");
+    org.apache.iceberg.variants.ShreddedObject obj = Variants.object(md);
+    obj.put("k", Variants.of("v1"));
+    return Variant.of(md, obj);
+  }
+
+  private static void assertVariantValEqualsKV(VariantVal vv, String key, 
String expected) {
+    VariantMetadata metadata =
+        
VariantMetadata.from(ByteBuffer.wrap(vv.getMetadata()).order(ByteOrder.LITTLE_ENDIAN));
+    VariantValue value =
+        VariantValue.from(metadata, 
ByteBuffer.wrap(vv.getValue()).order(ByteOrder.LITTLE_ENDIAN));
+    
assertThat(value.asObject().get(key).asPrimitive().get()).isEqualTo(expected);
+  }
+}

Reply via email to