This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ee7138bf7a1 HIVE-29287: Addendum: Iceberg: [V3] Variant Shredding
support (#6234)
ee7138bf7a1 is described below
commit ee7138bf7a1a1ee1de07fa2243fe947a627268cb
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed Dec 17 19:37:11 2025 +0200
HIVE-29287: Addendum: Iceberg: [V3] Variant Shredding support (#6234)
---
.../org/apache/iceberg/hive/HiveSchemaUtil.java | 38 +
.../mr/hive/writer/HiveFileWriterFactory.java | 37 +-
.../writer/HiveIcebergCopyOnWriteRecordWriter.java | 24 +-
.../mr/hive/writer/HiveIcebergRecordWriter.java | 24 +-
.../mr/hive/writer/HiveIcebergWriterBase.java | 2 +-
.../hive/writer/SchemaInferringDefaultsWriter.java | 161 +++
.../java/org/apache/iceberg/parquet/Parquet.java | 1524 --------------------
.../org/apache/iceberg/parquet/VariantUtil.java | 164 ++-
.../iceberg/mr/hive/TestHiveIcebergSelects.java | 70 -
.../iceberg/mr/hive/TestHiveIcebergVariant.java | 193 +++
iceberg/iceberg-shading/pom.xml | 8 -
11 files changed, 556 insertions(+), 1689 deletions(-)
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index f651406df74..8277546b525 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -431,6 +431,44 @@ public static void setDefaultValues(Record record,
List<Types.NestedField> field
}
}
+ /**
+ * Sets a value into a {@link Record} using a struct-only field path
(top-level column or nested
+ * through structs). Intermediate struct records are created as needed.
+ *
+ * <p>If the path traverses a non-struct type (e.g. list/map), the operation
is ignored.
+ */
+ public static void setStructField(Record root, String[] path, Object value) {
+ if (root == null || path == null || path.length == 0) {
+ return;
+ }
+ Record current = root;
+ Types.StructType currentStruct = root.struct();
+
+ for (int i = 0; i < path.length - 1; i++) {
+ String fieldName = path[i];
+ Types.NestedField field = currentStruct.field(fieldName);
+ if (field == null || !field.type().isStructType()) {
+ return;
+ }
+ Types.StructType nestedStruct = field.type().asStructType();
+ current = getOrCreateStructRecord(current, fieldName, nestedStruct);
+ currentStruct = nestedStruct;
+ }
+
+ current.setField(path[path.length - 1], value);
+ }
+
+ private static Record getOrCreateStructRecord(
+ Record parent, String fieldName, Types.StructType structType) {
+ Object value = parent.getField(fieldName);
+ if (value instanceof Record) {
+ return (Record) value;
+ }
+ Record record = GenericRecord.create(structType);
+ parent.setField(fieldName, record);
+ return record;
+ }
+
// Special method for nested structs that always applies defaults to null
fields
private static void setDefaultValuesForNestedStruct(Record record,
List<Types.NestedField> fields) {
for (Types.NestedField field : fields) {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
index 6489c78f9ed..234cf928432 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveFileWriterFactory.java
@@ -20,7 +20,6 @@
package org.apache.iceberg.mr.hive.writer;
import java.util.Map;
-import java.util.function.Supplier;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
@@ -33,12 +32,13 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.VariantShreddingFunction;
import org.apache.iceberg.parquet.VariantUtil;
class HiveFileWriterFactory extends BaseFileWriterFactory<Record> {
private final Map<String, String> properties;
- private Supplier<Record> sampleRecord = null;
+ private Record sampleRecord = null;
HiveFileWriterFactory(
Table table,
@@ -85,9 +85,34 @@ protected void
configurePositionDelete(Avro.DeleteWriteBuilder builder) {
@Override
protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
builder.createWriterFunc(GenericParquetWriter::create);
- // Configure variant shredding function if conditions are met:
- VariantUtil.variantShreddingFunc(dataSchema(), sampleRecord, properties)
- .ifPresent(builder::variantShreddingFunc);
+ // Configure variant shredding if enabled and a sample record is available
+ if (VariantUtil.shouldUseVariantShredding(properties, dataSchema())) {
+ setVariantShreddingFunc(builder,
VariantUtil.variantShreddingFunc(sampleRecord, dataSchema()));
+ }
+ }
+
+ /**
+ * Sets a {@link VariantShreddingFunction} on the underlying Parquet write
builder.
+ *
+ * <p>{@link Parquet.DataWriteBuilder} does not expose {@code
variantShreddingFunc} directly; it is set on an
+ * internal write builder held in the private {@code appenderBuilder} field.
This method uses reflection to
+ * access that internal builder and invoke {@code
variantShreddingFunc(VariantShreddingFunction)}.
+ *
+ * TODO: Replace with {@code
DataWriteBuilder.variantShreddingFunc(VariantShreddingFunction)}
+ * once it becomes publicly available.
+ */
+ private static void setVariantShreddingFunc(Parquet.DataWriteBuilder
dataWriteBuilder,
+ VariantShreddingFunction fn) {
+ try {
+ java.lang.reflect.Field field =
dataWriteBuilder.getClass().getDeclaredField("appenderBuilder");
+ field.setAccessible(true);
+ Object writeBuilder = field.get(dataWriteBuilder);
+ writeBuilder.getClass()
+ .getMethod("variantShreddingFunc", VariantShreddingFunction.class)
+ .invoke(writeBuilder, fn);
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -164,7 +189,7 @@ HiveFileWriterFactory build() {
* Set a sample record to use for data-driven variant shredding schema
generation.
* Should be called before the Parquet writer is created.
*/
- public void initialize(Supplier<Record> record) {
+ public void initialize(Record record) {
if (sampleRecord == null) {
sampleRecord = record;
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
index e8d5ed2a8e4..f48bf21345e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergCopyOnWriteRecordWriter.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.util.List;
-import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
@@ -29,7 +28,6 @@
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.DataWriteResult;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.hive.FilesForCommit;
@@ -37,32 +35,18 @@
import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.Types;
-class HiveIcebergCopyOnWriteRecordWriter extends HiveIcebergWriterBase {
-
- private final int currentSpecId;
- private final Set<String> missingColumns;
- private final List<Types.NestedField> missingOrStructFields;
+class HiveIcebergCopyOnWriteRecordWriter extends SchemaInferringDefaultsWriter
{
private final GenericRecord rowDataTemplate;
private final List<DataFile> replacedDataFiles;
- private final HiveFileWriterFactory fileWriterFactory;
-
HiveIcebergCopyOnWriteRecordWriter(Table table, HiveFileWriterFactory
writerFactory,
OutputFileFactory deleteFileFactory, Context context) {
- super(table, newDataWriter(table, writerFactory, deleteFileFactory,
context));
+ super(table, writerFactory, deleteFileFactory, context);
- this.currentSpecId = table.spec().specId();
this.rowDataTemplate = GenericRecord.create(table.schema());
this.replacedDataFiles = Lists.newArrayList();
-
- this.missingColumns = context.missingColumns();
- this.missingOrStructFields =
specs.get(currentSpecId).schema().asStruct().fields().stream()
- .filter(field -> missingColumns.contains(field.name()) ||
field.type().isStructType())
- .toList();
- this.fileWriterFactory = writerFactory;
}
@Override
@@ -82,9 +66,7 @@ public void write(Writable row) throws IOException {
.build();
replacedDataFiles.add(dataFile);
} else {
- HiveSchemaUtil.setDefaultValues(rowData, missingOrStructFields,
missingColumns);
- fileWriterFactory.initialize(() -> rowData);
- writer.write(rowData, specs.get(currentSpecId), partition(rowData,
currentSpecId));
+ writeOrBuffer(rowData);
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
index ca9d232e3d5..cb9a21e9780 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergRecordWriter.java
@@ -21,45 +21,27 @@
import java.io.IOException;
import java.util.List;
-import java.util.Set;
import org.apache.hadoop.io.Writable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.Record;
-import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.io.DataWriteResult;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.mr.hive.FilesForCommit;
import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.types.Types;
-class HiveIcebergRecordWriter extends HiveIcebergWriterBase {
-
- private final int currentSpecId;
- private final Set<String> missingColumns;
- private final List<Types.NestedField> missingOrStructFields;
-
- private final HiveFileWriterFactory fileWriterFactory;
+class HiveIcebergRecordWriter extends SchemaInferringDefaultsWriter {
HiveIcebergRecordWriter(Table table, HiveFileWriterFactory fileWriterFactory,
OutputFileFactory dataFileFactory, Context context) {
- super(table, newDataWriter(table, fileWriterFactory, dataFileFactory,
context));
-
- this.currentSpecId = table.spec().specId();
- this.missingColumns = context.missingColumns();
- this.missingOrStructFields =
specs.get(currentSpecId).schema().asStruct().fields().stream()
- .filter(field -> missingColumns.contains(field.name()) ||
field.type().isStructType())
- .toList();
- this.fileWriterFactory = fileWriterFactory;
+ super(table, fileWriterFactory, dataFileFactory, context);
}
@Override
public void write(Writable row) throws IOException {
Record record = ((Container<Record>) row).get();
- HiveSchemaUtil.setDefaultValues(record, missingOrStructFields,
missingColumns);
- fileWriterFactory.initialize(() -> record);
- writer.write(record, specs.get(currentSpecId), partition(record,
currentSpecId));
+ writeOrBuffer(record);
}
@Override
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
index f797c37c130..ad026a83040 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergWriterBase.java
@@ -80,7 +80,7 @@ public void close(boolean abort) throws IOException {
.retry(3)
.suppressFailureWhenFinished()
.onFailure((file, exception) -> LOG.debug("Failed on to remove file
{} on abort", file, exception))
- .run(file -> io.deleteFile(file.path().toString()));
+ .run(file -> io.deleteFile(file.location()));
LOG.warn("HiveIcebergWriter is closed with abort");
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/SchemaInferringDefaultsWriter.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/SchemaInferringDefaultsWriter.java
new file mode 100644
index 00000000000..487085b6631
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/SchemaInferringDefaultsWriter.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive.writer;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.hive.writer.WriterBuilder.Context;
+import org.apache.iceberg.parquet.VariantUtil;
+import org.apache.iceberg.parquet.VariantUtil.VariantField;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types.NestedField;
+
+abstract class SchemaInferringDefaultsWriter extends HiveIcebergWriterBase {
+
+ private static final int VARIANT_SAMPLE_BUFFER_SIZE = 100;
+
+ private final HiveFileWriterFactory fileWriterFactory;
+
+ private final int currentSpecId;
+ private final Set<String> missingColumns;
+ private final List<NestedField> missingOrStructFields;
+
+ private final List<VariantField> variantFields;
+ private final BitSet sampledVariantFields;
+
+ private final List<Record> buffer;
+ private final Record accumulatedSample;
+ private boolean sampleInitialized = false;
+
+ SchemaInferringDefaultsWriter(
+ Table table,
+ HiveFileWriterFactory fileWriterFactory,
+ OutputFileFactory dataFileFactory,
+ Context context) {
+
+ super(table, newDataWriter(table, fileWriterFactory, dataFileFactory,
context));
+ Schema schema = table.schema();
+ this.fileWriterFactory = fileWriterFactory;
+
+ this.currentSpecId = table.spec().specId();
+ this.missingColumns = context.missingColumns();
+ this.missingOrStructFields = schema.columns().stream()
+ .filter(field -> missingColumns.contains(field.name()) ||
field.type().isStructType())
+ .toList();
+
+ this.variantFields =
VariantUtil.variantFieldsForShredding(table.properties(), schema);
+ this.sampledVariantFields = new BitSet(variantFields.size());
+
+ boolean shouldBuffer = !variantFields.isEmpty();
+ this.buffer = shouldBuffer ?
Lists.newArrayListWithCapacity(VARIANT_SAMPLE_BUFFER_SIZE) : null;
+ this.accumulatedSample = shouldBuffer ? GenericRecord.create(schema) :
null;
+ }
+
+ protected void writeOrBuffer(Record record) {
+ HiveSchemaUtil.setDefaultValues(record, missingOrStructFields,
missingColumns);
+
+ if (buffer != null && !sampleInitialized) {
+ accumulateSample(record);
+
+ if (allVariantFieldsSampled() || buffer.size() >=
VARIANT_SAMPLE_BUFFER_SIZE) {
+ // Use accumulated sample for schema inference
+ fileWriterFactory.initialize(accumulatedSample);
+ sampleInitialized = true;
+
+ flushBufferedRecords();
+ } else {
+ buffer.add(record.copy());
+ return;
+ }
+ }
+ writeRecord(record);
+ }
+
+ private void writeRecord(Record record) {
+ writer.write(record, specs.get(currentSpecId), partition(record,
currentSpecId));
+ }
+
+ private void flushBufferedRecords() {
+ for (Record bufferedRecord : buffer) {
+ writeRecord(bufferedRecord);
+ }
+ buffer.clear();
+ }
+
+ private boolean allVariantFieldsSampled() {
+ return sampledVariantFields.nextClearBit(0) >= variantFields.size();
+ }
+
+ private void accumulateSample(Record record) {
+ if (accumulatedSample == null || allVariantFieldsSampled()) {
+ return;
+ }
+ for (int fieldIndex = sampledVariantFields.nextClearBit(0);
+ fieldIndex < variantFields.size();
+ fieldIndex = sampledVariantFields.nextClearBit(fieldIndex + 1)) {
+ trySampleVariantField(fieldIndex, record);
+ }
+ }
+
+ private void trySampleVariantField(int fieldIndex, Record record) {
+ VariantField variantField = variantFields.get(fieldIndex);
+ Object val = safeGet(variantField, record);
+ if (!VariantUtil.isShreddable(val)) {
+ return;
+ }
+ HiveSchemaUtil.setStructField(accumulatedSample, variantField.path(), val);
+ sampledVariantFields.set(fieldIndex);
+ }
+
+ private static Object safeGet(VariantField variantField, Record record) {
+ try {
+ return variantField.accessor().get(record);
+ } catch (RuntimeException e) {
+ // Treat unexpected access failures as "no sample" and keep scanning.
+ return null;
+ }
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ if (buffer != null) {
+ if (abort) {
+ // Don't write anything on abort. Just drop any buffered records.
+ buffer.clear();
+ } else if (!buffer.isEmpty()) {
+ if (!sampleInitialized) {
+ // Use whatever we have accumulated so far
+ fileWriterFactory.initialize(accumulatedSample);
+ }
+ flushBufferedRecords();
+ }
+ }
+ super.close(abort);
+ }
+
+}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
deleted file mode 100644
index 56e3f3480c7..00000000000
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ /dev/null
@@ -1,1524 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.parquet;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.InternalData;
-import org.apache.iceberg.MetricsConfig;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.SchemaParser;
-import org.apache.iceberg.SortOrder;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.SystemConfigs;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.deletes.EqualityDeleteWriter;
-import org.apache.iceberg.deletes.PositionDeleteWriter;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
-import org.apache.iceberg.encryption.EncryptionKeyMetadata;
-import org.apache.iceberg.encryption.NativeEncryptionInputFile;
-import org.apache.iceberg.encryption.NativeEncryptionOutputFile;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.hadoop.HadoopInputFile;
-import org.apache.iceberg.hadoop.HadoopOutputFile;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.DeleteSchemaUtil;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.mapping.NameMapping;
-import
org.apache.iceberg.parquet.ParquetValueWriters.PositionDeleteStructWriter;
-import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
-import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.util.ArrayUtil;
-import org.apache.iceberg.util.ByteBuffers;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.parquet.HadoopReadOptions;
-import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.avro.AvroReadSupport;
-import org.apache.parquet.avro.AvroWriteSupport;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
-import org.apache.parquet.conf.PlainParquetConfiguration;
-import org.apache.parquet.crypto.FileDecryptionProperties;
-import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetOutputFormat;
-import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.hadoop.api.WriteSupport;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type.ID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION;
-import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_COMPRESSION_LEVEL;
-import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_DICT_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_ROW_LIMIT;
-import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_PAGE_SIZE_BYTES;
-import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
-import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
-import static
org.apache.iceberg.TableProperties.DELETE_PARQUET_ROW_GROUP_SIZE_BYTES;
-import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
-import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX;
-import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES;
-import static
org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT;
-import static
org.apache.iceberg.TableProperties.PARQUET_COLUMN_STATS_ENABLED_PREFIX;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
-import static
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
-import static
org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT;
-import static
org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES;
-import static
org.apache.iceberg.TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT;
-import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT;
-import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT;
-import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT;
-import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT;
-import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
-import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
-
-// TODO: remove class once upgraded to Iceberg v1.11.0
(https://github.com/apache/iceberg/pull/14153)
-
-public class Parquet {
- private static final Logger LOG = LoggerFactory.getLogger(Parquet.class);
-
- private Parquet() {
- }
-
- private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
- Sets.newHashSet(
- "parquet.read.filter",
- "parquet.private.read.filter.predicate",
- "parquet.read.support.class",
- "parquet.crypto.factory.class");
-
- public static WriteBuilder write(OutputFile file) {
- if (file instanceof EncryptedOutputFile) {
- return write((EncryptedOutputFile) file);
- }
-
- return new WriteBuilder(file);
- }
-
- public static WriteBuilder write(EncryptedOutputFile file) {
- if (file instanceof NativeEncryptionOutputFile) {
- NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile)
file;
- return write(nativeFile.plainOutputFile())
- .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
- .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
- } else {
- return write(file.encryptingOutputFile());
- }
- }
-
- public static class WriteBuilder implements InternalData.WriteBuilder {
- private final OutputFile file;
- private final Configuration conf;
- private final Map<String, String> metadata = Maps.newLinkedHashMap();
- private final Map<String, String> config = Maps.newLinkedHashMap();
- private Schema schema = null;
- private VariantShreddingFunction variantShreddingFunc = null;
- private String name = "table";
- private WriteSupport<?> writeSupport = null;
- private BiFunction<Schema, MessageType, ParquetValueWriter<?>>
createWriterFunc = null;
- private MetricsConfig metricsConfig = MetricsConfig.getDefault();
- private ParquetFileWriter.Mode writeMode = ParquetFileWriter.Mode.CREATE;
- private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
- private Function<Map<String, String>, Context> createContextFunc =
Context::dataContext;
- private ByteBuffer fileEncryptionKey = null;
- private ByteBuffer fileAADPrefix = null;
-
- private WriteBuilder(OutputFile file) {
- this.file = file;
- if (file instanceof HadoopOutputFile) {
- this.conf = new Configuration(((HadoopOutputFile) file).getConf());
- } else {
- this.conf = new Configuration();
- }
- }
-
- public WriteBuilder forTable(Table table) {
- schema(table.schema());
- setAll(table.properties());
- metricsConfig(MetricsConfig.forTable(table));
- return this;
- }
-
- @Override
- public WriteBuilder schema(Schema newSchema) {
- this.schema = newSchema;
- return this;
- }
-
- /**
- * Set a {@link VariantShreddingFunction} that is called with each variant
field's name and
- * field ID to produce the shredding type as a {@code typed_value} field.
This field is added to
- * the result variant struct alongside the {@code metadata} and {@code
value} fields.
- *
- * @param func {@link VariantShreddingFunction} that produces a shredded
{@code typed_value}
- * @return this for method chaining
- */
- public WriteBuilder variantShreddingFunc(VariantShreddingFunction func) {
- this.variantShreddingFunc = func;
- return this;
- }
-
- @Override
- public WriteBuilder named(String newName) {
- this.name = newName;
- return this;
- }
-
- public WriteBuilder writeSupport(WriteSupport<?> newWriteSupport) {
- this.writeSupport = newWriteSupport;
- return this;
- }
-
- @Override
- public WriteBuilder set(String property, String value) {
- config.put(property, value);
- return this;
- }
-
- public WriteBuilder setAll(Map<String, String> properties) {
- config.putAll(properties);
- return this;
- }
-
- @Override
- public WriteBuilder meta(String property, String value) {
- metadata.put(property, value);
- return this;
- }
-
- public WriteBuilder createWriterFunc(
- Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
- if (newCreateWriterFunc != null) {
- this.createWriterFunc = (icebergSchema, type) ->
newCreateWriterFunc.apply(type);
- }
- return this;
- }
-
- public WriteBuilder createWriterFunc(
- BiFunction<Schema, MessageType, ParquetValueWriter<?>>
newCreateWriterFunc) {
- this.createWriterFunc = newCreateWriterFunc;
- return this;
- }
-
- public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
- this.metricsConfig = newMetricsConfig;
- return this;
- }
-
- @Override
- public WriteBuilder overwrite() {
- return overwrite(true);
- }
-
- public WriteBuilder overwrite(boolean enabled) {
- this.writeMode = enabled ? ParquetFileWriter.Mode.OVERWRITE :
ParquetFileWriter.Mode.CREATE;
- return this;
- }
-
- public WriteBuilder writerVersion(WriterVersion version) {
- this.writerVersion = version;
- return this;
- }
-
- public WriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
- this.fileEncryptionKey = encryptionKey;
- return this;
- }
-
- public WriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
- this.fileAADPrefix = aadPrefix;
- return this;
- }
-
- @SuppressWarnings("unchecked")
- private <T> WriteSupport<T> getWriteSupport(MessageType type) {
- if (writeSupport != null) {
- return (WriteSupport<T>) writeSupport;
- } else {
- return new AvroWriteSupport<>(
- type,
- ParquetAvro.parquetAvroSchema(AvroSchemaUtil.convert(schema,
name)),
- ParquetAvro.DEFAULT_MODEL);
- }
- }
-
- /*
- * Sets the writer version. Default value is PARQUET_1_0 (v1).
- */
- @VisibleForTesting
- WriteBuilder withWriterVersion(WriterVersion version) {
- this.writerVersion = version;
- return this;
- }
-
- // supposed to always be a private method used strictly by data and delete
write builders
- private WriteBuilder createContextFunc(
- Function<Map<String, String>, Context> newCreateContextFunc) {
- this.createContextFunc = newCreateContextFunc;
- return this;
- }
-
- private void setBloomFilterConfig(
- Context context,
- Map<String, String> colNameToParquetPathMap,
- BiConsumer<String, Boolean> withBloomFilterEnabled,
- BiConsumer<String, Double> withBloomFilterFPP) {
-
- context
- .columnBloomFilterEnabled()
- .forEach(
- (colPath, isEnabled) -> {
- String parquetColumnPath =
colNameToParquetPathMap.get(colPath);
- if (parquetColumnPath == null) {
- LOG.warn("Skipping bloom filter config for missing field:
{}", colPath);
- return;
- }
-
- withBloomFilterEnabled.accept(parquetColumnPath,
Boolean.valueOf(isEnabled));
- String fpp = context.columnBloomFilterFpp().get(colPath);
- if (fpp != null) {
- withBloomFilterFPP.accept(parquetColumnPath,
Double.parseDouble(fpp));
- }
- });
- }
-
- private void setColumnStatsConfig(
- Context context,
- Map<String, String> colNameToParquetPathMap,
- BiConsumer<String, Boolean> withColumnStatsEnabled) {
-
- context
- .columnStatsEnabled()
- .forEach(
- (colPath, isEnabled) -> {
- String parquetColumnPath =
colNameToParquetPathMap.get(colPath);
- if (parquetColumnPath == null) {
- LOG.warn("Skipping column statistics config for missing
field: {}", colPath);
- return;
- }
- withColumnStatsEnabled.accept(parquetColumnPath,
Boolean.valueOf(isEnabled));
- });
- }
-
- @Override
- public <D> FileAppender<D> build() throws IOException {
- Preconditions.checkNotNull(schema, "Schema is required");
- Preconditions.checkNotNull(name, "Table name is required and cannot be
null");
-
- // add the Iceberg schema to keyValueMetadata
- meta("iceberg.schema", SchemaParser.toJson(schema));
-
- // Map Iceberg properties to pass down to the Parquet writer
- Context context = createContextFunc.apply(config);
-
- int rowGroupSize = context.rowGroupSize();
- int pageSize = context.pageSize();
- int pageRowLimit = context.pageRowLimit();
- int dictionaryPageSize = context.dictionaryPageSize();
- String compressionLevel = context.compressionLevel();
- CompressionCodecName codec = context.codec();
- int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
- int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
- int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
- boolean dictionaryEnabled = context.dictionaryEnabled();
-
- if (compressionLevel != null) {
- switch (codec) {
- case GZIP:
- config.put("zlib.compress.level", compressionLevel);
- break;
- case BROTLI:
- config.put("compression.brotli.quality", compressionLevel);
- break;
- case ZSTD:
- // keep "io.compression.codec.zstd.level" for backwards
compatibility
- config.put("io.compression.codec.zstd.level", compressionLevel);
- config.put("parquet.compression.codec.zstd.level",
compressionLevel);
- break;
- default:
- // compression level is not supported; ignore it
- }
- }
-
- set("parquet.avro.write-old-list-structure", "false");
- MessageType type = ParquetSchemaUtil.convert(schema, name,
variantShreddingFunc);
-
- FileEncryptionProperties fileEncryptionProperties = null;
- if (fileEncryptionKey != null) {
- byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
- byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
-
- fileEncryptionProperties =
- FileEncryptionProperties.builder(encryptionKeyArray)
- .withAADPrefix(aadPrefixArray)
- .withoutAADPrefixStorage()
- .build();
- } else {
- Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with
null encryption key");
- }
-
- Map<String, String> colNameToParquetPathMap =
- type.getColumns().stream()
- .filter(
- col -> {
- ID id = col.getPrimitiveType().getId();
- return id != null && schema.findColumnName(id.intValue())
!= null;
- })
- .collect(
- Collectors.toMap(
- col ->
schema.findColumnName(col.getPrimitiveType().getId().intValue()),
- col -> String.join(".", col.getPath())));
-
- if (createWriterFunc != null) {
- Preconditions.checkArgument(
- writeSupport == null, "Cannot write with both write support and
Parquet value writer");
-
- for (Map.Entry<String, String> entry : config.entrySet()) {
- conf.set(entry.getKey(), entry.getValue());
- }
-
- ParquetProperties.Builder propsBuilder =
- ParquetProperties.builder()
- .withWriterVersion(writerVersion)
- .withPageSize(pageSize)
- .withPageRowCountLimit(pageRowLimit)
- .withDictionaryEncoding(dictionaryEnabled)
- .withDictionaryPageSize(dictionaryPageSize)
- .withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
- .withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
- .withMaxBloomFilterBytes(bloomFilterMaxBytes);
-
- setBloomFilterConfig(
- context,
- colNameToParquetPathMap,
- propsBuilder::withBloomFilterEnabled,
- propsBuilder::withBloomFilterFPP);
-
- setColumnStatsConfig(context, colNameToParquetPathMap,
propsBuilder::withStatisticsEnabled);
-
- ParquetProperties parquetProperties = propsBuilder.build();
-
- return new org.apache.iceberg.parquet.ParquetWriter<>(
- conf,
- file,
- schema,
- type,
- rowGroupSize,
- metadata,
- createWriterFunc,
- codec,
- parquetProperties,
- metricsConfig,
- writeMode,
- fileEncryptionProperties);
- } else {
- ParquetWriteBuilder<D> parquetWriteBuilder =
- new ParquetWriteBuilder<D>(ParquetIO.file(file))
- .withWriterVersion(writerVersion)
- .setType(type)
- .setConfig(config)
- .setKeyValueMetadata(metadata)
- .setWriteSupport(getWriteSupport(type))
- .withCompressionCodec(codec)
- .withWriteMode(writeMode)
- .withRowGroupSize(rowGroupSize)
- .withPageSize(pageSize)
- .withPageRowCountLimit(pageRowLimit)
- .withDictionaryEncoding(dictionaryEnabled)
- .withDictionaryPageSize(dictionaryPageSize)
- .withEncryption(fileEncryptionProperties);
-
- setBloomFilterConfig(
- context,
- colNameToParquetPathMap,
- parquetWriteBuilder::withBloomFilterEnabled,
- parquetWriteBuilder::withBloomFilterFPP);
-
- setColumnStatsConfig(
- context, colNameToParquetPathMap,
parquetWriteBuilder::withStatisticsEnabled);
-
- return new ParquetWriteAdapter<>(parquetWriteBuilder.build(),
metricsConfig);
- }
- }
-
- private static class Context {
- private final int rowGroupSize;
- private final int pageSize;
- private final int pageRowLimit;
- private final int dictionaryPageSize;
- private final CompressionCodecName codec;
- private final String compressionLevel;
- private final int rowGroupCheckMinRecordCount;
- private final int rowGroupCheckMaxRecordCount;
- private final int bloomFilterMaxBytes;
- private final Map<String, String> columnBloomFilterFpp;
- private final Map<String, String> columnBloomFilterEnabled;
- private final Map<String, String> columnStatsEnabled;
- private final boolean dictionaryEnabled;
-
- private Context(
- int rowGroupSize,
- int pageSize,
- int pageRowLimit,
- int dictionaryPageSize,
- CompressionCodecName codec,
- String compressionLevel,
- int rowGroupCheckMinRecordCount,
- int rowGroupCheckMaxRecordCount,
- int bloomFilterMaxBytes,
- Map<String, String> columnBloomFilterFpp,
- Map<String, String> columnBloomFilterEnabled,
- Map<String, String> columnStatsEnabled,
- boolean dictionaryEnabled) {
- this.rowGroupSize = rowGroupSize;
- this.pageSize = pageSize;
- this.pageRowLimit = pageRowLimit;
- this.dictionaryPageSize = dictionaryPageSize;
- this.codec = codec;
- this.compressionLevel = compressionLevel;
- this.rowGroupCheckMinRecordCount = rowGroupCheckMinRecordCount;
- this.rowGroupCheckMaxRecordCount = rowGroupCheckMaxRecordCount;
- this.bloomFilterMaxBytes = bloomFilterMaxBytes;
- this.columnBloomFilterFpp = columnBloomFilterFpp;
- this.columnBloomFilterEnabled = columnBloomFilterEnabled;
- this.columnStatsEnabled = columnStatsEnabled;
- this.dictionaryEnabled = dictionaryEnabled;
- }
-
- static Context dataContext(Map<String, String> config) {
- int rowGroupSize =
- PropertyUtil.propertyAsInt(
- config, PARQUET_ROW_GROUP_SIZE_BYTES,
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
- Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be
> 0");
-
- int pageSize =
- PropertyUtil.propertyAsInt(
- config, PARQUET_PAGE_SIZE_BYTES,
PARQUET_PAGE_SIZE_BYTES_DEFAULT);
- Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
-
- int pageRowLimit =
- PropertyUtil.propertyAsInt(
- config, PARQUET_PAGE_ROW_LIMIT,
PARQUET_PAGE_ROW_LIMIT_DEFAULT);
- Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit
must be > 0");
-
- int dictionaryPageSize =
- PropertyUtil.propertyAsInt(
- config, PARQUET_DICT_SIZE_BYTES,
PARQUET_DICT_SIZE_BYTES_DEFAULT);
- Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page
size must be > 0");
-
- String codecAsString =
- config.getOrDefault(PARQUET_COMPRESSION,
PARQUET_COMPRESSION_DEFAULT);
- CompressionCodecName codec = toCodec(codecAsString);
-
- String compressionLevel =
- config.getOrDefault(PARQUET_COMPRESSION_LEVEL,
PARQUET_COMPRESSION_LEVEL_DEFAULT);
-
- int rowGroupCheckMinRecordCount =
- PropertyUtil.propertyAsInt(
- config,
- PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT,
- PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT_DEFAULT);
- Preconditions.checkArgument(
- rowGroupCheckMinRecordCount > 0, "Row group check minimal record
count must be > 0");
-
- int rowGroupCheckMaxRecordCount =
- PropertyUtil.propertyAsInt(
- config,
- PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT,
- PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT_DEFAULT);
- Preconditions.checkArgument(
- rowGroupCheckMaxRecordCount > 0, "Row group check maximum record
count must be > 0");
- Preconditions.checkArgument(
- rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
- "Row group check maximum record count must be >= minimal record
count");
-
- int bloomFilterMaxBytes =
- PropertyUtil.propertyAsInt(
- config, PARQUET_BLOOM_FILTER_MAX_BYTES,
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT);
- Preconditions.checkArgument(bloomFilterMaxBytes > 0, "bloom Filter Max
Bytes must be > 0");
-
- Map<String, String> columnBloomFilterFpp =
- PropertyUtil.propertiesWithPrefix(config,
PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX);
-
- Map<String, String> columnBloomFilterEnabled =
- PropertyUtil.propertiesWithPrefix(config,
PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX);
-
- Map<String, String> columnStatsEnabled =
- PropertyUtil.propertiesWithPrefix(config,
PARQUET_COLUMN_STATS_ENABLED_PREFIX);
-
- boolean dictionaryEnabled =
- PropertyUtil.propertyAsBoolean(config,
ParquetOutputFormat.ENABLE_DICTIONARY, true);
-
- return new Context(
- rowGroupSize,
- pageSize,
- pageRowLimit,
- dictionaryPageSize,
- codec,
- compressionLevel,
- rowGroupCheckMinRecordCount,
- rowGroupCheckMaxRecordCount,
- bloomFilterMaxBytes,
- columnBloomFilterFpp,
- columnBloomFilterEnabled,
- columnStatsEnabled,
- dictionaryEnabled);
- }
-
- static Context deleteContext(Map<String, String> config) {
- // default delete config using data config
- Context dataContext = dataContext(config);
-
- int rowGroupSize =
- PropertyUtil.propertyAsInt(
- config, DELETE_PARQUET_ROW_GROUP_SIZE_BYTES,
dataContext.rowGroupSize());
- Preconditions.checkArgument(rowGroupSize > 0, "Row group size must be
> 0");
-
- int pageSize =
- PropertyUtil.propertyAsInt(
- config, DELETE_PARQUET_PAGE_SIZE_BYTES,
dataContext.pageSize());
- Preconditions.checkArgument(pageSize > 0, "Page size must be > 0");
-
- int pageRowLimit =
- PropertyUtil.propertyAsInt(
- config, DELETE_PARQUET_PAGE_ROW_LIMIT,
dataContext.pageRowLimit());
- Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit
must be > 0");
-
- int dictionaryPageSize =
- PropertyUtil.propertyAsInt(
- config, DELETE_PARQUET_DICT_SIZE_BYTES,
dataContext.dictionaryPageSize());
- Preconditions.checkArgument(dictionaryPageSize > 0, "Dictionary page
size must be > 0");
-
- String codecAsString = config.get(DELETE_PARQUET_COMPRESSION);
- CompressionCodecName codec =
- codecAsString != null ? toCodec(codecAsString) :
dataContext.codec();
-
- String compressionLevel =
- config.getOrDefault(DELETE_PARQUET_COMPRESSION_LEVEL,
dataContext.compressionLevel());
-
- int rowGroupCheckMinRecordCount =
- PropertyUtil.propertyAsInt(
- config,
- DELETE_PARQUET_ROW_GROUP_CHECK_MIN_RECORD_COUNT,
- dataContext.rowGroupCheckMinRecordCount());
- Preconditions.checkArgument(
- rowGroupCheckMinRecordCount > 0, "Row group check minimal record
count must be > 0");
-
- int rowGroupCheckMaxRecordCount =
- PropertyUtil.propertyAsInt(
- config,
- DELETE_PARQUET_ROW_GROUP_CHECK_MAX_RECORD_COUNT,
- dataContext.rowGroupCheckMaxRecordCount());
- Preconditions.checkArgument(
- rowGroupCheckMaxRecordCount > 0, "Row group check maximum record
count must be > 0");
- Preconditions.checkArgument(
- rowGroupCheckMaxRecordCount >= rowGroupCheckMinRecordCount,
- "Row group check maximum record count must be >= minimal record
count");
-
- boolean dictionaryEnabled =
- PropertyUtil.propertyAsBoolean(config,
ParquetOutputFormat.ENABLE_DICTIONARY, true);
-
- return new Context(
- rowGroupSize,
- pageSize,
- pageRowLimit,
- dictionaryPageSize,
- codec,
- compressionLevel,
- rowGroupCheckMinRecordCount,
- rowGroupCheckMaxRecordCount,
- PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT,
- ImmutableMap.of(),
- ImmutableMap.of(),
- ImmutableMap.of(),
- dictionaryEnabled);
- }
-
- private static CompressionCodecName toCodec(String codecAsString) {
- try {
- return
CompressionCodecName.valueOf(codecAsString.toUpperCase(Locale.ENGLISH));
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Unsupported compression codec: "
+ codecAsString);
- }
- }
-
- int rowGroupSize() {
- return rowGroupSize;
- }
-
- int pageSize() {
- return pageSize;
- }
-
- int pageRowLimit() {
- return pageRowLimit;
- }
-
- int dictionaryPageSize() {
- return dictionaryPageSize;
- }
-
- CompressionCodecName codec() {
- return codec;
- }
-
- String compressionLevel() {
- return compressionLevel;
- }
-
- int rowGroupCheckMinRecordCount() {
- return rowGroupCheckMinRecordCount;
- }
-
- int rowGroupCheckMaxRecordCount() {
- return rowGroupCheckMaxRecordCount;
- }
-
- int bloomFilterMaxBytes() {
- return bloomFilterMaxBytes;
- }
-
- Map<String, String> columnBloomFilterFpp() {
- return columnBloomFilterFpp;
- }
-
- Map<String, String> columnBloomFilterEnabled() {
- return columnBloomFilterEnabled;
- }
-
- Map<String, String> columnStatsEnabled() {
- return columnStatsEnabled;
- }
-
- boolean dictionaryEnabled() {
- return dictionaryEnabled;
- }
- }
- }
-
- public static DataWriteBuilder writeData(OutputFile file) {
- return new DataWriteBuilder(file);
- }
-
- public static DataWriteBuilder writeData(EncryptedOutputFile file) {
- if (file instanceof NativeEncryptionOutputFile) {
- NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile)
file;
- return writeData(nativeFile.plainOutputFile())
- .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
- .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
- } else {
- return writeData(file.encryptingOutputFile());
- }
- }
-
- public static class DataWriteBuilder {
- private final WriteBuilder appenderBuilder;
- private final String location;
- private PartitionSpec spec = null;
- private StructLike partition = null;
- private EncryptionKeyMetadata keyMetadata = null;
- private SortOrder sortOrder = null;
-
- private DataWriteBuilder(OutputFile file) {
- this.appenderBuilder = write(file);
- this.location = file.location();
- }
-
- public DataWriteBuilder forTable(Table table) {
- schema(table.schema());
- withSpec(table.spec());
- setAll(table.properties());
- metricsConfig(MetricsConfig.forTable(table));
- return this;
- }
-
- public DataWriteBuilder schema(Schema newSchema) {
- appenderBuilder.schema(newSchema);
- return this;
- }
-
- public DataWriteBuilder set(String property, String value) {
- appenderBuilder.set(property, value);
- return this;
- }
-
- public DataWriteBuilder setAll(Map<String, String> properties) {
- appenderBuilder.setAll(properties);
- return this;
- }
-
- public DataWriteBuilder meta(String property, String value) {
- appenderBuilder.meta(property, value);
- return this;
- }
-
- public DataWriteBuilder overwrite() {
- return overwrite(true);
- }
-
- public DataWriteBuilder overwrite(boolean enabled) {
- appenderBuilder.overwrite(enabled);
- return this;
- }
-
- public DataWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
- appenderBuilder.metricsConfig(newMetricsConfig);
- return this;
- }
-
- public DataWriteBuilder createWriterFunc(
- Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
- appenderBuilder.createWriterFunc(newCreateWriterFunc);
- return this;
- }
-
- public DataWriteBuilder createWriterFunc(
- BiFunction<Schema, MessageType, ParquetValueWriter<?>>
newCreateWriterFunc) {
- appenderBuilder.createWriterFunc(newCreateWriterFunc);
- return this;
- }
-
- public DataWriteBuilder variantShreddingFunc(VariantShreddingFunction
func) {
- appenderBuilder.variantShreddingFunc(func);
- return this;
- }
-
- public DataWriteBuilder withSpec(PartitionSpec newSpec) {
- this.spec = newSpec;
- return this;
- }
-
- public DataWriteBuilder withPartition(StructLike newPartition) {
- this.partition = newPartition;
- return this;
- }
-
- public DataWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
- this.keyMetadata = metadata;
- return this;
- }
-
- public DataWriteBuilder withFileEncryptionKey(ByteBuffer
fileEncryptionKey) {
- appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
- return this;
- }
-
- public DataWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
- appenderBuilder.withAADPrefix(aadPrefix);
- return this;
- }
-
- public DataWriteBuilder withSortOrder(SortOrder newSortOrder) {
- this.sortOrder = newSortOrder;
- return this;
- }
-
- public <T> DataWriter<T> build() throws IOException {
- Preconditions.checkArgument(spec != null, "Cannot create data writer
without spec");
- Preconditions.checkArgument(
- spec.isUnpartitioned() || partition != null,
- "Partition must not be null when creating data writer for
partitioned spec");
-
- FileAppender<T> fileAppender = appenderBuilder.build();
- return new DataWriter<>(
- fileAppender, FileFormat.PARQUET, location, spec, partition,
keyMetadata, sortOrder);
- }
- }
-
- public static DeleteWriteBuilder writeDeletes(OutputFile file) {
- return new DeleteWriteBuilder(file);
- }
-
- public static DeleteWriteBuilder writeDeletes(EncryptedOutputFile file) {
- if (file instanceof NativeEncryptionOutputFile) {
- NativeEncryptionOutputFile nativeFile = (NativeEncryptionOutputFile)
file;
- return writeDeletes(nativeFile.plainOutputFile())
- .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
- .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
- } else {
- return writeDeletes(file.encryptingOutputFile());
- }
- }
-
- public static class DeleteWriteBuilder {
- private final WriteBuilder appenderBuilder;
- private final String location;
- private BiFunction<Schema, MessageType, ParquetValueWriter<?>>
createWriterFunc = null;
- private Schema rowSchema = null;
- private PartitionSpec spec = null;
- private StructLike partition = null;
- private EncryptionKeyMetadata keyMetadata = null;
- private int[] equalityFieldIds = null;
- private SortOrder sortOrder;
- private Function<CharSequence, ?> pathTransformFunc = Function.identity();
-
- private DeleteWriteBuilder(OutputFile file) {
- this.appenderBuilder = write(file);
- this.location = file.location();
- }
-
- public DeleteWriteBuilder forTable(Table table) {
- rowSchema(table.schema());
- withSpec(table.spec());
- setAll(table.properties());
- metricsConfig(MetricsConfig.forTable(table));
- return this;
- }
-
- public DeleteWriteBuilder set(String property, String value) {
- appenderBuilder.set(property, value);
- return this;
- }
-
- public DeleteWriteBuilder setAll(Map<String, String> properties) {
- appenderBuilder.setAll(properties);
- return this;
- }
-
- public DeleteWriteBuilder meta(String property, String value) {
- appenderBuilder.meta(property, value);
- return this;
- }
-
- public DeleteWriteBuilder overwrite() {
- return overwrite(true);
- }
-
- public DeleteWriteBuilder overwrite(boolean enabled) {
- appenderBuilder.overwrite(enabled);
- return this;
- }
-
- public DeleteWriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
- appenderBuilder.metricsConfig(newMetricsConfig);
- return this;
- }
-
- public DeleteWriteBuilder createWriterFunc(
- Function<MessageType, ParquetValueWriter<?>> newCreateWriterFunc) {
- this.createWriterFunc = (ignored, fileSchema) ->
newCreateWriterFunc.apply(fileSchema);
- return this;
- }
-
- public DeleteWriteBuilder createWriterFunc(
- BiFunction<Schema, MessageType, ParquetValueWriter<?>>
newCreateWriterFunc) {
- this.createWriterFunc = newCreateWriterFunc;
- return this;
- }
-
- public DeleteWriteBuilder rowSchema(Schema newSchema) {
- this.rowSchema = newSchema;
- return this;
- }
-
- public DeleteWriteBuilder withSpec(PartitionSpec newSpec) {
- this.spec = newSpec;
- return this;
- }
-
- public DeleteWriteBuilder withPartition(StructLike key) {
- this.partition = key;
- return this;
- }
-
- public DeleteWriteBuilder withKeyMetadata(EncryptionKeyMetadata metadata) {
- this.keyMetadata = metadata;
- return this;
- }
-
- public DeleteWriteBuilder withFileEncryptionKey(ByteBuffer
fileEncryptionKey) {
- appenderBuilder.withFileEncryptionKey(fileEncryptionKey);
- return this;
- }
-
- public DeleteWriteBuilder withAADPrefix(ByteBuffer aadPrefix) {
- appenderBuilder.withAADPrefix(aadPrefix);
- return this;
- }
-
- public DeleteWriteBuilder equalityFieldIds(List<Integer> fieldIds) {
- this.equalityFieldIds = ArrayUtil.toIntArray(fieldIds);
- return this;
- }
-
- public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
- this.equalityFieldIds = fieldIds;
- return this;
- }
-
- public DeleteWriteBuilder transformPaths(Function<CharSequence, ?>
newPathTransformFunc) {
- this.pathTransformFunc = newPathTransformFunc;
- return this;
- }
-
- public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
- this.sortOrder = newSortOrder;
- return this;
- }
-
- public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws
IOException {
- Preconditions.checkState(
- rowSchema != null, "Cannot create equality delete file without a
schema");
- Preconditions.checkState(
- equalityFieldIds != null, "Cannot create equality delete file
without delete field ids");
- Preconditions.checkState(
- createWriterFunc != null,
- "Cannot create equality delete file unless createWriterFunc is set");
- Preconditions.checkArgument(
- spec != null, "Spec must not be null when creating equality delete
writer");
- Preconditions.checkArgument(
- spec.isUnpartitioned() || partition != null,
- "Partition must not be null for partitioned writes");
-
- meta("delete-type", "equality");
- meta(
- "delete-field-ids",
- IntStream.of(equalityFieldIds)
- .mapToObj(Objects::toString)
- .collect(Collectors.joining(", ")));
-
- // the appender uses the row schema without extra columns
- appenderBuilder.schema(rowSchema);
- appenderBuilder.createWriterFunc(createWriterFunc);
- appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
-
- return new EqualityDeleteWriter<>(
- appenderBuilder.build(),
- FileFormat.PARQUET,
- location,
- spec,
- partition,
- keyMetadata,
- sortOrder,
- equalityFieldIds);
- }
-
- public <T> PositionDeleteWriter<T> buildPositionWriter() throws
IOException {
- Preconditions.checkState(
- equalityFieldIds == null, "Cannot create position delete file using
delete field ids");
- Preconditions.checkArgument(
- spec != null, "Spec must not be null when creating position delete
writer");
- Preconditions.checkArgument(
- spec.isUnpartitioned() || partition != null,
- "Partition must not be null for partitioned writes");
- Preconditions.checkArgument(
- rowSchema == null || createWriterFunc != null,
- "Create function should be provided if we write row data");
-
- meta("delete-type", "position");
-
- if (rowSchema != null && createWriterFunc != null) {
- // the appender uses the row schema wrapped with position fields
- appenderBuilder.schema(DeleteSchemaUtil.posDeleteSchema(rowSchema));
-
- appenderBuilder.createWriterFunc(
- (schema, parquetSchema) -> {
- ParquetValueWriter<?> writer = createWriterFunc.apply(schema,
parquetSchema);
- if (writer instanceof StructWriter) {
- return new PositionDeleteStructWriter<T>(
- (StructWriter<?>) writer, pathTransformFunc);
- } else {
- throw new UnsupportedOperationException(
- "Cannot wrap writer for position deletes: " +
writer.getClass());
- }
- });
-
- } else {
- appenderBuilder.schema(DeleteSchemaUtil.pathPosSchema());
-
- // We ignore the 'createWriterFunc' and 'rowSchema' even if is
provided, since we do not
- // write row data itself
- appenderBuilder.createWriterFunc(
- (schema, parquetSchema) ->
- new PositionDeleteStructWriter<T>(
- (StructWriter<?>) GenericParquetWriter.create(schema,
parquetSchema),
- Function.identity()));
- }
-
- appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext);
-
- return new PositionDeleteWriter<>(
- appenderBuilder.build(), FileFormat.PARQUET, location, spec,
partition, keyMetadata);
- }
- }
-
- private static class ParquetWriteBuilder<T>
- extends ParquetWriter.Builder<T, ParquetWriteBuilder<T>> {
- private Map<String, String> keyValueMetadata = Maps.newHashMap();
- private Map<String, String> config = Maps.newHashMap();
- private MessageType type;
- private WriteSupport<T> writeSupport;
-
- private ParquetWriteBuilder(org.apache.parquet.io.OutputFile path) {
- super(path);
- }
-
- @Override
- protected ParquetWriteBuilder<T> self() {
- return this;
- }
-
- public ParquetWriteBuilder<T> setKeyValueMetadata(Map<String, String>
keyValueMetadata) {
- this.keyValueMetadata = keyValueMetadata;
- return self();
- }
-
- public ParquetWriteBuilder<T> setConfig(Map<String, String> config) {
- this.config = config;
- return self();
- }
-
- public ParquetWriteBuilder<T> setType(MessageType type) {
- this.type = type;
- return self();
- }
-
- public ParquetWriteBuilder<T> setWriteSupport(WriteSupport<T>
writeSupport) {
- this.writeSupport = writeSupport;
- return self();
- }
-
- @Override
- protected WriteSupport<T> getWriteSupport(Configuration configuration) {
- for (Map.Entry<String, String> entry : config.entrySet()) {
- configuration.set(entry.getKey(), entry.getValue());
- }
- return new ParquetWriteSupport<>(type, keyValueMetadata, writeSupport);
- }
- }
-
- public static ReadBuilder read(InputFile file) {
- if (file instanceof NativeEncryptionInputFile) {
- NativeEncryptionInputFile nativeFile = (NativeEncryptionInputFile) file;
- return new ReadBuilder(nativeFile.encryptedInputFile())
- .withFileEncryptionKey(nativeFile.keyMetadata().encryptionKey())
- .withAADPrefix(nativeFile.keyMetadata().aadPrefix());
- } else {
- return new ReadBuilder(file);
- }
- }
-
- public static class ReadBuilder implements InternalData.ReadBuilder {
- private final InputFile file;
- private final Map<String, String> properties = Maps.newHashMap();
- private Long start = null;
- private Long length = null;
- private Schema schema = null;
- private Expression filter = null;
- private ReadSupport<?> readSupport = null;
- private Function<MessageType, VectorizedReader<?>> batchedReaderFunc =
null;
- private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
- private BiFunction<Schema, MessageType, ParquetValueReader<?>>
readerFuncWithSchema = null;
- private boolean filterRecords = true;
- private boolean caseSensitive = true;
- private boolean callInit = false;
- private boolean reuseContainers = false;
- private int maxRecordsPerBatch = 10000;
- private NameMapping nameMapping = null;
- private ByteBuffer fileEncryptionKey = null;
- private ByteBuffer fileAADPrefix = null;
-
- private ReadBuilder(InputFile file) {
- this.file = file;
- }
-
- /**
- * Restricts the read to the given range: [start, start + length).
- *
- * @param newStart the start position for this read
- * @param newLength the length of the range this read should scan
- * @return this builder for method chaining
- */
- @Override
- public ReadBuilder split(long newStart, long newLength) {
- this.start = newStart;
- this.length = newLength;
- return this;
- }
-
- @Override
- public ReadBuilder project(Schema newSchema) {
- this.schema = newSchema;
- return this;
- }
-
- public ReadBuilder caseInsensitive() {
- return caseSensitive(false);
- }
-
- public ReadBuilder caseSensitive(boolean newCaseSensitive) {
- this.caseSensitive = newCaseSensitive;
- return this;
- }
-
- public ReadBuilder filterRecords(boolean newFilterRecords) {
- this.filterRecords = newFilterRecords;
- return this;
- }
-
- public ReadBuilder filter(Expression newFilter) {
- this.filter = newFilter;
- return this;
- }
-
- /**
- * @deprecated will be removed in 2.0.0; use {@link
#createReaderFunc(Function)} instead
- */
- @Deprecated
- public ReadBuilder readSupport(ReadSupport<?> newFilterSupport) {
- this.readSupport = newFilterSupport;
- return this;
- }
-
- public ReadBuilder createReaderFunc(
- Function<MessageType, ParquetValueReader<?>> newReaderFunction) {
- Preconditions.checkArgument(
- this.batchedReaderFunc == null,
- "Cannot set reader function: batched reader function already set");
- Preconditions.checkArgument(
- this.readerFuncWithSchema == null,
- "Cannot set reader function: 2-argument reader function already
set");
- this.readerFunc = newReaderFunction;
- return this;
- }
-
- public ReadBuilder createReaderFunc(
- BiFunction<Schema, MessageType, ParquetValueReader<?>>
newReaderFunction) {
- Preconditions.checkArgument(
- this.readerFunc == null,
- "Cannot set 2-argument reader function: reader function already
set");
- Preconditions.checkArgument(
- this.batchedReaderFunc == null,
- "Cannot set 2-argument reader function: batched reader function
already set");
- this.readerFuncWithSchema = newReaderFunction;
- return this;
- }
-
- public ReadBuilder createBatchedReaderFunc(Function<MessageType,
VectorizedReader<?>> func) {
- Preconditions.checkArgument(
- this.readerFunc == null,
- "Cannot set batched reader function: reader function already set");
- Preconditions.checkArgument(
- this.readerFuncWithSchema == null,
- "Cannot set batched reader function: 2-argument reader function
already set");
- this.batchedReaderFunc = func;
- return this;
- }
-
- public ReadBuilder set(String key, String value) {
- properties.put(key, value);
- return this;
- }
-
- /**
- * @deprecated will be removed in 2.0.0; use {@link
#createReaderFunc(Function)} instead
- */
- @Deprecated
- public ReadBuilder callInit() {
- this.callInit = true;
- return this;
- }
-
- @Override
- public ReadBuilder reuseContainers() {
- this.reuseContainers = true;
- return this;
- }
-
- public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
- this.maxRecordsPerBatch = numRowsPerBatch;
- return this;
- }
-
- public ReadBuilder withNameMapping(NameMapping newNameMapping) {
- this.nameMapping = newNameMapping;
- return this;
- }
-
- @Override
- public ReadBuilder setRootType(Class<? extends StructLike> rootClass) {
- throw new UnsupportedOperationException("Custom types are not yet
supported");
- }
-
- @Override
- public ReadBuilder setCustomType(int fieldId, Class<? extends StructLike>
structClass) {
- throw new UnsupportedOperationException("Custom types are not yet
supported");
- }
-
- public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) {
- this.fileEncryptionKey = encryptionKey;
- return this;
- }
-
- public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
- this.fileAADPrefix = aadPrefix;
- return this;
- }
-
- @Override
- @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
- public <D> CloseableIterable<D> build() {
- FileDecryptionProperties fileDecryptionProperties = null;
- if (fileEncryptionKey != null) {
- byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
- byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
- fileDecryptionProperties =
- FileDecryptionProperties.builder()
- .withFooterKey(encryptionKeyArray)
- .withAADPrefix(aadPrefixArray)
- .build();
- } else {
- Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with
null encryption key");
- }
-
- if (readerFunc != null || readerFuncWithSchema != null ||
batchedReaderFunc != null) {
- ParquetReadOptions.Builder optionsBuilder;
- if (file instanceof HadoopInputFile) {
- // remove read properties already set that may conflict with this
read
- Configuration conf = new Configuration(((HadoopInputFile)
file).getConf());
- for (String property : READ_PROPERTIES_TO_REMOVE) {
- conf.unset(property);
- }
- optionsBuilder = HadoopReadOptions.builder(conf);
- } else {
- optionsBuilder = ParquetReadOptions.builder(new
PlainParquetConfiguration());
- }
-
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- optionsBuilder.set(entry.getKey(), entry.getValue());
- }
-
- if (start != null) {
- optionsBuilder.withRange(start, start + length);
- }
-
- if (fileDecryptionProperties != null) {
- optionsBuilder.withDecryption(fileDecryptionProperties);
- }
-
- ParquetReadOptions options = optionsBuilder.build();
-
- NameMapping mapping;
- if (nameMapping != null) {
- mapping = nameMapping;
- } else if
(SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) {
- mapping = null;
- } else {
- mapping = NameMapping.empty();
- }
-
- if (batchedReaderFunc != null) {
- return new VectorizedParquetReader<>(
- file,
- schema,
- options,
- batchedReaderFunc,
- mapping,
- filter,
- reuseContainers,
- caseSensitive,
- maxRecordsPerBatch);
- } else {
- Function<MessageType, ParquetValueReader<?>> readBuilder =
- readerFuncWithSchema != null ?
- fileType -> readerFuncWithSchema.apply(schema, fileType) :
- readerFunc;
- return new org.apache.iceberg.parquet.ParquetReader<>(
- file, schema, options, readBuilder, mapping, filter,
reuseContainers, caseSensitive);
- }
- }
-
- ParquetReadBuilder<D> builder = new
ParquetReadBuilder<>(ParquetIO.file(file));
-
- builder.project(schema);
-
- if (readSupport != null) {
- builder.readSupport((ReadSupport<D>) readSupport);
- } else {
- builder.readSupport(new AvroReadSupport<>(ParquetAvro.DEFAULT_MODEL));
- }
-
- // default options for readers
- builder
- .set("parquet.strict.typing", "false") // allow type promotion
- .set("parquet.avro.compatible", "false") // use the new RecordReader
with Utf8 support
- .set(
- "parquet.avro.add-list-element-records",
- "false"); // assume that lists use a 3-level schema
-
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- builder.set(entry.getKey(), entry.getValue());
- }
-
- if (filter != null) {
- // TODO: should not need to get the schema to push down before opening
the file.
- // Parquet should allow setting a filter inside its read support
- ParquetReadOptions decryptOptions =
- ParquetReadOptions.builder(new PlainParquetConfiguration())
- .withDecryption(fileDecryptionProperties)
- .build();
- MessageType type;
- try (ParquetFileReader schemaReader =
- ParquetFileReader.open(ParquetIO.file(file), decryptOptions))
{
- type = schemaReader.getFileMetaData().getSchema();
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
- Schema fileSchema = ParquetSchemaUtil.convert(type);
- builder
- .useStatsFilter()
- .useDictionaryFilter()
- .useRecordFilter(filterRecords)
- .useBloomFilter()
- .withFilter(ParquetFilters.convert(fileSchema, filter,
caseSensitive));
- } else {
- // turn off filtering
- builder
- .useStatsFilter(false)
- .useDictionaryFilter(false)
- .useBloomFilter(false)
- .useRecordFilter(false);
- }
-
- if (callInit) {
- builder.callInit();
- }
-
- if (start != null) {
- builder.withFileRange(start, start + length);
- }
-
- if (nameMapping != null) {
- builder.withNameMapping(nameMapping);
- }
-
- if (fileDecryptionProperties != null) {
- builder.withDecryption(fileDecryptionProperties);
- }
-
- return new ParquetIterable<>(builder);
- }
- }
-
- private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
- private Schema schema = null;
- private ReadSupport<T> readSupport = null;
- private boolean callInit = false;
- private NameMapping nameMapping = null;
-
- private ParquetReadBuilder(org.apache.parquet.io.InputFile file) {
- super(file);
- }
-
- public ParquetReadBuilder<T> project(Schema newSchema) {
- this.schema = newSchema;
- return this;
- }
-
- public ParquetReadBuilder<T> withNameMapping(NameMapping newNameMapping) {
- this.nameMapping = newNameMapping;
- return this;
- }
-
- public ParquetReadBuilder<T> readSupport(ReadSupport<T> newReadSupport) {
- this.readSupport = newReadSupport;
- return this;
- }
-
- public ParquetReadBuilder<T> callInit() {
- this.callInit = true;
- return this;
- }
-
- @Override
- protected ReadSupport<T> getReadSupport() {
- return new ParquetReadSupport<>(schema, readSupport, callInit,
nameMapping);
- }
- }
-
- /**
- * Combines several files into one
- *
- * @param inputFiles an {@link Iterable} of parquet files. The order of
iteration determines the
- * order in which content of files are read and written to the {@code
outputFile}
- * @param outputFile the output parquet file containing all the data from
{@code inputFiles}
- * @param rowGroupSize the row group size to use when writing the {@code
outputFile}
- * @param schema the schema of the data
- * @param metadata extraMetadata to write at the footer of the {@code
outputFile}
- */
- public static void concat(
- Iterable<File> inputFiles,
- File outputFile,
- int rowGroupSize,
- Schema schema,
- Map<String, String> metadata)
- throws IOException {
- OutputFile file = Files.localOutput(outputFile);
- try (ParquetFileWriter writer =
- new ParquetFileWriter(
- ParquetIO.file(file),
- ParquetSchemaUtil.convert(schema, "table"),
- ParquetFileWriter.Mode.CREATE,
- rowGroupSize,
- 0)) {
- writer.start();
- for (File inputFile : inputFiles) {
- writer.appendFile(ParquetIO.file(Files.localInput(inputFile)));
- }
- writer.end(metadata);
- }
- }
-}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
index 7d0f0feec8c..736a39b895c 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/parquet/VariantUtil.java
@@ -19,64 +19,106 @@
package org.apache.iceberg.parquet;
+import java.util.List;
import java.util.Map;
-import java.util.Optional;
-import java.util.function.Supplier;
+import org.apache.iceberg.Accessor;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.Variant;
+/**
+ * Utilities for variant shredding support in Parquet writers.
+ *
+ * <p>This includes:
+ * <ul>
+ * <li>Detecting whether shredding should be enabled for a schema</li>
+ * <li>Discovering eligible VARIANT fields (top-level or nested in structs;
not in lists/maps)</li>
+ * <li>Building a {@link VariantShreddingFunction} for data-driven
typed_value schema inference</li>
+ * </ul>
+ */
public class VariantUtil {
private VariantUtil() {
}
/**
- * Create a VariantShreddingFunction if variant shredding is enabled and the
schema has variant columns.
+ * A VARIANT field in an Iceberg {@link Schema} that can be used for
shredding.
*
- * @param schema The Iceberg schema
- * @param sampleRecord A sample record to infer variant schemas from actual
data (can be null)
- * @param properties Table properties to check if variant shredding is
enabled
- * @return An Optional containing the VariantShreddingFunction if applicable
+ * <p>Shredding is supported for top-level VARIANT columns or VARIANT fields
nested in structs.
+ * VARIANT fields stored inside lists or maps are excluded because schema
accessors do not retrieve
+ * list/map contents.
+ */
+ public record VariantField(int fieldId, Accessor<StructLike> accessor,
String[] path) {
+ }
+
+ /**
+ * Check if variant shredding is enabled via table properties.
*/
- public static Optional<VariantShreddingFunction> variantShreddingFunc(
- Schema schema,
- Supplier<Record> sampleRecord,
- Map<String, String> properties) {
-
- // Preconditions: must have variant columns + property enabled
- if (!hasVariantColumns(schema) || !isVariantShreddingEnabled(properties)) {
- return Optional.empty();
+ public static boolean isVariantShreddingEnabled(Map<String, String>
properties) {
+ String shreddingEnabled =
properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED);
+ return Boolean.parseBoolean(shreddingEnabled);
+ }
+
+ public static boolean isShreddable(Object value) {
+ if (value instanceof Variant variant) {
+ return variant.value().type() != PhysicalType.NULL;
}
+ return false;
+ }
- VariantShreddingFunction fn =
- constructVariantShreddingFunc(sampleRecord.get(), schema);
+ public static List<VariantField> variantFieldsForShredding(
+ Map<String, String> properties, Schema schema) {
+ if (!isVariantShreddingEnabled(properties)) {
+ return List.of();
+ }
+ return variantFieldsForShredding(schema);
+ }
+
+ /**
+ * Returns all VARIANT fields that are eligible for shredding: top-level
VARIANT columns and VARIANT
+ * fields nested in structs (excluding lists/maps).
+ */
+ private static List<VariantField> variantFieldsForShredding(Schema schema) {
+ List<VariantField> results = Lists.newArrayList();
+ new VariantFieldVisitor(schema).collect(results);
+ return results;
+ }
+
+ public static boolean shouldUseVariantShredding(Map<String, String>
properties, Schema schema) {
+ return isVariantShreddingEnabled(properties) && hasVariantFields(schema);
+ }
- return Optional.of(fn);
+ private static boolean hasVariantFields(Schema schema) {
+ return new VariantFieldVisitor(schema).hasVariantField();
}
- private static VariantShreddingFunction constructVariantShreddingFunc(
+ public static VariantShreddingFunction variantShreddingFunc(
Record sampleRecord, Schema schema) {
- return (id, name) -> {
+ return (id, ignoredName) -> {
// Validate the field exists and is a variant type
Types.NestedField field = schema.findField(id);
- if (field == null || !(field.type() instanceof Types.VariantType)) {
+ if (field == null || !field.type().isVariantType()) {
return null; // Not a variant field, no shredding
}
// If we have a sample record, try to generate schema from actual data
if (sampleRecord != null) {
try {
- Object variantValue = sampleRecord.getField(name);
+ // NOTE: Parquet conversion passes the field's local name, not the
full path.
+ // Use an accessor to support variant fields nested in structs.
+ Accessor<StructLike> accessor = schema.accessorForField(id);
+ Object variantValue = accessor != null ? accessor.get(sampleRecord)
: null;
if (variantValue instanceof Variant variant) {
- // Use ParquetVariantUtil to generate schema from actual variant
value
return ParquetVariantUtil.toParquetSchema(variant.value());
}
- } catch (Exception e) {
+ } catch (RuntimeException e) {
// Fall through to default schema
}
}
@@ -84,20 +126,66 @@ private static VariantShreddingFunction
constructVariantShreddingFunc(
};
}
- /**
- * Check if the schema contains any variant columns.
- */
- private static boolean hasVariantColumns(Schema schema) {
- return schema.columns().stream()
- .anyMatch(field -> field.type() instanceof Types.VariantType);
- }
+ private static final class VariantFieldVisitor {
+ private final Schema schema;
- /**
- * Check if variant shredding is enabled via table properties.
- */
- private static boolean isVariantShreddingEnabled(Map<String, String>
properties) {
- String shreddingEnabled =
properties.get(InputFormatConfig.VARIANT_SHREDDING_ENABLED);
- return Boolean.parseBoolean(shreddingEnabled);
- }
+ private VariantFieldVisitor(Schema schema) {
+ this.schema = schema;
+ }
+
+ private boolean hasVariantField() {
+ return hasVariantField(schema.asStruct());
+ }
+
+ private boolean hasVariantField(Types.StructType struct) {
+ for (Types.NestedField field : struct.fields()) {
+ if (field.type().isVariantType()) {
+ // Accessors don't retrieve values contained in lists/maps so this
enforces the "struct-only"
+ // nesting rule for shredding.
+ if (schema.accessorForField(field.fieldId()) != null) {
+ return true;
+ }
+ } else if (field.type().isStructType() &&
hasVariantField(field.type().asStructType())) {
+ return true;
+ }
+ // Do not recurse into List or Map (shredding is not supported there)
+ }
+ return false;
+ }
+ private void collect(List<VariantField> results) {
+ collect(schema.asStruct(), Lists.newArrayList(), results);
+ }
+
+ private void collect(
+ Types.StructType struct,
+ List<String> parents,
+ List<VariantField> results) {
+
+ for (Types.NestedField field : struct.fields()) {
+ if (field.type().isVariantType()) {
+ // Accessors don't retrieve values contained in lists/maps so this
enforces the "struct-only"
+ // nesting rule for shredding.
+ Accessor<StructLike> accessor =
schema.accessorForField(field.fieldId());
+ if (accessor == null) {
+ continue;
+ }
+
+ String[] path = new String[parents.size() + 1];
+ for (int i = 0; i < parents.size(); i++) {
+ path[i] = parents.get(i);
+ }
+ path[parents.size()] = field.name();
+
+ results.add(new VariantField(field.fieldId(), accessor, path));
+
+ } else if (field.type().isStructType()) {
+ parents.add(field.name());
+ collect(field.type().asStructType(), parents, results);
+ parents.removeLast();
+ }
+ // Do not recurse into List or Map (shredding is not supported there)
+ }
+ }
+ }
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
index 990d6e782fd..8a1cf740067 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java
@@ -22,9 +22,6 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -35,16 +32,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.schema.GroupType;
-import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assume.assumeTrue;
/**
@@ -195,68 +187,6 @@ public void testSpecialCharacters() {
Assert.assertArrayEquals(new Object[]{"star", 2L}, result.get(1));
}
- @Test
- public void testVariantSelectProjection() throws IOException {
- assumeTrue(fileFormat == FileFormat.PARQUET);
- assumeTrue(!isVectorized);
-
- TableIdentifier table = TableIdentifier.of("default",
"variant_projection");
- shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
-
- shell.executeStatement(
- String.format(
- "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG
STORED AS %s %s %s",
- table,
- fileFormat,
- testTables.locationForCreateTableSQL(table),
- testTables.propertiesForCreateTableSQL(
- ImmutableMap.of("format-version", "3",
"variant.shredding.enabled", "true"))));
-
- shell.executeStatement(
- String.format(
- "INSERT INTO %s VALUES " +
- "(1, parse_json('{\"name\":\"Alice\",\"age\":30}'))," +
- "(2, parse_json('{\"name\":\"Bob\"}'))",
- table));
-
- List<Object[]> rows =
- shell.executeStatement(
- String.format(
- "SELECT id, " +
- "variant_get(payload, '$.name') AS name, " +
- "try_variant_get(payload, '$.age', 'int') AS age " +
- "FROM %s ORDER BY id",
- table));
-
- Assert.assertEquals(2, rows.size());
- Assert.assertEquals(1, ((Number) rows.get(0)[0]).intValue());
- Assert.assertEquals("Alice", rows.get(0)[1]);
- Assert.assertEquals(30, ((Number) rows.get(0)[2]).intValue());
-
- Assert.assertEquals(2, ((Number) rows.get(1)[0]).intValue());
- Assert.assertEquals("Bob", rows.get(1)[1]);
- Assert.assertNull(rows.get(1)[2]);
-
- Table icebergTable = testTables.loadTable(table);
- Types.NestedField variantField =
icebergTable.schema().findField("payload");
- Assert.assertNotNull("Variant column should exist", variantField);
- DataFile dataFile =
- StreamSupport.stream(
-
icebergTable.currentSnapshot().addedDataFiles(icebergTable.io()).spliterator(),
false)
- .findFirst()
- .orElseThrow(() -> new IllegalStateException("No data files
written for test table"));
-
- Path parquetPath = new Path(dataFile.path().toString());
- try (ParquetFileReader reader =
- ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath,
shell.getHiveConf()))) {
- MessageType parquetSchema =
reader.getFooter().getFileMetaData().getSchema();
- GroupType variantType =
parquetSchema.getType(variantField.name()).asGroupType();
- assertThat(variantType.containsField("typed_value")).isTrue();
- }
-
- shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
- }
-
@Test
public void testScanTableCaseInsensitive() throws IOException {
testTables.createTable(shell, "customers",
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergVariant.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergVariant.java
new file mode 100644
index 00000000000..7ec594a609c
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergVariant.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.StreamSupport;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assume.assumeTrue;
+
+public class TestHiveIcebergVariant extends
HiveIcebergStorageHandlerWithEngineBase {
+ private static final String TYPED_VALUE_FIELD = "typed_value";
+
+ @Test
+ public void testVariantSelectProjection() throws IOException {
+ assumeParquetNonVectorized();
+
+ TableIdentifier table = TableIdentifier.of("default",
"variant_projection");
+ shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+
+ shell.executeStatement(
+ String.format(
+ "CREATE TABLE %s (id INT, payload VARIANT) STORED BY ICEBERG
STORED AS %s %s %s",
+ table,
+ fileFormat,
+ testTables.locationForCreateTableSQL(table),
+ testTables.propertiesForCreateTableSQL(
+ ImmutableMap.of("format-version", "3",
"variant.shredding.enabled", "true"))));
+
+ shell.executeStatement(
+ String.format(
+ "INSERT INTO %s VALUES " +
+ "(1, parse_json('null'))," +
+ "(2, parse_json('{\"name\":\"Alice\",\"age\":30}'))," +
+ "(3, parse_json('{\"name\":\"Bob\"}'))",
+ table));
+
+ List<Object[]> rows =
+ shell.executeStatement(
+ String.format(
+ "SELECT id, " +
+ "variant_get(payload, '$.name') AS name, " +
+ "try_variant_get(payload, '$.age', 'int') AS age " +
+ "FROM %s ORDER BY id",
+ table));
+
+ Assert.assertEquals(3, rows.size());
+
+ Assert.assertEquals(1, ((Number) rows.get(0)[0]).intValue());
+ Assert.assertNull(rows.get(0)[1]);
+ Assert.assertNull(rows.get(0)[2]);
+
+ Assert.assertEquals(2, ((Number) rows.get(1)[0]).intValue());
+ Assert.assertEquals("Alice", rows.get(1)[1]);
+ Assert.assertEquals(30, ((Number) rows.get(1)[2]).intValue());
+
+ Assert.assertEquals(3, ((Number) rows.get(2)[0]).intValue());
+ Assert.assertEquals("Bob", rows.get(2)[1]);
+ Assert.assertNull(rows.get(2)[2]);
+
+ Table icebergTable = testTables.loadTable(table);
+ Types.NestedField variantField = requiredField(icebergTable, "payload",
"Variant column should exist");
+ MessageType parquetSchema = readParquetSchema(firstDataFile(icebergTable));
+ assertThat(hasTypedValue(parquetSchema, variantField.name())).isTrue();
+ }
+
+ @Test
+ public void testVariantShreddingInStruct() throws IOException {
+ assumeParquetNonVectorized();
+
+ TableIdentifier table = TableIdentifier.of("default",
"variant_struct_shredding");
+ shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+
+ shell.executeStatement(
+ String.format(
+ "CREATE TABLE %s (id INT, payload STRUCT<info: VARIANT>) STORED BY
ICEBERG STORED AS %s %s %s",
+ table,
+ fileFormat,
+ testTables.locationForCreateTableSQL(table),
+ testTables.propertiesForCreateTableSQL(
+ ImmutableMap.of("format-version", "3",
"variant.shredding.enabled", "true"))));
+
+ shell.executeStatement(
+ String.format(
+ "INSERT INTO %s VALUES " +
+ "(1, named_struct('info', parse_json('null')))," +
+ "(2, named_struct('info',
parse_json('{\"city\":\"Seattle\",\"state\":\"WA\"}')))",
+ table));
+
+ Table icebergTable = testTables.loadTable(table);
+ Types.NestedField payloadField = requiredField(icebergTable, "payload",
"Struct column should exist");
+ MessageType parquetSchema = readParquetSchema(firstDataFile(icebergTable));
+ assertThat(hasTypedValue(parquetSchema, payloadField.name(),
"info")).isTrue();
+ }
+
+ @Test
+ public void testVariantShreddingNotAppliedInArrayOrMap() throws IOException {
+ assumeParquetNonVectorized();
+
+ TableIdentifier table = TableIdentifier.of("default",
"variant_container_no_shredding");
+ shell.executeStatement(String.format("DROP TABLE IF EXISTS %s", table));
+
+ shell.executeStatement(
+ String.format(
+ "CREATE TABLE %s (id INT, arr ARRAY<VARIANT>, mp MAP<STRING,
VARIANT>) " +
+ "STORED BY ICEBERG STORED AS %s %s %s",
+ table,
+ fileFormat,
+ testTables.locationForCreateTableSQL(table),
+ testTables.propertiesForCreateTableSQL(
+ ImmutableMap.of("format-version", "3",
"variant.shredding.enabled", "true"))));
+
+ shell.executeStatement(
+ String.format(
+ "INSERT INTO %s VALUES " +
+ "(1, array(parse_json('{\"a\":1}')), map('k',
parse_json('{\"b\":2}')))",
+ table));
+
+ Table icebergTable = testTables.loadTable(table);
+ MessageType parquetSchema = readParquetSchema(firstDataFile(icebergTable));
+ // The element/value types should remain as the base VARIANT struct (no
typed_value).
+ assertThat(hasTypedValue(parquetSchema, "arr", "list",
"element")).isFalse();
+ assertThat(hasTypedValue(parquetSchema, "mp", "key_value",
"value")).isFalse();
+ }
+
+ private void assumeParquetNonVectorized() {
+ assumeTrue(fileFormat == FileFormat.PARQUET);
+ assumeTrue(!isVectorized);
+ }
+
+ private static Types.NestedField requiredField(Table table, String
fieldName, String message) {
+ Types.NestedField field = table.schema().findField(fieldName);
+ Assert.assertNotNull(message, field);
+ return field;
+ }
+
+ private static DataFile firstDataFile(Table table) {
+ return
StreamSupport.stream(table.currentSnapshot().addedDataFiles(table.io()).spliterator(),
false)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No data files written
for test table"));
+ }
+
+ private MessageType readParquetSchema(DataFile dataFile) throws IOException {
+ Path parquetPath = new Path(dataFile.location());
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(parquetPath,
shell.getHiveConf()))) {
+ return reader.getFooter().getFileMetaData().getSchema();
+ }
+ }
+
+ private static GroupType groupAt(MessageType parquetSchema, String... path) {
+ org.apache.parquet.schema.Type type = parquetSchema.getType(path[0]);
+ for (int i = 1; i < path.length; i++) {
+ type = type.asGroupType().getType(path[i]);
+ }
+ return type.asGroupType();
+ }
+
+ private static boolean hasTypedValue(MessageType parquetSchema, String...
pathToVariantGroup) {
+ return groupAt(parquetSchema,
pathToVariantGroup).containsField(TYPED_VALUE_FIELD);
+ }
+}
diff --git a/iceberg/iceberg-shading/pom.xml b/iceberg/iceberg-shading/pom.xml
index ac4bf4a9b10..17d50dd91d6 100644
--- a/iceberg/iceberg-shading/pom.xml
+++ b/iceberg/iceberg-shading/pom.xml
@@ -117,7 +117,6 @@
</excludes>
</artifactSet>
<filters>
- <!-- Global exclusions -->
<filter>
<artifact>*:*</artifact>
<excludes>
@@ -128,13 +127,6 @@
<exclude>static/</exclude>
</excludes>
</filter>
- <!-- Exclude org.apache.iceberg.parquet.Parquet.class from
iceberg-parquet -->
- <filter>
- <artifact>org.apache.iceberg:iceberg-parquet</artifact>
- <excludes>
-
<exclude>org/apache/iceberg/parquet/Parquet.class</exclude>
- </excludes>
- </filter>
</filters>
</configuration>
</execution>