This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 235ab0f4fd Parquet, Data: Implementation of ParquetFormatModel (#15253)
235ab0f4fd is described below
commit 235ab0f4fd34f5e71642030577a7ea996e2c4489
Author: pvary <[email protected]>
AuthorDate: Sun Feb 15 10:53:20 2026 +0100
Parquet, Data: Implementation of ParquetFormatModel (#15253)
---
.../apache/iceberg/data/GenericFormatModels.java | 14 +
.../iceberg/data/TestGenericFormatModels.java | 3 +-
.../java/org/apache/iceberg/parquet/Parquet.java | 47 ++-
.../apache/iceberg/parquet/ParquetFormatModel.java | 317 +++++++++++++++++++++
4 files changed, 373 insertions(+), 8 deletions(-)
diff --git
a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
index e2e13861ca..6fde8bbeba 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericFormatModels.java
@@ -21,7 +21,10 @@ package org.apache.iceberg.data;
import org.apache.iceberg.avro.AvroFormatModel;
import org.apache.iceberg.data.avro.DataWriter;
import org.apache.iceberg.data.avro.PlannedDataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.parquet.ParquetFormatModel;
public class GenericFormatModels {
public static void register() {
@@ -34,6 +37,17 @@ public class GenericFormatModels {
PlannedDataReader.create(icebergSchema, idToConstant)));
FormatModelRegistry.register(AvroFormatModel.forPositionDeletes());
+
+ FormatModelRegistry.register(
+ ParquetFormatModel.create(
+ Record.class,
+ Void.class,
+ (icebergSchema, fileSchema, engineSchema) ->
+ GenericParquetWriter.create(icebergSchema, fileSchema),
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ GenericParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant)));
+
+ FormatModelRegistry.register(ParquetFormatModel.forPositionDeletes());
}
private GenericFormatModels() {}
diff --git
a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
index ca3dda30ab..ab5968da8b 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestGenericFormatModels.java
@@ -54,7 +54,8 @@ public class TestGenericFormatModels {
private static final List<Record> TEST_RECORDS =
RandomGenericData.generate(TestBase.SCHEMA, 10, 1L);
- private static final FileFormat[] FILE_FORMATS = new FileFormat[]
{FileFormat.AVRO};
+ private static final FileFormat[] FILE_FORMATS =
+ new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET};
@TempDir protected Path temp;
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index ae049d0875..2387d52edf 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -302,8 +302,7 @@ public class Parquet {
}
// supposed to always be a private method used strictly by data and delete
write builders
- private WriteBuilder createContextFunc(
- Function<Map<String, String>, Context> newCreateContextFunc) {
+ WriteBuilder createContextFunc(Function<Map<String, String>, Context>
newCreateContextFunc) {
this.createContextFunc = newCreateContextFunc;
return this;
}
@@ -498,7 +497,7 @@ public class Parquet {
}
}
- private static class Context {
+ static class Context {
private final int rowGroupSize;
private final int pageSize;
private final int pageRowLimit;
@@ -1176,6 +1175,7 @@ public class Parquet {
private Expression filter = null;
private ReadSupport<?> readSupport = null;
private Function<MessageType, VectorizedReader<?>> batchedReaderFunc =
null;
+ private BiFunction<Schema, MessageType, VectorizedReader<?>>
batchedReaderFuncWithSchema = null;
private ReaderFunction readerFunction = null;
private boolean filterRecords = true;
private boolean caseSensitive = true;
@@ -1298,6 +1298,9 @@ public class Parquet {
Preconditions.checkArgument(
this.batchedReaderFunc == null,
"Cannot set reader function: batched reader function already set");
+ Preconditions.checkArgument(
+ this.batchedReaderFuncWithSchema == null,
+ "Cannot set reader function: batched reader function with schema
already set");
Preconditions.checkArgument(
this.readerFunction == null, "Cannot set reader function: reader
function already set");
this.readerFunction = new UnaryReaderFunction(newReaderFunction);
@@ -1309,6 +1312,9 @@ public class Parquet {
Preconditions.checkArgument(
this.batchedReaderFunc == null,
"Cannot set reader function: batched reader function already set");
+ Preconditions.checkArgument(
+ this.batchedReaderFuncWithSchema == null,
+ "Cannot set reader function: batched reader function with schema
already set");
Preconditions.checkArgument(
this.readerFunction == null, "Cannot set reader function: reader
function already set");
this.readerFunction = new BinaryReaderFunction(newReaderFunction);
@@ -1319,6 +1325,9 @@ public class Parquet {
Preconditions.checkArgument(
this.batchedReaderFunc == null,
"Cannot set batched reader function: batched reader function already
set");
+ Preconditions.checkArgument(
+ this.batchedReaderFuncWithSchema == null,
+ "Cannot set reader function: batched reader function with schema
already set");
Preconditions.checkArgument(
this.readerFunction == null,
"Cannot set batched reader function: ReaderFunction already set");
@@ -1326,10 +1335,28 @@ public class Parquet {
return this;
}
+ public ReadBuilder createBatchedReaderFunc(
+ BiFunction<Schema, MessageType, VectorizedReader<?>> func) {
+ Preconditions.checkArgument(
+ this.batchedReaderFunc == null,
+ "Cannot set batched reader function: batched reader function already
set");
+ Preconditions.checkArgument(
+ this.batchedReaderFuncWithSchema == null,
+ "Cannot set reader function: batched reader function with schema
already set");
+ Preconditions.checkArgument(
+ this.readerFunction == null,
+ "Cannot set batched reader function: ReaderFunction already set");
+ this.batchedReaderFuncWithSchema = func;
+ return this;
+ }
+
public ReadBuilder createReaderFunc(ReaderFunction reader) {
Preconditions.checkArgument(
this.batchedReaderFunc == null,
"Cannot set reader function: batched reader function already set");
+ Preconditions.checkArgument(
+ this.batchedReaderFuncWithSchema == null,
+ "Cannot set reader function: batched reader function with schema
already set");
Preconditions.checkArgument(
this.readerFunction == null, "Cannot set reader function: reader
function already set");
this.readerFunction = reader;
@@ -1389,7 +1416,7 @@ public class Parquet {
}
@Override
- @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
+ @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity",
"checkstyle:MethodLength"})
public <D> CloseableIterable<D> build() {
FileDecryptionProperties fileDecryptionProperties = null;
if (fileEncryptionKey != null) {
@@ -1404,7 +1431,9 @@ public class Parquet {
Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with
null encryption key");
}
- if (batchedReaderFunc != null || readerFunction != null) {
+ if (batchedReaderFunc != null
+ || batchedReaderFuncWithSchema != null
+ || readerFunction != null) {
ParquetReadOptions.Builder optionsBuilder;
if (file instanceof HadoopInputFile) {
// remove read properties already set that may conflict with this
read
@@ -1441,12 +1470,16 @@ public class Parquet {
mapping = NameMapping.empty();
}
- if (batchedReaderFunc != null) {
+ Function<MessageType, VectorizedReader<?>> batchedFunc =
+ batchedReaderFuncWithSchema != null
+ ? messageType -> batchedReaderFuncWithSchema.apply(schema,
messageType)
+ : batchedReaderFunc;
+ if (batchedFunc != null) {
return new VectorizedParquetReader<>(
file,
schema,
options,
- batchedReaderFunc,
+ batchedFunc,
mapping,
filter,
reuseContainers,
diff --git
a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
new file mode 100644
index 0000000000..90d6e3ef41
--- /dev/null
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.formats.BaseFormatModel;
+import org.apache.iceberg.formats.ModelWriteBuilder;
+import org.apache.iceberg.formats.ReadBuilder;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DeleteSchemaUtil;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.schema.MessageType;
+
+public class ParquetFormatModel<D, S, R>
+ extends BaseFormatModel<D, S, ParquetValueWriter<?>, R, MessageType> {
+ public static final String WRITER_VERSION_KEY = "parquet.writer.version";
+ private final boolean isBatchReader;
+
+ public static <D> ParquetFormatModel<PositionDelete<D>, Void, Object>
forPositionDeletes() {
+ return new ParquetFormatModel<>(PositionDelete.deleteClass(), Void.class,
null, null, false);
+ }
+
+ public static <D, S> ParquetFormatModel<D, S, ParquetValueReader<?>> create(
+ Class<D> type,
+ Class<S> schemaType,
+ WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
+ ReaderFunction<ParquetValueReader<?>, S, MessageType> readerFunction) {
+ return new ParquetFormatModel<>(type, schemaType, writerFunction,
readerFunction, false);
+ }
+
+ public static <D, S> ParquetFormatModel<D, S, VectorizedReader<?>> create(
+ Class<? extends D> type,
+ Class<S> schemaType,
+ ReaderFunction<VectorizedReader<?>, S, MessageType> batchReaderFunction)
{
+ return new ParquetFormatModel<>(type, schemaType, null,
batchReaderFunction, true);
+ }
+
+ private ParquetFormatModel(
+ Class<? extends D> type,
+ Class<S> schemaType,
+ WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction,
+ ReaderFunction<R, S, MessageType> readerFunction,
+ boolean isBatchReader) {
+ super(type, schemaType, writerFunction, readerFunction);
+ this.isBatchReader = isBatchReader;
+ }
+
+ @Override
+ public FileFormat format() {
+ return FileFormat.PARQUET;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> writeBuilder(EncryptedOutputFile outputFile) {
+ return new WriteBuilderWrapper<>(outputFile, writerFunction());
+ }
+
+ @Override
+ public ReadBuilder<D, S> readBuilder(InputFile inputFile) {
+ return new ReadBuilderWrapper<>(inputFile, readerFunction(),
isBatchReader);
+ }
+
+ private static class WriteBuilderWrapper<D, S> implements
ModelWriteBuilder<D, S> {
+ private final Parquet.WriteBuilder internal;
+ private final WriterFunction<ParquetValueWriter<?>, S, MessageType>
writerFunction;
+ private Schema schema;
+ private S engineSchema;
+ private FileContent content;
+
+ private WriteBuilderWrapper(
+ EncryptedOutputFile outputFile,
+ WriterFunction<ParquetValueWriter<?>, S, MessageType> writerFunction) {
+ this.internal = Parquet.write(outputFile);
+ this.writerFunction = writerFunction;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> schema(Schema newSchema) {
+ this.schema = newSchema;
+ internal.schema(newSchema);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> engineSchema(S newSchema) {
+ this.engineSchema = newSchema;
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> set(String property, String value) {
+ if (WRITER_VERSION_KEY.equals(property)) {
+ internal.writerVersion(ParquetProperties.WriterVersion.valueOf(value));
+ }
+
+ internal.set(property, value);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> setAll(Map<String, String> properties) {
+ internal.setAll(properties);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> meta(String property, String value) {
+ internal.meta(property, value);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> meta(Map<String, String> properties) {
+ internal.meta(properties);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> content(FileContent newContent) {
+ this.content = newContent;
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> metricsConfig(MetricsConfig metricsConfig) {
+ internal.metricsConfig(metricsConfig);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> overwrite() {
+ internal.overwrite();
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> withFileEncryptionKey(ByteBuffer
encryptionKey) {
+ internal.withFileEncryptionKey(encryptionKey);
+ return this;
+ }
+
+ @Override
+ public ModelWriteBuilder<D, S> withAADPrefix(ByteBuffer aadPrefix) {
+ internal.withAADPrefix(aadPrefix);
+ return this;
+ }
+
+ @Override
+ public FileAppender<D> build() throws IOException {
+ switch (content) {
+ case DATA:
+
internal.createContextFunc(Parquet.WriteBuilder.Context::dataContext);
+ internal.createWriterFunc(
+ (icebergSchema, messageType) ->
+ writerFunction.write(icebergSchema, messageType,
engineSchema));
+ break;
+ case EQUALITY_DELETES:
+
internal.createContextFunc(Parquet.WriteBuilder.Context::deleteContext);
+ internal.createWriterFunc(
+ (icebergSchema, messageType) ->
+ writerFunction.write(icebergSchema, messageType,
engineSchema));
+ break;
+ case POSITION_DELETES:
+ Preconditions.checkState(
+ schema == null,
+ "Invalid schema: %s. Position deletes with schema are not
supported by the API.",
+ schema);
+ Preconditions.checkState(
+ engineSchema == null,
+ "Invalid engineSchema: %s. Position deletes with schema are not
supported by the API.",
+ engineSchema);
+
+
internal.createContextFunc(Parquet.WriteBuilder.Context::deleteContext);
+ internal.createWriterFunc(
+ (icebergSchema, messageType) ->
+ new ParquetValueWriters.PositionDeleteStructWriter<D>(
+ (ParquetValueWriters.StructWriter<?>)
+ GenericParquetWriter.create(icebergSchema,
messageType),
+ Function.identity()));
+ internal.schema(DeleteSchemaUtil.pathPosSchema());
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown file content: " +
content);
+ }
+
+ return internal.build();
+ }
+ }
+
+ private static class ReadBuilderWrapper<R, D, S> implements ReadBuilder<D,
S> {
+ private final Parquet.ReadBuilder internal;
+ private final ReaderFunction<R, S, MessageType> readerFunction;
+ private final boolean isBatchReader;
+ private S engineSchema;
+ private Map<Integer, ?> idToConstant = ImmutableMap.of();
+
+ private ReadBuilderWrapper(
+ InputFile inputFile,
+ ReaderFunction<R, S, MessageType> readerFunction,
+ boolean isBatchReader) {
+ this.internal = Parquet.read(inputFile);
+ this.readerFunction = readerFunction;
+ this.isBatchReader = isBatchReader;
+ }
+
+ @Override
+ public ReadBuilder<D, S> split(long newStart, long newLength) {
+ internal.split(newStart, newLength);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> project(Schema schema) {
+ internal.project(schema);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> engineProjection(S schema) {
+ this.engineSchema = schema;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> caseSensitive(boolean caseSensitive) {
+ internal.caseSensitive(caseSensitive);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> filter(Expression filter) {
+ internal.filter(filter);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> set(String key, String value) {
+ internal.set(key, value);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> reuseContainers() {
+ internal.reuseContainers();
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> recordsPerBatch(int numRowsPerBatch) {
+ internal.recordsPerBatch(numRowsPerBatch);
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> idToConstant(Map<Integer, ?> newIdToConstant) {
+ this.idToConstant = newIdToConstant;
+ return this;
+ }
+
+ @Override
+ public ReadBuilder<D, S> withNameMapping(NameMapping nameMapping) {
+ internal.withNameMapping(nameMapping);
+ return this;
+ }
+
+ @Override
+ public CloseableIterable<D> build() {
+ if (isBatchReader) {
+ return internal
+ .createBatchedReaderFunc(
+ (icebergSchema, messageType) ->
+ (VectorizedReader<?>)
+ readerFunction.read(icebergSchema, messageType,
engineSchema, idToConstant))
+ .build();
+ } else {
+ return internal
+ .createReaderFunc(
+ (icebergSchema, messageType) ->
+ (ParquetValueReader<?>)
+ readerFunction.read(icebergSchema, messageType,
engineSchema, idToConstant))
+ .build();
+ }
+ }
+ }
+}