tmater commented on code in PR #16714:
URL: https://github.com/apache/iceberg/pull/16714#discussion_r3386544368


##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVariantExtractionReaders.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.data;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.iceberg.parquet.ParquetValueReader;
+import org.apache.iceberg.parquet.ParquetVariantExtractionReaders;
+import 
org.apache.iceberg.parquet.ParquetVariantExtractionReaders.VariantExtractionField;
+import 
org.apache.iceberg.parquet.ParquetVariantExtractionReaders.VariantExtractionRow;
+import org.apache.iceberg.parquet.ParquetVariantReaders.DelegatingValueReader;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.variants.PhysicalType;
+import org.apache.iceberg.variants.VariantValue;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.TimestampNTZType;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/** Parquet readers that materialize Spark variant extraction struct rows from 
shredded variants. */
+public class SparkVariantExtractionReaders {
+  private SparkVariantExtractionReaders() {}
+
+  public static ParquetValueReader<InternalRow> buildStructReader(
+      MessageType fileSchema,
+      GroupType variantGroup,
+      List<String> variantColumnPath,
+      StructType extractionStruct) {
+    List<VariantExtractionField> fields = Lists.newArrayList();
+    int numFields = 0;
+    for (StructField field : extractionStruct.fields()) {
+      numFields = Math.max(numFields, Integer.parseInt(field.name()) + 1);
+    }
+
+    DataType[] targetTypes = new DataType[numFields];
+    for (StructField field : extractionStruct.fields()) {
+      int ordinal = Integer.parseInt(field.name());
+      targetTypes[ordinal] = field.dataType();
+      fields.add(
+          new VariantExtractionField(
+              ordinal,
+              SparkVariantExtractionUtil.isPlaceholderExtraction(field),
+              SparkVariantExtractionUtil.parseObjectPath(
+                  SparkVariantExtractionUtil.extractionPath(field))));
+    }
+
+    ParquetValueReader<VariantExtractionRow> parquetReader =
+        ParquetVariantExtractionReaders.buildRowReader(
+            fileSchema, variantGroup, variantColumnPath, fields);
+
+    return new SparkVariantExtractionStructReader(parquetReader, targetTypes);
+  }
+
+  /** Counts leaf Parquet columns referenced by a reader tree. */
+  public static int leafColumnCount(ParquetValueReader<?> reader) {
+    return ParquetVariantExtractionReaders.leafColumnCount(reader);
+  }
+
+  /** Visible for unit tests in {@code org.apache.iceberg.spark.data}. */
+  static Object toSparkValueForTests(VariantValue value, DataType targetType) {
+    return toSparkValue(value, targetType);
+  }
+
+  private static class SparkVariantExtractionStructReader
+      extends DelegatingValueReader<VariantExtractionRow, InternalRow> {
+    private final DataType[] targetTypes;
+
+    private SparkVariantExtractionStructReader(
+        ParquetValueReader<VariantExtractionRow> reader, DataType[] 
targetTypes) {
+      super(reader);
+      this.targetTypes = targetTypes;
+    }
+
+    @Override
+    public InternalRow read(InternalRow reuse) {
+      VariantExtractionRow row = readFromDelegate(null);
+      int numFields = row.numFields();
+      GenericInternalRow result =
+          reuse instanceof GenericInternalRow
+              ? (GenericInternalRow) reuse
+              : new GenericInternalRow(numFields);
+
+      if (row.metadata() == null) {
+        // SQL NULL variant: match Spark variant_get semantics by nulling 
every extraction slot,
+        // including full-variant placeholder slots. Distinct from a non-null 
variant where only
+        // missing or JSON-null paths produce per-field SQL NULLs.
+        for (int i = 0; i < numFields; i += 1) {
+          result.setNullAt(i);
+        }
+        return result;
+      }
+
+      for (int i = 0; i < numFields; i += 1) {
+        if (row.placeholder(i)) {
+          result.setBoolean(i, true);
+        } else {
+          Object sparkValue = toSparkValue(row.value(i), targetTypes[i]);
+          if (sparkValue == null) {
+            result.setNullAt(i);
+          } else {
+            result.update(i, sparkValue);
+          }
+        }
+      }
+
+      return result;
+    }
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private static Object toSparkValue(VariantValue value, DataType targetType) {

Review Comment:
   I may be missing something, but this looks like it overlaps with Spark's 
`variant_get` casting logic. `toSparkValue` handles the common cases, but it 
seems separate from Spark's existing behavior around `failOnError`, 
`timeZoneId`, and some cast edge cases.
   
   Would it make sense to reuse Spark's cast path here if we can bridge from 
Iceberg's `VariantValue` to Spark's `Variant` / `VariantVal`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to