This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 78a620eea2c HIVE-29287: Iceberg: [V3] Variant Shredding support (#6152)
78a620eea2c is described below

commit 78a620eea2c69e1ddf5175f91be7c64999c063e6
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Fri Dec 12 19:06:16 2025 +0100

    HIVE-29287: Iceberg: [V3] Variant Shredding support (#6152)
---
 .../org/apache/iceberg/mr/InputFormatConfig.java   |    1 +
 .../IcebergVariantObjectInspector.java             |   13 +-
 .../mr/hive/writer/HiveFileWriterFactory.java      |   20 +
 .../writer/HiveIcebergCopyOnWriteRecordWriter.java |   15 +
 .../mr/hive/writer/HiveIcebergRecordWriter.java    |    9 +-
 .../java/org/apache/iceberg/parquet/Parquet.java   | 1524 ++++++++++++++++++++
 .../org/apache/iceberg/parquet/VariantUtil.java    |  103 ++
 .../iceberg/mr/hive/TestHiveIcebergSelects.java    |   70 +
 .../test/queries/positive/variant_type_shredding.q |   39 +
 .../results/positive/variant_type_shredding.q.out  |  101 ++
 iceberg/iceberg-shading/pom.xml                    |    8 +
 .../hadoop/hive/serde2/variant/VariantBuilder.java |    2 +-
 12 files changed, 1897 insertions(+), 8 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 2ee6627789b..a7ba33624b3 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -54,6 +54,7 @@ private InputFormatConfig() {
   public static final String TABLE_CATALOG_PREFIX = 
"iceberg.mr.table.catalog.";
   public static final String LOCALITY = "iceberg.mr.locality";
   public static final String WRITE_FANOUT_ENABLED = "write.fanout.enabled";
+  public static final String VARIANT_SHREDDING_ENABLED = 
"variant.shredding.enabled";
 
   public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name";
   public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java
index 192d4b25bd0..8afae4c4895 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.mr.hive.serde.objectinspector;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 import java.util.List;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -61,11 +62,13 @@ public Object getStructFieldData(Object data, StructField 
fieldRef) {
 
     switch (field.getFieldID()) {
       case 0: // "metadata" field (binary)
-        ByteBuffer metadata = 
ByteBuffer.allocate(variant.metadata().sizeInBytes());
+        ByteBuffer metadata = 
ByteBuffer.allocate(variant.metadata().sizeInBytes())
+            .order(ByteOrder.LITTLE_ENDIAN);
         variant.metadata().writeTo(metadata, 0);
         return metadata.array();
       case 1: // "value" field (binary)
-        ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
+        ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
+            .order(ByteOrder.LITTLE_ENDIAN);
         variant.value().writeTo(value, 0);
         return value.array();
       default:
@@ -79,10 +82,12 @@ public List<Object> getStructFieldsDataAsList(Object data) {
       return null;
     }
     Variant variant = (Variant) data;
-    ByteBuffer metadata = 
ByteBuffer.allocate(variant.metadata().sizeInBytes());
+    ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
+        .order(ByteOrder.LITTLE_ENDIAN);
     variant.metadata().writeTo(metadata, 0);
 
-    ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
+    ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
+        .order(ByteOrder.LITTLE_ENDIAN);
     variant.value().writeTo(value, 0);
 
     // Return the data for our fields in the correct order: metadata, value
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
index 7f106f25480..6489c78f9ed 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
@@ -19,6 +19,8 @@
 
 package org.apache.iceberg.mr.hive.writer;
 
+import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
@@ -31,9 +33,13 @@
 import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.VariantUtil;
 
 class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
 
+  private final Map<String, String> properties;
+  private Supplier<Record> sampleRecord = null;
+
   HiveFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
@@ -54,6 +60,7 @@ class HiveFileWriterFactory extends 
BaseFileWriterFactory<Record> {
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
         positionDeleteRowSchema);
+    properties = table.properties();
   }
 
   static Builder builderFor(Table table) {
@@ -78,6 +85,9 @@ protected void 
configurePositionDelete(Avro.DeleteWriteBuilder builder) {
   @Override
   protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
     builder.createWriterFunc(GenericParquetWriter::create);
+    // Configure variant shredding function if conditions are met:
+    VariantUtil.variantShreddingFunc(dataSchema(), sampleRecord, properties)
+        .ifPresent(builder::variantShreddingFunc);
   }
 
   @Override
@@ -149,4 +159,14 @@ HiveFileWriterFactory build() {
           positionDeleteRowSchema);
     }
   }
+
+  /**
+   * Set a sample record to use for data-driven variant shredding schema 
generation.
+   * Should be called before the Parquet writer is created.
+   */
+  public void initialize(Supplier<Record> record) {
+    if (sampleRecord == null) {
+      sampleRecord = record;
+    }
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
index f3c04c279e7..e8d5ed2a8e4 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
@@ -21,6 +21,7 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import org.apache.hadoop.io.Writable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
@@ -28,6 +29,7 @@
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.hive.HiveSchemaUtil;
 import org.apache.iceberg.io.DataWriteResult;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.mr.hive.FilesForCommit;
@@ -35,14 +37,19 @@
 import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
 
 class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {
 
   private final int currentSpecId;
+  private final Set<String> missingColumns;
+  private final List<Types.NestedField> missingOrStructFields;
 
   private final GenericRecord rowDataTemplate;
   private final List<DataFile> replacedDataFiles;
 
+  private final HiveFileWriterFactory fileWriterFactory;
+
   HiveIcebergCopyOnWriteRecordWriter(Table table, HiveFileWriterFactory 
writerFactory,
       OutputFileFactory deleteFileFactory, Context context) {
     super(table, newDataWriter(table, writerFactory, deleteFileFactory, 
context));
@@ -50,6 +57,12 @@ class HiveIcebergCopyOnWriteRecordWriter extends 
HiveIcebergWriterBase {
     this.currentSpecId = table.spec().specId();
     this.rowDataTemplate = GenericRecord.create(table.schema());
     this.replacedDataFiles = Lists.newArrayList();
+
+    this.missingColumns = context.missingColumns();
+    this.missingOrStructFields = 
specs.get(currentSpecId).schema().asStruct().fields().stream()
+        .filter(field -> missingColumns.contains(field.name()) || 
field.type().isStructType())
+        .toList();
+    this.fileWriterFactory = writerFactory;
   }
 
   @Override
@@ -69,6 +82,8 @@ public void write(Writable row) throws IOException {
             .build();
       replacedDataFiles.add(dataFile);
     } else {
+      HiveSchemaUtil.setDefaultValues(rowData, missingOrStructFields, 
missingColumns);
+      fileWriterFactory.initialize(() -> rowData);
       writer.write(rowData, specs.get(currentSpecId), partition(rowData, 
currentSpecId));
     }
   }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
index 9adfd15dc20..ca9d232e3d5 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
@@ -40,6 +40,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
   private final Set<String> missingColumns;
   private final List<Types.NestedField> missingOrStructFields;
 
+  private final HiveFileWriterFactory fileWriterFactory;
+
   HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
       OutputFileFactory dataFileFactory, Context context) {
     super(table, newDataWriter(table, fileWriterFactory, dataFileFactory, 
context));
@@ -47,18 +49,19 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase 
{
     this.currentSpecId = table.spec().specId();
     this.missingColumns = context.missingColumns();
     this.missingOrStructFields = 
specs.get(currentSpecId).schema().asStruct().fields().stream()
-        .filter(field -> missingColumns.contains(field.name()) || 
field.type().isStructType()).toList();
+        .filter(field -> missingColumns.contains(field.name()) || 
field.type().isStructType())
+        .toList();
+    this.fileWriterFactory = fileWriterFactory;
   }
 
   @Override
   public void write(Writable row) throws IOException {
     Record record = ((Container<Record>) row).get();
     HiveSchemaUtil.setDefaultValues(record, missingOrStructFields, 
missingColumns);
-
+    fileWriterFactory.initialize(() -> record);
     writer.write(record, specs.get(currentSpecId), partition(record, 
currentSpecId));
   }
 
-
   @Override
   public FilesForCommit files() {
     List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
new file mode 100644
index 00000000000..56e3f3480c7
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -0,0 +1,1524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.InternalData;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.encryption.NativeEncryptionInputFile;
+import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.HadoopOutputFile;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import 
org.apache.iceberg.parquet.ParquetValueWriters.PositionDeleteStructWriter;
+import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroWriteSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.conf.PlainParquetConfiguration;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type.ID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
+import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
+import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT;
+import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
+import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
+import static 
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static 
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX;
+import static 
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
+import static 
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
+import static 
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT;
+import static 
org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
+import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
+import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
+// TODO: remove class once upgraded to Iceberg v1.11.0 
(https://github.com/apache/iceberg/pull/14153)
+
+public class Parquet {
+  private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);
+
+  private Parquet() {
+  }
+
+  private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
+      Sets.newHashSet(
+          "parquet.read.filter",
+          "parquet.private.read.filter.predicate",
+          "parquet.read.support.class",
+          "parquet.crypto.factory.class");
+
+  public static WriteBuilder write(OutputFile file) {
+    if (file instanceof EncryptedOutputFile) {
+      return write((EncryptedOutputFile) file);
+    }
+
+    return new WriteBuilder(file);
+  }
+
+  public static WriteBuilder write(EncryptedOutputFile file) {
+    if (file instanceof NativeEncryptionOutputFile) {
+      NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) 
file;
+      return write(nativeFile.plainOutputFile())
+          .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
+          .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
+    } else {
+      return write(file.encryptingOutputFile());
+    }
+  }
+
+  public static class WriteBuilder implements InternalData.WriteBuilder {
+    private final OutputFile file;
+    private final Configuration conf;
+    private final Map<String, String> metadata = Maps.newLinkedHashMap();
+    private final Map<String, String> config = Maps.newLinkedHashMap();
+    private Schema schema = null;
+    private VariantShreddingFunction variantShreddingFunc = null;
+    private String name = "table";
+    private WriteSupport<?> writeSupport = null;
+    private BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
createWriterFunc = null;
+    private MetricsConfig metricsConfig = MetricsConfig.getDefault();
+    private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
+    private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
+    private Function<Map<String, String>, Context> createContextFunc = 
Context::dataContext;
+    private ByteBuffer fileEncryptionKey = null;
+    private ByteBuffer fileAADPrefix = null;
+
+    private WriteBuilder(OutputFile file) {
+      this.file = file;
+      if (file instanceof HadoopOutputFile) {
+        this.conf = new Configuration(((HadoopOutputFile) file).getConf());
+      } else {
+        this.conf = new Configuration();
+      }
+    }
+
+    public WriteBuilder forTable(Table table) {
+      schema(table.schema());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.forTable(table));
+      return this;
+    }
+
+    @Override
+    public WriteBuilder schema(Schema newSchema) {
+      this.schema = newSchema;
+      return this;
+    }
+
+    /**
+     * Set a {@link VariantShreddingFunction} that is called with each variant 
field's name and
+     * field ID to produce the shredding type as a {@code typed_value} field. 
This field is added to
+     * the result variant struct alongside the {@code metadata} and {@code 
value} fields.
+     *
+     * @param func {@link VariantShreddingFunction} that produces a shredded 
{@code typed_value}
+     * @return this for method chaining
+     */
+    public WriteBuilder variantShreddingFunc(VariantShreddingFunction func) {
+      this.variantShreddingFunc = func;
+      return this;
+    }
+
+    @Override
+    public WriteBuilder named(String newName) {
+      this.name = newName;
+      return this;
+    }
+
+    public WriteBuilder writeSupport(WriteSupport<?> newWriteSupport) {
+      this.writeSupport = newWriteSupport;
+      return this;
+    }
+
+    @Override
+    public WriteBuilder set(String property, String value) {
+      config.put(property, value);
+      return this;
+    }
+
+    public WriteBuilder setAll(Map<String, String> properties) {
+      config.putAll(properties);
+      return this;
+    }
+
+    @Override
+    public WriteBuilder meta(String property, String value) {
+      metadata.put(property, value);
+      return this;
+    }
+
+    public WriteBuilder createWriterFunc(
+        Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
+      if (newCreateWriterFunc != null) {
+        this.createWriterFunc = (icebergSchema, type) -> 
newCreateWriterFunc.apply(type);
+      }
+      return this;
+    }
+
+    public WriteBuilder createWriterFunc(
+        BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
newCreateWriterFunc) {
+      this.createWriterFunc = newCreateWriterFunc;
+      return this;
+    }
+
+    public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      this.metricsConfig = newMetricsConfig;
+      return this;
+    }
+
+    @Override
+    public WriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public WriteBuilder overwrite(boolean enabled) {
+      this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE : 
ParquetFileWriter.Mode.CREATE;
+      return this;
+    }
+
+    public WriteBuilder writerVersion(WriterVersion version) {
+      this.writerVersion = version;
+      return this;
+    }
+
+    public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
+      this.fileEncryptionKey = encryptionKey;
+      return this;
+    }
+
+    public WriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
+      this.fileAADPrefix = aadPrefix;
+      return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <T> WriteSupport<T> getWriteSupport(MessageType type) {
+      if (writeSupport != null) {
+        return (WriteSupport<T>) writeSupport;
+      } else {
+        return new AvroWriteSupport<>(
+            type,
+            ParquetAvro.parquetAvroSchema(AvroSchemaUtil.convert(schema, 
name)),
+            ParquetAvro.DEFAULT_MODEL);
+      }
+    }
+
+    /*
+     * Sets the writer version. Default value is PARQUET_1_0 (v1).
+     */
+    @VisibleForTesting
+    WriteBuilder withWriterVersion(WriterVersion version) {
+      this.writerVersion = version;
+      return this;
+    }
+
+    // supposed to always be a private method used strictly by data and delete 
write builders
+    private WriteBuilder createContextFunc(
+        Function<Map<String, String>, Context> newCreateContextFunc) {
+      this.createContextFunc = newCreateContextFunc;
+      return this;
+    }
+
+    private void setBloomFilterConfig(
+        Context context,
+        Map<String, String> colNameToParquetPathMap,
+        BiConsumer<String, Boolean> withBloomFilterEnabled,
+        BiConsumer<String, Double> withBloomFilterFPP) {
+
+      context
+          .columnBloomFilterEnabled()
+          .forEach(
+              (colPath, isEnabled) -> {
+                String parquetColumnPath = 
colNameToParquetPathMap.get(colPath);
+                if (parquetColumnPath == null) {
+                  LOG.warn("Skipping bloom filter config for missing field: 
{}", colPath);
+                  return;
+                }
+
+                withBloomFilterEnabled.accept(parquetColumnPath, 
Boolean.valueOf(isEnabled));
+                String fpp = context.columnBloomFilterFpp().get(colPath);
+                if (fpp != null) {
+                  withBloomFilterFPP.accept(parquetColumnPath, 
Double.parseDouble(fpp));
+                }
+              });
+    }
+
+    private void setColumnStatsConfig(
+        Context context,
+        Map<String, String> colNameToParquetPathMap,
+        BiConsumer<String, Boolean> withColumnStatsEnabled) {
+
+      context
+          .columnStatsEnabled()
+          .forEach(
+              (colPath, isEnabled) -> {
+                String parquetColumnPath = 
colNameToParquetPathMap.get(colPath);
+                if (parquetColumnPath == null) {
+                  LOG.warn("Skipping column statistics config for missing 
field: {}", colPath);
+                  return;
+                }
+                withColumnStatsEnabled.accept(parquetColumnPath, 
Boolean.valueOf(isEnabled));
+              });
+    }
+
+    @Override
+    public <D> FileAppender<D> build() throws IOException {
+      Preconditions.checkNotNull(schema, "Schema is required");
+      Preconditions.checkNotNull(name, "Table name is required and cannot be 
null");
+
+      // add the Iceberg schema to keyValueMetadata
+      meta("iceberg.schema", SchemaParser.toJson(schema));
+
+      // Map Iceberg properties to pass down to the Parquet writer
+      Context context = createContextFunc.apply(config);
+
+      int rowGroupSize = context.rowGroupSize();
+      int pageSize = context.pageSize();
+      int pageRowLimit = context.pageRowLimit();
+      int dictionaryPageSize = context.dictionaryPageSize();
+      String compressionLevel = context.compressionLevel();
+      CompressionCodecName codec = context.codec();
+      int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
+      int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
+      int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
+      boolean dictionaryEnabled = context.dictionaryEnabled();
+
+      if (compressionLevel != null) {
+        switch (codec) {
+          case GZIP:
+            config.put("zlib.compress.level", compressionLevel);
+            break;
+          case BROTLI:
+            config.put("compression.brotli.quality", compressionLevel);
+            break;
+          case ZSTD:
+            // keep "io.compression.codec.zstd.level" for backwards 
compatibility
+            config.put("io.compression.codec.zstd.level", compressionLevel);
+            config.put("parquet.compression.codec.zstd.level", 
compressionLevel);
+            break;
+          default:
+            // compression level is not supported; ignore it
+        }
+      }
+
+      set("parquet.avro.write-old-list-structure", "false");
+      MessageType type = ParquetSchemaUtil.convert(schema, name, 
variantShreddingFunc);
+
+      FileEncryptionProperties fileEncryptionProperties = null;
+      if (fileEncryptionKey != null) {
+        byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
+        byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
+
+        fileEncryptionProperties =
+            FileEncryptionProperties.builder(encryptionKeyArray)
+                .withAADPrefix(aadPrefixArray)
+                .withoutAADPrefixStorage()
+                .build();
+      } else {
+        Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with 
null encryption key");
+      }
+
+      Map<String, String> colNameToParquetPathMap =
+          type.getColumns().stream()
+              .filter(
+                  col -> {
+                    ID id = col.getPrimitiveType().getId();
+                    return id != null && schema.findColumnName(id.intValue()) 
!= null;
+                  })
+              .collect(
+                  Collectors.toMap(
+                      col -> 
schema.findColumnName(col.getPrimitiveType().getId().intValue()),
+                      col -> String.join(".", col.getPath())));
+
+      if (createWriterFunc != null) {
+        Preconditions.checkArgument(
+            writeSupport == null, "Cannot write with both write support and 
Parquet value writer");
+
+        for (Map.Entry<String, String> entry : config.entrySet()) {
+          conf.set(entry.getKey(), entry.getValue());
+        }
+
+        ParquetProperties.Builder propsBuilder =
+            ParquetProperties.builder()
+                .withWriterVersion(writerVersion)
+                .withPageSize(pageSize)
+                .withPageRowCountLimit(pageRowLimit)
+                .withDictionaryEncoding(dictionaryEnabled)
+                .withDictionaryPageSize(dictionaryPageSize)
+                .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
+                .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
+                .withMaxBloomFilterBytes(bloomFilterMaxBytes);
+
+        setBloomFilterConfig(
+            context,
+            colNameToParquetPathMap,
+            propsBuilder::withBloomFilterEnabled,
+            propsBuilder::withBloomFilterFPP);
+
+        setColumnStatsConfig(context, colNameToParquetPathMap, 
propsBuilder::withStatisticsEnabled);
+
+        ParquetProperties parquetProperties = propsBuilder.build();
+
+        return new org.apache.iceberg.parquet.ParquetWriter<>(
+            conf,
+            file,
+            schema,
+            type,
+            rowGroupSize,
+            metadata,
+            createWriterFunc,
+            codec,
+            parquetProperties,
+            metricsConfig,
+            writeMode,
+            fileEncryptionProperties);
+      } else {
+        ParquetWriteBuilder<D> parquetWriteBuilder =
+            new ParquetWriteBuilder<D>(ParquetIO.file(file))
+                .withWriterVersion(writerVersion)
+                .setType(type)
+                .setConfig(config)
+                .setKeyValueMetadata(metadata)
+                .setWriteSupport(getWriteSupport(type))
+                .withCompressionCodec(codec)
+                .withWriteMode(writeMode)
+                .withRowGroupSize(rowGroupSize)
+                .withPageSize(pageSize)
+                .withPageRowCountLimit(pageRowLimit)
+                .withDictionaryEncoding(dictionaryEnabled)
+                .withDictionaryPageSize(dictionaryPageSize)
+                .withEncryption(fileEncryptionProperties);
+
+        setBloomFilterConfig(
+            context,
+            colNameToParquetPathMap,
+            parquetWriteBuilder::withBloomFilterEnabled,
+            parquetWriteBuilder::withBloomFilterFPP);
+
+        setColumnStatsConfig(
+            context, colNameToParquetPathMap, 
parquetWriteBuilder::withStatisticsEnabled);
+
+        return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), 
metricsConfig);
+      }
+    }
+
+    private static class Context {
+      private final int rowGroupSize;
+      private final int pageSize;
+      private final int pageRowLimit;
+      private final int dictionaryPageSize;
+      private final CompressionCodecName codec;
+      private final String compressionLevel;
+      private final int rowGroupCheckMinRecordCount;
+      private final int rowGroupCheckMaxRecordCount;
+      private final int bloomFilterMaxBytes;
+      private final Map<String, String> columnBloomFilterFpp;
+      private final Map<String, String> columnBloomFilterEnabled;
+      private final Map<String, String> columnStatsEnabled;
+      private final boolean dictionaryEnabled;
+
+      private Context(
+          int rowGroupSize,
+          int pageSize,
+          int pageRowLimit,
+          int dictionaryPageSize,
+          CompressionCodecName codec,
+          String compressionLevel,
+          int rowGroupCheckMinRecordCount,
+          int rowGroupCheckMaxRecordCount,
+          int bloomFilterMaxBytes,
+          Map<String, String> columnBloomFilterFpp,
+          Map<String, String> columnBloomFilterEnabled,
+          Map<String, String> columnStatsEnabled,
+          boolean dictionaryEnabled) {
+        this.rowGroupSize = rowGroupSize;
+        this.pageSize = pageSize;
+        this.pageRowLimit = pageRowLimit;
+        this.dictionaryPageSize = dictionaryPageSize;
+        this.codec = codec;
+        this.compressionLevel = compressionLevel;
+        this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount;
+        this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount;
+        this.bloomFilterMaxBytes = bloomFilterMaxBytes;
+        this.columnBloomFilterFpp = columnBloomFilterFpp;
+        this.columnBloomFilterEnabled = columnBloomFilterEnabled;
+        this.columnStatsEnabled = columnStatsEnabled;
+        this.dictionaryEnabled = dictionaryEnabled;
+      }
+
+      static Context dataContext(Map<String, String> config) {
+        int rowGroupSize =
+            PropertyUtil.propertyAsInt(
+                config, PARQUET_ROW_GROUP_SIZE_BYTES, 
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+        Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be 
> 0");
+
+        int pageSize =
+            PropertyUtil.propertyAsInt(
+                config, PARQUET_PAGE_SIZE_BYTES, 
PARQUET_PAGE_SIZE_BYTES_DEFAULT);
+        Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
+
+        int pageRowLimit =
+            PropertyUtil.propertyAsInt(
+                config, PARQUET_PAGE_ROW_LIMIT, 
PARQUET_PAGE_ROW_LIMIT_DEFAULT);
+        Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit 
must be > 0");
+
+        int dictionaryPageSize =
+            PropertyUtil.propertyAsInt(
+                config, PARQUET_DICT_SIZE_BYTES, 
PARQUET_DICT_SIZE_BYTES_DEFAULT);
+        Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page 
size must be > 0");
+
+        String codecAsString =
+            config.getOrDefault(PARQUET_COMPRESSION, 
PARQUET_COMPRESSION_DEFAULT);
+        CompressionCodecName codec = toCodec(codecAsString);
+
+        String compressionLevel =
+            config.getOrDefault(PARQUET_COMPRESSION_LEVEL, 
PARQUET_COMPRESSION_LEVEL_DEFAULT);
+
+        int rowGroupCheckMinRecordCount =
+            PropertyUtil.propertyAsInt(
+                config,
+                PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT,
+                PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT);
+        Preconditions.checkArgument(
+            rowGroupCheckMinRecordCount > 0, "Row group check minimal record 
count must be > 0");
+
+        int rowGroupCheckMaxRecordCount =
+            PropertyUtil.propertyAsInt(
+                config,
+                PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT,
+                PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT);
+        Preconditions.checkArgument(
+            rowGroupCheckMaxRecordCount > 0, "Row group check maximum record 
count must be > 0");
+        Preconditions.checkArgument(
+            rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
+            "Row group check maximum record count must be >= minimal record 
count");
+
+        int bloomFilterMaxBytes =
+            PropertyUtil.propertyAsInt(
+                config, PARQUET_BLOOM_FILTER_MAX_BYTES, 
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
+        Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max 
Bytes must be > 0");
+
+        Map<String, String> columnBloomFilterFpp =
+            PropertyUtil.propertiesWithPrefix(config, 
PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX);
+
+        Map<String, String> columnBloomFilterEnabled =
+            PropertyUtil.propertiesWithPrefix(config, 
PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);
+
+        Map<String, String> columnStatsEnabled =
+            PropertyUtil.propertiesWithPrefix(config, 
PARQUET_COLUMN_STATS_ENABLED_PREFIX);
+
+        boolean dictionaryEnabled =
+            PropertyUtil.propertyAsBoolean(config, 
ParquetOutputFormat.ENABLE_DICTIONARY, true);
+
+        return new Context(
+            rowGroupSize,
+            pageSize,
+            pageRowLimit,
+            dictionaryPageSize,
+            codec,
+            compressionLevel,
+            rowGroupCheckMinRecordCount,
+            rowGroupCheckMaxRecordCount,
+            bloomFilterMaxBytes,
+            columnBloomFilterFpp,
+            columnBloomFilterEnabled,
+            columnStatsEnabled,
+            dictionaryEnabled);
+      }
+
+      static Context deleteContext(Map<String, String> config) {
+        // default delete config using data config
+        Context dataContext = dataContext(config);
+
+        int rowGroupSize =
+            PropertyUtil.propertyAsInt(
+                config, DELETE_PARQUET_ROW_GROUP_SIZE_BYTES, 
dataContext.rowGroupSize());
+        Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be 
> 0");
+
+        int pageSize =
+            PropertyUtil.propertyAsInt(
+                config, DELETE_PARQUET_PAGE_SIZE_BYTES, 
dataContext.pageSize());
+        Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
+
+        int pageRowLimit =
+            PropertyUtil.propertyAsInt(
+                config, DELETE_PARQUET_PAGE_ROW_LIMIT, 
dataContext.pageRowLimit());
+        Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit 
must be > 0");
+
+        int dictionaryPageSize =
+            PropertyUtil.propertyAsInt(
+                config, DELETE_PARQUET_DICT_SIZE_BYTES, 
dataContext.dictionaryPageSize());
+        Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page 
size must be > 0");
+
+        String codecAsString = config.get(DELETE_PARQUET_COMPRESSION);
+        CompressionCodecName codec =
+            codecAsString != null ? toCodec(codecAsString) : 
dataContext.codec();
+
+        String compressionLevel =
+            config.getOrDefault(DELETE_PARQUET_COMPRESSION_LEVEL, 
dataContext.compressionLevel());
+
+        int rowGroupCheckMinRecordCount =
+            PropertyUtil.propertyAsInt(
+                config,
+                DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT,
+                dataContext.rowGroupCheckMinRecordCount());
+        Preconditions.checkArgument(
+            rowGroupCheckMinRecordCount > 0, "Row group check minimal record 
count must be > 0");
+
+        int rowGroupCheckMaxRecordCount =
+            PropertyUtil.propertyAsInt(
+                config,
+                DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT,
+                dataContext.rowGroupCheckMaxRecordCount());
+        Preconditions.checkArgument(
+            rowGroupCheckMaxRecordCount > 0, "Row group check maximum record 
count must be > 0");
+        Preconditions.checkArgument(
+            rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
+            "Row group check maximum record count must be >= minimal record 
count");
+
+        boolean dictionaryEnabled =
+            PropertyUtil.propertyAsBoolean(config, 
ParquetOutputFormat.ENABLE_DICTIONARY, true);
+
+        return new Context(
+            rowGroupSize,
+            pageSize,
+            pageRowLimit,
+            dictionaryPageSize,
+            codec,
+            compressionLevel,
+            rowGroupCheckMinRecordCount,
+            rowGroupCheckMaxRecordCount,
+            PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT,
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            ImmutableMap.of(),
+            dictionaryEnabled);
+      }
+
+      private static CompressionCodecName toCodec(String codecAsString) {
+        try {
+          return 
CompressionCodecName.valueOf(codecAsString.toUpperCase(Locale.ENGLISH));
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException("Unsupported compression codec: " 
+ codecAsString);
+        }
+      }
+
+      int rowGroupSize() {
+        return rowGroupSize;
+      }
+
+      int pageSize() {
+        return pageSize;
+      }
+
+      int pageRowLimit() {
+        return pageRowLimit;
+      }
+
+      int dictionaryPageSize() {
+        return dictionaryPageSize;
+      }
+
+      CompressionCodecName codec() {
+        return codec;
+      }
+
+      String compressionLevel() {
+        return compressionLevel;
+      }
+
+      int rowGroupCheckMinRecordCount() {
+        return rowGroupCheckMinRecordCount;
+      }
+
+      int rowGroupCheckMaxRecordCount() {
+        return rowGroupCheckMaxRecordCount;
+      }
+
+      int bloomFilterMaxBytes() {
+        return bloomFilterMaxBytes;
+      }
+
+      Map<String, String> columnBloomFilterFpp() {
+        return columnBloomFilterFpp;
+      }
+
+      Map<String, String> columnBloomFilterEnabled() {
+        return columnBloomFilterEnabled;
+      }
+
+      Map<String, String> columnStatsEnabled() {
+        return columnStatsEnabled;
+      }
+
+      boolean dictionaryEnabled() {
+        return dictionaryEnabled;
+      }
+    }
+  }
+
+  public static DataWriteBuilder writeData(OutputFile file) {
+    return new DataWriteBuilder(file);
+  }
+
+  public static DataWriteBuilder writeData(EncryptedOutputFile file) {
+    if (file instanceof NativeEncryptionOutputFile) {
+      NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) 
file;
+      return writeData(nativeFile.plainOutputFile())
+          .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
+          .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
+    } else {
+      return writeData(file.encryptingOutputFile());
+    }
+  }
+
+  public static class DataWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private SortOrder sortOrder = null;
+
+    private DataWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DataWriteBuilder forTable(Table table) {
+      schema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.forTable(table));
+      return this;
+    }
+
+    public DataWriteBuilder schema(Schema newSchema) {
+      appenderBuilder.schema(newSchema);
+      return this;
+    }
+
+    public DataWriteBuilder set(String property, String value) {
+      appenderBuilder.set(property, value);
+      return this;
+    }
+
+    public DataWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DataWriteBuilder meta(String property, String value) {
+      appenderBuilder.meta(property, value);
+      return this;
+    }
+
+    public DataWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DataWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DataWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DataWriteBuilder createWriterFunc(
+        Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
+      appenderBuilder.createWriterFunc(newCreateWriterFunc);
+      return this;
+    }
+
+    public DataWriteBuilder createWriterFunc(
+        BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
newCreateWriterFunc) {
+      appenderBuilder.createWriterFunc(newCreateWriterFunc);
+      return this;
+    }
+
+    public DataWriteBuilder variantShreddingFunc(VariantShreddingFunction 
func) {
+      appenderBuilder.variantShreddingFunc(func);
+      return this;
+    }
+
+    public DataWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DataWriteBuilder withPartition(StructLike newPartition) {
+      this.partition = newPartition;
+      return this;
+    }
+
+    public DataWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DataWriteBuilder withFileEncryptionKey(ByteBuffer 
fileEncryptionKey) {
+      appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
+      return this;
+    }
+
+    public DataWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
+      appenderBuilder.withAADPrefix(aadPrefix);
+      return this;
+    }
+
+    public DataWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> DataWriter<T> build() throws IOException {
+      Preconditions.checkArgument(spec != null, "Cannot create data writer 
without spec");
+      Preconditions.checkArgument(
+          spec.isUnpartitioned() || partition != null,
+          "Partition must not be null when creating data writer for 
partitioned spec");
+
+      FileAppender<T> fileAppender = appenderBuilder.build();
+      return new DataWriter<>(
+          fileAppender, FileFormat.PARQUET, location, spec, partition, 
keyMetadata, sortOrder);
+    }
+  }
+
+  public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+    return new DeleteWriteBuilder(file);
+  }
+
+  public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
+    if (file instanceof NativeEncryptionOutputFile) {
+      NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile) 
file;
+      return writeDeletes(nativeFile.plainOutputFile())
+          .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
+          .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
+    } else {
+      return writeDeletes(file.encryptingOutputFile());
+    }
+  }
+
+  public static class DeleteWriteBuilder {
+    private final WriteBuilder appenderBuilder;
+    private final String location;
+    private BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
createWriterFunc = null;
+    private Schema rowSchema = null;
+    private PartitionSpec spec = null;
+    private StructLike partition = null;
+    private EncryptionKeyMetadata keyMetadata = null;
+    private int[] equalityFieldIds = null;
+    private SortOrder sortOrder;
+    private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+    private DeleteWriteBuilder(OutputFile file) {
+      this.appenderBuilder = write(file);
+      this.location = file.location();
+    }
+
+    public DeleteWriteBuilder forTable(Table table) {
+      rowSchema(table.schema());
+      withSpec(table.spec());
+      setAll(table.properties());
+      metricsConfig(MetricsConfig.forTable(table));
+      return this;
+    }
+
+    public DeleteWriteBuilder set(String property, String value) {
+      appenderBuilder.set(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder setAll(Map<String, String> properties) {
+      appenderBuilder.setAll(properties);
+      return this;
+    }
+
+    public DeleteWriteBuilder meta(String property, String value) {
+      appenderBuilder.meta(property, value);
+      return this;
+    }
+
+    public DeleteWriteBuilder overwrite() {
+      return overwrite(true);
+    }
+
+    public DeleteWriteBuilder overwrite(boolean enabled) {
+      appenderBuilder.overwrite(enabled);
+      return this;
+    }
+
+    public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+      appenderBuilder.metricsConfig(newMetricsConfig);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(
+        Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
+      this.createWriterFunc = (ignored, fileSchema) -> 
newCreateWriterFunc.apply(fileSchema);
+      return this;
+    }
+
+    public DeleteWriteBuilder createWriterFunc(
+        BiFunction<Schema, MessageType, ParquetValueWriter<?>> 
newCreateWriterFunc) {
+      this.createWriterFunc = newCreateWriterFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder rowSchema(Schema newSchema) {
+      this.rowSchema = newSchema;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+      this.spec = newSpec;
+      return this;
+    }
+
+    public DeleteWriteBuilder withPartition(StructLike key) {
+      this.partition = key;
+      return this;
+    }
+
+    public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+      this.keyMetadata = metadata;
+      return this;
+    }
+
+    public DeleteWriteBuilder withFileEncryptionKey(ByteBuffer 
fileEncryptionKey) {
+      appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
+      return this;
+    }
+
+    public DeleteWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
+      appenderBuilder.withAADPrefix(aadPrefix);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+      this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+      return this;
+    }
+
+    public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+      this.equalityFieldIds = fieldIds;
+      return this;
+    }
+
+    public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> 
newPathTransformFunc) {
+      this.pathTransformFunc = newPathTransformFunc;
+      return this;
+    }
+
+    public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+      this.sortOrder = newSortOrder;
+      return this;
+    }
+
+    public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws 
IOException {
+      Preconditions.checkState(
+          rowSchema != null, "Cannot create equality delete file without a 
schema");
+      Preconditions.checkState(
+          equalityFieldIds != null, "Cannot create equality delete file 
without delete field ids");
+      Preconditions.checkState(
+          createWriterFunc != null,
+          "Cannot create equality delete file unless createWriterFunc is set");
+      Preconditions.checkArgument(
+          spec != null, "Spec must not be null when creating equality delete 
writer");
+      Preconditions.checkArgument(
+          spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+
+      meta("delete-type", "equality");
+      meta(
+          "delete-field-ids",
+          IntStream.of(equalityFieldIds)
+              .mapToObj(Objects::toString)
+              .collect(Collectors.joining(", ")));
+
+      // the appender uses the row schema without extra columns
+      appenderBuilder.schema(rowSchema);
+      appenderBuilder.createWriterFunc(createWriterFunc);
+      appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
+
+      return new EqualityDeleteWriter<>(
+          appenderBuilder.build(),
+          FileFormat.PARQUET,
+          location,
+          spec,
+          partition,
+          keyMetadata,
+          sortOrder,
+          equalityFieldIds);
+    }
+
+    public <T> PositionDeleteWriter<T> buildPositionWriter() throws 
IOException {
+      Preconditions.checkState(
+          equalityFieldIds == null, "Cannot create position delete file using 
delete field ids");
+      Preconditions.checkArgument(
+          spec != null, "Spec must not be null when creating position delete 
writer");
+      Preconditions.checkArgument(
+          spec.isUnpartitioned() || partition != null,
+          "Partition must not be null for partitioned writes");
+      Preconditions.checkArgument(
+          rowSchema == null || createWriterFunc != null,
+          "Create function should be provided if we write row data");
+
+      meta("delete-type", "position");
+
+      if (rowSchema != null && createWriterFunc != null) {
+        // the appender uses the row schema wrapped with position fields
+        appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema));
+
+        appenderBuilder.createWriterFunc(
+            (schema, parquetSchema) -> {
+              ParquetValueWriter<?> writer = createWriterFunc.apply(schema, 
parquetSchema);
+              if (writer instanceof StructWriter) {
+                return new PositionDeleteStructWriter<T>(
+                    (StructWriter<?>) writer, pathTransformFunc);
+              } else {
+                throw new UnsupportedOperationException(
+                    "Cannot wrap writer for position deletes: " + 
writer.getClass());
+              }
+            });
+
+      } else {
+        appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
+
+        // We ignore the 'createWriterFunc' and 'rowSchema' even if is 
provided, since we do not
+        // write row data itself
+        appenderBuilder.createWriterFunc(
+            (schema, parquetSchema) ->
+                new PositionDeleteStructWriter<T>(
+                    (StructWriter<?>) GenericParquetWriter.create(schema, 
parquetSchema),
+                    Function.identity()));
+      }
+
+      appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
+
+      return new PositionDeleteWriter<>(
+          appenderBuilder.build(), FileFormat.PARQUET, location, spec, 
partition, keyMetadata);
+    }
+  }
+
+  private static class ParquetWriteBuilder<T>
+      extends ParquetWriter.Builder<T, ParquetWriteBuilder<T>> {
+    private Map<String, String> keyValueMetadata = Maps.newHashMap();
+    private Map<String, String> config = Maps.newHashMap();
+    private MessageType type;
+    private WriteSupport<T> writeSupport;
+
+    private ParquetWriteBuilder(org.apache.parquet.io.OutputFile path) {
+      super(path);
+    }
+
+    @Override
+    protected ParquetWriteBuilder<T> self() {
+      return this;
+    }
+
+    public ParquetWriteBuilder<T> setKeyValueMetadata(Map<String, String> 
keyValueMetadata) {
+      this.keyValueMetadata = keyValueMetadata;
+      return self();
+    }
+
+    public ParquetWriteBuilder<T> setConfig(Map<String, String> config) {
+      this.config = config;
+      return self();
+    }
+
+    public ParquetWriteBuilder<T> setType(MessageType type) {
+      this.type = type;
+      return self();
+    }
+
+    public ParquetWriteBuilder<T> setWriteSupport(WriteSupport<T> 
writeSupport) {
+      this.writeSupport = writeSupport;
+      return self();
+    }
+
+    @Override
+    protected WriteSupport<T> getWriteSupport(Configuration configuration) {
+      for (Map.Entry<String, String> entry : config.entrySet()) {
+        configuration.set(entry.getKey(), entry.getValue());
+      }
+      return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport);
+    }
+  }
+
+  public static ReadBuilder read(InputFile file) {
+    if (file instanceof NativeEncryptionInputFile) {
+      NativeEncryptionInputFile nativeFile = (NativeEncryptionInputFile) file;
+      return new ReadBuilder(nativeFile.encryptedInputFile())
+          .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
+          .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
+    } else {
+      return new ReadBuilder(file);
+    }
+  }
+
+  public static class ReadBuilder implements InternalData.ReadBuilder {
+    private final InputFile file;
+    private final Map<String, String> properties = Maps.newHashMap();
+    private Long start = null;
+    private Long length = null;
+    private Schema schema = null;
+    private Expression filter = null;
+    private ReadSupport<?> readSupport = null;
+    private Function<MessageType, VectorizedReader<?>> batchedReaderFunc = 
null;
+    private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
+    private BiFunction<Schema, MessageType, ParquetValueReader<?>> 
readerFuncWithSchema = null;
+    private boolean filterRecords = true;
+    private boolean caseSensitive = true;
+    private boolean callInit = false;
+    private boolean reuseContainers = false;
+    private int maxRecordsPerBatch = 10000;
+    private NameMapping nameMapping = null;
+    private ByteBuffer fileEncryptionKey = null;
+    private ByteBuffer fileAADPrefix = null;
+
+    private ReadBuilder(InputFile file) {
+      this.file = file;
+    }
+
+    /**
+     * Restricts the read to the given range: [start, start + length).
+     *
+     * @param newStart the start position for this read
+     * @param newLength the length of the range this read should scan
+     * @return this builder for method chaining
+     */
+    @Override
+    public ReadBuilder split(long newStart, long newLength) {
+      this.start = newStart;
+      this.length = newLength;
+      return this;
+    }
+
+    @Override
+    public ReadBuilder project(Schema newSchema) {
+      this.schema = newSchema;
+      return this;
+    }
+
+    public ReadBuilder caseInsensitive() {
+      return caseSensitive(false);
+    }
+
+    public ReadBuilder caseSensitive(boolean newCaseSensitive) {
+      this.caseSensitive = newCaseSensitive;
+      return this;
+    }
+
+    public ReadBuilder filterRecords(boolean newFilterRecords) {
+      this.filterRecords = newFilterRecords;
+      return this;
+    }
+
+    public ReadBuilder filter(Expression newFilter) {
+      this.filter = newFilter;
+      return this;
+    }
+
+    /**
+     * @deprecated will be removed in 2.0.0; use {@link 
#createReaderFunc(Function)} instead
+     */
+    @Deprecated
+    public ReadBuilder readSupport(ReadSupport<?> newFilterSupport) {
+      this.readSupport = newFilterSupport;
+      return this;
+    }
+
+    public ReadBuilder createReaderFunc(
+        Function<MessageType, ParquetValueReader<?>> newReaderFunction) {
+      Preconditions.checkArgument(
+          this.batchedReaderFunc == null,
+          "Cannot set reader function: batched reader function already set");
+      Preconditions.checkArgument(
+          this.readerFuncWithSchema == null,
+          "Cannot set reader function: 2-argument reader function already 
set");
+      this.readerFunc = newReaderFunction;
+      return this;
+    }
+
+    public ReadBuilder createReaderFunc(
+        BiFunction<Schema, MessageType, ParquetValueReader<?>> 
newReaderFunction) {
+      Preconditions.checkArgument(
+          this.readerFunc == null,
+          "Cannot set 2-argument reader function: reader function already 
set");
+      Preconditions.checkArgument(
+          this.batchedReaderFunc == null,
+          "Cannot set 2-argument reader function: batched reader function 
already set");
+      this.readerFuncWithSchema = newReaderFunction;
+      return this;
+    }
+
+    public ReadBuilder createBatchedReaderFunc(Function<MessageType, 
VectorizedReader<?>> func) {
+      Preconditions.checkArgument(
+          this.readerFunc == null,
+          "Cannot set batched reader function: reader function already set");
+      Preconditions.checkArgument(
+          this.readerFuncWithSchema == null,
+          "Cannot set batched reader function: 2-argument reader function 
already set");
+      this.batchedReaderFunc = func;
+      return this;
+    }
+
+    public ReadBuilder set(String key, String value) {
+      properties.put(key, value);
+      return this;
+    }
+
+    /**
+     * @deprecated will be removed in 2.0.0; use {@link 
#createReaderFunc(Function)} instead
+     */
+    @Deprecated
+    public ReadBuilder callInit() {
+      this.callInit = true;
+      return this;
+    }
+
+    @Override
+    public ReadBuilder reuseContainers() {
+      this.reuseContainers = true;
+      return this;
+    }
+
+    public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
+      this.maxRecordsPerBatch = numRowsPerBatch;
+      return this;
+    }
+
+    public ReadBuilder withNameMapping(NameMapping newNameMapping) {
+      this.nameMapping = newNameMapping;
+      return this;
+    }
+
+    @Override
+    public ReadBuilder setRootType(Class<? extends StructLike> rootClass) {
+      throw new UnsupportedOperationException("Custom types are not yet 
supported");
+    }
+
+    @Override
+    public ReadBuilder setCustomType(int fieldId, Class<? extends StructLike> 
structClass) {
+      throw new UnsupportedOperationException("Custom types are not yet 
supported");
+    }
+
+    public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
+      this.fileEncryptionKey = encryptionKey;
+      return this;
+    }
+
+    public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
+      this.fileAADPrefix = aadPrefix;
+      return this;
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
+    public <D> CloseableIterable<D> build() {
+      FileDecryptionProperties fileDecryptionProperties = null;
+      if (fileEncryptionKey != null) {
+        byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
+        byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
+        fileDecryptionProperties =
+            FileDecryptionProperties.builder()
+                .withFooterKey(encryptionKeyArray)
+                .withAADPrefix(aadPrefixArray)
+                .build();
+      } else {
+        Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with 
null encryption key");
+      }
+
+      if (readerFunc != null || readerFuncWithSchema != null || 
batchedReaderFunc != null) {
+        ParquetReadOptions.Builder optionsBuilder;
+        if (file instanceof HadoopInputFile) {
+          // remove read properties already set that may conflict with this 
read
+          Configuration conf = new Configuration(((HadoopInputFile) 
file).getConf());
+          for (String property : READ_PROPERTIES_TO_REMOVE) {
+            conf.unset(property);
+          }
+          optionsBuilder = HadoopReadOptions.builder(conf);
+        } else {
+          optionsBuilder = ParquetReadOptions.builder(new 
PlainParquetConfiguration());
+        }
+
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+          optionsBuilder.set(entry.getKey(), entry.getValue());
+        }
+
+        if (start != null) {
+          optionsBuilder.withRange(start, start + length);
+        }
+
+        if (fileDecryptionProperties != null) {
+          optionsBuilder.withDecryption(fileDecryptionProperties);
+        }
+
+        ParquetReadOptions options = optionsBuilder.build();
+
+        NameMapping mapping;
+        if (nameMapping != null) {
+          mapping = nameMapping;
+        } else if 
(SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) {
+          mapping = null;
+        } else {
+          mapping = NameMapping.empty();
+        }
+
+        if (batchedReaderFunc != null) {
+          return new VectorizedParquetReader<>(
+              file,
+              schema,
+              options,
+              batchedReaderFunc,
+              mapping,
+              filter,
+              reuseContainers,
+              caseSensitive,
+              maxRecordsPerBatch);
+        } else {
+          Function<MessageType, ParquetValueReader<?>> readBuilder =
+              readerFuncWithSchema != null ?
+                  fileType -> readerFuncWithSchema.apply(schema, fileType) :
+                  readerFunc;
+          return new org.apache.iceberg.parquet.ParquetReader<>(
+              file, schema, options, readBuilder, mapping, filter, 
reuseContainers, caseSensitive);
+        }
+      }
+
+      ParquetReadBuilder<D> builder = new 
ParquetReadBuilder<>(ParquetIO.file(file));
+
+      builder.project(schema);
+
+      if (readSupport != null) {
+        builder.readSupport((ReadSupport<D>) readSupport);
+      } else {
+        builder.readSupport(new AvroReadSupport<>(ParquetAvro.DEFAULT_MODEL));
+      }
+
+      // default options for readers
+      builder
+          .set("parquet.strict.typing", "false") // allow type promotion
+          .set("parquet.avro.compatible", "false") // use the new RecordReader 
with Utf8 support
+          .set(
+              "parquet.avro.add-list-element-records",
+              "false"); // assume that lists use a 3-level schema
+
+      for (Map.Entry<String, String> entry : properties.entrySet()) {
+        builder.set(entry.getKey(), entry.getValue());
+      }
+
+      if (filter != null) {
+        // TODO: should not need to get the schema to push down before opening 
the file.
+        // Parquet should allow setting a filter inside its read support
+        ParquetReadOptions decryptOptions =
+            ParquetReadOptions.builder(new PlainParquetConfiguration())
+                .withDecryption(fileDecryptionProperties)
+                .build();
+        MessageType type;
+        try (ParquetFileReader schemaReader =
+                 ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) 
{
+          type = schemaReader.getFileMetaData().getSchema();
+        } catch (IOException e) {
+          throw new RuntimeIOException(e);
+        }
+        Schema fileSchema = ParquetSchemaUtil.convert(type);
+        builder
+            .useStatsFilter()
+            .useDictionaryFilter()
+            .useRecordFilter(filterRecords)
+            .useBloomFilter()
+            .withFilter(ParquetFilters.convert(fileSchema, filter, 
caseSensitive));
+      } else {
+        // turn off filtering
+        builder
+            .useStatsFilter(false)
+            .useDictionaryFilter(false)
+            .useBloomFilter(false)
+            .useRecordFilter(false);
+      }
+
+      if (callInit) {
+        builder.callInit();
+      }
+
+      if (start != null) {
+        builder.withFileRange(start, start + length);
+      }
+
+      if (nameMapping != null) {
+        builder.withNameMapping(nameMapping);
+      }
+
+      if (fileDecryptionProperties != null) {
+        builder.withDecryption(fileDecryptionProperties);
+      }
+
+      return new ParquetIterable<>(builder);
+    }
+  }
+
+  private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
+    private Schema schema = null;
+    private ReadSupport<T> readSupport = null;
+    private boolean callInit = false;
+    private NameMapping nameMapping = null;
+
+    private ParquetReadBuilder(org.apache.parquet.io.InputFile file) {
+      super(file);
+    }
+
+    public ParquetReadBuilder<T> project(Schema newSchema) {
+      this.schema = newSchema;
+      return this;
+    }
+
+    public ParquetReadBuilder<T> withNameMapping(NameMapping newNameMapping) {
+      this.nameMapping = newNameMapping;
+      return this;
+    }
+
+    public ParquetReadBuilder<T> readSupport(ReadSupport<T> newReadSupport) {
+      this.readSupport = newReadSupport;
+      return this;
+    }
+
+    public ParquetReadBuilder<T> callInit() {
+      this.callInit = true;
+      return this;
+    }
+
+    @Override
+    protected ReadSupport<T> getReadSupport() {
+      return new ParquetReadSupport<>(schema, readSupport, callInit, 
nameMapping);
+    }
+  }
+
+  /**
+   * Combines several files into one
+   *
+   * @param inputFiles an {@link Iterable} of parquet files. The order of 
iteration determines the
+   *     order in which content of files are read and written to the {@code 
outputFile}
+   * @param outputFile the output parquet file containing all the data from 
{@code inputFiles}
+   * @param rowGroupSize the row group size to use when writing the {@code 
outputFile}
+   * @param schema the schema of the data
+   * @param metadata extraMetadata to write at the footer of the {@code 
outputFile}
+   */
+  public static void concat(
+      Iterable<File> inputFiles,
+      File outputFile,
+      int rowGroupSize,
+      Schema schema,
+      Map<String, String> metadata)
+      throws IOException {
+    OutputFile file = Files.localOutput(outputFile);
+    try (ParquetFileWriter writer =
+        new ParquetFileWriter(
+            ParquetIO.file(file),
+            ParquetSchemaUtil.convert(schema, "table"),
+            ParquetFileWriter.Mode.CREATE,
+            rowGroupSize,
+            0)) {
+      writer.start();
+      for (File inputFile : inputFiles) {
+        writer.appendFile(ParquetIO.file(Files.localInput(inputFile)));
+      }
+      writer.end(metadata);
+    }
+  }
+}
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
new file mode 100644
index 00000000000..7d0f0feec8c
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.Variant;
+
+public class VariantUtil {
+
+  private VariantUtil() {
+  }
+
+  /**
+   * Create a VariantShreddingFunction if variant shredding is enabled and the 
schema has variant columns.
+   *
+   * @param schema        The Iceberg schema
+   * @param sampleRecord  A sample record to infer variant schemas from actual 
data (can be null)
+   * @param properties    Table properties to check if variant shredding is 
enabled
+   * @return An Optional containing the VariantShreddingFunction if applicable
+   */
+  public static Optional<VariantShreddingFunction> variantShreddingFunc(
+      Schema schema,
+      Supplier<Record> sampleRecord,
+      Map<String, String> properties) {
+
+    // Preconditions: must have variant columns + property enabled
+    if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) {
+      return Optional.empty();
+    }
+
+    VariantShreddingFunction fn =
+        constructVariantShreddingFunc(sampleRecord.get(), schema);
+
+    return Optional.of(fn);
+  }
+
+  private static VariantShreddingFunction constructVariantShreddingFunc(
+      Record sampleRecord, Schema schema) {
+
+    return (id, name) -> {
+      // Validate the field exists and is a variant type
+      Types.NestedField field = schema.findField(id);
+
+      if (field == null || !(field.type() instanceof Types.VariantType)) {
+        return null; // Not a variant field, no shredding
+      }
+
+      // If we have a sample record, try to generate schema from actual data
+      if (sampleRecord != null) {
+        try {
+          Object variantValue = sampleRecord.getField(name);
+          if (variantValue instanceof Variant variant) {
+            // Use ParquetVariantUtil to generate schema from actual variant 
value
+            return ParquetVariantUtil.toParquetSchema(variant.value());
+          }
+        } catch (Exception e) {
+          // Fall through to default schema
+        }
+      }
+      return null;
+    };
+  }
+
+  /**
+   * Check if the schema contains any variant columns.
+   */
+  private static boolean hasVariantColumns(Schema schema) {
+    return schema.columns().stream()
+        .anyMatch(field -> field.type() instanceof Types.VariantType);
+  }
+
+  /**
+   * Check if variant shredding is enabled via table properties.
+   */
+  private static boolean isVariantShreddingEnabled(Map<String, String> 
properties) {
+    String shreddingEnabled = 
properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED);
+    return Boolean.parseBoolean(shreddingEnabled);
+  }
+
+}
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
index 8a1cf740067..990d6e782fd 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
@@ -22,6 +22,9 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -32,11 +35,16 @@
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assume.assumeTrue;
 
 /**
@@ -187,6 +195,68 @@ public void testSpecialCharacters() {
     Assert.assertArrayEquals(new Object[]{"star", 2L}, result.get(1));
   }
 
+  @Test
+  public void testVariantSelectProjection() throws IOException {
+    assumeTrue(fileFormat == FileFormat.PARQUET);
+    assumeTrue(!isVectorized);
+
+    TableIdentifier table = TableIdentifier.of("default", 
"variant_projection");
+    shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+
+    shell.executeStatement(
+        String.format(
+            "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG 
STORED AS %s %s %s",
+            table,
+            fileFormat,
+            testTables.locationForCreateTableSQL(table),
+            testTables.propertiesForCreateTableSQL(
+                ImmutableMap.of("format-version", "3", 
"variant.shredding.enabled", "true"))));
+
+    shell.executeStatement(
+        String.format(
+            "INSERT INTO %s VALUES " +
+                "(1, parse_json('{\"name\":\"Alice\",\"age\":30}'))," +
+                "(2, parse_json('{\"name\":\"Bob\"}'))",
+            table));
+
+    List<Object[]> rows =
+        shell.executeStatement(
+            String.format(
+                "SELECT id, " +
+                    "variant_get(payload, '$.name') AS name, " +
+                    "try_variant_get(payload, '$.age', 'int') AS age " +
+                    "FROM %s ORDER BY id",
+                table));
+
+    Assert.assertEquals(2, rows.size());
+    Assert.assertEquals(1, ((Number) rows.get(0)[0]).intValue());
+    Assert.assertEquals("Alice", rows.get(0)[1]);
+    Assert.assertEquals(30, ((Number) rows.get(0)[2]).intValue());
+
+    Assert.assertEquals(2, ((Number) rows.get(1)[0]).intValue());
+    Assert.assertEquals("Bob", rows.get(1)[1]);
+    Assert.assertNull(rows.get(1)[2]);
+
+    Table icebergTable = testTables.loadTable(table);
+    Types.NestedField variantField = 
icebergTable.schema().findField("payload");
+    Assert.assertNotNull("Variant column should exist", variantField);
+    DataFile dataFile =
+        StreamSupport.stream(
+                
icebergTable.currentSnapshot().addedDataFiles(icebergTable.io()).spliterator(), 
false)
+            .findFirst()
+            .orElseThrow(() -> new IllegalStateException("No data files 
written for test table"));
+
+    Path parquetPath = new Path(dataFile.path().toString());
+    try (ParquetFileReader reader =
+        ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath, 
shell.getHiveConf()))) {
+      MessageType parquetSchema = 
reader.getFooter().getFileMetaData().getSchema();
+      GroupType variantType = 
parquetSchema.getType(variantField.name()).asGroupType();
+      assertThat(variantType.containsField("typed_value")).isTrue();
+    }
+
+    shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+  }
+
   @Test
   public void testScanTableCaseInsensitive() throws IOException {
     testTables.createTable(shell, "customers",
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q 
b/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q
new file mode 100644
index 00000000000..25d84dd0c0a
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q
@@ -0,0 +1,39 @@
+-- Mask random uuid
+--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/
+-- Mask random snapshot id
+--! qt:replace:/('current-snapshot-id'=')\d+/$1#SnapshotId#/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/('current-snapshot-timestamp-ms'=')\d+/$1#Masked#/
+
+-- SORT_QUERY_RESULTS
+set hive.explain.user=false;
+set hive.fetch.task.conversion=none;
+
+drop table if exists tbl_shredded_variant;
+
+-- Create test table
+CREATE EXTERNAL TABLE tbl_shredded_variant (
+    id INT,
+    data VARIANT
+) STORED BY ICEBERG
+tblproperties(
+    'format-version'='3',
+    'variant.shredding.enabled'='true'
+);
+
+-- Insert JSON structures
+INSERT INTO tbl_shredded_variant VALUES
+(1, parse_json('{"name": "John", "age": 30, "active": true}')),
+(2, parse_json('{"name": "Bill", "active": false}')),
+(3, parse_json('{"name": "Henry", "age": 20}'));
+
+-- Disable vectorized execution until Variant type is supported
+set hive.vectorized.execution.enabled=false;
+
+-- Retrieve and verify
+SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25;
+
+EXPLAIN
+SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25;
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out
new file mode 100644
index 00000000000..b51bc749525
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out
@@ -0,0 +1,101 @@
+PREHOOK: query: drop table if exists tbl_shredded_variant
+PREHOOK: type: DROPTABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: drop table if exists tbl_shredded_variant
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: database:default
+PREHOOK: query: CREATE EXTERNAL TABLE tbl_shredded_variant (
+    id INT,
+    data VARIANT
+) STORED BY ICEBERG
+tblproperties(
+    'format-version'='3',
+    'variant.shredding.enabled'='true'
+)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_shredded_variant
+POSTHOOK: query: CREATE EXTERNAL TABLE tbl_shredded_variant (
+    id INT,
+    data VARIANT
+) STORED BY ICEBERG
+tblproperties(
+    'format-version'='3',
+    'variant.shredding.enabled'='true'
+)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_shredded_variant
+PREHOOK: query: INSERT INTO tbl_shredded_variant VALUES
+(1, parse_json('{"name": "John", "age": 30, "active": true}')),
+(2, parse_json('{"name": "Bill", "active": false}')),
+(3, parse_json('{"name": "Henry", "age": 20}'))
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_shredded_variant
+POSTHOOK: query: INSERT INTO tbl_shredded_variant VALUES
+(1, parse_json('{"name": "John", "age": 30, "active": true}')),
+(2, parse_json('{"name": "Bill", "active": false}')),
+(3, parse_json('{"name": "Henry", "age": 20}'))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_shredded_variant
+PREHOOK: query: SELECT id, try_variant_get(data, '$.name') FROM 
tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_shredded_variant
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT id, try_variant_get(data, '$.name') FROM 
tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_shredded_variant
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1      John
+PREHOOK: query: EXPLAIN
+SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_shredded_variant
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: EXPLAIN
+SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_shredded_variant
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: tbl_shredded_variant
+                  filterExpr: (UDFToDouble(variant_get(data, '$.age')) > 
25.0D) (type: boolean)
+                  Statistics: Num rows: 3 Data size: 1020 Basic stats: 
COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: (UDFToDouble(variant_get(data, '$.age')) > 
25.0D) (type: boolean)
+                    Statistics: Num rows: 1 Data size: 340 Basic stats: 
COMPLETE Column stats: NONE
+                    Select Operator
+                      expressions: id (type: int), try_variant_get(data, 
'$.name') (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 1 Data size: 340 Basic stats: 
COMPLETE Column stats: NONE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 1 Data size: 340 Basic stats: 
COMPLETE Column stats: NONE
+                        table:
+                            input format: 
org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: 
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
diff --git a/iceberg/iceberg-shading/pom.xml b/iceberg/iceberg-shading/pom.xml
index 17d50dd91d6..ac4bf4a9b10 100644
--- a/iceberg/iceberg-shading/pom.xml
+++ b/iceberg/iceberg-shading/pom.xml
@@ -117,6 +117,7 @@
                     </excludes>
                   </artifactSet>
                   <filters>
+                    <!-- Global exclusions -->
                     <filter>
                       <artifact>*:*</artifact>
                       <excludes>
@@ -127,6 +128,13 @@
                         <exclude>static/</exclude>
                       </excludes>
                     </filter>
+                    <!-- Exclude org.apache.iceberg.parquet.Parquet.class from 
iceberg-parquet -->
+                    <filter>
+                      <artifact>org.apache.iceberg:iceberg-parquet</artifact>
+                      <excludes>
+                        
<exclude>org/apache/iceberg/parquet/Parquet.class</exclude>
+                      </excludes>
+                    </filter>
                   </filters>
                 </configuration>
               </execution>
diff --git 
a/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java 
b/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java
index 88e9240140d..3c92879ba0c 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java
@@ -301,7 +301,7 @@ public int getWritePos() {
   // - when `allowDuplicateKeys` is true, the field with the greatest offset 
value (the last
   // appended one) is kept.
   // - otherwise, throw an exception.
-  public void finishWritingObject(int start, ArrayList<FieldEntry> fields) {
+  public void finishWritingObject(int start, List<FieldEntry> fields) {
     int size = fields.size();
     Collections.sort(fields);
     int maxId = size == 0 ? 0 : fields.getFirst().id;

Reply via email to