This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 78a620eea2c HIVE-29287: Iceberg: [V3] Variant Shredding support (#6152)
78a620eea2c is described below
commit 78a620eea2c69e1ddf5175f91be7c64999c063e6
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Fri Dec 12 19:06:16 2025 +0100
HIVE-29287: Iceberg: [V3] Variant Shredding support (#6152)
---
.../org/apache/iceberg/mr/InputFormatConfig.java | 1 +
.../IcebergVariantObjectInspector.java | 13 +-
.../mr/hive/writer/HiveFileWriterFactory.java | 20 +
.../writer/HiveIcebergCopyOnWriteRecordWriter.java | 15 +
.../mr/hive/writer/HiveIcebergRecordWriter.java | 9 +-
.../java/org/apache/iceberg/parquet/Parquet.java | 1524 ++++++++++++++++++++
.../org/apache/iceberg/parquet/VariantUtil.java | 103 ++
.../iceberg/mr/hive/TestHiveIcebergSelects.java | 70 +
.../test/queries/positive/variant_type_shredding.q | 39 +
.../results/positive/variant_type_shredding.q.out | 101 ++
iceberg/iceberg-shading/pom.xml | 8 +
.../hadoop/hive/serde2/variant/VariantBuilder.java | 2 +-
12 files changed, 1897 insertions(+), 8 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 2ee6627789b..a7ba33624b3 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -54,6 +54,7 @@ private InputFormatConfig() {
public static final String TABLE_CATALOG_PREFIX =
"iceberg.mr.table.catalog.";
public static final String LOCALITY = "iceberg.mr.locality";
public static final String WRITE_FANOUT_ENABLED = "write.fanout.enabled";
+ public static final String VARIANT_SHREDDING_ENABLED =
"variant.shredding.enabled";
public static final String CTAS_TABLE_NAME = "iceberg.mr.ctas.table.name";
public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java
index 192d4b25bd0..8afae4c4895 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/serde/objectinspector/IcebergVariantObjectInspector.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.mr.hive.serde.objectinspector;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.List;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
@@ -61,11 +62,13 @@ public Object getStructFieldData(Object data, StructField
fieldRef) {
switch (field.getFieldID()) {
case 0: // "metadata" field (binary)
- ByteBuffer metadata =
ByteBuffer.allocate(variant.metadata().sizeInBytes());
+ ByteBuffer metadata =
ByteBuffer.allocate(variant.metadata().sizeInBytes())
+ .order(ByteOrder.LITTLE_ENDIAN);
variant.metadata().writeTo(metadata, 0);
return metadata.array();
case 1: // "value" field (binary)
- ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
+ ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
+ .order(ByteOrder.LITTLE_ENDIAN);
variant.value().writeTo(value, 0);
return value.array();
default:
@@ -79,10 +82,12 @@ public List<Object> getStructFieldsDataAsList(Object data) {
return null;
}
Variant variant = (Variant) data;
- ByteBuffer metadata =
ByteBuffer.allocate(variant.metadata().sizeInBytes());
+ ByteBuffer metadata = ByteBuffer.allocate(variant.metadata().sizeInBytes())
+ .order(ByteOrder.LITTLE_ENDIAN);
variant.metadata().writeTo(metadata, 0);
- ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes());
+ ByteBuffer value = ByteBuffer.allocate(variant.value().sizeInBytes())
+ .order(ByteOrder.LITTLE_ENDIAN);
variant.value().writeTo(value, 0);
// Return the data for our fields in the correct order: metadata, value
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
index 7f106f25480..6489c78f9ed 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
@@ -19,6 +19,8 @@
package org.apache.iceberg.mr.hive.writer;
+import java.util.Map;
+import java.util.function.Supplier;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
@@ -31,9 +33,13 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.VariantUtil;
class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
+ private final Map<String, String> properties;
+ private Supplier<Record> sampleRecord = null;
+
HiveFileWriterFactory(
Table table,
FileFormat dataFileFormat,
@@ -54,6 +60,7 @@ class HiveFileWriterFactory extends
BaseFileWriterFactory<Record> {
equalityDeleteRowSchema,
equalityDeleteSortOrder,
positionDeleteRowSchema);
+ properties = table.properties();
}
static Builder builderFor(Table table) {
@@ -78,6 +85,9 @@ protected void
configurePositionDelete(Avro.DeleteWriteBuilder builder) {
@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
+ // Configure variant shredding function if conditions are met:
+ VariantUtil.variantShreddingFunc(dataSchema(), sampleRecord, properties)
+ .ifPresent(builder::variantShreddingFunc);
}
@Override
@@ -149,4 +159,14 @@ HiveFileWriterFactory build() {
positionDeleteRowSchema);
}
}
+
+ /**
+ * Set a sample record to use for data-driven variant shredding schema
generation.
+ * Should be called before the Parquet writer is created.
+ */
+ public void initialize(Supplier<Record> record) {
+ if (sampleRecord == null) {
+ sampleRecord = record;
+ }
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
index f3c04c279e7..e8d5ed2a8e4 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
@@ -28,6 +29,7 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.DataWriteResult;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.hive.FilesForCommit;
@@ -35,14 +37,19 @@
import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {
private final int currentSpecId;
+ private final Set<String> missingColumns;
+ private final List<Types.NestedField> missingOrStructFields;
private final GenericRecord rowDataTemplate;
private final List<DataFile> replacedDataFiles;
+ private final HiveFileWriterFactory fileWriterFactory;
+
HiveIcebergCopyOnWriteRecordWriter(Table table, HiveFileWriterFactory
writerFactory,
OutputFileFactory deleteFileFactory, Context context) {
super(table, newDataWriter(table, writerFactory, deleteFileFactory,
context));
@@ -50,6 +57,12 @@ class HiveIcebergCopyOnWriteRecordWriter extends
HiveIcebergWriterBase {
this.currentSpecId = table.spec().specId();
this.rowDataTemplate = GenericRecord.create(table.schema());
this.replacedDataFiles = Lists.newArrayList();
+
+ this.missingColumns = context.missingColumns();
+ this.missingOrStructFields =
specs.get(currentSpecId).schema().asStruct().fields().stream()
+ .filter(field -> missingColumns.contains(field.name()) ||
field.type().isStructType())
+ .toList();
+ this.fileWriterFactory = writerFactory;
}
@Override
@@ -69,6 +82,8 @@ public void write(Writable row) throws IOException {
.build();
replacedDataFiles.add(dataFile);
} else {
+ HiveSchemaUtil.setDefaultValues(rowData, missingOrStructFields,
missingColumns);
+ fileWriterFactory.initialize(() -> rowData);
writer.write(rowData, specs.get(currentSpecId), partition(rowData,
currentSpecId));
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
index 9adfd15dc20..ca9d232e3d5 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
@@ -40,6 +40,8 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
private final Set<String> missingColumns;
private final List<Types.NestedField> missingOrStructFields;
+ private final HiveFileWriterFactory fileWriterFactory;
+
HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
OutputFileFactory dataFileFactory, Context context) {
super(table, newDataWriter(table, fileWriterFactory, dataFileFactory,
context));
@@ -47,18 +49,19 @@ class HiveIcebergRecordWriter extends HiveIcebergWriterBase
{
this.currentSpecId = table.spec().specId();
this.missingColumns = context.missingColumns();
this.missingOrStructFields =
specs.get(currentSpecId).schema().asStruct().fields().stream()
- .filter(field -> missingColumns.contains(field.name()) ||
field.type().isStructType()).toList();
+ .filter(field -> missingColumns.contains(field.name()) ||
field.type().isStructType())
+ .toList();
+ this.fileWriterFactory = fileWriterFactory;
}
@Override
public void write(Writable row) throws IOException {
Record record = ((Container<Record>) row).get();
HiveSchemaUtil.setDefaultValues(record, missingOrStructFields,
missingColumns);
-
+ fileWriterFactory.initialize(() -> record);
writer.write(record, specs.get(currentSpecId), partition(record,
currentSpecId));
}
-
@Override
public FilesForCommit files() {
List<DataFile> dataFiles = ((DataWriteResult) writer.result()).dataFiles();
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
new file mode 100644
index 00000000000..56e3f3480c7
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -0,0 +1,1524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.InternalData;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.SystemConfigs;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.deletes.PositionDeleteWriter;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionKeyMetadata;
+import org.apache.iceberg.encryption.NativeEncryptionInputFile;
+import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.hadoop.HadoopOutputFile;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import
org.apache.iceberg.parquet.ParquetValueWriters.PositionDeleteStructWriter;
+import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroWriteSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.conf.PlainParquetConfiguration;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type.ID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
+import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
+import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT;
+import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
+import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
+import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX;
+import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
+import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
+import static
org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
+import static
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT;
+import static
org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT;
+import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
+import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT;
+import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
+import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
+// TODO: remove class once upgraded to Iceberg v1.11.0
(https://github.com/apache/iceberg/pull/14153)
+
+public class Parquet {
+ private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);
+
+ private Parquet() {
+ }
+
+ private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
+ Sets.newHashSet(
+ "parquet.read.filter",
+ "parquet.private.read.filter.predicate",
+ "parquet.read.support.class",
+ "parquet.crypto.factory.class");
+
+ public static WriteBuilder write(OutputFile file) {
+ if (file instanceof EncryptedOutputFile) {
+ return write((EncryptedOutputFile) file);
+ }
+
+ return new WriteBuilder(file);
+ }
+
+ public static WriteBuilder write(EncryptedOutputFile file) {
+ if (file instanceof NativeEncryptionOutputFile) {
+ NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile)
file;
+ return write(nativeFile.plainOutputFile())
+ .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
+ .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
+ } else {
+ return write(file.encryptingOutputFile());
+ }
+ }
+
+ public static class WriteBuilder implements InternalData.WriteBuilder {
+ private final OutputFile file;
+ private final Configuration conf;
+ private final Map<String, String> metadata = Maps.newLinkedHashMap();
+ private final Map<String, String> config = Maps.newLinkedHashMap();
+ private Schema schema = null;
+ private VariantShreddingFunction variantShreddingFunc = null;
+ private String name = "table";
+ private WriteSupport<?> writeSupport = null;
+ private BiFunction<Schema, MessageType, ParquetValueWriter<?>>
createWriterFunc = null;
+ private MetricsConfig metricsConfig = MetricsConfig.getDefault();
+ private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
+ private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
+ private Function<Map<String, String>, Context> createContextFunc =
Context::dataContext;
+ private ByteBuffer fileEncryptionKey = null;
+ private ByteBuffer fileAADPrefix = null;
+
+ private WriteBuilder(OutputFile file) {
+ this.file = file;
+ if (file instanceof HadoopOutputFile) {
+ this.conf = new Configuration(((HadoopOutputFile) file).getConf());
+ } else {
+ this.conf = new Configuration();
+ }
+ }
+
+ public WriteBuilder forTable(Table table) {
+ schema(table.schema());
+ setAll(table.properties());
+ metricsConfig(MetricsConfig.forTable(table));
+ return this;
+ }
+
+ @Override
+ public WriteBuilder schema(Schema newSchema) {
+ this.schema = newSchema;
+ return this;
+ }
+
+ /**
+ * Set a {@link VariantShreddingFunction} that is called with each variant
field's name and
+ * field ID to produce the shredding type as a {@code typed_value} field.
This field is added to
+ * the result variant struct alongside the {@code metadata} and {@code
value} fields.
+ *
+ * @param func {@link VariantShreddingFunction} that produces a shredded
{@code typed_value}
+ * @return this for method chaining
+ */
+ public WriteBuilder variantShreddingFunc(VariantShreddingFunction func) {
+ this.variantShreddingFunc = func;
+ return this;
+ }
+
+ @Override
+ public WriteBuilder named(String newName) {
+ this.name = newName;
+ return this;
+ }
+
+ public WriteBuilder writeSupport(WriteSupport<?> newWriteSupport) {
+ this.writeSupport = newWriteSupport;
+ return this;
+ }
+
+ @Override
+ public WriteBuilder set(String property, String value) {
+ config.put(property, value);
+ return this;
+ }
+
+ public WriteBuilder setAll(Map<String, String> properties) {
+ config.putAll(properties);
+ return this;
+ }
+
+ @Override
+ public WriteBuilder meta(String property, String value) {
+ metadata.put(property, value);
+ return this;
+ }
+
+ public WriteBuilder createWriterFunc(
+ Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
+ if (newCreateWriterFunc != null) {
+ this.createWriterFunc = (icebergSchema, type) ->
newCreateWriterFunc.apply(type);
+ }
+ return this;
+ }
+
+ public WriteBuilder createWriterFunc(
+ BiFunction<Schema, MessageType, ParquetValueWriter<?>>
newCreateWriterFunc) {
+ this.createWriterFunc = newCreateWriterFunc;
+ return this;
+ }
+
+ public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+ this.metricsConfig = newMetricsConfig;
+ return this;
+ }
+
+ @Override
+ public WriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public WriteBuilder overwrite(boolean enabled) {
+ this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE :
ParquetFileWriter.Mode.CREATE;
+ return this;
+ }
+
+ public WriteBuilder writerVersion(WriterVersion version) {
+ this.writerVersion = version;
+ return this;
+ }
+
+ public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
+ this.fileEncryptionKey = encryptionKey;
+ return this;
+ }
+
+ public WriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
+ this.fileAADPrefix = aadPrefix;
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> WriteSupport<T> getWriteSupport(MessageType type) {
+ if (writeSupport != null) {
+ return (WriteSupport<T>) writeSupport;
+ } else {
+ return new AvroWriteSupport<>(
+ type,
+ ParquetAvro.parquetAvroSchema(AvroSchemaUtil.convert(schema,
name)),
+ ParquetAvro.DEFAULT_MODEL);
+ }
+ }
+
+ /*
+ * Sets the writer version. Default value is PARQUET_1_0 (v1).
+ */
+ @VisibleForTesting
+ WriteBuilder withWriterVersion(WriterVersion version) {
+ this.writerVersion = version;
+ return this;
+ }
+
+ // supposed to always be a private method used strictly by data and delete
write builders
+ private WriteBuilder createContextFunc(
+ Function<Map<String, String>, Context> newCreateContextFunc) {
+ this.createContextFunc = newCreateContextFunc;
+ return this;
+ }
+
+ private void setBloomFilterConfig(
+ Context context,
+ Map<String, String> colNameToParquetPathMap,
+ BiConsumer<String, Boolean> withBloomFilterEnabled,
+ BiConsumer<String, Double> withBloomFilterFPP) {
+
+ context
+ .columnBloomFilterEnabled()
+ .forEach(
+ (colPath, isEnabled) -> {
+ String parquetColumnPath =
colNameToParquetPathMap.get(colPath);
+ if (parquetColumnPath == null) {
+ LOG.warn("Skipping bloom filter config for missing field:
{}", colPath);
+ return;
+ }
+
+ withBloomFilterEnabled.accept(parquetColumnPath,
Boolean.valueOf(isEnabled));
+ String fpp = context.columnBloomFilterFpp().get(colPath);
+ if (fpp != null) {
+ withBloomFilterFPP.accept(parquetColumnPath,
Double.parseDouble(fpp));
+ }
+ });
+ }
+
+ private void setColumnStatsConfig(
+ Context context,
+ Map<String, String> colNameToParquetPathMap,
+ BiConsumer<String, Boolean> withColumnStatsEnabled) {
+
+ context
+ .columnStatsEnabled()
+ .forEach(
+ (colPath, isEnabled) -> {
+ String parquetColumnPath =
colNameToParquetPathMap.get(colPath);
+ if (parquetColumnPath == null) {
+ LOG.warn("Skipping column statistics config for missing
field: {}", colPath);
+ return;
+ }
+ withColumnStatsEnabled.accept(parquetColumnPath,
Boolean.valueOf(isEnabled));
+ });
+ }
+
+ @Override
+ public <D> FileAppender<D> build() throws IOException {
+ Preconditions.checkNotNull(schema, "Schema is required");
+ Preconditions.checkNotNull(name, "Table name is required and cannot be
null");
+
+ // add the Iceberg schema to keyValueMetadata
+ meta("iceberg.schema", SchemaParser.toJson(schema));
+
+ // Map Iceberg properties to pass down to the Parquet writer
+ Context context = createContextFunc.apply(config);
+
+ int rowGroupSize = context.rowGroupSize();
+ int pageSize = context.pageSize();
+ int pageRowLimit = context.pageRowLimit();
+ int dictionaryPageSize = context.dictionaryPageSize();
+ String compressionLevel = context.compressionLevel();
+ CompressionCodecName codec = context.codec();
+ int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
+ int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
+ int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
+ boolean dictionaryEnabled = context.dictionaryEnabled();
+
+ if (compressionLevel != null) {
+ switch (codec) {
+ case GZIP:
+ config.put("zlib.compress.level", compressionLevel);
+ break;
+ case BROTLI:
+ config.put("compression.brotli.quality", compressionLevel);
+ break;
+ case ZSTD:
+ // keep "io.compression.codec.zstd.level" for backwards
compatibility
+ config.put("io.compression.codec.zstd.level", compressionLevel);
+ config.put("parquet.compression.codec.zstd.level",
compressionLevel);
+ break;
+ default:
+ // compression level is not supported; ignore it
+ }
+ }
+
+ set("parquet.avro.write-old-list-structure", "false");
+ MessageType type = ParquetSchemaUtil.convert(schema, name,
variantShreddingFunc);
+
+ FileEncryptionProperties fileEncryptionProperties = null;
+ if (fileEncryptionKey != null) {
+ byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
+ byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
+
+ fileEncryptionProperties =
+ FileEncryptionProperties.builder(encryptionKeyArray)
+ .withAADPrefix(aadPrefixArray)
+ .withoutAADPrefixStorage()
+ .build();
+ } else {
+ Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with
null encryption key");
+ }
+
+ Map<String, String> colNameToParquetPathMap =
+ type.getColumns().stream()
+ .filter(
+ col -> {
+ ID id = col.getPrimitiveType().getId();
+ return id != null && schema.findColumnName(id.intValue())
!= null;
+ })
+ .collect(
+ Collectors.toMap(
+ col ->
schema.findColumnName(col.getPrimitiveType().getId().intValue()),
+ col -> String.join(".", col.getPath())));
+
+ if (createWriterFunc != null) {
+ Preconditions.checkArgument(
+ writeSupport == null, "Cannot write with both write support and
Parquet value writer");
+
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+
+ ParquetProperties.Builder propsBuilder =
+ ParquetProperties.builder()
+ .withWriterVersion(writerVersion)
+ .withPageSize(pageSize)
+ .withPageRowCountLimit(pageRowLimit)
+ .withDictionaryEncoding(dictionaryEnabled)
+ .withDictionaryPageSize(dictionaryPageSize)
+ .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
+ .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
+ .withMaxBloomFilterBytes(bloomFilterMaxBytes);
+
+ setBloomFilterConfig(
+ context,
+ colNameToParquetPathMap,
+ propsBuilder::withBloomFilterEnabled,
+ propsBuilder::withBloomFilterFPP);
+
+ setColumnStatsConfig(context, colNameToParquetPathMap,
propsBuilder::withStatisticsEnabled);
+
+ ParquetProperties parquetProperties = propsBuilder.build();
+
+ return new org.apache.iceberg.parquet.ParquetWriter<>(
+ conf,
+ file,
+ schema,
+ type,
+ rowGroupSize,
+ metadata,
+ createWriterFunc,
+ codec,
+ parquetProperties,
+ metricsConfig,
+ writeMode,
+ fileEncryptionProperties);
+ } else {
+ ParquetWriteBuilder<D> parquetWriteBuilder =
+ new ParquetWriteBuilder<D>(ParquetIO.file(file))
+ .withWriterVersion(writerVersion)
+ .setType(type)
+ .setConfig(config)
+ .setKeyValueMetadata(metadata)
+ .setWriteSupport(getWriteSupport(type))
+ .withCompressionCodec(codec)
+ .withWriteMode(writeMode)
+ .withRowGroupSize(rowGroupSize)
+ .withPageSize(pageSize)
+ .withPageRowCountLimit(pageRowLimit)
+ .withDictionaryEncoding(dictionaryEnabled)
+ .withDictionaryPageSize(dictionaryPageSize)
+ .withEncryption(fileEncryptionProperties);
+
+ setBloomFilterConfig(
+ context,
+ colNameToParquetPathMap,
+ parquetWriteBuilder::withBloomFilterEnabled,
+ parquetWriteBuilder::withBloomFilterFPP);
+
+ setColumnStatsConfig(
+ context, colNameToParquetPathMap,
parquetWriteBuilder::withStatisticsEnabled);
+
+ return new ParquetWriteAdapter<>(parquetWriteBuilder.build(),
metricsConfig);
+ }
+ }
+
+ private static class Context {
+ private final int rowGroupSize;
+ private final int pageSize;
+ private final int pageRowLimit;
+ private final int dictionaryPageSize;
+ private final CompressionCodecName codec;
+ private final String compressionLevel;
+ private final int rowGroupCheckMinRecordCount;
+ private final int rowGroupCheckMaxRecordCount;
+ private final int bloomFilterMaxBytes;
+ private final Map<String, String> columnBloomFilterFpp;
+ private final Map<String, String> columnBloomFilterEnabled;
+ private final Map<String, String> columnStatsEnabled;
+ private final boolean dictionaryEnabled;
+
+ private Context(
+ int rowGroupSize,
+ int pageSize,
+ int pageRowLimit,
+ int dictionaryPageSize,
+ CompressionCodecName codec,
+ String compressionLevel,
+ int rowGroupCheckMinRecordCount,
+ int rowGroupCheckMaxRecordCount,
+ int bloomFilterMaxBytes,
+ Map<String, String> columnBloomFilterFpp,
+ Map<String, String> columnBloomFilterEnabled,
+ Map<String, String> columnStatsEnabled,
+ boolean dictionaryEnabled) {
+ this.rowGroupSize = rowGroupSize;
+ this.pageSize = pageSize;
+ this.pageRowLimit = pageRowLimit;
+ this.dictionaryPageSize = dictionaryPageSize;
+ this.codec = codec;
+ this.compressionLevel = compressionLevel;
+ this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount;
+ this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount;
+ this.bloomFilterMaxBytes = bloomFilterMaxBytes;
+ this.columnBloomFilterFpp = columnBloomFilterFpp;
+ this.columnBloomFilterEnabled = columnBloomFilterEnabled;
+ this.columnStatsEnabled = columnStatsEnabled;
+ this.dictionaryEnabled = dictionaryEnabled;
+ }
+
+ static Context dataContext(Map<String, String> config) {
+ int rowGroupSize =
+ PropertyUtil.propertyAsInt(
+ config, PARQUET_ROW_GROUP_SIZE_BYTES,
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+ Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be
> 0");
+
+ int pageSize =
+ PropertyUtil.propertyAsInt(
+ config, PARQUET_PAGE_SIZE_BYTES,
PARQUET_PAGE_SIZE_BYTES_DEFAULT);
+ Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
+
+ int pageRowLimit =
+ PropertyUtil.propertyAsInt(
+ config, PARQUET_PAGE_ROW_LIMIT,
PARQUET_PAGE_ROW_LIMIT_DEFAULT);
+ Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit
must be > 0");
+
+ int dictionaryPageSize =
+ PropertyUtil.propertyAsInt(
+ config, PARQUET_DICT_SIZE_BYTES,
PARQUET_DICT_SIZE_BYTES_DEFAULT);
+ Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page
size must be > 0");
+
+ String codecAsString =
+ config.getOrDefault(PARQUET_COMPRESSION,
PARQUET_COMPRESSION_DEFAULT);
+ CompressionCodecName codec = toCodec(codecAsString);
+
+ String compressionLevel =
+ config.getOrDefault(PARQUET_COMPRESSION_LEVEL,
PARQUET_COMPRESSION_LEVEL_DEFAULT);
+
+ int rowGroupCheckMinRecordCount =
+ PropertyUtil.propertyAsInt(
+ config,
+ PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT,
+ PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT);
+ Preconditions.checkArgument(
+ rowGroupCheckMinRecordCount > 0, "Row group check minimal record
count must be > 0");
+
+ int rowGroupCheckMaxRecordCount =
+ PropertyUtil.propertyAsInt(
+ config,
+ PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT,
+ PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT);
+ Preconditions.checkArgument(
+ rowGroupCheckMaxRecordCount > 0, "Row group check maximum record
count must be > 0");
+ Preconditions.checkArgument(
+ rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
+ "Row group check maximum record count must be >= minimal record
count");
+
+ int bloomFilterMaxBytes =
+ PropertyUtil.propertyAsInt(
+ config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
+ Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max
Bytes must be > 0");
+
+ Map<String, String> columnBloomFilterFpp =
+ PropertyUtil.propertiesWithPrefix(config,
PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX);
+
+ Map<String, String> columnBloomFilterEnabled =
+ PropertyUtil.propertiesWithPrefix(config,
PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);
+
+ Map<String, String> columnStatsEnabled =
+ PropertyUtil.propertiesWithPrefix(config,
PARQUET_COLUMN_STATS_ENABLED_PREFIX);
+
+ boolean dictionaryEnabled =
+ PropertyUtil.propertyAsBoolean(config,
ParquetOutputFormat.ENABLE_DICTIONARY, true);
+
+ return new Context(
+ rowGroupSize,
+ pageSize,
+ pageRowLimit,
+ dictionaryPageSize,
+ codec,
+ compressionLevel,
+ rowGroupCheckMinRecordCount,
+ rowGroupCheckMaxRecordCount,
+ bloomFilterMaxBytes,
+ columnBloomFilterFpp,
+ columnBloomFilterEnabled,
+ columnStatsEnabled,
+ dictionaryEnabled);
+ }
+
+ static Context deleteContext(Map<String, String> config) {
+ // default delete config using data config
+ Context dataContext = dataContext(config);
+
+ int rowGroupSize =
+ PropertyUtil.propertyAsInt(
+ config, DELETE_PARQUET_ROW_GROUP_SIZE_BYTES,
dataContext.rowGroupSize());
+ Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be
> 0");
+
+ int pageSize =
+ PropertyUtil.propertyAsInt(
+ config, DELETE_PARQUET_PAGE_SIZE_BYTES,
dataContext.pageSize());
+ Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
+
+ int pageRowLimit =
+ PropertyUtil.propertyAsInt(
+ config, DELETE_PARQUET_PAGE_ROW_LIMIT,
dataContext.pageRowLimit());
+ Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit
must be > 0");
+
+ int dictionaryPageSize =
+ PropertyUtil.propertyAsInt(
+ config, DELETE_PARQUET_DICT_SIZE_BYTES,
dataContext.dictionaryPageSize());
+ Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page
size must be > 0");
+
+ String codecAsString = config.get(DELETE_PARQUET_COMPRESSION);
+ CompressionCodecName codec =
+ codecAsString != null ? toCodec(codecAsString) :
dataContext.codec();
+
+ String compressionLevel =
+ config.getOrDefault(DELETE_PARQUET_COMPRESSION_LEVEL,
dataContext.compressionLevel());
+
+ int rowGroupCheckMinRecordCount =
+ PropertyUtil.propertyAsInt(
+ config,
+ DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT,
+ dataContext.rowGroupCheckMinRecordCount());
+ Preconditions.checkArgument(
+ rowGroupCheckMinRecordCount > 0, "Row group check minimal record
count must be > 0");
+
+ int rowGroupCheckMaxRecordCount =
+ PropertyUtil.propertyAsInt(
+ config,
+ DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT,
+ dataContext.rowGroupCheckMaxRecordCount());
+ Preconditions.checkArgument(
+ rowGroupCheckMaxRecordCount > 0, "Row group check maximum record
count must be > 0");
+ Preconditions.checkArgument(
+ rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
+ "Row group check maximum record count must be >= minimal record
count");
+
+ boolean dictionaryEnabled =
+ PropertyUtil.propertyAsBoolean(config,
ParquetOutputFormat.ENABLE_DICTIONARY, true);
+
+ return new Context(
+ rowGroupSize,
+ pageSize,
+ pageRowLimit,
+ dictionaryPageSize,
+ codec,
+ compressionLevel,
+ rowGroupCheckMinRecordCount,
+ rowGroupCheckMaxRecordCount,
+ PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT,
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ dictionaryEnabled);
+ }
+
+ private static CompressionCodecName toCodec(String codecAsString) {
+ try {
+ return
CompressionCodecName.valueOf(codecAsString.toUpperCase(Locale.ENGLISH));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Unsupported compression codec: "
+ codecAsString);
+ }
+ }
+
+ int rowGroupSize() {
+ return rowGroupSize;
+ }
+
+ int pageSize() {
+ return pageSize;
+ }
+
+ int pageRowLimit() {
+ return pageRowLimit;
+ }
+
+ int dictionaryPageSize() {
+ return dictionaryPageSize;
+ }
+
+ CompressionCodecName codec() {
+ return codec;
+ }
+
+ String compressionLevel() {
+ return compressionLevel;
+ }
+
+ int rowGroupCheckMinRecordCount() {
+ return rowGroupCheckMinRecordCount;
+ }
+
+ int rowGroupCheckMaxRecordCount() {
+ return rowGroupCheckMaxRecordCount;
+ }
+
+ int bloomFilterMaxBytes() {
+ return bloomFilterMaxBytes;
+ }
+
+ Map<String, String> columnBloomFilterFpp() {
+ return columnBloomFilterFpp;
+ }
+
+ Map<String, String> columnBloomFilterEnabled() {
+ return columnBloomFilterEnabled;
+ }
+
+ Map<String, String> columnStatsEnabled() {
+ return columnStatsEnabled;
+ }
+
+ boolean dictionaryEnabled() {
+ return dictionaryEnabled;
+ }
+ }
+ }
+
+ public static DataWriteBuilder writeData(OutputFile file) {
+ return new DataWriteBuilder(file);
+ }
+
+ public static DataWriteBuilder writeData(EncryptedOutputFile file) {
+ if (file instanceof NativeEncryptionOutputFile) {
+ NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile)
file;
+ return writeData(nativeFile.plainOutputFile())
+ .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
+ .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
+ } else {
+ return writeData(file.encryptingOutputFile());
+ }
+ }
+
+ public static class DataWriteBuilder {
+ private final WriteBuilder appenderBuilder;
+ private final String location;
+ private PartitionSpec spec = null;
+ private StructLike partition = null;
+ private EncryptionKeyMetadata keyMetadata = null;
+ private SortOrder sortOrder = null;
+
+ private DataWriteBuilder(OutputFile file) {
+ this.appenderBuilder = write(file);
+ this.location = file.location();
+ }
+
+ public DataWriteBuilder forTable(Table table) {
+ schema(table.schema());
+ withSpec(table.spec());
+ setAll(table.properties());
+ metricsConfig(MetricsConfig.forTable(table));
+ return this;
+ }
+
+ public DataWriteBuilder schema(Schema newSchema) {
+ appenderBuilder.schema(newSchema);
+ return this;
+ }
+
+ public DataWriteBuilder set(String property, String value) {
+ appenderBuilder.set(property, value);
+ return this;
+ }
+
+ public DataWriteBuilder setAll(Map<String, String> properties) {
+ appenderBuilder.setAll(properties);
+ return this;
+ }
+
+ public DataWriteBuilder meta(String property, String value) {
+ appenderBuilder.meta(property, value);
+ return this;
+ }
+
+ public DataWriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public DataWriteBuilder overwrite(boolean enabled) {
+ appenderBuilder.overwrite(enabled);
+ return this;
+ }
+
+ public DataWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+ appenderBuilder.metricsConfig(newMetricsConfig);
+ return this;
+ }
+
+ public DataWriteBuilder createWriterFunc(
+ Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
+ appenderBuilder.createWriterFunc(newCreateWriterFunc);
+ return this;
+ }
+
+ public DataWriteBuilder createWriterFunc(
+ BiFunction<Schema, MessageType, ParquetValueWriter<?>>
newCreateWriterFunc) {
+ appenderBuilder.createWriterFunc(newCreateWriterFunc);
+ return this;
+ }
+
+ public DataWriteBuilder variantShreddingFunc(VariantShreddingFunction
func) {
+ appenderBuilder.variantShreddingFunc(func);
+ return this;
+ }
+
+ public DataWriteBuilder withSpec(PartitionSpec newSpec) {
+ this.spec = newSpec;
+ return this;
+ }
+
+ public DataWriteBuilder withPartition(StructLike newPartition) {
+ this.partition = newPartition;
+ return this;
+ }
+
+ public DataWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+ this.keyMetadata = metadata;
+ return this;
+ }
+
+ public DataWriteBuilder withFileEncryptionKey(ByteBuffer
fileEncryptionKey) {
+ appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
+ return this;
+ }
+
+ public DataWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
+ appenderBuilder.withAADPrefix(aadPrefix);
+ return this;
+ }
+
+ public DataWriteBuilder withSortOrder(SortOrder newSortOrder) {
+ this.sortOrder = newSortOrder;
+ return this;
+ }
+
+ public <T> DataWriter<T> build() throws IOException {
+ Preconditions.checkArgument(spec != null, "Cannot create data writer
without spec");
+ Preconditions.checkArgument(
+ spec.isUnpartitioned() || partition != null,
+ "Partition must not be null when creating data writer for
partitioned spec");
+
+ FileAppender<T> fileAppender = appenderBuilder.build();
+ return new DataWriter<>(
+ fileAppender, FileFormat.PARQUET, location, spec, partition,
keyMetadata, sortOrder);
+ }
+ }
+
+ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
+ return new DeleteWriteBuilder(file);
+ }
+
+ public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
+ if (file instanceof NativeEncryptionOutputFile) {
+ NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile)
file;
+ return writeDeletes(nativeFile.plainOutputFile())
+ .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
+ .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
+ } else {
+ return writeDeletes(file.encryptingOutputFile());
+ }
+ }
+
+ public static class DeleteWriteBuilder {
+ private final WriteBuilder appenderBuilder;
+ private final String location;
+ private BiFunction<Schema, MessageType, ParquetValueWriter<?>>
createWriterFunc = null;
+ private Schema rowSchema = null;
+ private PartitionSpec spec = null;
+ private StructLike partition = null;
+ private EncryptionKeyMetadata keyMetadata = null;
+ private int[] equalityFieldIds = null;
+ private SortOrder sortOrder;
+ private Function<CharSequence, ?> pathTransformFunc = Function.identity();
+
+ private DeleteWriteBuilder(OutputFile file) {
+ this.appenderBuilder = write(file);
+ this.location = file.location();
+ }
+
+ public DeleteWriteBuilder forTable(Table table) {
+ rowSchema(table.schema());
+ withSpec(table.spec());
+ setAll(table.properties());
+ metricsConfig(MetricsConfig.forTable(table));
+ return this;
+ }
+
+ public DeleteWriteBuilder set(String property, String value) {
+ appenderBuilder.set(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder setAll(Map<String, String> properties) {
+ appenderBuilder.setAll(properties);
+ return this;
+ }
+
+ public DeleteWriteBuilder meta(String property, String value) {
+ appenderBuilder.meta(property, value);
+ return this;
+ }
+
+ public DeleteWriteBuilder overwrite() {
+ return overwrite(true);
+ }
+
+ public DeleteWriteBuilder overwrite(boolean enabled) {
+ appenderBuilder.overwrite(enabled);
+ return this;
+ }
+
+ public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
+ appenderBuilder.metricsConfig(newMetricsConfig);
+ return this;
+ }
+
+ public DeleteWriteBuilder createWriterFunc(
+ Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
+ this.createWriterFunc = (ignored, fileSchema) ->
newCreateWriterFunc.apply(fileSchema);
+ return this;
+ }
+
+ public DeleteWriteBuilder createWriterFunc(
+ BiFunction<Schema, MessageType, ParquetValueWriter<?>>
newCreateWriterFunc) {
+ this.createWriterFunc = newCreateWriterFunc;
+ return this;
+ }
+
+ public DeleteWriteBuilder rowSchema(Schema newSchema) {
+ this.rowSchema = newSchema;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
+ this.spec = newSpec;
+ return this;
+ }
+
+ public DeleteWriteBuilder withPartition(StructLike key) {
+ this.partition = key;
+ return this;
+ }
+
+ public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
+ this.keyMetadata = metadata;
+ return this;
+ }
+
+ public DeleteWriteBuilder withFileEncryptionKey(ByteBuffer
fileEncryptionKey) {
+ appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
+ return this;
+ }
+
+ public DeleteWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
+ appenderBuilder.withAADPrefix(aadPrefix);
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
+ this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
+ return this;
+ }
+
+ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
+ this.equalityFieldIds = fieldIds;
+ return this;
+ }
+
+ public DeleteWriteBuilder transformPaths(Function<CharSequence, ?>
newPathTransformFunc) {
+ this.pathTransformFunc = newPathTransformFunc;
+ return this;
+ }
+
+ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
+ this.sortOrder = newSortOrder;
+ return this;
+ }
+
+ public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws
IOException {
+ Preconditions.checkState(
+ rowSchema != null, "Cannot create equality delete file without a
schema");
+ Preconditions.checkState(
+ equalityFieldIds != null, "Cannot create equality delete file
without delete field ids");
+ Preconditions.checkState(
+ createWriterFunc != null,
+ "Cannot create equality delete file unless createWriterFunc is set");
+ Preconditions.checkArgument(
+ spec != null, "Spec must not be null when creating equality delete
writer");
+ Preconditions.checkArgument(
+ spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
+
+ meta("delete-type", "equality");
+ meta(
+ "delete-field-ids",
+ IntStream.of(equalityFieldIds)
+ .mapToObj(Objects::toString)
+ .collect(Collectors.joining(", ")));
+
+ // the appender uses the row schema without extra columns
+ appenderBuilder.schema(rowSchema);
+ appenderBuilder.createWriterFunc(createWriterFunc);
+ appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
+
+ return new EqualityDeleteWriter<>(
+ appenderBuilder.build(),
+ FileFormat.PARQUET,
+ location,
+ spec,
+ partition,
+ keyMetadata,
+ sortOrder,
+ equalityFieldIds);
+ }
+
+ public <T> PositionDeleteWriter<T> buildPositionWriter() throws
IOException {
+ Preconditions.checkState(
+ equalityFieldIds == null, "Cannot create position delete file using
delete field ids");
+ Preconditions.checkArgument(
+ spec != null, "Spec must not be null when creating position delete
writer");
+ Preconditions.checkArgument(
+ spec.isUnpartitioned() || partition != null,
+ "Partition must not be null for partitioned writes");
+ Preconditions.checkArgument(
+ rowSchema == null || createWriterFunc != null,
+ "Create function should be provided if we write row data");
+
+ meta("delete-type", "position");
+
+ if (rowSchema != null && createWriterFunc != null) {
+ // the appender uses the row schema wrapped with position fields
+ appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema));
+
+ appenderBuilder.createWriterFunc(
+ (schema, parquetSchema) -> {
+ ParquetValueWriter<?> writer = createWriterFunc.apply(schema,
parquetSchema);
+ if (writer instanceof StructWriter) {
+ return new PositionDeleteStructWriter<T>(
+ (StructWriter<?>) writer, pathTransformFunc);
+ } else {
+ throw new UnsupportedOperationException(
+ "Cannot wrap writer for position deletes: " +
writer.getClass());
+ }
+ });
+
+ } else {
+ appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
+
+ // We ignore the 'createWriterFunc' and 'rowSchema' even if is
provided, since we do not
+ // write row data itself
+ appenderBuilder.createWriterFunc(
+ (schema, parquetSchema) ->
+ new PositionDeleteStructWriter<T>(
+ (StructWriter<?>) GenericParquetWriter.create(schema,
parquetSchema),
+ Function.identity()));
+ }
+
+ appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
+
+ return new PositionDeleteWriter<>(
+ appenderBuilder.build(), FileFormat.PARQUET, location, spec,
partition, keyMetadata);
+ }
+ }
+
+ private static class ParquetWriteBuilder<T>
+ extends ParquetWriter.Builder<T, ParquetWriteBuilder<T>> {
+ private Map<String, String> keyValueMetadata = Maps.newHashMap();
+ private Map<String, String> config = Maps.newHashMap();
+ private MessageType type;
+ private WriteSupport<T> writeSupport;
+
+ private ParquetWriteBuilder(org.apache.parquet.io.OutputFile path) {
+ super(path);
+ }
+
+ @Override
+ protected ParquetWriteBuilder<T> self() {
+ return this;
+ }
+
+ public ParquetWriteBuilder<T> setKeyValueMetadata(Map<String, String>
keyValueMetadata) {
+ this.keyValueMetadata = keyValueMetadata;
+ return self();
+ }
+
+ public ParquetWriteBuilder<T> setConfig(Map<String, String> config) {
+ this.config = config;
+ return self();
+ }
+
+ public ParquetWriteBuilder<T> setType(MessageType type) {
+ this.type = type;
+ return self();
+ }
+
+ public ParquetWriteBuilder<T> setWriteSupport(WriteSupport<T>
writeSupport) {
+ this.writeSupport = writeSupport;
+ return self();
+ }
+
+ @Override
+ protected WriteSupport<T> getWriteSupport(Configuration configuration) {
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ configuration.set(entry.getKey(), entry.getValue());
+ }
+ return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport);
+ }
+ }
+
+ public static ReadBuilder read(InputFile file) {
+ if (file instanceof NativeEncryptionInputFile) {
+ NativeEncryptionInputFile nativeFile = (NativeEncryptionInputFile) file;
+ return new ReadBuilder(nativeFile.encryptedInputFile())
+ .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
+ .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
+ } else {
+ return new ReadBuilder(file);
+ }
+ }
+
+ public static class ReadBuilder implements InternalData.ReadBuilder {
+ private final InputFile file;
+ private final Map<String, String> properties = Maps.newHashMap();
+ private Long start = null;
+ private Long length = null;
+ private Schema schema = null;
+ private Expression filter = null;
+ private ReadSupport<?> readSupport = null;
+ private Function<MessageType, VectorizedReader<?>> batchedReaderFunc =
null;
+ private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
+ private BiFunction<Schema, MessageType, ParquetValueReader<?>>
readerFuncWithSchema = null;
+ private boolean filterRecords = true;
+ private boolean caseSensitive = true;
+ private boolean callInit = false;
+ private boolean reuseContainers = false;
+ private int maxRecordsPerBatch = 10000;
+ private NameMapping nameMapping = null;
+ private ByteBuffer fileEncryptionKey = null;
+ private ByteBuffer fileAADPrefix = null;
+
+ private ReadBuilder(InputFile file) {
+ this.file = file;
+ }
+
+ /**
+ * Restricts the read to the given range: [start, start + length).
+ *
+ * @param newStart the start position for this read
+ * @param newLength the length of the range this read should scan
+ * @return this builder for method chaining
+ */
+ @Override
+ public ReadBuilder split(long newStart, long newLength) {
+ this.start = newStart;
+ this.length = newLength;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder project(Schema newSchema) {
+ this.schema = newSchema;
+ return this;
+ }
+
+ public ReadBuilder caseInsensitive() {
+ return caseSensitive(false);
+ }
+
+ public ReadBuilder caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
+ public ReadBuilder filterRecords(boolean newFilterRecords) {
+ this.filterRecords = newFilterRecords;
+ return this;
+ }
+
+ public ReadBuilder filter(Expression newFilter) {
+ this.filter = newFilter;
+ return this;
+ }
+
+ /**
+ * @deprecated will be removed in 2.0.0; use {@link
#createReaderFunc(Function)} instead
+ */
+ @Deprecated
+ public ReadBuilder readSupport(ReadSupport<?> newFilterSupport) {
+ this.readSupport = newFilterSupport;
+ return this;
+ }
+
+ public ReadBuilder createReaderFunc(
+ Function<MessageType, ParquetValueReader<?>> newReaderFunction) {
+ Preconditions.checkArgument(
+ this.batchedReaderFunc == null,
+ "Cannot set reader function: batched reader function already set");
+ Preconditions.checkArgument(
+ this.readerFuncWithSchema == null,
+ "Cannot set reader function: 2-argument reader function already
set");
+ this.readerFunc = newReaderFunction;
+ return this;
+ }
+
+ public ReadBuilder createReaderFunc(
+ BiFunction<Schema, MessageType, ParquetValueReader<?>>
newReaderFunction) {
+ Preconditions.checkArgument(
+ this.readerFunc == null,
+ "Cannot set 2-argument reader function: reader function already
set");
+ Preconditions.checkArgument(
+ this.batchedReaderFunc == null,
+ "Cannot set 2-argument reader function: batched reader function
already set");
+ this.readerFuncWithSchema = newReaderFunction;
+ return this;
+ }
+
+ public ReadBuilder createBatchedReaderFunc(Function<MessageType,
VectorizedReader<?>> func) {
+ Preconditions.checkArgument(
+ this.readerFunc == null,
+ "Cannot set batched reader function: reader function already set");
+ Preconditions.checkArgument(
+ this.readerFuncWithSchema == null,
+ "Cannot set batched reader function: 2-argument reader function
already set");
+ this.batchedReaderFunc = func;
+ return this;
+ }
+
+ public ReadBuilder set(String key, String value) {
+ properties.put(key, value);
+ return this;
+ }
+
+ /**
+ * @deprecated will be removed in 2.0.0; use {@link
#createReaderFunc(Function)} instead
+ */
+ @Deprecated
+ public ReadBuilder callInit() {
+ this.callInit = true;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder reuseContainers() {
+ this.reuseContainers = true;
+ return this;
+ }
+
+ public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
+ this.maxRecordsPerBatch = numRowsPerBatch;
+ return this;
+ }
+
+ public ReadBuilder withNameMapping(NameMapping newNameMapping) {
+ this.nameMapping = newNameMapping;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder setRootType(Class<? extends StructLike> rootClass) {
+ throw new UnsupportedOperationException("Custom types are not yet
supported");
+ }
+
+ @Override
+ public ReadBuilder setCustomType(int fieldId, Class<? extends StructLike>
structClass) {
+ throw new UnsupportedOperationException("Custom types are not yet
supported");
+ }
+
+ public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
+ this.fileEncryptionKey = encryptionKey;
+ return this;
+ }
+
+ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
+ this.fileAADPrefix = aadPrefix;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
+ public <D> CloseableIterable<D> build() {
+ FileDecryptionProperties fileDecryptionProperties = null;
+ if (fileEncryptionKey != null) {
+ byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
+ byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
+ fileDecryptionProperties =
+ FileDecryptionProperties.builder()
+ .withFooterKey(encryptionKeyArray)
+ .withAADPrefix(aadPrefixArray)
+ .build();
+ } else {
+ Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with
null encryption key");
+ }
+
+ if (readerFunc != null || readerFuncWithSchema != null ||
batchedReaderFunc != null) {
+ ParquetReadOptions.Builder optionsBuilder;
+ if (file instanceof HadoopInputFile) {
+ // remove read properties already set that may conflict with this
read
+ Configuration conf = new Configuration(((HadoopInputFile)
file).getConf());
+ for (String property : READ_PROPERTIES_TO_REMOVE) {
+ conf.unset(property);
+ }
+ optionsBuilder = HadoopReadOptions.builder(conf);
+ } else {
+ optionsBuilder = ParquetReadOptions.builder(new
PlainParquetConfiguration());
+ }
+
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ optionsBuilder.set(entry.getKey(), entry.getValue());
+ }
+
+ if (start != null) {
+ optionsBuilder.withRange(start, start + length);
+ }
+
+ if (fileDecryptionProperties != null) {
+ optionsBuilder.withDecryption(fileDecryptionProperties);
+ }
+
+ ParquetReadOptions options = optionsBuilder.build();
+
+ NameMapping mapping;
+ if (nameMapping != null) {
+ mapping = nameMapping;
+ } else if
(SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) {
+ mapping = null;
+ } else {
+ mapping = NameMapping.empty();
+ }
+
+ if (batchedReaderFunc != null) {
+ return new VectorizedParquetReader<>(
+ file,
+ schema,
+ options,
+ batchedReaderFunc,
+ mapping,
+ filter,
+ reuseContainers,
+ caseSensitive,
+ maxRecordsPerBatch);
+ } else {
+ Function<MessageType, ParquetValueReader<?>> readBuilder =
+ readerFuncWithSchema != null ?
+ fileType -> readerFuncWithSchema.apply(schema, fileType) :
+ readerFunc;
+ return new org.apache.iceberg.parquet.ParquetReader<>(
+ file, schema, options, readBuilder, mapping, filter,
reuseContainers, caseSensitive);
+ }
+ }
+
+ ParquetReadBuilder<D> builder = new
ParquetReadBuilder<>(ParquetIO.file(file));
+
+ builder.project(schema);
+
+ if (readSupport != null) {
+ builder.readSupport((ReadSupport<D>) readSupport);
+ } else {
+ builder.readSupport(new AvroReadSupport<>(ParquetAvro.DEFAULT_MODEL));
+ }
+
+ // default options for readers
+ builder
+ .set("parquet.strict.typing", "false") // allow type promotion
+ .set("parquet.avro.compatible", "false") // use the new RecordReader
with Utf8 support
+ .set(
+ "parquet.avro.add-list-element-records",
+ "false"); // assume that lists use a 3-level schema
+
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ builder.set(entry.getKey(), entry.getValue());
+ }
+
+ if (filter != null) {
+ // TODO: should not need to get the schema to push down before opening
the file.
+ // Parquet should allow setting a filter inside its read support
+ ParquetReadOptions decryptOptions =
+ ParquetReadOptions.builder(new PlainParquetConfiguration())
+ .withDecryption(fileDecryptionProperties)
+ .build();
+ MessageType type;
+ try (ParquetFileReader schemaReader =
+ ParquetFileReader.open(ParquetIO.file(file), decryptOptions))
{
+ type = schemaReader.getFileMetaData().getSchema();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ Schema fileSchema = ParquetSchemaUtil.convert(type);
+ builder
+ .useStatsFilter()
+ .useDictionaryFilter()
+ .useRecordFilter(filterRecords)
+ .useBloomFilter()
+ .withFilter(ParquetFilters.convert(fileSchema, filter,
caseSensitive));
+ } else {
+ // turn off filtering
+ builder
+ .useStatsFilter(false)
+ .useDictionaryFilter(false)
+ .useBloomFilter(false)
+ .useRecordFilter(false);
+ }
+
+ if (callInit) {
+ builder.callInit();
+ }
+
+ if (start != null) {
+ builder.withFileRange(start, start + length);
+ }
+
+ if (nameMapping != null) {
+ builder.withNameMapping(nameMapping);
+ }
+
+ if (fileDecryptionProperties != null) {
+ builder.withDecryption(fileDecryptionProperties);
+ }
+
+ return new ParquetIterable<>(builder);
+ }
+ }
+
+ private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
+ private Schema schema = null;
+ private ReadSupport<T> readSupport = null;
+ private boolean callInit = false;
+ private NameMapping nameMapping = null;
+
+ private ParquetReadBuilder(org.apache.parquet.io.InputFile file) {
+ super(file);
+ }
+
+ public ParquetReadBuilder<T> project(Schema newSchema) {
+ this.schema = newSchema;
+ return this;
+ }
+
+ public ParquetReadBuilder<T> withNameMapping(NameMapping newNameMapping) {
+ this.nameMapping = newNameMapping;
+ return this;
+ }
+
+ public ParquetReadBuilder<T> readSupport(ReadSupport<T> newReadSupport) {
+ this.readSupport = newReadSupport;
+ return this;
+ }
+
+ public ParquetReadBuilder<T> callInit() {
+ this.callInit = true;
+ return this;
+ }
+
+ @Override
+ protected ReadSupport<T> getReadSupport() {
+ return new ParquetReadSupport<>(schema, readSupport, callInit,
nameMapping);
+ }
+ }
+
+ /**
+ * Combines several files into one
+ *
+ * @param inputFiles an {@link Iterable} of parquet files. The order of
iteration determines the
+ * order in which content of files are read and written to the {@code
outputFile}
+ * @param outputFile the output parquet file containing all the data from
{@code inputFiles}
+ * @param rowGroupSize the row group size to use when writing the {@code
outputFile}
+ * @param schema the schema of the data
+ * @param metadata extraMetadata to write at the footer of the {@code
outputFile}
+ */
+ public static void concat(
+ Iterable<File> inputFiles,
+ File outputFile,
+ int rowGroupSize,
+ Schema schema,
+ Map<String, String> metadata)
+ throws IOException {
+ OutputFile file = Files.localOutput(outputFile);
+ try (ParquetFileWriter writer =
+ new ParquetFileWriter(
+ ParquetIO.file(file),
+ ParquetSchemaUtil.convert(schema, "table"),
+ ParquetFileWriter.Mode.CREATE,
+ rowGroupSize,
+ 0)) {
+ writer.start();
+ for (File inputFile : inputFiles) {
+ writer.appendFile(ParquetIO.file(Files.localInput(inputFile)));
+ }
+ writer.end(metadata);
+ }
+ }
+}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
new file mode 100644
index 00000000000..7d0f0feec8c
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.Variant;
+
+public class VariantUtil {
+
+ private VariantUtil() {
+ }
+
+ /**
+ * Create a VariantShreddingFunction if variant shredding is enabled and the
schema has variant columns.
+ *
+ * @param schema The Iceberg schema
+ * @param sampleRecord A sample record to infer variant schemas from actual
data (can be null)
+ * @param properties Table properties to check if variant shredding is
enabled
+ * @return An Optional containing the VariantShreddingFunction if applicable
+ */
+ public static Optional<VariantShreddingFunction> variantShreddingFunc(
+ Schema schema,
+ Supplier<Record> sampleRecord,
+ Map<String, String> properties) {
+
+ // Preconditions: must have variant columns + property enabled
+ if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) {
+ return Optional.empty();
+ }
+
+ VariantShreddingFunction fn =
+ constructVariantShreddingFunc(sampleRecord.get(), schema);
+
+ return Optional.of(fn);
+ }
+
+ private static VariantShreddingFunction constructVariantShreddingFunc(
+ Record sampleRecord, Schema schema) {
+
+ return (id, name) -> {
+ // Validate the field exists and is a variant type
+ Types.NestedField field = schema.findField(id);
+
+ if (field == null || !(field.type() instanceof Types.VariantType)) {
+ return null; // Not a variant field, no shredding
+ }
+
+ // If we have a sample record, try to generate schema from actual data
+ if (sampleRecord != null) {
+ try {
+ Object variantValue = sampleRecord.getField(name);
+ if (variantValue instanceof Variant variant) {
+ // Use ParquetVariantUtil to generate schema from actual variant
value
+ return ParquetVariantUtil.toParquetSchema(variant.value());
+ }
+ } catch (Exception e) {
+ // Fall through to default schema
+ }
+ }
+ return null;
+ };
+ }
+
+ /**
+ * Check if the schema contains any variant columns.
+ */
+ private static boolean hasVariantColumns(Schema schema) {
+ return schema.columns().stream()
+ .anyMatch(field -> field.type() instanceof Types.VariantType);
+ }
+
+ /**
+ * Check if variant shredding is enabled via table properties.
+ */
+ private static boolean isVariantShreddingEnabled(Map<String, String>
properties) {
+ String shreddingEnabled =
properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED);
+ return Boolean.parseBoolean(shreddingEnabled);
+ }
+
+}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
index 8a1cf740067..990d6e782fd 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
@@ -22,6 +22,9 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -32,11 +35,16 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assume.assumeTrue;
/**
@@ -187,6 +195,68 @@ public void testSpecialCharacters() {
Assert.assertArrayEquals(new Object[]{"star", 2L}, result.get(1));
}
+ @Test
+ public void testVariantSelectProjection() throws IOException {
+ assumeTrue(fileFormat == FileFormat.PARQUET);
+ assumeTrue(!isVectorized);
+
+ TableIdentifier table = TableIdentifier.of("default",
"variant_projection");
+ shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+
+ shell.executeStatement(
+ String.format(
+ "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG
STORED AS %s %s %s",
+ table,
+ fileFormat,
+ testTables.locationForCreateTableSQL(table),
+ testTables.propertiesForCreateTableSQL(
+ ImmutableMap.of("format-version", "3",
"variant.shredding.enabled", "true"))));
+
+ shell.executeStatement(
+ String.format(
+ "INSERT INTO %s VALUES " +
+ "(1, parse_json('{\"name\":\"Alice\",\"age\":30}'))," +
+ "(2, parse_json('{\"name\":\"Bob\"}'))",
+ table));
+
+ List<Object[]> rows =
+ shell.executeStatement(
+ String.format(
+ "SELECT id, " +
+ "variant_get(payload, '$.name') AS name, " +
+ "try_variant_get(payload, '$.age', 'int') AS age " +
+ "FROM %s ORDER BY id",
+ table));
+
+ Assert.assertEquals(2, rows.size());
+ Assert.assertEquals(1, ((Number) rows.get(0)[0]).intValue());
+ Assert.assertEquals("Alice", rows.get(0)[1]);
+ Assert.assertEquals(30, ((Number) rows.get(0)[2]).intValue());
+
+ Assert.assertEquals(2, ((Number) rows.get(1)[0]).intValue());
+ Assert.assertEquals("Bob", rows.get(1)[1]);
+ Assert.assertNull(rows.get(1)[2]);
+
+ Table icebergTable = testTables.loadTable(table);
+ Types.NestedField variantField =
icebergTable.schema().findField("payload");
+ Assert.assertNotNull("Variant column should exist", variantField);
+ DataFile dataFile =
+ StreamSupport.stream(
+
icebergTable.currentSnapshot().addedDataFiles(icebergTable.io()).spliterator(),
false)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No data files
written for test table"));
+
+ Path parquetPath = new Path(dataFile.path().toString());
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath,
shell.getHiveConf()))) {
+ MessageType parquetSchema =
reader.getFooter().getFileMetaData().getSchema();
+ GroupType variantType =
parquetSchema.getType(variantField.name()).asGroupType();
+ assertThat(variantType.containsField("typed_value")).isTrue();
+ }
+
+ shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+ }
+
@Test
public void testScanTableCaseInsensitive() throws IOException {
testTables.createTable(shell, "customers",
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q
b/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q
new file mode 100644
index 00000000000..25d84dd0c0a
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/positive/variant_type_shredding.q
@@ -0,0 +1,39 @@
+-- Mask random uuid
+--! qt:replace:/(\s+'uuid'=')\S+('\s*)/$1#Masked#$2/
+-- Mask random snapshot id
+--! qt:replace:/('current-snapshot-id'=')\d+/$1#SnapshotId#/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/('current-snapshot-timestamp-ms'=')\d+/$1#Masked#/
+
+-- SORT_QUERY_RESULTS
+set hive.explain.user=false;
+set hive.fetch.task.conversion=none;
+
+drop table if exists tbl_shredded_variant;
+
+-- Create test table
+CREATE EXTERNAL TABLE tbl_shredded_variant (
+ id INT,
+ data VARIANT
+) STORED BY ICEBERG
+tblproperties(
+ 'format-version'='3',
+ 'variant.shredding.enabled'='true'
+);
+
+-- Insert JSON structures
+INSERT INTO tbl_shredded_variant VALUES
+(1, parse_json('{"name": "John", "age": 30, "active": true}')),
+(2, parse_json('{"name": "Bill", "active": false}')),
+(3, parse_json('{"name": "Henry", "age": 20}'));
+
+-- Disable vectorized execution until Variant type is supported
+set hive.vectorized.execution.enabled=false;
+
+-- Retrieve and verify
+SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25;
+
+EXPLAIN
+SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25;
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out
b/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out
new file mode 100644
index 00000000000..b51bc749525
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/results/positive/variant_type_shredding.q.out
@@ -0,0 +1,101 @@
+PREHOOK: query: drop table if exists tbl_shredded_variant
+PREHOOK: type: DROPTABLE
+PREHOOK: Output: database:default
+POSTHOOK: query: drop table if exists tbl_shredded_variant
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Output: database:default
+PREHOOK: query: CREATE EXTERNAL TABLE tbl_shredded_variant (
+ id INT,
+ data VARIANT
+) STORED BY ICEBERG
+tblproperties(
+ 'format-version'='3',
+ 'variant.shredding.enabled'='true'
+)
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_shredded_variant
+POSTHOOK: query: CREATE EXTERNAL TABLE tbl_shredded_variant (
+ id INT,
+ data VARIANT
+) STORED BY ICEBERG
+tblproperties(
+ 'format-version'='3',
+ 'variant.shredding.enabled'='true'
+)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_shredded_variant
+PREHOOK: query: INSERT INTO tbl_shredded_variant VALUES
+(1, parse_json('{"name": "John", "age": 30, "active": true}')),
+(2, parse_json('{"name": "Bill", "active": false}')),
+(3, parse_json('{"name": "Henry", "age": 20}'))
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@tbl_shredded_variant
+POSTHOOK: query: INSERT INTO tbl_shredded_variant VALUES
+(1, parse_json('{"name": "John", "age": 30, "active": true}')),
+(2, parse_json('{"name": "Bill", "active": false}')),
+(3, parse_json('{"name": "Henry", "age": 20}'))
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@tbl_shredded_variant
+PREHOOK: query: SELECT id, try_variant_get(data, '$.name') FROM
tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_shredded_variant
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: SELECT id, try_variant_get(data, '$.name') FROM
tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_shredded_variant
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1 John
+PREHOOK: query: EXPLAIN
+SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_shredded_variant
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: EXPLAIN
+SELECT id, try_variant_get(data, '$.name') FROM tbl_shredded_variant
+WHERE variant_get(data, '$.age') > 25
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_shredded_variant
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
+ alias: tbl_shredded_variant
+ filterExpr: (UDFToDouble(variant_get(data, '$.age')) >
25.0D) (type: boolean)
+ Statistics: Num rows: 3 Data size: 1020 Basic stats:
COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (UDFToDouble(variant_get(data, '$.age')) >
25.0D) (type: boolean)
+ Statistics: Num rows: 1 Data size: 340 Basic stats:
COMPLETE Column stats: NONE
+ Select Operator
+ expressions: id (type: int), try_variant_get(data,
'$.name') (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 1 Data size: 340 Basic stats:
COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 1 Data size: 340 Basic stats:
COMPLETE Column stats: NONE
+ table:
+ input format:
org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format:
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
diff --git a/iceberg/iceberg-shading/pom.xml b/iceberg/iceberg-shading/pom.xml
index 17d50dd91d6..ac4bf4a9b10 100644
--- a/iceberg/iceberg-shading/pom.xml
+++ b/iceberg/iceberg-shading/pom.xml
@@ -117,6 +117,7 @@
</excludes>
</artifactSet>
<filters>
+ <!-- Global exclusions -->
<filter>
<artifact>*:*</artifact>
<excludes>
@@ -127,6 +128,13 @@
<exclude>static/</exclude>
</excludes>
</filter>
+ <!-- Exclude org.apache.iceberg.parquet.Parquet.class from
iceberg-parquet -->
+ <filter>
+ <artifact>org.apache.iceberg:iceberg-parquet</artifact>
+ <excludes>
+
<exclude>org/apache/iceberg/parquet/Parquet.class</exclude>
+ </excludes>
+ </filter>
</filters>
</configuration>
</execution>
diff --git
a/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java
b/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java
index 88e9240140d..3c92879ba0c 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/variant/VariantBuilder.java
@@ -301,7 +301,7 @@ public int getWritePos() {
// - when `allowDuplicateKeys` is true, the field with the greatest offset
value (the last
// appended one) is kept.
// - otherwise, throw an exception.
- public void finishWritingObject(int start, ArrayList<FieldEntry> fields) {
+ public void finishWritingObject(int start, List<FieldEntry> fields) {
int size = fields.size();
Collections.sort(fields);
int maxId = size == 0 ? 0 : fields.getFirst().id;