This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ee7138bf7a1 HIVE-29287: Addendum: Iceberg: [V3] Variant Shredding 
support (#6234)
ee7138bf7a1 is described below

commit ee7138bf7a1a1ee1de07fa2243fe947a627268cb
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed Dec 17 19:37:11 2025 +0200

    HIVE-29287: Addendum: Iceberg: [V3] Variant Shredding support (#6234)
---
 .../org/apache/iceberg/hive/HiveSchemaUtil.java    |   38 +
 .../mr/hive/writer/HiveFileWriterFactory.java      |   37 +-
 .../writer/HiveIcebergCopyOnWriteRecordWriter.java |   24 +-
 .../mr/hive/writer/HiveIcebergRecordWriter.java    |   24 +-
 .../mr/hive/writer/HiveIcebergWriterBase.java      |    2 +-
 .../hive/writer/SchemaInferringDefaultsWriter.java |  161 +++
 .../java/org/apache/iceberg/parquet/Parquet.java   | 1524 --------------------
 .../org/apache/iceberg/parquet/VariantUtil.java    |  164 ++-
 .../iceberg/mr/hive/TestHiveIcebergSelects.java    |   70 -
 .../iceberg/mr/hive/TestHiveIcebergVariant.java    |  193 +++
 iceberg/iceberg-shading/pom.xml                    |    8 -
 11 files changed, 556 insertions(+), 1689 deletions(-)

diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index f651406df74..8277546b525 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -431,6 +431,44 @@ public static void setDefaultValues(Record record, 
List<Types.NestedField> field
     }
   }
 
+  /**
+   * Sets a value into a {@link Record} using a struct-only field path 
(top-level column or nested
+   * through structs). Intermediate struct records are created as needed.
+   *
+   * <p>If the path traverses a non-struct type (e.g. list/map), the operation 
is ignored.
+   */
+  public static void setStructField(Record root, String[] path, Object value) {
+    if (root == null || path == null || path.length == 0) {
+      return;
+    }
+    Record current = root;
+    Types.StructType currentStruct = root.struct();
+
+    for (int i = 0; i < path.length - 1; i++) {
+      String fieldName = path[i];
+      Types.NestedField field = currentStruct.field(fieldName);
+      if (field == null || !field.type().isStructType()) {
+        return;
+      }
+      Types.StructType nestedStruct = field.type().asStructType();
+      current = getOrCreateStructRecord(current, fieldName, nestedStruct);
+      currentStruct = nestedStruct;
+    }
+
+    current.setField(path[path.length - 1], value);
+  }
+
+  private static Record getOrCreateStructRecord(
+      Record parent, String fieldName, Types.StructType structType) {
+    Object value = parent.getField(fieldName);
+    if (value instanceof Record) {
+      return (Record) value;
+    }
+    Record record = GenericRecord.create(structType);
+    parent.setField(fieldName, record);
+    return record;
+  }
+
   // Special method for nested structs that always applies defaults to null 
fields
   private static void setDefaultValuesForNestedStruct(Record record, 
List<Types.NestedField> fields) {
     for (Types.NestedField field : fields) {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
index 6489c78f9ed..234cf928432 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
@@ -20,7 +20,6 @@
 package org.apache.iceberg.mr.hive.writer;
 
 import java.util.Map;
-import java.util.function.Supplier;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
@@ -33,12 +32,13 @@
 import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.VariantShreddingFunction;
 import org.apache.iceberg.parquet.VariantUtil;
 
 class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
 
   private final Map<String, String> properties;
-  private Supplier<Record> sampleRecord = null;
+  private Record sampleRecord = null;
 
   HiveFileWriterFactory(
       Table table,
@@ -85,9 +85,34 @@ protected void 
configurePositionDelete(Avro.DeleteWriteBuilder builder) {
   @Override
   protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
     builder.createWriterFunc(GenericParquetWriter::create);
-    // Configure variant shredding function if conditions are met:
-    VariantUtil.variantShreddingFunc(dataSchema(), sampleRecord, properties)
-        .ifPresent(builder::variantShreddingFunc);
+    // Configure variant shredding if enabled and a sample record is available
+    if (VariantUtil.shouldUseVariantShredding(properties, dataSchema())) {
+      setVariantShreddingFunc(builder, 
VariantUtil.variantShreddingFunc(sampleRecord, dataSchema()));
+    }
+  }
+
+  /**
+   * Sets a {@link VariantShreddingFunction} on the underlying Parquet write 
builder.
+   *
+   * <p>{@link Parquet.DataWriteBuilder} does not expose {@code 
variantShreddingFunc} directly; it is set on an
+   * internal write builder held in the private {@code appenderBuilder} field. 
This method uses reflection to
+   * access that internal builder and invoke {@code 
variantShreddingFunc(VariantShreddingFunction)}.
+   *
+   * TODO: Replace with {@code 
DataWriteBuilder.variantShreddingFunc(VariantShreddingFunction)}
+   * once it becomes publicly available.
+   */
+  private static void setVariantShreddingFunc(Parquet.DataWriteBuilder 
dataWriteBuilder,
+      VariantShreddingFunction fn) {
+    try {
+      java.lang.reflect.Field field = 
dataWriteBuilder.getClass().getDeclaredField("appenderBuilder");
+      field.setAccessible(true);
+      Object writeBuilder = field.get(dataWriteBuilder);
+      writeBuilder.getClass()
+          .getMethod("variantShreddingFunc", VariantShreddingFunction.class)
+          .invoke(writeBuilder, fn);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
@@ -164,7 +189,7 @@ HiveFileWriterFactory build() {
    * Set a sample record to use for data-driven variant shredding schema 
generation.
    * Should be called before the Parquet writer is created.
    */
-  public void initialize(Supplier<Record> record) {
+  public void initialize(Record record) {
     if (sampleRecord == null) {
       sampleRecord = record;
     }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
index e8d5ed2a8e4..f48bf21345e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
@@ -21,7 +21,6 @@
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 import org.apache.hadoop.io.Writable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
@@ -29,7 +28,6 @@
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.io.DataWriteResult;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.hive.FilesForCommit;
@@ -37,32 +35,18 @@
 import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.Types;
 
-class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {
-
-  private final int currentSpecId;
-  private final Set<String> missingColumns;
-  private final List<Types.NestedField> missingOrStructFields;
+class HiveIcebergCopyOnWriteRecordWriter extends SchemaInferringDefaultsWriter 
{
 
   private final GenericRecord rowDataTemplate;
   private final List<DataFile> replacedDataFiles;
 
-  private final HiveFileWriterFactory fileWriterFactory;
-
   HiveIcebergCopyOnWriteRecordWriter(Table table, HiveFileWriterFactory 
writerFactory,
       OutputFileFactory deleteFileFactory, Context context) {
-    super(table, newDataWriter(table, writerFactory, deleteFileFactory, 
context));
+    super(table, writerFactory, deleteFileFactory, context);
 
-    this.currentSpecId = table.spec().specId();
     this.rowDataTemplate = GenericRecord.create(table.schema());
     this.replacedDataFiles = Lists.newArrayList();
-
-    this.missingColumns = context.missingColumns();
-    this.missingOrStructFields = 
specs.get(currentSpecId).schema().asStruct().fields().stream()
-        .filter(field -> missingColumns.contains(field.name()) || 
field.type().isStructType())
-        .toList();
-    this.fileWriterFactory = writerFactory;
   }
 
   @Override
@@ -82,9 +66,7 @@ public void write(Writable row) throws IOException {
             .build();
       replacedDataFiles.add(dataFile);
     } else {
-      HiveSchemaUtil.setDefaultValues(rowData, missingOrStructFields, 
missingColumns);
-      fileWriterFactory.initialize(() -> rowData);
-      writer.write(rowData, specs.get(currentSpecId), partition(rowData, 
currentSpecId));
+      writeOrBuffer(rowData);
     }
   }
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
index ca9d232e3d5..cb9a21e9780 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
@@ -21,45 +21,27 @@
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 import org.apache.hadoop.io.Writable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.io.DataWriteResult;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.hive.FilesForCommit;
 import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
 import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.types.Types;
 
-class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
-
-  private final int currentSpecId;
-  private final Set<String> missingColumns;
-  private final List<Types.NestedField> missingOrStructFields;
-
-  private final HiveFileWriterFactory fileWriterFactory;
+class HiveIcebergRecordWriter extends SchemaInferringDefaultsWriter {
 
   HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
       OutputFileFactory dataFileFactory, Context context) {
-    super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, 
context));
-
-    this.currentSpecId = table.spec().specId();
-    this.missingColumns = context.missingColumns();
-    this.missingOrStructFields = 
specs.get(currentSpecId).schema().asStruct().fields().stream()
-        .filter(field -> missingColumns.contains(field.name()) || 
field.type().isStructType())
-        .toList();
-    this.fileWriterFactory = fileWriterFactory;
+    super(table, fileWriterFactory, dataFileFactory, context);
   }
 
   @Override
   public void write(Writable row) throws IOException {
     Record record = ((Container<Record>) row).get();
-    HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, 
missingColumns);
-    fileWriterFactory.initialize(() -> record);
-    writer.write(record, specs.get(currentSpecId), partition(record, 
currentSpecId));
+    writeOrBuffer(record);
   }
 
   @Override
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
index f797c37c130..ad026a83040 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
@@ -80,7 +80,7 @@ public void close(boolean abort) throws IOException {
           .retry(3)
           .suppressFailureWhenFinished()
           .onFailure((file, exception) -> LOG.debug("Failed on to remove file 
{} on abort", file, exception))
-          .run(file -> io.deleteFile(file.path().toString()));
+          .run(file -> io.deleteFile(file.location()));
       LOG.warn("HiveIcebergWriter is closed with abort");
     }
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/SchemaInferringDefaultsWriter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/SchemaInferringDefaultsWriter.java
new file mode 100644
index 00000000000..487085b6631
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/SchemaInferringDefaultsWriter.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.writer;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
+import org.apache.iceberg.parquet.VariantUtil;
+import org.apache.iceberg.parquet.VariantUtil.VariantField;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types.NestedField;
+
+abstract class SchemaInferringDefaultsWriter extends HiveIcebergWriterBase {
+
+  private static final int VARIANT_SAMPLE_BUFFER_SIZE = 100;
+
+  private final HiveFileWriterFactory fileWriterFactory;
+
+  private final int currentSpecId;
+  private final Set<String> missingColumns;
+  private final List<NestedField> missingOrStructFields;
+
+  private final List<VariantField> variantFields;
+  private final BitSet sampledVariantFields;
+
+  private final List<Record> buffer;
+  private final Record accumulatedSample;
+  private boolean sampleInitialized = false;
+
+  SchemaInferringDefaultsWriter(
+      Table table,
+      HiveFileWriterFactory fileWriterFactory,
+      OutputFileFactory dataFileFactory,
+      Context context) {
+
+    super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, 
context));
+    Schema schema = table.schema();
+    this.fileWriterFactory = fileWriterFactory;
+
+    this.currentSpecId = table.spec().specId();
+    this.missingColumns = context.missingColumns();
+    this.missingOrStructFields = schema.columns().stream()
+        .filter(field -> missingColumns.contains(field.name()) || 
field.type().isStructType())
+        .toList();
+
+    this.variantFields = 
VariantUtil.variantFieldsForShredding(table.properties(), schema);
+    this.sampledVariantFields = new BitSet(variantFields.size());
+
+    boolean shouldBuffer = !variantFields.isEmpty();
+    this.buffer = shouldBuffer ? 
Lists.newArrayListWithCapacity(VARIANT_SAMPLE_BUFFER_SIZE) : null;
+    this.accumulatedSample = shouldBuffer ? GenericRecord.create(schema) : 
null;
+  }
+
+  protected void writeOrBuffer(Record record) {
+    HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, 
missingColumns);
+
+    if (buffer != null && !sampleInitialized) {
+      accumulateSample(record);
+
+      if (allVariantFieldsSampled() || buffer.size() >= 
VARIANT_SAMPLE_BUFFER_SIZE) {
+        // Use accumulated sample for schema inference
+        fileWriterFactory.initialize(accumulatedSample);
+        sampleInitialized = true;
+
+        flushBufferedRecords();
+      } else {
+        buffer.add(record.copy());
+        return;
+      }
+    }
+    writeRecord(record);
+  }
+
+  private void writeRecord(Record record) {
+    writer.write(record, specs.get(currentSpecId), partition(record, 
currentSpecId));
+  }
+
+  private void flushBufferedRecords() {
+    for (Record bufferedRecord : buffer) {
+      writeRecord(bufferedRecord);
+    }
+    buffer.clear();
+  }
+
+  private boolean allVariantFieldsSampled() {
+    return sampledVariantFields.nextClearBit(0) >= variantFields.size();
+  }
+
+  private void accumulateSample(Record record) {
+    if (accumulatedSample == null || allVariantFieldsSampled()) {
+      return;
+    }
+    for (int fieldIndex = sampledVariantFields.nextClearBit(0);
+         fieldIndex < variantFields.size();
+         fieldIndex = sampledVariantFields.nextClearBit(fieldIndex + 1)) {
+      trySampleVariantField(fieldIndex, record);
+    }
+  }
+
+  private void trySampleVariantField(int fieldIndex, Record record) {
+    VariantField variantField = variantFields.get(fieldIndex);
+    Object val = safeGet(variantField, record);
+    if (!VariantUtil.isShreddable(val)) {
+      return;
+    }
+    HiveSchemaUtil.setStructField(accumulatedSample, variantField.path(), val);
+    sampledVariantFields.set(fieldIndex);
+  }
+
+  private static Object safeGet(VariantField variantField, Record record) {
+    try {
+      return variantField.accessor().get(record);
+    } catch (RuntimeException e) {
+      // Treat unexpected access failures as "no sample" and keep scanning.
+      return null;
+    }
+  }
+
+  @Override
+  public void close(boolean abort) throws IOException {
+    if (buffer != null) {
+      if (abort) {
+        // Don't write anything on abort. Just drop any buffered records.
+        buffer.clear();
+      } else if (!buffer.isEmpty()) {
+        if (!sampleInitialized) {
+          // Use whatever we have accumulated so far
+          fileWriterFactory.initialize(accumulatedSample);
+        }
+        flushBufferedRecords();
+      }
+    }
+    super.close(abort);
+  }
+
+}
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
deleted file mode 100644
index 56e3f3480c7..00000000000
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ /dev/null
@@ -1,1524 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.parquet;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.InternalData;
-import org.apache.iceberg.MetricsConfig;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.SchemaParser;
-import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.SystemConfigs;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.deletes.EqualityDeleteWriter;
-import org.apache.iceberg.deletes.PositionDeleteWriter;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
-import org.apache.iceberg.encryption.EncryptionKeyMetadata;
-import org.apache.iceberg.encryption.NativeEncryptionInputFile;
-import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.hadoop.HadoopInputFile;
-import org.apache.iceberg.hadoop.HadoopOutputFile;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.DeleteSchemaUtil;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.mapping.NameMapping;
-import 
org.apache.iceberg.parquet.ParquetValueWriters.PositionDeleteStructWriter;
-import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
-import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.util.ArrayUtil;
-import org.apache.iceberg.util.ByteBuffers;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.parquet.HadoopReadOptions;
-import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.avro.AvroReadSupport;
-import org.apache.parquet.avro.AvroWriteSupport;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
-import org.apache.parquet.conf.PlainParquetConfiguration;
-import org.apache.parquet.crypto.FileDecryptionProperties;
-import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetOutputFormat;
-import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.hadoop.api.WriteSupport;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type.ID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
-import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
-import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT;
-import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES;
-import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
-import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
-import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
-import static 
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
-import static 
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX;
-import static 
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
-import static 
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
-import static 
org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
-import static 
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
-import static 
org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT;
-import static 
org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES;
-import static 
org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT;
-import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
-import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT;
-import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
-import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
-import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
-
-// TODO: remove class once upgraded to Iceberg v1.11.0 
(https://github.com/apache/iceberg/pull/14153)
-
-public class Parquet {
-  private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);
-
-  private Parquet() {
-  }
-
-  private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
-      Sets.newHashSet(
-          "parquet.read.filter",
-          "parquet.private.read.filter.predicate",
-          "parquet.read.support.class",
-          "parquet.crypto.factory.class");
-
-  public static WriteBuilder write(OutputFile file) {
-    if (file instanceof EncryptedOutputFile) {
-      return write((EncryptedOutputFile) file);
-    }
-
-    return new WriteBuilder(file);
-  }
-
-  public static WriteBuilder write(EncryptedOutputFile file) {
-    if (file instanceof NativeEncryptionOutputFile) {
-      NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) 
file;
-      return write(nativeFile.plainOutputFile())
-          .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
-          .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
-    } else {
-      return write(file.encryptingOutputFile());
-    }
-  }
-
-  public static class WriteBuilder implements InternalData.WriteBuilder {
-    private final OutputFile file;
-    private final Configuration conf;
-    private final Map<String, String> metadata = Maps.newLinkedHashMap();
-    private final Map<String, String> config = Maps.newLinkedHashMap();
-    private Schema schema = null;
-    private VariantShreddingFunction variantShreddingFunc = null;
-    private String name = "table";
-    private WriteSupport<?> writeSupport = null;
-    private BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
createWriterFunc = null;
-    private MetricsConfig metricsConfig = MetricsConfig.getDefault();
-    private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
-    private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
-    private Function<Map<String, String>, Context> createContextFunc = 
Context::dataContext;
-    private ByteBuffer fileEncryptionKey = null;
-    private ByteBuffer fileAADPrefix = null;
-
-    private WriteBuilder(OutputFile file) {
-      this.file = file;
-      if (file instanceof HadoopOutputFile) {
-        this.conf = new Configuration(((HadoopOutputFile) file).getConf());
-      } else {
-        this.conf = new Configuration();
-      }
-    }
-
-    public WriteBuilder forTable(Table table) {
-      schema(table.schema());
-      setAll(table.properties());
-      metricsConfig(MetricsConfig.forTable(table));
-      return this;
-    }
-
-    @Override
-    public WriteBuilder schema(Schema newSchema) {
-      this.schema = newSchema;
-      return this;
-    }
-
-    /**
-     * Set a {@link VariantShreddingFunction} that is called with each variant 
field's name and
-     * field ID to produce the shredding type as a {@code typed_value} field. 
This field is added to
-     * the result variant struct alongside the {@code metadata} and {@code 
value} fields.
-     *
-     * @param func {@link VariantShreddingFunction} that produces a shredded 
{@code typed_value}
-     * @return this for method chaining
-     */
-    public WriteBuilder variantShreddingFunc(VariantShreddingFunction func) {
-      this.variantShreddingFunc = func;
-      return this;
-    }
-
-    @Override
-    public WriteBuilder named(String newName) {
-      this.name = newName;
-      return this;
-    }
-
-    public WriteBuilder writeSupport(WriteSupport<?> newWriteSupport) {
-      this.writeSupport = newWriteSupport;
-      return this;
-    }
-
-    @Override
-    public WriteBuilder set(String property, String value) {
-      config.put(property, value);
-      return this;
-    }
-
-    public WriteBuilder setAll(Map<String, String> properties) {
-      config.putAll(properties);
-      return this;
-    }
-
-    @Override
-    public WriteBuilder meta(String property, String value) {
-      metadata.put(property, value);
-      return this;
-    }
-
-    public WriteBuilder createWriterFunc(
-        Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
-      if (newCreateWriterFunc != null) {
-        this.createWriterFunc = (icebergSchema, type) -> 
newCreateWriterFunc.apply(type);
-      }
-      return this;
-    }
-
-    public WriteBuilder createWriterFunc(
-        BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
newCreateWriterFunc) {
-      this.createWriterFunc = newCreateWriterFunc;
-      return this;
-    }
-
-    public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
-      this.metricsConfig = newMetricsConfig;
-      return this;
-    }
-
-    @Override
-    public WriteBuilder overwrite() {
-      return overwrite(true);
-    }
-
-    public WriteBuilder overwrite(boolean enabled) {
-      this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE : 
ParquetFileWriter.Mode.CREATE;
-      return this;
-    }
-
-    public WriteBuilder writerVersion(WriterVersion version) {
-      this.writerVersion = version;
-      return this;
-    }
-
-    public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
-      this.fileEncryptionKey = encryptionKey;
-      return this;
-    }
-
-    public WriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
-      this.fileAADPrefix = aadPrefix;
-      return this;
-    }
-
-    @SuppressWarnings("unchecked")
-    private <T> WriteSupport<T> getWriteSupport(MessageType type) {
-      if (writeSupport != null) {
-        return (WriteSupport<T>) writeSupport;
-      } else {
-        return new AvroWriteSupport<>(
-            type,
-            ParquetAvro.parquetAvroSchema(AvroSchemaUtil.convert(schema, 
name)),
-            ParquetAvro.DEFAULT_MODEL);
-      }
-    }
-
-    /*
-     * Sets the writer version. Default value is PARQUET_1_0 (v1).
-     */
-    @VisibleForTesting
-    WriteBuilder withWriterVersion(WriterVersion version) {
-      this.writerVersion = version;
-      return this;
-    }
-
-    // supposed to always be a private method used strictly by data and delete 
write builders
-    private WriteBuilder createContextFunc(
-        Function<Map<String, String>, Context> newCreateContextFunc) {
-      this.createContextFunc = newCreateContextFunc;
-      return this;
-    }
-
-    private void setBloomFilterConfig(
-        Context context,
-        Map<String, String> colNameToParquetPathMap,
-        BiConsumer<String, Boolean> withBloomFilterEnabled,
-        BiConsumer<String, Double> withBloomFilterFPP) {
-
-      context
-          .columnBloomFilterEnabled()
-          .forEach(
-              (colPath, isEnabled) -> {
-                String parquetColumnPath = 
colNameToParquetPathMap.get(colPath);
-                if (parquetColumnPath == null) {
-                  LOG.warn("Skipping bloom filter config for missing field: 
{}", colPath);
-                  return;
-                }
-
-                withBloomFilterEnabled.accept(parquetColumnPath, 
Boolean.valueOf(isEnabled));
-                String fpp = context.columnBloomFilterFpp().get(colPath);
-                if (fpp != null) {
-                  withBloomFilterFPP.accept(parquetColumnPath, 
Double.parseDouble(fpp));
-                }
-              });
-    }
-
-    private void setColumnStatsConfig(
-        Context context,
-        Map<String, String> colNameToParquetPathMap,
-        BiConsumer<String, Boolean> withColumnStatsEnabled) {
-
-      context
-          .columnStatsEnabled()
-          .forEach(
-              (colPath, isEnabled) -> {
-                String parquetColumnPath = 
colNameToParquetPathMap.get(colPath);
-                if (parquetColumnPath == null) {
-                  LOG.warn("Skipping column statistics config for missing 
field: {}", colPath);
-                  return;
-                }
-                withColumnStatsEnabled.accept(parquetColumnPath, 
Boolean.valueOf(isEnabled));
-              });
-    }
-
-    @Override
-    public <D> FileAppender<D> build() throws IOException {
-      Preconditions.checkNotNull(schema, "Schema is required");
-      Preconditions.checkNotNull(name, "Table name is required and cannot be 
null");
-
-      // add the Iceberg schema to keyValueMetadata
-      meta("iceberg.schema", SchemaParser.toJson(schema));
-
-      // Map Iceberg properties to pass down to the Parquet writer
-      Context context = createContextFunc.apply(config);
-
-      int rowGroupSize = context.rowGroupSize();
-      int pageSize = context.pageSize();
-      int pageRowLimit = context.pageRowLimit();
-      int dictionaryPageSize = context.dictionaryPageSize();
-      String compressionLevel = context.compressionLevel();
-      CompressionCodecName codec = context.codec();
-      int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
-      int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
-      int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
-      boolean dictionaryEnabled = context.dictionaryEnabled();
-
-      if (compressionLevel != null) {
-        switch (codec) {
-          case GZIP:
-            config.put("zlib.compress.level", compressionLevel);
-            break;
-          case BROTLI:
-            config.put("compression.brotli.quality", compressionLevel);
-            break;
-          case ZSTD:
-            // keep "io.compression.codec.zstd.level" for backwards 
compatibility
-            config.put("io.compression.codec.zstd.level", compressionLevel);
-            config.put("parquet.compression.codec.zstd.level", 
compressionLevel);
-            break;
-          default:
-            // compression level is not supported; ignore it
-        }
-      }
-
-      set("parquet.avro.write-old-list-structure", "false");
-      MessageType type = ParquetSchemaUtil.convert(schema, name, 
variantShreddingFunc);
-
-      FileEncryptionProperties fileEncryptionProperties = null;
-      if (fileEncryptionKey != null) {
-        byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
-        byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
-
-        fileEncryptionProperties =
-            FileEncryptionProperties.builder(encryptionKeyArray)
-                .withAADPrefix(aadPrefixArray)
-                .withoutAADPrefixStorage()
-                .build();
-      } else {
-        Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with 
null encryption key");
-      }
-
-      Map<String, String> colNameToParquetPathMap =
-          type.getColumns().stream()
-              .filter(
-                  col -> {
-                    ID id = col.getPrimitiveType().getId();
-                    return id != null && schema.findColumnName(id.intValue()) 
!= null;
-                  })
-              .collect(
-                  Collectors.toMap(
-                      col -> 
schema.findColumnName(col.getPrimitiveType().getId().intValue()),
-                      col -> String.join(".", col.getPath())));
-
-      if (createWriterFunc != null) {
-        Preconditions.checkArgument(
-            writeSupport == null, "Cannot write with both write support and 
Parquet value writer");
-
-        for (Map.Entry<String, String> entry : config.entrySet()) {
-          conf.set(entry.getKey(), entry.getValue());
-        }
-
-        ParquetProperties.Builder propsBuilder =
-            ParquetProperties.builder()
-                .withWriterVersion(writerVersion)
-                .withPageSize(pageSize)
-                .withPageRowCountLimit(pageRowLimit)
-                .withDictionaryEncoding(dictionaryEnabled)
-                .withDictionaryPageSize(dictionaryPageSize)
-                .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
-                .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
-                .withMaxBloomFilterBytes(bloomFilterMaxBytes);
-
-        setBloomFilterConfig(
-            context,
-            colNameToParquetPathMap,
-            propsBuilder::withBloomFilterEnabled,
-            propsBuilder::withBloomFilterFPP);
-
-        setColumnStatsConfig(context, colNameToParquetPathMap, 
propsBuilder::withStatisticsEnabled);
-
-        ParquetProperties parquetProperties = propsBuilder.build();
-
-        return new org.apache.iceberg.parquet.ParquetWriter<>(
-            conf,
-            file,
-            schema,
-            type,
-            rowGroupSize,
-            metadata,
-            createWriterFunc,
-            codec,
-            parquetProperties,
-            metricsConfig,
-            writeMode,
-            fileEncryptionProperties);
-      } else {
-        ParquetWriteBuilder<D> parquetWriteBuilder =
-            new ParquetWriteBuilder<D>(ParquetIO.file(file))
-                .withWriterVersion(writerVersion)
-                .setType(type)
-                .setConfig(config)
-                .setKeyValueMetadata(metadata)
-                .setWriteSupport(getWriteSupport(type))
-                .withCompressionCodec(codec)
-                .withWriteMode(writeMode)
-                .withRowGroupSize(rowGroupSize)
-                .withPageSize(pageSize)
-                .withPageRowCountLimit(pageRowLimit)
-                .withDictionaryEncoding(dictionaryEnabled)
-                .withDictionaryPageSize(dictionaryPageSize)
-                .withEncryption(fileEncryptionProperties);
-
-        setBloomFilterConfig(
-            context,
-            colNameToParquetPathMap,
-            parquetWriteBuilder::withBloomFilterEnabled,
-            parquetWriteBuilder::withBloomFilterFPP);
-
-        setColumnStatsConfig(
-            context, colNameToParquetPathMap, 
parquetWriteBuilder::withStatisticsEnabled);
-
-        return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), 
metricsConfig);
-      }
-    }
-
-    private static class Context {
-      private final int rowGroupSize;
-      private final int pageSize;
-      private final int pageRowLimit;
-      private final int dictionaryPageSize;
-      private final CompressionCodecName codec;
-      private final String compressionLevel;
-      private final int rowGroupCheckMinRecordCount;
-      private final int rowGroupCheckMaxRecordCount;
-      private final int bloomFilterMaxBytes;
-      private final Map<String, String> columnBloomFilterFpp;
-      private final Map<String, String> columnBloomFilterEnabled;
-      private final Map<String, String> columnStatsEnabled;
-      private final boolean dictionaryEnabled;
-
-      private Context(
-          int rowGroupSize,
-          int pageSize,
-          int pageRowLimit,
-          int dictionaryPageSize,
-          CompressionCodecName codec,
-          String compressionLevel,
-          int rowGroupCheckMinRecordCount,
-          int rowGroupCheckMaxRecordCount,
-          int bloomFilterMaxBytes,
-          Map<String, String> columnBloomFilterFpp,
-          Map<String, String> columnBloomFilterEnabled,
-          Map<String, String> columnStatsEnabled,
-          boolean dictionaryEnabled) {
-        this.rowGroupSize = rowGroupSize;
-        this.pageSize = pageSize;
-        this.pageRowLimit = pageRowLimit;
-        this.dictionaryPageSize = dictionaryPageSize;
-        this.codec = codec;
-        this.compressionLevel = compressionLevel;
-        this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount;
-        this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount;
-        this.bloomFilterMaxBytes = bloomFilterMaxBytes;
-        this.columnBloomFilterFpp = columnBloomFilterFpp;
-        this.columnBloomFilterEnabled = columnBloomFilterEnabled;
-        this.columnStatsEnabled = columnStatsEnabled;
-        this.dictionaryEnabled = dictionaryEnabled;
-      }
-
-      static Context dataContext(Map<String, String> config) {
-        int rowGroupSize =
-            PropertyUtil.propertyAsInt(
-                config, PARQUET_ROW_GROUP_SIZE_BYTES, 
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
-        Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be 
> 0");
-
-        int pageSize =
-            PropertyUtil.propertyAsInt(
-                config, PARQUET_PAGE_SIZE_BYTES, 
PARQUET_PAGE_SIZE_BYTES_DEFAULT);
-        Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
-
-        int pageRowLimit =
-            PropertyUtil.propertyAsInt(
-                config, PARQUET_PAGE_ROW_LIMIT, 
PARQUET_PAGE_ROW_LIMIT_DEFAULT);
-        Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit 
must be > 0");
-
-        int dictionaryPageSize =
-            PropertyUtil.propertyAsInt(
-                config, PARQUET_DICT_SIZE_BYTES, 
PARQUET_DICT_SIZE_BYTES_DEFAULT);
-        Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page 
size must be > 0");
-
-        String codecAsString =
-            config.getOrDefault(PARQUET_COMPRESSION, 
PARQUET_COMPRESSION_DEFAULT);
-        CompressionCodecName codec = toCodec(codecAsString);
-
-        String compressionLevel =
-            config.getOrDefault(PARQUET_COMPRESSION_LEVEL, 
PARQUET_COMPRESSION_LEVEL_DEFAULT);
-
-        int rowGroupCheckMinRecordCount =
-            PropertyUtil.propertyAsInt(
-                config,
-                PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT,
-                PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT);
-        Preconditions.checkArgument(
-            rowGroupCheckMinRecordCount > 0, "Row group check minimal record 
count must be > 0");
-
-        int rowGroupCheckMaxRecordCount =
-            PropertyUtil.propertyAsInt(
-                config,
-                PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT,
-                PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT);
-        Preconditions.checkArgument(
-            rowGroupCheckMaxRecordCount > 0, "Row group check maximum record 
count must be > 0");
-        Preconditions.checkArgument(
-            rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
-            "Row group check maximum record count must be >= minimal record 
count");
-
-        int bloomFilterMaxBytes =
-            PropertyUtil.propertyAsInt(
-                config, PARQUET_BLOOM_FILTER_MAX_BYTES, 
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
-        Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max 
Bytes must be > 0");
-
-        Map<String, String> columnBloomFilterFpp =
-            PropertyUtil.propertiesWithPrefix(config, 
PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX);
-
-        Map<String, String> columnBloomFilterEnabled =
-            PropertyUtil.propertiesWithPrefix(config, 
PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);
-
-        Map<String, String> columnStatsEnabled =
-            PropertyUtil.propertiesWithPrefix(config, 
PARQUET_COLUMN_STATS_ENABLED_PREFIX);
-
-        boolean dictionaryEnabled =
-            PropertyUtil.propertyAsBoolean(config, 
ParquetOutputFormat.ENABLE_DICTIONARY, true);
-
-        return new Context(
-            rowGroupSize,
-            pageSize,
-            pageRowLimit,
-            dictionaryPageSize,
-            codec,
-            compressionLevel,
-            rowGroupCheckMinRecordCount,
-            rowGroupCheckMaxRecordCount,
-            bloomFilterMaxBytes,
-            columnBloomFilterFpp,
-            columnBloomFilterEnabled,
-            columnStatsEnabled,
-            dictionaryEnabled);
-      }
-
-      static Context deleteContext(Map<String, String> config) {
-        // default delete config using data config
-        Context dataContext = dataContext(config);
-
-        int rowGroupSize =
-            PropertyUtil.propertyAsInt(
-                config, DELETE_PARQUET_ROW_GROUP_SIZE_BYTES, 
dataContext.rowGroupSize());
-        Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be 
> 0");
-
-        int pageSize =
-            PropertyUtil.propertyAsInt(
-                config, DELETE_PARQUET_PAGE_SIZE_BYTES, 
dataContext.pageSize());
-        Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
-
-        int pageRowLimit =
-            PropertyUtil.propertyAsInt(
-                config, DELETE_PARQUET_PAGE_ROW_LIMIT, 
dataContext.pageRowLimit());
-        Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit 
must be > 0");
-
-        int dictionaryPageSize =
-            PropertyUtil.propertyAsInt(
-                config, DELETE_PARQUET_DICT_SIZE_BYTES, 
dataContext.dictionaryPageSize());
-        Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page 
size must be > 0");
-
-        String codecAsString = config.get(DELETE_PARQUET_COMPRESSION);
-        CompressionCodecName codec =
-            codecAsString != null ? toCodec(codecAsString) : 
dataContext.codec();
-
-        String compressionLevel =
-            config.getOrDefault(DELETE_PARQUET_COMPRESSION_LEVEL, 
dataContext.compressionLevel());
-
-        int rowGroupCheckMinRecordCount =
-            PropertyUtil.propertyAsInt(
-                config,
-                DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT,
-                dataContext.rowGroupCheckMinRecordCount());
-        Preconditions.checkArgument(
-            rowGroupCheckMinRecordCount > 0, "Row group check minimal record 
count must be > 0");
-
-        int rowGroupCheckMaxRecordCount =
-            PropertyUtil.propertyAsInt(
-                config,
-                DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT,
-                dataContext.rowGroupCheckMaxRecordCount());
-        Preconditions.checkArgument(
-            rowGroupCheckMaxRecordCount > 0, "Row group check maximum record 
count must be > 0");
-        Preconditions.checkArgument(
-            rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
-            "Row group check maximum record count must be >= minimal record 
count");
-
-        boolean dictionaryEnabled =
-            PropertyUtil.propertyAsBoolean(config, 
ParquetOutputFormat.ENABLE_DICTIONARY, true);
-
-        return new Context(
-            rowGroupSize,
-            pageSize,
-            pageRowLimit,
-            dictionaryPageSize,
-            codec,
-            compressionLevel,
-            rowGroupCheckMinRecordCount,
-            rowGroupCheckMaxRecordCount,
-            PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT,
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            ImmutableMap.of(),
-            dictionaryEnabled);
-      }
-
-      private static CompressionCodecName toCodec(String codecAsString) {
-        try {
-          return 
CompressionCodecName.valueOf(codecAsString.toUpperCase(Locale.ENGLISH));
-        } catch (IllegalArgumentException e) {
-          throw new IllegalArgumentException("Unsupported compression codec: " 
+ codecAsString);
-        }
-      }
-
-      int rowGroupSize() {
-        return rowGroupSize;
-      }
-
-      int pageSize() {
-        return pageSize;
-      }
-
-      int pageRowLimit() {
-        return pageRowLimit;
-      }
-
-      int dictionaryPageSize() {
-        return dictionaryPageSize;
-      }
-
-      CompressionCodecName codec() {
-        return codec;
-      }
-
-      String compressionLevel() {
-        return compressionLevel;
-      }
-
-      int rowGroupCheckMinRecordCount() {
-        return rowGroupCheckMinRecordCount;
-      }
-
-      int rowGroupCheckMaxRecordCount() {
-        return rowGroupCheckMaxRecordCount;
-      }
-
-      int bloomFilterMaxBytes() {
-        return bloomFilterMaxBytes;
-      }
-
-      Map<String, String> columnBloomFilterFpp() {
-        return columnBloomFilterFpp;
-      }
-
-      Map<String, String> columnBloomFilterEnabled() {
-        return columnBloomFilterEnabled;
-      }
-
-      Map<String, String> columnStatsEnabled() {
-        return columnStatsEnabled;
-      }
-
-      boolean dictionaryEnabled() {
-        return dictionaryEnabled;
-      }
-    }
-  }
-
-  public static DataWriteBuilder writeData(OutputFile file) {
-    return new DataWriteBuilder(file);
-  }
-
-  public static DataWriteBuilder writeData(EncryptedOutputFile file) {
-    if (file instanceof NativeEncryptionOutputFile) {
-      NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) 
file;
-      return writeData(nativeFile.plainOutputFile())
-          .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
-          .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
-    } else {
-      return writeData(file.encryptingOutputFile());
-    }
-  }
-
-  public static class DataWriteBuilder {
-    private final WriteBuilder appenderBuilder;
-    private final String location;
-    private PartitionSpec spec = null;
-    private StructLike partition = null;
-    private EncryptionKeyMetadata keyMetadata = null;
-    private SortOrder sortOrder = null;
-
-    private DataWriteBuilder(OutputFile file) {
-      this.appenderBuilder = write(file);
-      this.location = file.location();
-    }
-
-    public DataWriteBuilder forTable(Table table) {
-      schema(table.schema());
-      withSpec(table.spec());
-      setAll(table.properties());
-      metricsConfig(MetricsConfig.forTable(table));
-      return this;
-    }
-
-    public DataWriteBuilder schema(Schema newSchema) {
-      appenderBuilder.schema(newSchema);
-      return this;
-    }
-
-    public DataWriteBuilder set(String property, String value) {
-      appenderBuilder.set(property, value);
-      return this;
-    }
-
-    public DataWriteBuilder setAll(Map<String, String> properties) {
-      appenderBuilder.setAll(properties);
-      return this;
-    }
-
-    public DataWriteBuilder meta(String property, String value) {
-      appenderBuilder.meta(property, value);
-      return this;
-    }
-
-    public DataWriteBuilder overwrite() {
-      return overwrite(true);
-    }
-
-    public DataWriteBuilder overwrite(boolean enabled) {
-      appenderBuilder.overwrite(enabled);
-      return this;
-    }
-
-    public DataWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
-      appenderBuilder.metricsConfig(newMetricsConfig);
-      return this;
-    }
-
-    public DataWriteBuilder createWriterFunc(
-        Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
-      appenderBuilder.createWriterFunc(newCreateWriterFunc);
-      return this;
-    }
-
-    public DataWriteBuilder createWriterFunc(
-        BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
newCreateWriterFunc) {
-      appenderBuilder.createWriterFunc(newCreateWriterFunc);
-      return this;
-    }
-
-    public DataWriteBuilder variantShreddingFunc(VariantShreddingFunction 
func) {
-      appenderBuilder.variantShreddingFunc(func);
-      return this;
-    }
-
-    public DataWriteBuilder withSpec(PartitionSpec newSpec) {
-      this.spec = newSpec;
-      return this;
-    }
-
-    public DataWriteBuilder withPartition(StructLike newPartition) {
-      this.partition = newPartition;
-      return this;
-    }
-
-    public DataWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
-      this.keyMetadata = metadata;
-      return this;
-    }
-
-    public DataWriteBuilder withFileEncryptionKey(ByteBuffer 
fileEncryptionKey) {
-      appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
-      return this;
-    }
-
-    public DataWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
-      appenderBuilder.withAADPrefix(aadPrefix);
-      return this;
-    }
-
-    public DataWriteBuilder withSortOrder(SortOrder newSortOrder) {
-      this.sortOrder = newSortOrder;
-      return this;
-    }
-
-    public <T> DataWriter<T> build() throws IOException {
-      Preconditions.checkArgument(spec != null, "Cannot create data writer 
without spec");
-      Preconditions.checkArgument(
-          spec.isUnpartitioned() || partition != null,
-          "Partition must not be null when creating data writer for 
partitioned spec");
-
-      FileAppender<T> fileAppender = appenderBuilder.build();
-      return new DataWriter<>(
-          fileAppender, FileFormat.PARQUET, location, spec, partition, 
keyMetadata, sortOrder);
-    }
-  }
-
-  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
-    return new DeleteWriteBuilder(file);
-  }
-
-  public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
-    if (file instanceof NativeEncryptionOutputFile) {
-      NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) 
file;
-      return writeDeletes(nativeFile.plainOutputFile())
-          .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
-          .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
-    } else {
-      return writeDeletes(file.encryptingOutputFile());
-    }
-  }
-
-  public static class DeleteWriteBuilder {
-    private final WriteBuilder appenderBuilder;
-    private final String location;
-    private BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
createWriterFunc = null;
-    private Schema rowSchema = null;
-    private PartitionSpec spec = null;
-    private StructLike partition = null;
-    private EncryptionKeyMetadata keyMetadata = null;
-    private int[] equalityFieldIds = null;
-    private SortOrder sortOrder;
-    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
-
-    private DeleteWriteBuilder(OutputFile file) {
-      this.appenderBuilder = write(file);
-      this.location = file.location();
-    }
-
-    public DeleteWriteBuilder forTable(Table table) {
-      rowSchema(table.schema());
-      withSpec(table.spec());
-      setAll(table.properties());
-      metricsConfig(MetricsConfig.forTable(table));
-      return this;
-    }
-
-    public DeleteWriteBuilder set(String property, String value) {
-      appenderBuilder.set(property, value);
-      return this;
-    }
-
-    public DeleteWriteBuilder setAll(Map<String, String> properties) {
-      appenderBuilder.setAll(properties);
-      return this;
-    }
-
-    public DeleteWriteBuilder meta(String property, String value) {
-      appenderBuilder.meta(property, value);
-      return this;
-    }
-
-    public DeleteWriteBuilder overwrite() {
-      return overwrite(true);
-    }
-
-    public DeleteWriteBuilder overwrite(boolean enabled) {
-      appenderBuilder.overwrite(enabled);
-      return this;
-    }
-
-    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
-      appenderBuilder.metricsConfig(newMetricsConfig);
-      return this;
-    }
-
-    public DeleteWriteBuilder createWriterFunc(
-        Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
-      this.createWriterFunc = (ignored, fileSchema) -> 
newCreateWriterFunc.apply(fileSchema);
-      return this;
-    }
-
-    public DeleteWriteBuilder createWriterFunc(
-        BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
newCreateWriterFunc) {
-      this.createWriterFunc = newCreateWriterFunc;
-      return this;
-    }
-
-    public DeleteWriteBuilder rowSchema(Schema newSchema) {
-      this.rowSchema = newSchema;
-      return this;
-    }
-
-    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
-      this.spec = newSpec;
-      return this;
-    }
-
-    public DeleteWriteBuilder withPartition(StructLike key) {
-      this.partition = key;
-      return this;
-    }
-
-    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
-      this.keyMetadata = metadata;
-      return this;
-    }
-
-    public DeleteWriteBuilder withFileEncryptionKey(ByteBuffer 
fileEncryptionKey) {
-      appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
-      return this;
-    }
-
-    public DeleteWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
-      appenderBuilder.withAADPrefix(aadPrefix);
-      return this;
-    }
-
-    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
-      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
-      return this;
-    }
-
-    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
-      this.equalityFieldIds = fieldIds;
-      return this;
-    }
-
-    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> 
newPathTransformFunc) {
-      this.pathTransformFunc = newPathTransformFunc;
-      return this;
-    }
-
-    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
-      this.sortOrder = newSortOrder;
-      return this;
-    }
-
-    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws 
IOException {
-      Preconditions.checkState(
-          rowSchema != null, "Cannot create equality delete file without a 
schema");
-      Preconditions.checkState(
-          equalityFieldIds != null, "Cannot create equality delete file 
without delete field ids");
-      Preconditions.checkState(
-          createWriterFunc != null,
-          "Cannot create equality delete file unless createWriterFunc is set");
-      Preconditions.checkArgument(
-          spec != null, "Spec must not be null when creating equality delete 
writer");
-      Preconditions.checkArgument(
-          spec.isUnpartitioned() || partition != null,
-          "Partition must not be null for partitioned writes");
-
-      meta("delete-type", "equality");
-      meta(
-          "delete-field-ids",
-          IntStream.of(equalityFieldIds)
-              .mapToObj(Objects::toString)
-              .collect(Collectors.joining(", ")));
-
-      // the appender uses the row schema without extra columns
-      appenderBuilder.schema(rowSchema);
-      appenderBuilder.createWriterFunc(createWriterFunc);
-      appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
-
-      return new EqualityDeleteWriter<>(
-          appenderBuilder.build(),
-          FileFormat.PARQUET,
-          location,
-          spec,
-          partition,
-          keyMetadata,
-          sortOrder,
-          equalityFieldIds);
-    }
-
-    public <T> PositionDeleteWriter<T> buildPositionWriter() throws 
IOException {
-      Preconditions.checkState(
-          equalityFieldIds == null, "Cannot create position delete file using 
delete field ids");
-      Preconditions.checkArgument(
-          spec != null, "Spec must not be null when creating position delete 
writer");
-      Preconditions.checkArgument(
-          spec.isUnpartitioned() || partition != null,
-          "Partition must not be null for partitioned writes");
-      Preconditions.checkArgument(
-          rowSchema == null || createWriterFunc != null,
-          "Create function should be provided if we write row data");
-
-      meta("delete-type", "position");
-
-      if (rowSchema != null && createWriterFunc != null) {
-        // the appender uses the row schema wrapped with position fields
-        appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema));
-
-        appenderBuilder.createWriterFunc(
-            (schema, parquetSchema) -> {
-              ParquetValueWriter<?> writer = createWriterFunc.apply(schema, 
parquetSchema);
-              if (writer instanceof StructWriter) {
-                return new PositionDeleteStructWriter<T>(
-                    (StructWriter<?>) writer, pathTransformFunc);
-              } else {
-                throw new UnsupportedOperationException(
-                    "Cannot wrap writer for position deletes: " + 
writer.getClass());
-              }
-            });
-
-      } else {
-        appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
-
-        // We ignore the 'createWriterFunc' and 'rowSchema' even if is 
provided, since we do not
-        // write row data itself
-        appenderBuilder.createWriterFunc(
-            (schema, parquetSchema) ->
-                new PositionDeleteStructWriter<T>(
-                    (StructWriter<?>) GenericParquetWriter.create(schema, 
parquetSchema),
-                    Function.identity()));
-      }
-
-      appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
-
-      return new PositionDeleteWriter<>(
-          appenderBuilder.build(), FileFormat.PARQUET, location, spec, 
partition, keyMetadata);
-    }
-  }
-
-  private static class ParquetWriteBuilder<T>
-      extends ParquetWriter.Builder<T, ParquetWriteBuilder<T>> {
-    private Map<String, String> keyValueMetadata = Maps.newHashMap();
-    private Map<String, String> config = Maps.newHashMap();
-    private MessageType type;
-    private WriteSupport<T> writeSupport;
-
-    private ParquetWriteBuilder(org.apache.parquet.io.OutputFile path) {
-      super(path);
-    }
-
-    @Override
-    protected ParquetWriteBuilder<T> self() {
-      return this;
-    }
-
-    public ParquetWriteBuilder<T> setKeyValueMetadata(Map<String, String> 
keyValueMetadata) {
-      this.keyValueMetadata = keyValueMetadata;
-      return self();
-    }
-
-    public ParquetWriteBuilder<T> setConfig(Map<String, String> config) {
-      this.config = config;
-      return self();
-    }
-
-    public ParquetWriteBuilder<T> setType(MessageType type) {
-      this.type = type;
-      return self();
-    }
-
-    public ParquetWriteBuilder<T> setWriteSupport(WriteSupport<T> 
writeSupport) {
-      this.writeSupport = writeSupport;
-      return self();
-    }
-
-    @Override
-    protected WriteSupport<T> getWriteSupport(Configuration configuration) {
-      for (Map.Entry<String, String> entry : config.entrySet()) {
-        configuration.set(entry.getKey(), entry.getValue());
-      }
-      return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport);
-    }
-  }
-
-  public static ReadBuilder read(InputFile file) {
-    if (file instanceof NativeEncryptionInputFile) {
-      NativeEncryptionInputFile nativeFile = (NativeEncryptionInputFile) file;
-      return new ReadBuilder(nativeFile.encryptedInputFile())
-          .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
-          .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
-    } else {
-      return new ReadBuilder(file);
-    }
-  }
-
-  public static class ReadBuilder implements InternalData.ReadBuilder {
-    private final InputFile file;
-    private final Map<String, String> properties = Maps.newHashMap();
-    private Long start = null;
-    private Long length = null;
-    private Schema schema = null;
-    private Expression filter = null;
-    private ReadSupport<?> readSupport = null;
-    private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = 
null;
-    private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
-    private BiFunction<Schema, MessageType, ParquetValueReader<?>> 
readerFuncWithSchema = null;
-    private boolean filterRecords = true;
-    private boolean caseSensitive = true;
-    private boolean callInit = false;
-    private boolean reuseContainers = false;
-    private int maxRecordsPerBatch = 10000;
-    private NameMapping nameMapping = null;
-    private ByteBuffer fileEncryptionKey = null;
-    private ByteBuffer fileAADPrefix = null;
-
-    private ReadBuilder(InputFile file) {
-      this.file = file;
-    }
-
-    /**
-     * Restricts the read to the given range: [start, start + length).
-     *
-     * @param newStart the start position for this read
-     * @param newLength the length of the range this read should scan
-     * @return this builder for method chaining
-     */
-    @Override
-    public ReadBuilder split(long newStart, long newLength) {
-      this.start = newStart;
-      this.length = newLength;
-      return this;
-    }
-
-    @Override
-    public ReadBuilder project(Schema newSchema) {
-      this.schema = newSchema;
-      return this;
-    }
-
-    public ReadBuilder caseInsensitive() {
-      return caseSensitive(false);
-    }
-
-    public ReadBuilder caseSensitive(boolean newCaseSensitive) {
-      this.caseSensitive = newCaseSensitive;
-      return this;
-    }
-
-    public ReadBuilder filterRecords(boolean newFilterRecords) {
-      this.filterRecords = newFilterRecords;
-      return this;
-    }
-
-    public ReadBuilder filter(Expression newFilter) {
-      this.filter = newFilter;
-      return this;
-    }
-
-    /**
-     * @deprecated will be removed in 2.0.0; use {@link 
#createReaderFunc(Function)} instead
-     */
-    @Deprecated
-    public ReadBuilder readSupport(ReadSupport<?> newFilterSupport) {
-      this.readSupport = newFilterSupport;
-      return this;
-    }
-
-    public ReadBuilder createReaderFunc(
-        Function<MessageType, ParquetValueReader<?>> newReaderFunction) {
-      Preconditions.checkArgument(
-          this.batchedReaderFunc == null,
-          "Cannot set reader function: batched reader function already set");
-      Preconditions.checkArgument(
-          this.readerFuncWithSchema == null,
-          "Cannot set reader function: 2-argument reader function already 
set");
-      this.readerFunc = newReaderFunction;
-      return this;
-    }
-
-    public ReadBuilder createReaderFunc(
-        BiFunction<Schema, MessageType, ParquetValueReader<?>> 
newReaderFunction) {
-      Preconditions.checkArgument(
-          this.readerFunc == null,
-          "Cannot set 2-argument reader function: reader function already 
set");
-      Preconditions.checkArgument(
-          this.batchedReaderFunc == null,
-          "Cannot set 2-argument reader function: batched reader function 
already set");
-      this.readerFuncWithSchema = newReaderFunction;
-      return this;
-    }
-
-    public ReadBuilder createBatchedReaderFunc(Function<MessageType, 
VectorizedReader<?>> func) {
-      Preconditions.checkArgument(
-          this.readerFunc == null,
-          "Cannot set batched reader function: reader function already set");
-      Preconditions.checkArgument(
-          this.readerFuncWithSchema == null,
-          "Cannot set batched reader function: 2-argument reader function 
already set");
-      this.batchedReaderFunc = func;
-      return this;
-    }
-
-    public ReadBuilder set(String key, String value) {
-      properties.put(key, value);
-      return this;
-    }
-
-    /**
-     * @deprecated will be removed in 2.0.0; use {@link 
#createReaderFunc(Function)} instead
-     */
-    @Deprecated
-    public ReadBuilder callInit() {
-      this.callInit = true;
-      return this;
-    }
-
-    @Override
-    public ReadBuilder reuseContainers() {
-      this.reuseContainers = true;
-      return this;
-    }
-
-    public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
-      this.maxRecordsPerBatch = numRowsPerBatch;
-      return this;
-    }
-
-    public ReadBuilder withNameMapping(NameMapping newNameMapping) {
-      this.nameMapping = newNameMapping;
-      return this;
-    }
-
-    @Override
-    public ReadBuilder setRootType(Class<? extends StructLike> rootClass) {
-      throw new UnsupportedOperationException("Custom types are not yet 
supported");
-    }
-
-    @Override
-    public ReadBuilder setCustomType(int fieldId, Class<? extends StructLike> 
structClass) {
-      throw new UnsupportedOperationException("Custom types are not yet 
supported");
-    }
-
-    public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
-      this.fileEncryptionKey = encryptionKey;
-      return this;
-    }
-
-    public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
-      this.fileAADPrefix = aadPrefix;
-      return this;
-    }
-
-    @Override
-    @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
-    public <D> CloseableIterable<D> build() {
-      FileDecryptionProperties fileDecryptionProperties = null;
-      if (fileEncryptionKey != null) {
-        byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
-        byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
-        fileDecryptionProperties =
-            FileDecryptionProperties.builder()
-                .withFooterKey(encryptionKeyArray)
-                .withAADPrefix(aadPrefixArray)
-                .build();
-      } else {
-        Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with 
null encryption key");
-      }
-
-      if (readerFunc != null || readerFuncWithSchema != null || 
batchedReaderFunc != null) {
-        ParquetReadOptions.Builder optionsBuilder;
-        if (file instanceof HadoopInputFile) {
-          // remove read properties already set that may conflict with this 
read
-          Configuration conf = new Configuration(((HadoopInputFile) 
file).getConf());
-          for (String property : READ_PROPERTIES_TO_REMOVE) {
-            conf.unset(property);
-          }
-          optionsBuilder = HadoopReadOptions.builder(conf);
-        } else {
-          optionsBuilder = ParquetReadOptions.builder(new 
PlainParquetConfiguration());
-        }
-
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-          optionsBuilder.set(entry.getKey(), entry.getValue());
-        }
-
-        if (start != null) {
-          optionsBuilder.withRange(start, start + length);
-        }
-
-        if (fileDecryptionProperties != null) {
-          optionsBuilder.withDecryption(fileDecryptionProperties);
-        }
-
-        ParquetReadOptions options = optionsBuilder.build();
-
-        NameMapping mapping;
-        if (nameMapping != null) {
-          mapping = nameMapping;
-        } else if 
(SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) {
-          mapping = null;
-        } else {
-          mapping = NameMapping.empty();
-        }
-
-        if (batchedReaderFunc != null) {
-          return new VectorizedParquetReader<>(
-              file,
-              schema,
-              options,
-              batchedReaderFunc,
-              mapping,
-              filter,
-              reuseContainers,
-              caseSensitive,
-              maxRecordsPerBatch);
-        } else {
-          Function<MessageType, ParquetValueReader<?>> readBuilder =
-              readerFuncWithSchema != null ?
-                  fileType -> readerFuncWithSchema.apply(schema, fileType) :
-                  readerFunc;
-          return new org.apache.iceberg.parquet.ParquetReader<>(
-              file, schema, options, readBuilder, mapping, filter, 
reuseContainers, caseSensitive);
-        }
-      }
-
-      ParquetReadBuilder<D> builder = new 
ParquetReadBuilder<>(ParquetIO.file(file));
-
-      builder.project(schema);
-
-      if (readSupport != null) {
-        builder.readSupport((ReadSupport<D>) readSupport);
-      } else {
-        builder.readSupport(new AvroReadSupport<>(ParquetAvro.DEFAULT_MODEL));
-      }
-
-      // default options for readers
-      builder
-          .set("parquet.strict.typing", "false") // allow type promotion
-          .set("parquet.avro.compatible", "false") // use the new RecordReader 
with Utf8 support
-          .set(
-              "parquet.avro.add-list-element-records",
-              "false"); // assume that lists use a 3-level schema
-
-      for (Map.Entry<String, String> entry : properties.entrySet()) {
-        builder.set(entry.getKey(), entry.getValue());
-      }
-
-      if (filter != null) {
-        // TODO: should not need to get the schema to push down before opening 
the file.
-        // Parquet should allow setting a filter inside its read support
-        ParquetReadOptions decryptOptions =
-            ParquetReadOptions.builder(new PlainParquetConfiguration())
-                .withDecryption(fileDecryptionProperties)
-                .build();
-        MessageType type;
-        try (ParquetFileReader schemaReader =
-                 ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) 
{
-          type = schemaReader.getFileMetaData().getSchema();
-        } catch (IOException e) {
-          throw new RuntimeIOException(e);
-        }
-        Schema fileSchema = ParquetSchemaUtil.convert(type);
-        builder
-            .useStatsFilter()
-            .useDictionaryFilter()
-            .useRecordFilter(filterRecords)
-            .useBloomFilter()
-            .withFilter(ParquetFilters.convert(fileSchema, filter, 
caseSensitive));
-      } else {
-        // turn off filtering
-        builder
-            .useStatsFilter(false)
-            .useDictionaryFilter(false)
-            .useBloomFilter(false)
-            .useRecordFilter(false);
-      }
-
-      if (callInit) {
-        builder.callInit();
-      }
-
-      if (start != null) {
-        builder.withFileRange(start, start + length);
-      }
-
-      if (nameMapping != null) {
-        builder.withNameMapping(nameMapping);
-      }
-
-      if (fileDecryptionProperties != null) {
-        builder.withDecryption(fileDecryptionProperties);
-      }
-
-      return new ParquetIterable<>(builder);
-    }
-  }
-
-  private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
-    private Schema schema = null;
-    private ReadSupport<T> readSupport = null;
-    private boolean callInit = false;
-    private NameMapping nameMapping = null;
-
-    private ParquetReadBuilder(org.apache.parquet.io.InputFile file) {
-      super(file);
-    }
-
-    public ParquetReadBuilder<T> project(Schema newSchema) {
-      this.schema = newSchema;
-      return this;
-    }
-
-    public ParquetReadBuilder<T> withNameMapping(NameMapping newNameMapping) {
-      this.nameMapping = newNameMapping;
-      return this;
-    }
-
-    public ParquetReadBuilder<T> readSupport(ReadSupport<T> newReadSupport) {
-      this.readSupport = newReadSupport;
-      return this;
-    }
-
-    public ParquetReadBuilder<T> callInit() {
-      this.callInit = true;
-      return this;
-    }
-
-    @Override
-    protected ReadSupport<T> getReadSupport() {
-      return new ParquetReadSupport<>(schema, readSupport, callInit, 
nameMapping);
-    }
-  }
-
-  /**
-   * Combines several files into one
-   *
-   * @param inputFiles an {@link Iterable} of parquet files. The order of 
iteration determines the
-   *     order in which content of files are read and written to the {@code 
outputFile}
-   * @param outputFile the output parquet file containing all the data from 
{@code inputFiles}
-   * @param rowGroupSize the row group size to use when writing the {@code 
outputFile}
-   * @param schema the schema of the data
-   * @param metadata extraMetadata to write at the footer of the {@code 
outputFile}
-   */
-  public static void concat(
-      Iterable<File> inputFiles,
-      File outputFile,
-      int rowGroupSize,
-      Schema schema,
-      Map<String, String> metadata)
-      throws IOException {
-    OutputFile file = Files.localOutput(outputFile);
-    try (ParquetFileWriter writer =
-        new ParquetFileWriter(
-            ParquetIO.file(file),
-            ParquetSchemaUtil.convert(schema, "table"),
-            ParquetFileWriter.Mode.CREATE,
-            rowGroupSize,
-            0)) {
-      writer.start();
-      for (File inputFile : inputFiles) {
-        writer.appendFile(ParquetIO.file(Files.localInput(inputFile)));
-      }
-      writer.end(metadata);
-    }
-  }
-}
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
index 7d0f0feec8c..736a39b895c 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
@@ -19,64 +19,106 @@
 
 package org.apache.iceberg.parquet;
 
+import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-import java.util.function.Supplier;
+import org.apache.iceberg.Accessor;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.PhysicalType;
 import org.apache.iceberg.variants.Variant;
 
+/**
+ * Utilities for variant shredding support in Parquet writers.
+ *
+ * <p>This includes:
+ * <ul>
+ *   <li>Detecting whether shredding should be enabled for a schema</li>
+ *   <li>Discovering eligible VARIANT fields (top-level or nested in structs; 
not in lists/maps)</li>
+ *   <li>Building a {@link VariantShreddingFunction} for data-driven 
typed_value schema inference</li>
+ * </ul>
+ */
 public class VariantUtil {
 
   private VariantUtil() {
   }
 
   /**
-   * Create a VariantShreddingFunction if variant shredding is enabled and the 
schema has variant columns.
+   * A VARIANT field in an Iceberg {@link Schema} that can be used for 
shredding.
    *
-   * @param schema        The Iceberg schema
-   * @param sampleRecord  A sample record to infer variant schemas from actual 
data (can be null)
-   * @param properties    Table properties to check if variant shredding is 
enabled
-   * @return An Optional containing the VariantShreddingFunction if applicable
+   * <p>Shredding is supported for top-level VARIANT columns or VARIANT fields 
nested in structs.
+   * VARIANT fields stored inside lists or maps are excluded because schema 
accessors do not retrieve
+   * list/map contents.
+   */
+  public record VariantField(int fieldId, Accessor<StructLike> accessor, 
String[] path) {
+  }
+
+  /**
+   * Check if variant shredding is enabled via table properties.
    */
-  public static Optional<VariantShreddingFunction> variantShreddingFunc(
-      Schema schema,
-      Supplier<Record> sampleRecord,
-      Map<String, String> properties) {
-
-    // Preconditions: must have variant columns + property enabled
-    if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) {
-      return Optional.empty();
+  public static boolean isVariantShreddingEnabled(Map<String, String> 
properties) {
+    String shreddingEnabled = 
properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED);
+    return Boolean.parseBoolean(shreddingEnabled);
+  }
+
+  public static boolean isShreddable(Object value) {
+    if (value instanceof Variant variant) {
+      return variant.value().type() != PhysicalType.NULL;
     }
+    return false;
+  }
 
-    VariantShreddingFunction fn =
-        constructVariantShreddingFunc(sampleRecord.get(), schema);
+  public static List<VariantField> variantFieldsForShredding(
+      Map<String, String> properties, Schema schema) {
+    if (!isVariantShreddingEnabled(properties)) {
+      return List.of();
+    }
+    return variantFieldsForShredding(schema);
+  }
+
+  /**
+   * Returns all VARIANT fields that are eligible for shredding: top-level 
VARIANT columns and VARIANT
+   * fields nested in structs (excluding lists/maps).
+   */
+  private static List<VariantField> variantFieldsForShredding(Schema schema) {
+    List<VariantField> results = Lists.newArrayList();
+    new VariantFieldVisitor(schema).collect(results);
+    return results;
+  }
+
+  public static boolean shouldUseVariantShredding(Map<String, String> 
properties, Schema schema) {
+    return isVariantShreddingEnabled(properties) && hasVariantFields(schema);
+  }
 
-    return Optional.of(fn);
+  private static boolean hasVariantFields(Schema schema) {
+    return new VariantFieldVisitor(schema).hasVariantField();
   }
 
-  private static VariantShreddingFunction constructVariantShreddingFunc(
+  public static VariantShreddingFunction variantShreddingFunc(
       Record sampleRecord, Schema schema) {
 
-    return (id, name) -> {
+    return (id, ignoredName) -> {
       // Validate the field exists and is a variant type
       Types.NestedField field = schema.findField(id);
 
-      if (field == null || !(field.type() instanceof Types.VariantType)) {
+      if (field == null || !field.type().isVariantType()) {
         return null; // Not a variant field, no shredding
       }
 
       // If we have a sample record, try to generate schema from actual data
       if (sampleRecord != null) {
         try {
-          Object variantValue = sampleRecord.getField(name);
+          // NOTE: Parquet conversion passes the field's local name, not the 
full path.
+          // Use an accessor to support variant fields nested in structs.
+          Accessor<StructLike> accessor = schema.accessorForField(id);
+          Object variantValue = accessor != null ? accessor.get(sampleRecord) 
: null;
           if (variantValue instanceof Variant variant) {
-            // Use ParquetVariantUtil to generate schema from actual variant 
value
             return ParquetVariantUtil.toParquetSchema(variant.value());
           }
-        } catch (Exception e) {
+        } catch (RuntimeException e) {
           // Fall through to default schema
         }
       }
@@ -84,20 +126,66 @@ private static VariantShreddingFunction 
constructVariantShreddingFunc(
     };
   }
 
-  /**
-   * Check if the schema contains any variant columns.
-   */
-  private static boolean hasVariantColumns(Schema schema) {
-    return schema.columns().stream()
-        .anyMatch(field -> field.type() instanceof Types.VariantType);
-  }
+  private static final class VariantFieldVisitor {
+    private final Schema schema;
 
-  /**
-   * Check if variant shredding is enabled via table properties.
-   */
-  private static boolean isVariantShreddingEnabled(Map<String, String> 
properties) {
-    String shreddingEnabled = 
properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED);
-    return Boolean.parseBoolean(shreddingEnabled);
-  }
+    private VariantFieldVisitor(Schema schema) {
+      this.schema = schema;
+    }
+
+    private boolean hasVariantField() {
+      return hasVariantField(schema.asStruct());
+    }
+
+    private boolean hasVariantField(Types.StructType struct) {
+      for (Types.NestedField field : struct.fields()) {
+        if (field.type().isVariantType()) {
+          // Accessors don't retrieve values contained in lists/maps so this 
enforces the "struct-only"
+          // nesting rule for shredding.
+          if (schema.accessorForField(field.fieldId()) != null) {
+            return true;
+          }
+        } else if (field.type().isStructType() && 
hasVariantField(field.type().asStructType())) {
+          return true;
+        }
+        // Do not recurse into List or Map (shredding is not supported there)
+      }
+      return false;
+    }
 
+    private void collect(List<VariantField> results) {
+      collect(schema.asStruct(), Lists.newArrayList(), results);
+    }
+
+    private void collect(
+        Types.StructType struct,
+        List<String> parents,
+        List<VariantField> results) {
+
+      for (Types.NestedField field : struct.fields()) {
+        if (field.type().isVariantType()) {
+          // Accessors don't retrieve values contained in lists/maps so this 
enforces the "struct-only"
+          // nesting rule for shredding.
+          Accessor<StructLike> accessor = 
schema.accessorForField(field.fieldId());
+          if (accessor == null) {
+            continue;
+          }
+
+          String[] path = new String[parents.size() + 1];
+          for (int i = 0; i < parents.size(); i++) {
+            path[i] = parents.get(i);
+          }
+          path[parents.size()] = field.name();
+
+          results.add(new VariantField(field.fieldId(), accessor, path));
+
+        } else if (field.type().isStructType()) {
+          parents.add(field.name());
+          collect(field.type().asStructType(), parents, results);
+          parents.removeLast();
+        }
+        // Do not recurse into List or Map (shredding is not supported there)
+      }
+    }
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
index 990d6e782fd..8a1cf740067 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
@@ -22,9 +22,6 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -35,16 +32,11 @@
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assume.assumeTrue;
 
 /**
@@ -195,68 +187,6 @@ public void testSpecialCharacters() {
     Assert.assertArrayEquals(new Object[]{"star", 2L}, result.get(1));
   }
 
-  @Test
-  public void testVariantSelectProjection() throws IOException {
-    assumeTrue(fileFormat == FileFormat.PARQUET);
-    assumeTrue(!isVectorized);
-
-    TableIdentifier table = TableIdentifier.of("default", 
"variant_projection");
-    shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
-
-    shell.executeStatement(
-        String.format(
-            "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG 
STORED AS %s %s %s",
-            table,
-            fileFormat,
-            testTables.locationForCreateTableSQL(table),
-            testTables.propertiesForCreateTableSQL(
-                ImmutableMap.of("format-version", "3", 
"variant.shredding.enabled", "true"))));
-
-    shell.executeStatement(
-        String.format(
-            "INSERT INTO %s VALUES " +
-                "(1, parse_json('{\"name\":\"Alice\",\"age\":30}'))," +
-                "(2, parse_json('{\"name\":\"Bob\"}'))",
-            table));
-
-    List<Object[]> rows =
-        shell.executeStatement(
-            String.format(
-                "SELECT id, " +
-                    "variant_get(payload, '$.name') AS name, " +
-                    "try_variant_get(payload, '$.age', 'int') AS age " +
-                    "FROM %s ORDER BY id",
-                table));
-
-    Assert.assertEquals(2, rows.size());
-    Assert.assertEquals(1, ((Number) rows.get(0)[0]).intValue());
-    Assert.assertEquals("Alice", rows.get(0)[1]);
-    Assert.assertEquals(30, ((Number) rows.get(0)[2]).intValue());
-
-    Assert.assertEquals(2, ((Number) rows.get(1)[0]).intValue());
-    Assert.assertEquals("Bob", rows.get(1)[1]);
-    Assert.assertNull(rows.get(1)[2]);
-
-    Table icebergTable = testTables.loadTable(table);
-    Types.NestedField variantField = 
icebergTable.schema().findField("payload");
-    Assert.assertNotNull("Variant column should exist", variantField);
-    DataFile dataFile =
-        StreamSupport.stream(
-                
icebergTable.currentSnapshot().addedDataFiles(icebergTable.io()).spliterator(), 
false)
-            .findFirst()
-            .orElseThrow(() -> new IllegalStateException("No data files 
written for test table"));
-
-    Path parquetPath = new Path(dataFile.path().toString());
-    try (ParquetFileReader reader =
-        ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath, 
shell.getHiveConf()))) {
-      MessageType parquetSchema = 
reader.getFooter().getFileMetaData().getSchema();
-      GroupType variantType = 
parquetSchema.getType(variantField.name()).asGroupType();
-      assertThat(variantType.containsField("typed_value")).isTrue();
-    }
-
-    shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
-  }
-
   @Test
   public void testScanTableCaseInsensitive() throws IOException {
     testTables.createTable(shell, "customers",
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergVariant.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergVariant.java
new file mode 100644
index 00000000000..7ec594a609c
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergVariant.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assume.assumeTrue;
+
+public class TestHiveIcebergVariant extends 
HiveIcebergStorageHandlerWithEngineBase {
+  private static final String TYPED_VALUE_FIELD = "typed_value";
+
+  @Test
+  public void testVariantSelectProjection() throws IOException {
+    assumeParquetNonVectorized();
+
+    TableIdentifier table = TableIdentifier.of("default", 
"variant_projection");
+    shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+
+    shell.executeStatement(
+        String.format(
+            "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG 
STORED AS %s %s %s",
+            table,
+            fileFormat,
+            testTables.locationForCreateTableSQL(table),
+            testTables.propertiesForCreateTableSQL(
+                ImmutableMap.of("format-version", "3", 
"variant.shredding.enabled", "true"))));
+
+    shell.executeStatement(
+        String.format(
+            "INSERT INTO %s VALUES " +
+                "(1, parse_json('null'))," +
+                "(2, parse_json('{\"name\":\"Alice\",\"age\":30}'))," +
+                "(3, parse_json('{\"name\":\"Bob\"}'))",
+            table));
+
+    List<Object[]> rows =
+        shell.executeStatement(
+            String.format(
+                "SELECT id, " +
+                    "variant_get(payload, '$.name') AS name, " +
+                    "try_variant_get(payload, '$.age', 'int') AS age " +
+                    "FROM %s ORDER BY id",
+                table));
+
+    Assert.assertEquals(3, rows.size());
+
+    Assert.assertEquals(1, ((Number) rows.get(0)[0]).intValue());
+    Assert.assertNull(rows.get(0)[1]);
+    Assert.assertNull(rows.get(0)[2]);
+
+    Assert.assertEquals(2, ((Number) rows.get(1)[0]).intValue());
+    Assert.assertEquals("Alice", rows.get(1)[1]);
+    Assert.assertEquals(30, ((Number) rows.get(1)[2]).intValue());
+
+    Assert.assertEquals(3, ((Number) rows.get(2)[0]).intValue());
+    Assert.assertEquals("Bob", rows.get(2)[1]);
+    Assert.assertNull(rows.get(2)[2]);
+
+    Table icebergTable = testTables.loadTable(table);
+    Types.NestedField variantField = requiredField(icebergTable, "payload", 
"Variant column should exist");
+    MessageType parquetSchema = readParquetSchema(firstDataFile(icebergTable));
+    assertThat(hasTypedValue(parquetSchema, variantField.name())).isTrue();
+  }
+
+  @Test
+  public void testVariantShreddingInStruct() throws IOException {
+    assumeParquetNonVectorized();
+
+    TableIdentifier table = TableIdentifier.of("default", 
"variant_struct_shredding");
+    shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+
+    shell.executeStatement(
+        String.format(
+            "CREATE TABLE %s (id INT, payload STRUCT<info: VARIANT>) STORED BY 
ICEBERG STORED AS %s %s %s",
+            table,
+            fileFormat,
+            testTables.locationForCreateTableSQL(table),
+            testTables.propertiesForCreateTableSQL(
+                ImmutableMap.of("format-version", "3", 
"variant.shredding.enabled", "true"))));
+
+    shell.executeStatement(
+        String.format(
+            "INSERT INTO %s VALUES " +
+                "(1, named_struct('info', parse_json('null')))," +
+                "(2, named_struct('info', 
parse_json('{\"city\":\"Seattle\",\"state\":\"WA\"}')))",
+            table));
+
+    Table icebergTable = testTables.loadTable(table);
+    Types.NestedField payloadField = requiredField(icebergTable, "payload", 
"Struct column should exist");
+    MessageType parquetSchema = readParquetSchema(firstDataFile(icebergTable));
+    assertThat(hasTypedValue(parquetSchema, payloadField.name(), 
"info")).isTrue();
+  }
+
+  @Test
+  public void testVariantShreddingNotAppliedInArrayOrMap() throws IOException {
+    assumeParquetNonVectorized();
+
+    TableIdentifier table = TableIdentifier.of("default", 
"variant_container_no_shredding");
+    shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+
+    shell.executeStatement(
+        String.format(
+            "CREATE TABLE %s (id INT, arr ARRAY<VARIANT>, mp MAP<STRING, 
VARIANT>) " +
+                "STORED BY ICEBERG STORED AS %s %s %s",
+            table,
+            fileFormat,
+            testTables.locationForCreateTableSQL(table),
+            testTables.propertiesForCreateTableSQL(
+                ImmutableMap.of("format-version", "3", 
"variant.shredding.enabled", "true"))));
+
+    shell.executeStatement(
+        String.format(
+            "INSERT INTO %s VALUES " +
+                "(1, array(parse_json('{\"a\":1}')), map('k', 
parse_json('{\"b\":2}')))",
+            table));
+
+    Table icebergTable = testTables.loadTable(table);
+    MessageType parquetSchema = readParquetSchema(firstDataFile(icebergTable));
+    // The element/value types should remain as the base VARIANT struct (no 
typed_value).
+    assertThat(hasTypedValue(parquetSchema, "arr", "list", 
"element")).isFalse();
+    assertThat(hasTypedValue(parquetSchema, "mp", "key_value", 
"value")).isFalse();
+  }
+
+  private void assumeParquetNonVectorized() {
+    assumeTrue(fileFormat == FileFormat.PARQUET);
+    assumeTrue(!isVectorized);
+  }
+
+  private static Types.NestedField requiredField(Table table, String 
fieldName, String message) {
+    Types.NestedField field = table.schema().findField(fieldName);
+    Assert.assertNotNull(message, field);
+    return field;
+  }
+
+  private static DataFile firstDataFile(Table table) {
+    return 
StreamSupport.stream(table.currentSnapshot().addedDataFiles(table.io()).spliterator(),
 false)
+        .findFirst()
+        .orElseThrow(() -> new IllegalStateException("No data files written 
for test table"));
+  }
+
+  private MessageType readParquetSchema(DataFile dataFile) throws IOException {
+    Path parquetPath = new Path(dataFile.location());
+    try (ParquetFileReader reader =
+             ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath, 
shell.getHiveConf()))) {
+      return reader.getFooter().getFileMetaData().getSchema();
+    }
+  }
+
+  private static GroupType groupAt(MessageType parquetSchema, String... path) {
+    org.apache.parquet.schema.Type type = parquetSchema.getType(path[0]);
+    for (int i = 1; i < path.length; i++) {
+      type = type.asGroupType().getType(path[i]);
+    }
+    return type.asGroupType();
+  }
+
+  private static boolean hasTypedValue(MessageType parquetSchema, String... 
pathToVariantGroup) {
+    return groupAt(parquetSchema, 
pathToVariantGroup).containsField(TYPED_VALUE_FIELD);
+  }
+}
diff --git a/iceberg/iceberg-shading/pom.xml b/iceberg/iceberg-shading/pom.xml
index ac4bf4a9b10..17d50dd91d6 100644
--- a/iceberg/iceberg-shading/pom.xml
+++ b/iceberg/iceberg-shading/pom.xml
@@ -117,7 +117,6 @@
                     </excludes>
                   </artifactSet>
                   <filters>
-                    <!-- Global exclusions -->
                     <filter>
                       <artifact>*:*</artifact>
                       <excludes>
@@ -128,13 +127,6 @@
                         <exclude>static/</exclude>
                       </excludes>
                     </filter>
-                    <!-- Exclude org.apache.iceberg.parquet.Parquet.class from 
iceberg-parquet -->
-                    <filter>
-                      <artifact>org.apache.iceberg:iceberg-parquet</artifact>
-                      <excludes>
-                        
<exclude>org/apache/iceberg/parquet/Parquet.class</exclude>
-                      </excludes>
-                    </filter>
                   </filters>
                 </configuration>
               </execution>

Reply via email to