qlong commented on code in PR #16714: URL: https://github.com/apache/iceberg/pull/16714#discussion_r3405740313
########## spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/data/SparkVariantExtractionReaders.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.parquet.ParquetVariantExtractionReaders; +import org.apache.iceberg.parquet.ParquetVariantExtractionReaders.VariantExtractionField; +import org.apache.iceberg.parquet.ParquetVariantExtractionReaders.VariantExtractionRow; +import org.apache.iceberg.parquet.ParquetVariantReaders.DelegatingValueReader; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.VariantValue; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.types.DecimalType; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.types.TimestampNTZType; +import org.apache.spark.sql.types.TimestampType; +import org.apache.spark.unsafe.types.UTF8String; + +/** Parquet readers that materialize Spark variant extraction struct rows from shredded variants. */ +public class SparkVariantExtractionReaders { + private SparkVariantExtractionReaders() {} + + public static ParquetValueReader<InternalRow> buildStructReader( + MessageType fileSchema, + GroupType variantGroup, + List<String> variantColumnPath, + StructType extractionStruct) { + List<VariantExtractionField> fields = Lists.newArrayList(); + int numFields = 0; + for (StructField field : extractionStruct.fields()) { + numFields = Math.max(numFields, Integer.parseInt(field.name()) + 1); + } + + DataType[] targetTypes = new DataType[numFields]; + for (StructField field : extractionStruct.fields()) { + int ordinal = Integer.parseInt(field.name()); + targetTypes[ordinal] = field.dataType(); + fields.add( + new VariantExtractionField( + ordinal, + SparkVariantExtractionUtil.isPlaceholderExtraction(field), + SparkVariantExtractionUtil.parseObjectPath( + SparkVariantExtractionUtil.extractionPath(field)))); + } + + ParquetValueReader<VariantExtractionRow> parquetReader = + ParquetVariantExtractionReaders.buildRowReader( + fileSchema, variantGroup, variantColumnPath, fields); + + return new SparkVariantExtractionStructReader(parquetReader, targetTypes); + } + + /** Counts leaf Parquet columns referenced by a reader tree. */ + public static int leafColumnCount(ParquetValueReader<?> reader) { + return ParquetVariantExtractionReaders.leafColumnCount(reader); + } + + /** Visible for unit tests in {@code org.apache.iceberg.spark.data}. */ + static Object toSparkValueForTests(VariantValue value, DataType targetType) { + return toSparkValue(value, targetType); + } + + private static class SparkVariantExtractionStructReader + extends DelegatingValueReader<VariantExtractionRow, InternalRow> { + private final DataType[] targetTypes; + + private SparkVariantExtractionStructReader( + ParquetValueReader<VariantExtractionRow> reader, DataType[] targetTypes) { + super(reader); + this.targetTypes = targetTypes; + } + + @Override + public InternalRow read(InternalRow reuse) { + VariantExtractionRow row = readFromDelegate(null); + int numFields = row.numFields(); + GenericInternalRow result = + reuse instanceof GenericInternalRow + ? (GenericInternalRow) reuse + : new GenericInternalRow(numFields); + + if (row.metadata() == null) { + // SQL NULL variant: match Spark variant_get semantics by nulling every extraction slot, + // including full-variant placeholder slots. Distinct from a non-null variant where only + // missing or JSON-null paths produce per-field SQL NULLs. + for (int i = 0; i < numFields; i += 1) { + result.setNullAt(i); + } + return result; + } + + for (int i = 0; i < numFields; i += 1) { + if (row.placeholder(i)) { + result.setBoolean(i, true); + } else { + Object sparkValue = toSparkValue(row.value(i), targetTypes[i]); + if (sparkValue == null) { + result.setNullAt(i); + } else { + result.update(i, sparkValue); + } + } + } + + return result; + } + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private static Object toSparkValue(VariantValue value, DataType targetType) { Review Comment: Thanks for review. I assume you were referring VariantGet.cast in spark. toSparkValue in the connector is required according to DSV2 extraction pushdown contract. When Spark pushes extraction down, it delegates extraction and cast to the connector, the engine no longer calls VarianGett.cast one the values returned from connector. The bridge from iceberg's VariantValue to spark's Variant already exists, it is triggered when extaction pushdown was rejected and connector returns the whole variant. It is expensive for shredded typed value, as they are put back into VariantValue then immediately extraced by Spark again. The cast logic in Spark is more general than needed here, it handles cross-type coercisons that do not apply to typed_value, with the exception of type narrowing overflow. I added a fix for that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
