This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new ccdf23ca25 Flink: Backport moving Flink to use the new FormatModel API 
(#15354)
ccdf23ca25 is described below

commit ccdf23ca25ce16984684884b36fb94096ccda096
Author: pvary <[email protected]>
AuthorDate: Wed Feb 18 16:55:00 2026 +0100

    Flink: Backport moving Flink to use the new FormatModel API (#15354)
    
    Backports #15329
---
 .../iceberg/flink/data/FlinkFormatModels.java      |  58 +++++++++++
 .../iceberg/flink/sink/FlinkFileWriterFactory.java |  98 ++++--------------
 .../flink/source/RowDataFileScanTaskReader.java    | 112 ++++-----------------
 .../flink/sink/TestCompressionSettings.java        |  10 +-
 .../flink/sink/dynamic/TestDynamicWriter.java      |   6 +-
 .../iceberg/flink/data/FlinkFormatModels.java      |  58 +++++++++++
 .../iceberg/flink/sink/FlinkFileWriterFactory.java |  98 ++++--------------
 .../flink/source/RowDataFileScanTaskReader.java    | 112 ++++-----------------
 .../flink/sink/TestCompressionSettings.java        |  10 +-
 .../flink/sink/dynamic/TestDynamicWriter.java      |   6 +-
 10 files changed, 204 insertions(+), 364 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
new file mode 100644
index 0000000000..0026c8a302
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.avro.AvroFormatModel;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.orc.ORCFormatModel;
+import org.apache.iceberg.parquet.ParquetFormatModel;
+
+public class FlinkFormatModels {
+  public static void register() {
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            RowData.class,
+            RowType.class,
+            (icebergSchema, fileSchema, engineSchema) ->
+                FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                FlinkParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
+
+    FormatModelRegistry.register(
+        AvroFormatModel.create(
+            RowData.class,
+            RowType.class,
+            (icebergSchema, fileSchema, engineSchema) -> new 
FlinkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ORCFormatModel.create(
+            RowData.class,
+            RowType.class,
+            (icebergSchema, fileSchema, engineSchema) ->
+                FlinkOrcWriter.buildWriter(engineSchema, icebergSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                new FlinkOrcReader(icebergSchema, fileSchema, idToConstant)));
+  }
+
+  private FlinkFormatModels() {}
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index b3ada41737..d5247941d8 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -25,28 +25,19 @@ import static 
org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
 import java.io.Serializable;
 import java.util.Map;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.data.FlinkAvroWriter;
-import org.apache.iceberg.flink.data.FlinkOrcWriter;
-import org.apache.iceberg.flink.data.FlinkParquetWriters;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 
-public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData> 
implements Serializable {
-  private RowType dataFlinkType;
-  private RowType equalityDeleteFlinkType;
-
-  private FlinkFileWriterFactory(
+public class FlinkFileWriterFactory extends 
RegistryBasedFileWriterFactory<RowData, RowType>
+    implements Serializable {
+  FlinkFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
       Schema dataSchema,
@@ -62,85 +53,30 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
     super(
         table,
         dataFileFormat,
+        RowData.class,
         dataSchema,
         dataSortOrder,
         deleteFileFormat,
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        writeProperties);
-
-    this.dataFlinkType = dataFlinkType;
-    this.equalityDeleteFlinkType = equalityDeleteFlinkType;
-  }
-
-  static Builder builderFor(Table table) {
-    return new Builder(table);
-  }
-
-  @Override
-  protected void configureDataWrite(Avro.DataWriteBuilder builder) {
-    builder.createWriterFunc(ignore -> new FlinkAvroWriter(dataFlinkType()));
-  }
-
-  @Override
-  protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(ignored -> new 
FlinkAvroWriter(equalityDeleteFlinkType()));
-  }
-
-  @Override
-  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
-
-  @Override
-  protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
-    builder.createWriterFunc(msgType -> 
FlinkParquetWriters.buildWriter(dataFlinkType(), msgType));
-  }
-
-  @Override
-  protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> FlinkParquetWriters.buildWriter(equalityDeleteFlinkType(), 
msgType));
-  }
-
-  @Override
-  protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.transformPaths(path -> StringData.fromString(path.toString()));
-  }
-
-  @Override
-  protected void configureDataWrite(ORC.DataWriteBuilder builder) {
-    builder.createWriterFunc(
-        (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), 
iSchema));
-  }
-
-  @Override
-  protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        (iSchema, typDesc) -> 
FlinkOrcWriter.buildWriter(equalityDeleteFlinkType(), iSchema));
+        writeProperties,
+        dataFlinkType == null ? FlinkSchemaUtil.convert(dataSchema) : 
dataFlinkType,
+        equalityDeleteInputSchema(equalityDeleteFlinkType, 
equalityDeleteRowSchema));
   }
 
