This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 68b7a2a710 Core, Data, Flink: Moving Flink to use the new FormatModel
API (#15329)
68b7a2a710 is described below
commit 68b7a2a71056aafd982b16e45e7f7ec254802e4a
Author: pvary <[email protected]>
AuthorDate: Tue Feb 17 12:50:03 2026 +0100
Core, Data, Flink: Moving Flink to use the new FormatModel API (#15329)
---
.../iceberg/formats/FormatModelRegistry.java | 3 +-
.../iceberg/flink/data/FlinkFormatModels.java | 58 +++++++++++
.../iceberg/flink/sink/FlinkFileWriterFactory.java | 98 ++++--------------
.../flink/source/RowDataFileScanTaskReader.java | 112 ++++-----------------
.../flink/sink/TestCompressionSettings.java | 10 +-
.../flink/sink/dynamic/TestDynamicWriter.java | 6 +-
6 files changed, 104 insertions(+), 183 deletions(-)
diff --git
a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
index e86dd9f97a..7e944510a8 100644
--- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
+++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java
@@ -57,7 +57,8 @@ public final class FormatModelRegistry {
private static final List<String> CLASSES_TO_REGISTER =
ImmutableList.of(
"org.apache.iceberg.data.GenericFormatModels",
- "org.apache.iceberg.arrow.vectorized.ArrowFormatModels");
+ "org.apache.iceberg.arrow.vectorized.ArrowFormatModels",
+ "org.apache.iceberg.flink.data.FlinkFormatModels");
// Format models indexed by file format and object model class
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>>
MODELS =
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
new file mode 100644
index 0000000000..0026c8a302
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.avro.AvroFormatModel;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.orc.ORCFormatModel;
+import org.apache.iceberg.parquet.ParquetFormatModel;
+
+public class FlinkFormatModels {
+ public static void register() {
+ FormatModelRegistry.register(
+ ParquetFormatModel.create(
+ RowData.class,
+ RowType.class,
+ (icebergSchema, fileSchema, engineSchema) ->
+ FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ FlinkParquetReaders.buildReader(icebergSchema, fileSchema,
idToConstant)));
+
+ FormatModelRegistry.register(
+ AvroFormatModel.create(
+ RowData.class,
+ RowType.class,
+ (icebergSchema, fileSchema, engineSchema) -> new
FlinkAvroWriter(engineSchema),
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
+
+ FormatModelRegistry.register(
+ ORCFormatModel.create(
+ RowData.class,
+ RowType.class,
+ (icebergSchema, fileSchema, engineSchema) ->
+ FlinkOrcWriter.buildWriter(engineSchema, icebergSchema),
+ (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+ new FlinkOrcReader(icebergSchema, fileSchema, idToConstant)));
+ }
+
+ private FlinkFormatModels() {}
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index b3ada41737..d5247941d8 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -25,28 +25,19 @@ import static
org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
import java.io.Serializable;
import java.util.Map;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.data.FlinkAvroWriter;
-import org.apache.iceberg.flink.data.FlinkOrcWriter;
-import org.apache.iceberg.flink.data.FlinkParquetWriters;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData>
implements Serializable {
- private RowType dataFlinkType;
- private RowType equalityDeleteFlinkType;
-
- private FlinkFileWriterFactory(
+public class FlinkFileWriterFactory extends
RegistryBasedFileWriterFactory<RowData, RowType>
+ implements Serializable {
+ FlinkFileWriterFactory(
Table table,
FileFormat dataFileFormat,
Schema dataSchema,
@@ -62,85 +53,30 @@ public class FlinkFileWriterFactory extends
BaseFileWriterFactory<RowData> imple
super(
table,
dataFileFormat,
+ RowData.class,
dataSchema,
dataSortOrder,
deleteFileFormat,
equalityFieldIds,
equalityDeleteRowSchema,
equalityDeleteSortOrder,
- writeProperties);
-
- this.dataFlinkType = dataFlinkType;
- this.equalityDeleteFlinkType = equalityDeleteFlinkType;
- }
-
- static Builder builderFor(Table table) {
- return new Builder(table);
- }
-
- @Override
- protected void configureDataWrite(Avro.DataWriteBuilder builder) {
- builder.createWriterFunc(ignore -> new FlinkAvroWriter(dataFlinkType()));
- }
-
- @Override
- protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
- builder.createWriterFunc(ignored -> new
FlinkAvroWriter(equalityDeleteFlinkType()));
- }
-
- @Override
- protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
-
- @Override
- protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
- builder.createWriterFunc(msgType ->
FlinkParquetWriters.buildWriter(dataFlinkType(), msgType));
- }
-
- @Override
- protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- msgType -> FlinkParquetWriters.buildWriter(equalityDeleteFlinkType(),
msgType));
- }
-
- @Override
- protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
- builder.transformPaths(path -> StringData.fromString(path.toString()));
- }
-
- @Override
- protected void configureDataWrite(ORC.DataWriteBuilder builder) {
- builder.createWriterFunc(
- (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(),
iSchema));
- }
-
- @Override
- protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
- builder.createWriterFunc(
- (iSchema, typDesc) ->
FlinkOrcWriter.buildWriter(equalityDeleteFlinkType(), iSchema));
+ writeProperties,
+ dataFlinkType == null ? FlinkSchemaUtil.convert(dataSchema) :
dataFlinkType,
+ equalityDeleteInputSchema(equalityDeleteFlinkType,
equalityDeleteRowSchema));
}
- @Override
- protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
- builder.transformPaths(path -> StringData.fromString(path.toString()));
- }
-
- private RowType dataFlinkType() {
- if (dataFlinkType == null) {
- Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
- this.dataFlinkType = FlinkSchemaUtil.convert(dataSchema());
+ private static RowType equalityDeleteInputSchema(RowType rowType, Schema
rowSchema) {
+ if (rowType != null) {
+ return rowType;
+ } else if (rowSchema != null) {
+ return FlinkSchemaUtil.convert(rowSchema);
+ } else {
+ return null;
}
-
- return dataFlinkType;
}
- private RowType equalityDeleteFlinkType() {
- if (equalityDeleteFlinkType == null) {
- Preconditions.checkNotNull(
- equalityDeleteRowSchema(), "Equality delete schema must not be
null");
- this.equalityDeleteFlinkType =
FlinkSchemaUtil.convert(equalityDeleteRowSchema());
- }
-
- return equalityDeleteFlinkType;
+ static Builder builderFor(Table table) {
+ return new Builder(table);
}
public static class Builder {
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index b8fb1ba32e..ee4aaf4a3d 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -24,10 +24,8 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
-import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.encryption.InputFilesDecryptor;
import org.apache.iceberg.expressions.Expression;
@@ -35,19 +33,14 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkSourceFilter;
import org.apache.iceberg.flink.RowDataWrapper;
-import org.apache.iceberg.flink.data.FlinkOrcReader;
-import org.apache.iceberg.flink.data.FlinkParquetReaders;
-import org.apache.iceberg.flink.data.FlinkPlannedAvroReader;
import org.apache.iceberg.flink.data.RowDataProjection;
import org.apache.iceberg.flink.data.RowDataUtil;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
@Internal
@@ -73,8 +66,7 @@ public class RowDataFileScanTaskReader implements
FileScanTaskReader<RowData> {
if (filters != null && !filters.isEmpty()) {
Expression combinedExpression =
filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
- this.rowFilter =
- new FlinkSourceFilter(this.projectedSchema, combinedExpression,
this.caseSensitive);
+ this.rowFilter = new FlinkSourceFilter(projectedSchema,
combinedExpression, caseSensitive);
} else {
this.rowFilter = null;
}
@@ -112,23 +104,23 @@ public class RowDataFileScanTaskReader implements
FileScanTaskReader<RowData> {
if (task.isDataTask()) {
throw new UnsupportedOperationException("Cannot read data task.");
} else {
- switch (task.file().format()) {
- case PARQUET:
- iter = newParquetIterable(task, schema, idToConstant,
inputFilesDecryptor);
- break;
-
- case AVRO:
- iter = newAvroIterable(task, schema, idToConstant,
inputFilesDecryptor);
- break;
-
- case ORC:
- iter = newOrcIterable(task, schema, idToConstant,
inputFilesDecryptor);
- break;
-
- default:
- throw new UnsupportedOperationException(
- "Cannot read unknown format: " + task.file().format());
+ ReadBuilder<RowData, RowType> builder =
+ FormatModelRegistry.readBuilder(
+ task.file().format(), RowData.class,
inputFilesDecryptor.getInputFile(task));
+
+ if (nameMapping != null) {
+ builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
}
+
+ iter =
+ builder
+ .project(schema)
+ .idToConstant(idToConstant)
+ .split(task.start(), task.length())
+ .caseSensitive(caseSensitive)
+ .filter(task.residual())
+ .reuseContainers()
+ .build();
}
if (rowFilter != null) {
@@ -137,72 +129,6 @@ public class RowDataFileScanTaskReader implements
FileScanTaskReader<RowData> {
return iter;
}
- private CloseableIterable<RowData> newAvroIterable(
- FileScanTask task,
- Schema schema,
- Map<Integer, ?> idToConstant,
- InputFilesDecryptor inputFilesDecryptor) {
- Avro.ReadBuilder builder =
- Avro.read(inputFilesDecryptor.getInputFile(task))
- .reuseContainers()
- .project(schema)
- .split(task.start(), task.length())
- .createReaderFunc(readSchema ->
FlinkPlannedAvroReader.create(schema, idToConstant));
-
- if (nameMapping != null) {
- builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- return builder.build();
- }
-
- private CloseableIterable<RowData> newParquetIterable(
- FileScanTask task,
- Schema schema,
- Map<Integer, ?> idToConstant,
- InputFilesDecryptor inputFilesDecryptor) {
- Parquet.ReadBuilder builder =
- Parquet.read(inputFilesDecryptor.getInputFile(task))
- .split(task.start(), task.length())
- .project(schema)
- .createReaderFunc(
- fileSchema -> FlinkParquetReaders.buildReader(schema,
fileSchema, idToConstant))
- .filter(task.residual())
- .caseSensitive(caseSensitive)
- .reuseContainers();
-
- if (nameMapping != null) {
- builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- return builder.build();
- }
-
- private CloseableIterable<RowData> newOrcIterable(
- FileScanTask task,
- Schema schema,
- Map<Integer, ?> idToConstant,
- InputFilesDecryptor inputFilesDecryptor) {
- Schema readSchemaWithoutConstantAndMetadataFields =
- TypeUtil.selectNot(
- schema, Sets.union(idToConstant.keySet(),
MetadataColumns.metadataFieldIds()));
-
- ORC.ReadBuilder builder =
- ORC.read(inputFilesDecryptor.getInputFile(task))
- .project(readSchemaWithoutConstantAndMetadataFields)
- .split(task.start(), task.length())
- .createReaderFunc(
- readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema,
idToConstant))
- .filter(task.residual())
- .caseSensitive(caseSensitive);
-
- if (nameMapping != null) {
- builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
- }
-
- return builder.build();
- }
-
private static class FlinkDeleteFilter extends DeleteFilter<RowData> {
private final RowType requiredRowType;
private final RowDataWrapper asStructLike;
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
index da5b5f6c28..339cd0510e 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
@@ -35,7 +35,7 @@ import org.apache.iceberg.Parameters;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.common.DynFields;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
import org.apache.iceberg.flink.FlinkWriteConf;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.SimpleDataUtil;
@@ -238,21 +238,21 @@ public class TestCompressionSettings {
testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1);
testHarness.prepareSnapshotPreBarrier(1L);
- DynFields.BoundField<IcebergStreamWriter> operatorField =
+ DynFields.BoundField<IcebergStreamWriter<?>> operatorField =
DynFields.builder()
.hiddenImpl(testHarness.getOperatorFactory().getClass(),
"operator")
.build(testHarness.getOperatorFactory());
- DynFields.BoundField<TaskWriter> writerField =
+ DynFields.BoundField<TaskWriter<?>> writerField =
DynFields.builder()
.hiddenImpl(IcebergStreamWriter.class, "writer")
.build(operatorField.get());
- DynFields.BoundField<FileWriterFactory> writerFactoryField =
+ DynFields.BoundField<FileWriterFactory<?>> writerFactoryField =
DynFields.builder()
.hiddenImpl(BaseTaskWriter.class, "writerFactory")
.build(writerField.get());
DynFields.BoundField<Map<String, String>> propsField =
DynFields.builder()
- .hiddenImpl(BaseFileWriterFactory.class, "writerProperties")
+ .hiddenImpl(RegistryBasedFileWriterFactory.class,
"writerProperties")
.build(writerFactoryField.get());
return propsField.get();
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 8e346cd8a1..f604f639f2 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -34,7 +34,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynFields;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
@@ -365,13 +365,13 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
DynFields.BoundField<Map<WriteTarget, TaskWriter<RowData>>> writerField =
DynFields.builder().hiddenImpl(dynamicWriter.getClass(),
"writers").build(dynamicWriter);
- DynFields.BoundField<FileWriterFactory> writerFactoryField =
+ DynFields.BoundField<FileWriterFactory<?>> writerFactoryField =
DynFields.builder()
.hiddenImpl(BaseTaskWriter.class, "writerFactory")
.build(writerField.get().values().iterator().next());
DynFields.BoundField<Map<String, String>> propsField =
DynFields.builder()
- .hiddenImpl(BaseFileWriterFactory.class, "writerProperties")
+ .hiddenImpl(RegistryBasedFileWriterFactory.class,
"writerProperties")
.build(writerFactoryField.get());
return propsField.get();
}