-  @Override
-  protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
-    builder.transformPaths(path -> StringData.fromString(path.toString()));
-  }
-
-  private RowType dataFlinkType() {
-    if (dataFlinkType == null) {
-      Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
-      this.dataFlinkType = FlinkSchemaUtil.convert(dataSchema());
+  private static RowType equalityDeleteInputSchema(RowType rowType, Schema 
rowSchema) {
+    if (rowType != null) {
+      return rowType;
+    } else if (rowSchema != null) {
+      return FlinkSchemaUtil.convert(rowSchema);
+    } else {
+      return null;
     }
-
-    return dataFlinkType;
   }
 
-  private RowType equalityDeleteFlinkType() {
-    if (equalityDeleteFlinkType == null) {
-      Preconditions.checkNotNull(
-          equalityDeleteRowSchema(), "Equality delete schema must not be 
null");
-      this.equalityDeleteFlinkType = 
FlinkSchemaUtil.convert(equalityDeleteRowSchema());
-    }
-
-    return equalityDeleteFlinkType;
+  static Builder builderFor(Table table) {
+    return new Builder(table);
   }
 
   public static class Builder {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index b8fb1ba32e..ee4aaf4a3d 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -24,10 +24,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.encryption.InputFilesDecryptor;
 import org.apache.iceberg.expressions.Expression;
@@ -35,19 +33,14 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.FlinkSourceFilter;
 import org.apache.iceberg.flink.RowDataWrapper;
-import org.apache.iceberg.flink.data.FlinkOrcReader;
-import org.apache.iceberg.flink.data.FlinkParquetReaders;
-import org.apache.iceberg.flink.data.FlinkPlannedAvroReader;
 import org.apache.iceberg.flink.data.RowDataProjection;
 import org.apache.iceberg.flink.data.RowDataUtil;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PartitionUtil;
 
 @Internal
@@ -73,8 +66,7 @@ public class RowDataFileScanTaskReader implements 
FileScanTaskReader<RowData> {
     if (filters != null && !filters.isEmpty()) {
       Expression combinedExpression =
           filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
-      this.rowFilter =
-          new FlinkSourceFilter(this.projectedSchema, combinedExpression, 
this.caseSensitive);
+      this.rowFilter = new FlinkSourceFilter(projectedSchema, 
combinedExpression, caseSensitive);
     } else {
       this.rowFilter = null;
     }
@@ -112,23 +104,23 @@ public class RowDataFileScanTaskReader implements 
FileScanTaskReader<RowData> {
     if (task.isDataTask()) {
       throw new UnsupportedOperationException("Cannot read data task.");
     } else {
-      switch (task.file().format()) {
-        case PARQUET:
-          iter = newParquetIterable(task, schema, idToConstant, 
inputFilesDecryptor);
-          break;
-
-        case AVRO:
-          iter = newAvroIterable(task, schema, idToConstant, 
inputFilesDecryptor);
-          break;
-
-        case ORC:
-          iter = newOrcIterable(task, schema, idToConstant, 
inputFilesDecryptor);
-          break;
-
-        default:
-          throw new UnsupportedOperationException(
-              "Cannot read unknown format: " + task.file().format());
+      ReadBuilder<RowData, RowType> builder =
+          FormatModelRegistry.readBuilder(
+              task.file().format(), RowData.class, 
inputFilesDecryptor.getInputFile(task));
+
+      if (nameMapping != null) {
+        builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
       }
+
+      iter =
+          builder
+              .project(schema)
+              .idToConstant(idToConstant)
+              .split(task.start(), task.length())
+              .caseSensitive(caseSensitive)
+              .filter(task.residual())
+              .reuseContainers()
+              .build();
     }
 
     if (rowFilter != null) {
@@ -137,72 +129,6 @@ public class RowDataFileScanTaskReader implements 
FileScanTaskReader<RowData> {
     return iter;
   }
 
-  private CloseableIterable<RowData> newAvroIterable(
-      FileScanTask task,
-      Schema schema,
-      Map<Integer, ?> idToConstant,
-      InputFilesDecryptor inputFilesDecryptor) {
-    Avro.ReadBuilder builder =
-        Avro.read(inputFilesDecryptor.getInputFile(task))
-            .reuseContainers()
-            .project(schema)
-            .split(task.start(), task.length())
-            .createReaderFunc(readSchema -> 
FlinkPlannedAvroReader.create(schema, idToConstant));
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
-  }
-
-  private CloseableIterable<RowData> newParquetIterable(
-      FileScanTask task,
-      Schema schema,
-      Map<Integer, ?> idToConstant,
-      InputFilesDecryptor inputFilesDecryptor) {
-    Parquet.ReadBuilder builder =
-        Parquet.read(inputFilesDecryptor.getInputFile(task))
-            .split(task.start(), task.length())
-            .project(schema)
-            .createReaderFunc(
-                fileSchema -> FlinkParquetReaders.buildReader(schema, 
fileSchema, idToConstant))
-            .filter(task.residual())
-            .caseSensitive(caseSensitive)
-            .reuseContainers();
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
-  }
-
-  private CloseableIterable<RowData> newOrcIterable(
-      FileScanTask task,
-      Schema schema,
-      Map<Integer, ?> idToConstant,
-      InputFilesDecryptor inputFilesDecryptor) {
-    Schema readSchemaWithoutConstantAndMetadataFields =
-        TypeUtil.selectNot(
-            schema, Sets.union(idToConstant.keySet(), 
MetadataColumns.metadataFieldIds()));
-
-    ORC.ReadBuilder builder =
-        ORC.read(inputFilesDecryptor.getInputFile(task))
-            .project(readSchemaWithoutConstantAndMetadataFields)
-            .split(task.start(), task.length())
-            .createReaderFunc(
-                readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema, 
idToConstant))
-            .filter(task.residual())
-            .caseSensitive(caseSensitive);
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
-  }
-
   private static class FlinkDeleteFilter extends DeleteFilter<RowData> {
     private final RowType requiredRowType;
     private final RowDataWrapper asStructLike;
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
index da5b5f6c28..339cd0510e 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
@@ -35,7 +35,7 @@ import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.common.DynFields;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
 import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.SimpleDataUtil;
@@ -238,21 +238,21 @@ public class TestCompressionSettings {
       testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1);
 
       testHarness.prepareSnapshotPreBarrier(1L);
-      DynFields.BoundField<IcebergStreamWriter> operatorField =
+      DynFields.BoundField<IcebergStreamWriter<?>> operatorField =
           DynFields.builder()
               .hiddenImpl(testHarness.getOperatorFactory().getClass(), 
"operator")
               .build(testHarness.getOperatorFactory());
-      DynFields.BoundField<TaskWriter> writerField =
+      DynFields.BoundField<TaskWriter<?>> writerField =
           DynFields.builder()
               .hiddenImpl(IcebergStreamWriter.class, "writer")
               .build(operatorField.get());
-      DynFields.BoundField<FileWriterFactory> writerFactoryField =
+      DynFields.BoundField<FileWriterFactory<?>> writerFactoryField =
           DynFields.builder()
               .hiddenImpl(BaseTaskWriter.class, "writerFactory")
               .build(writerField.get());
       DynFields.BoundField<Map<String, String>> propsField =
           DynFields.builder()
-              .hiddenImpl(BaseFileWriterFactory.class, "writerProperties")
+              .hiddenImpl(RegistryBasedFileWriterFactory.class, 
"writerProperties")
               .build(writerFactoryField.get());
       return propsField.get();
     }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 8e346cd8a1..f604f639f2 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -34,7 +34,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.common.DynFields;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
@@ -365,13 +365,13 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
     DynFields.BoundField<Map<WriteTarget, TaskWriter<RowData>>> writerField =
         DynFields.builder().hiddenImpl(dynamicWriter.getClass(), 
"writers").build(dynamicWriter);
 
-    DynFields.BoundField<FileWriterFactory> writerFactoryField =
+    DynFields.BoundField<FileWriterFactory<?>> writerFactoryField =
         DynFields.builder()
             .hiddenImpl(BaseTaskWriter.class, "writerFactory")
             .build(writerField.get().values().iterator().next());
     DynFields.BoundField<Map<String, String>> propsField =
         DynFields.builder()
-            .hiddenImpl(BaseFileWriterFactory.class, "writerProperties")
+            .hiddenImpl(RegistryBasedFileWriterFactory.class, 
"writerProperties")
             .build(writerFactoryField.get());
     return propsField.get();
   }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
new file mode 100644
index 0000000000..0026c8a302
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/data/FlinkFormatModels.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.data;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.avro.AvroFormatModel;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.orc.ORCFormatModel;
+import org.apache.iceberg.parquet.ParquetFormatModel;
+
+public class FlinkFormatModels {
+  public static void register() {
+    FormatModelRegistry.register(
+        ParquetFormatModel.create(
+            RowData.class,
+            RowType.class,
+            (icebergSchema, fileSchema, engineSchema) ->
+                FlinkParquetWriters.buildWriter(engineSchema, fileSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                FlinkParquetReaders.buildReader(icebergSchema, fileSchema, 
idToConstant)));
+
+    FormatModelRegistry.register(
+        AvroFormatModel.create(
+            RowData.class,
+            RowType.class,
+            (icebergSchema, fileSchema, engineSchema) -> new 
FlinkAvroWriter(engineSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                FlinkPlannedAvroReader.create(icebergSchema, idToConstant)));
+
+    FormatModelRegistry.register(
+        ORCFormatModel.create(
+            RowData.class,
+            RowType.class,
+            (icebergSchema, fileSchema, engineSchema) ->
+                FlinkOrcWriter.buildWriter(engineSchema, icebergSchema),
+            (icebergSchema, fileSchema, engineSchema, idToConstant) ->
+                new FlinkOrcReader(icebergSchema, fileSchema, idToConstant)));
+  }
+
+  private FlinkFormatModels() {}
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
index b3ada41737..d5247941d8 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkFileWriterFactory.java
@@ -25,28 +25,19 @@ import static 
org.apache.iceberg.TableProperties.DELETE_DEFAULT_FILE_FORMAT;
 import java.io.Serializable;
 import java.util.Map;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.data.FlinkAvroWriter;
-import org.apache.iceberg.flink.data.FlinkOrcWriter;
-import org.apache.iceberg.flink.data.FlinkParquetWriters;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 
-public class FlinkFileWriterFactory extends BaseFileWriterFactory<RowData> 
implements Serializable {
-  private RowType dataFlinkType;
-  private RowType equalityDeleteFlinkType;
-
-  private FlinkFileWriterFactory(
+public class FlinkFileWriterFactory extends 
RegistryBasedFileWriterFactory<RowData, RowType>
+    implements Serializable {
+  FlinkFileWriterFactory(
       Table table,
       FileFormat dataFileFormat,
       Schema dataSchema,
@@ -62,85 +53,30 @@ public class FlinkFileWriterFactory extends 
BaseFileWriterFactory<RowData> imple
     super(
         table,
         dataFileFormat,
+        RowData.class,
         dataSchema,
         dataSortOrder,
         deleteFileFormat,
         equalityFieldIds,
         equalityDeleteRowSchema,
         equalityDeleteSortOrder,
-        writeProperties);
-
-    this.dataFlinkType = dataFlinkType;
-    this.equalityDeleteFlinkType = equalityDeleteFlinkType;
-  }
-
-  static Builder builderFor(Table table) {
-    return new Builder(table);
-  }
-
-  @Override
-  protected void configureDataWrite(Avro.DataWriteBuilder builder) {
-    builder.createWriterFunc(ignore -> new FlinkAvroWriter(dataFlinkType()));
-  }
-
-  @Override
-  protected void configureEqualityDelete(Avro.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(ignored -> new 
FlinkAvroWriter(equalityDeleteFlinkType()));
-  }
-
-  @Override
-  protected void configurePositionDelete(Avro.DeleteWriteBuilder builder) {}
-
-  @Override
-  protected void configureDataWrite(Parquet.DataWriteBuilder builder) {
-    builder.createWriterFunc(msgType -> 
FlinkParquetWriters.buildWriter(dataFlinkType(), msgType));
-  }
-
-  @Override
-  protected void configureEqualityDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        msgType -> FlinkParquetWriters.buildWriter(equalityDeleteFlinkType(), 
msgType));
-  }
-
-  @Override
-  protected void configurePositionDelete(Parquet.DeleteWriteBuilder builder) {
-    builder.transformPaths(path -> StringData.fromString(path.toString()));
-  }
-
-  @Override
-  protected void configureDataWrite(ORC.DataWriteBuilder builder) {
-    builder.createWriterFunc(
-        (iSchema, typDesc) -> FlinkOrcWriter.buildWriter(dataFlinkType(), 
iSchema));
-  }
-
-  @Override
-  protected void configureEqualityDelete(ORC.DeleteWriteBuilder builder) {
-    builder.createWriterFunc(
-        (iSchema, typDesc) -> 
FlinkOrcWriter.buildWriter(equalityDeleteFlinkType(), iSchema));
+        writeProperties,
+        dataFlinkType == null ? FlinkSchemaUtil.convert(dataSchema) : 
dataFlinkType,
+        equalityDeleteInputSchema(equalityDeleteFlinkType, 
equalityDeleteRowSchema));
   }
 
-  @Override
-  protected void configurePositionDelete(ORC.DeleteWriteBuilder builder) {
-    builder.transformPaths(path -> StringData.fromString(path.toString()));
-  }
-
-  private RowType dataFlinkType() {
-    if (dataFlinkType == null) {
-      Preconditions.checkNotNull(dataSchema(), "Data schema must not be null");
-      this.dataFlinkType = FlinkSchemaUtil.convert(dataSchema());
+  private static RowType equalityDeleteInputSchema(RowType rowType, Schema 
rowSchema) {
+    if (rowType != null) {
+      return rowType;
+    } else if (rowSchema != null) {
+      return FlinkSchemaUtil.convert(rowSchema);
+    } else {
+      return null;
     }
-
-    return dataFlinkType;
   }
 
-  private RowType equalityDeleteFlinkType() {
-    if (equalityDeleteFlinkType == null) {
-      Preconditions.checkNotNull(
-          equalityDeleteRowSchema(), "Equality delete schema must not be 
null");
-      this.equalityDeleteFlinkType = 
FlinkSchemaUtil.convert(equalityDeleteRowSchema());
-    }
-
-    return equalityDeleteFlinkType;
+  static Builder builderFor(Table table) {
+    return new Builder(table);
   }
 
   public static class Builder {
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
index b8fb1ba32e..ee4aaf4a3d 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java
@@ -24,10 +24,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
-import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.encryption.InputFilesDecryptor;
 import org.apache.iceberg.expressions.Expression;
@@ -35,19 +33,14 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.FlinkSourceFilter;
 import org.apache.iceberg.flink.RowDataWrapper;
-import org.apache.iceberg.flink.data.FlinkOrcReader;
-import org.apache.iceberg.flink.data.FlinkParquetReaders;
-import org.apache.iceberg.flink.data.FlinkPlannedAvroReader;
 import org.apache.iceberg.flink.data.RowDataProjection;
 import org.apache.iceberg.flink.data.RowDataUtil;
+import org.apache.iceberg.formats.FormatModelRegistry;
+import org.apache.iceberg.formats.ReadBuilder;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.mapping.NameMappingParser;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PartitionUtil;
 
 @Internal
@@ -73,8 +66,7 @@ public class RowDataFileScanTaskReader implements 
FileScanTaskReader<RowData> {
     if (filters != null && !filters.isEmpty()) {
       Expression combinedExpression =
           filters.stream().reduce(Expressions.alwaysTrue(), Expressions::and);
-      this.rowFilter =
-          new FlinkSourceFilter(this.projectedSchema, combinedExpression, 
this.caseSensitive);
+      this.rowFilter = new FlinkSourceFilter(projectedSchema, 
combinedExpression, caseSensitive);
     } else {
       this.rowFilter = null;
     }
@@ -112,23 +104,23 @@ public class RowDataFileScanTaskReader implements 
FileScanTaskReader<RowData> {
     if (task.isDataTask()) {
       throw new UnsupportedOperationException("Cannot read data task.");
     } else {
-      switch (task.file().format()) {
-        case PARQUET:
-          iter = newParquetIterable(task, schema, idToConstant, 
inputFilesDecryptor);
-          break;
-
-        case AVRO:
-          iter = newAvroIterable(task, schema, idToConstant, 
inputFilesDecryptor);
-          break;
-
-        case ORC:
-          iter = newOrcIterable(task, schema, idToConstant, 
inputFilesDecryptor);
-          break;
-
-        default:
-          throw new UnsupportedOperationException(
-              "Cannot read unknown format: " + task.file().format());
+      ReadBuilder<RowData, RowType> builder =
+          FormatModelRegistry.readBuilder(
+              task.file().format(), RowData.class, 
inputFilesDecryptor.getInputFile(task));
+
+      if (nameMapping != null) {
+        builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
       }
+
+      iter =
+          builder
+              .project(schema)
+              .idToConstant(idToConstant)
+              .split(task.start(), task.length())
+              .caseSensitive(caseSensitive)
+              .filter(task.residual())
+              .reuseContainers()
+              .build();
     }
 
     if (rowFilter != null) {
@@ -137,72 +129,6 @@ public class RowDataFileScanTaskReader implements 
FileScanTaskReader<RowData> {
     return iter;
   }
 
-  private CloseableIterable<RowData> newAvroIterable(
-      FileScanTask task,
-      Schema schema,
-      Map<Integer, ?> idToConstant,
-      InputFilesDecryptor inputFilesDecryptor) {
-    Avro.ReadBuilder builder =
-        Avro.read(inputFilesDecryptor.getInputFile(task))
-            .reuseContainers()
-            .project(schema)
-            .split(task.start(), task.length())
-            .createReaderFunc(readSchema -> 
FlinkPlannedAvroReader.create(schema, idToConstant));
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
-  }
-
-  private CloseableIterable<RowData> newParquetIterable(
-      FileScanTask task,
-      Schema schema,
-      Map<Integer, ?> idToConstant,
-      InputFilesDecryptor inputFilesDecryptor) {
-    Parquet.ReadBuilder builder =
-        Parquet.read(inputFilesDecryptor.getInputFile(task))
-            .split(task.start(), task.length())
-            .project(schema)
-            .createReaderFunc(
-                fileSchema -> FlinkParquetReaders.buildReader(schema, 
fileSchema, idToConstant))
-            .filter(task.residual())
-            .caseSensitive(caseSensitive)
-            .reuseContainers();
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
-  }
-
-  private CloseableIterable<RowData> newOrcIterable(
-      FileScanTask task,
-      Schema schema,
-      Map<Integer, ?> idToConstant,
-      InputFilesDecryptor inputFilesDecryptor) {
-    Schema readSchemaWithoutConstantAndMetadataFields =
-        TypeUtil.selectNot(
-            schema, Sets.union(idToConstant.keySet(), 
MetadataColumns.metadataFieldIds()));
-
-    ORC.ReadBuilder builder =
-        ORC.read(inputFilesDecryptor.getInputFile(task))
-            .project(readSchemaWithoutConstantAndMetadataFields)
-            .split(task.start(), task.length())
-            .createReaderFunc(
-                readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema, 
idToConstant))
-            .filter(task.residual())
-            .caseSensitive(caseSensitive);
-
-    if (nameMapping != null) {
-      builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-    }
-
-    return builder.build();
-  }
-
   private static class FlinkDeleteFilter extends DeleteFilter<RowData> {
     private final RowType requiredRowType;
     private final RowDataWrapper asStructLike;
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
index da5b5f6c28..339cd0510e 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
@@ -35,7 +35,7 @@ import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.common.DynFields;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
 import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.SimpleDataUtil;
@@ -238,21 +238,21 @@ public class TestCompressionSettings {
       testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1);
 
       testHarness.prepareSnapshotPreBarrier(1L);
-      DynFields.BoundField<IcebergStreamWriter> operatorField =
+      DynFields.BoundField<IcebergStreamWriter<?>> operatorField =
           DynFields.builder()
               .hiddenImpl(testHarness.getOperatorFactory().getClass(), 
"operator")
               .build(testHarness.getOperatorFactory());
-      DynFields.BoundField<TaskWriter> writerField =
+      DynFields.BoundField<TaskWriter<?>> writerField =
           DynFields.builder()
               .hiddenImpl(IcebergStreamWriter.class, "writer")
               .build(operatorField.get());
-      DynFields.BoundField<FileWriterFactory> writerFactoryField =
+      DynFields.BoundField<FileWriterFactory<?>> writerFactoryField =
           DynFields.builder()
               .hiddenImpl(BaseTaskWriter.class, "writerFactory")
               .build(writerField.get());
       DynFields.BoundField<Map<String, String>> propsField =
           DynFields.builder()
-              .hiddenImpl(BaseFileWriterFactory.class, "writerProperties")
+              .hiddenImpl(RegistryBasedFileWriterFactory.class, 
"writerProperties")
               .build(writerFactoryField.get());
       return propsField.get();
     }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
index 8e346cd8a1..f604f639f2 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicWriter.java
@@ -34,7 +34,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.common.DynFields;
-import org.apache.iceberg.data.BaseFileWriterFactory;
+import org.apache.iceberg.data.RegistryBasedFileWriterFactory;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
@@ -365,13 +365,13 @@ class TestDynamicWriter extends TestFlinkIcebergSinkBase {
     DynFields.BoundField<Map<WriteTarget, TaskWriter<RowData>>> writerField =
         DynFields.builder().hiddenImpl(dynamicWriter.getClass(), 
"writers").build(dynamicWriter);
 
-    DynFields.BoundField<FileWriterFactory> writerFactoryField =
+    DynFields.BoundField<FileWriterFactory<?>> writerFactoryField =
         DynFields.builder()
             .hiddenImpl(BaseTaskWriter.class, "writerFactory")
             .build(writerField.get().values().iterator().next());
     DynFields.BoundField<Map<String, String>> propsField =
         DynFields.builder()
-            .hiddenImpl(BaseFileWriterFactory.class, "writerProperties")
+            .hiddenImpl(RegistryBasedFileWriterFactory.class, 
"writerProperties")
             .build(writerFactoryField.get());
     return propsField.get();
   }


Reply via email to