This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 85cf79de01 Flink: deprecate ReaderFunction with a new Converter 
interface to simplify user experience (#10956)
85cf79de01 is described below

commit 85cf79de0156e4bb4d02931c8d8e10e1d6b478c5
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Wed Aug 21 14:56:32 2024 -0700

    Flink: deprecate ReaderFunction with a new Converter interface to simplify 
user experience (#10956)
---
 .../apache/iceberg/flink/source/IcebergSource.java |  94 +++++++++++++++----
 .../source/reader/AvroGenericRecordConverter.java  |  69 ++++++++++++++
 .../reader/AvroGenericRecordReaderFunction.java    |  10 +-
 ...rFunction.java => ConverterReaderFunction.java} | 101 ++++++++++++---------
 .../source/reader/IcebergSourceSplitReader.java    |   2 +-
 .../flink/source/reader/RowDataConverter.java      |  32 +++++++
 .../TestIcebergSourceBoundedGenericRecord.java     |  70 +++++++++-----
 7 files changed, 292 insertions(+), 86 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index e629cc19bb..351ba54e5c 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -61,10 +61,12 @@ import 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
 import 
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
 import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
 import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
+import org.apache.iceberg.flink.source.reader.ConverterReaderFunction;
 import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
 import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
 import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
 import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.RowDataConverter;
 import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
 import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
 import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
@@ -211,20 +213,40 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
     }
   }
 
+  /**
+   * Create a source builder.
+   *
+   * @deprecated since 1.7.0. Will be removed in 2.0.0; use{@link 
IcebergSource#forRowData()} or
+   *     {@link IcebergSource#forOutputType(RowDataConverter)} instead
+   */
+  @Deprecated
   public static <T> Builder<T> builder() {
     return new Builder<>();
   }
 
+  /** Create a source builder for RowData output type. */
   public static Builder<RowData> forRowData() {
     return new Builder<>();
   }
 
+  /**
+   * Create a source builder that would convert {@link RowData} to the output 
type {@code T}.
+   *
+   * @param converter convert {@link RowData} to output type {@code T}
+   * @param <T> output type
+   * @return an IcebergSource builder
+   */
+  public static <T> Builder<T> forOutputType(RowDataConverter<T> converter) {
+    return new Builder<T>().converter(converter);
+  }
+
   public static class Builder<T> {
     private TableLoader tableLoader;
     private Table table;
     private SplitAssignerFactory splitAssignerFactory;
     private SerializableComparator<IcebergSourceSplit> splitComparator;
     private ReaderFunction<T> readerFunction;
+    private RowDataConverter<T> converter;
     private ReadableConfig flinkConfig = new Configuration();
     private final ScanContext.Builder contextBuilder = ScanContext.builder();
     private TableSchema projectedFlinkSchema;
@@ -255,11 +277,28 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       return this;
     }
 
+    /**
+     * @deprecated since 1.7.0. Will be removed in 2.0.0; use{@link
+     *     IcebergSource#forOutputType(RowDataConverter)} instead to produce 
output type other than
+     *     {@link RowData}.
+     */
+    @Deprecated
     public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
+      Preconditions.checkState(
+          converter == null,
+          "Cannot set reader function when builder was created via 
IcebergSource.forOutputType(Converter)");
       this.readerFunction = newReaderFunction;
       return this;
     }
 
+    /**
+     * Don't need to be public. It is set by {@link 
IcebergSource#forOutputType(RowDataConverter)}.
+     */
+    private Builder<T> converter(RowDataConverter<T> newConverter) {
+      this.converter = newConverter;
+      return this;
+    }
+
     public Builder<T> flinkConfig(ReadableConfig config) {
       this.flinkConfig = config;
       return this;
@@ -510,25 +549,7 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
       ScanContext context = contextBuilder.build();
       context.validate();
       if (readerFunction == null) {
-        if (table instanceof BaseMetadataTable) {
-          MetaDataReaderFunction rowDataReaderFunction =
-              new MetaDataReaderFunction(
-                  flinkConfig, table.schema(), context.project(), table.io(), 
table.encryption());
-          this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
-        } else {
-          RowDataReaderFunction rowDataReaderFunction =
-              new RowDataReaderFunction(
-                  flinkConfig,
-                  table.schema(),
-                  context.project(),
-                  context.nameMapping(),
-                  context.caseSensitive(),
-                  table.io(),
-                  table.encryption(),
-                  context.filters(),
-                  context.limit());
-          this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
-        }
+        this.readerFunction = readerFunction(context);
       }
 
       if (splitAssignerFactory == null) {
@@ -549,5 +570,40 @@ public class IcebergSource<T> implements Source<T, 
IcebergSourceSplit, IcebergEn
           table,
           emitter);
     }
+
+    private ReaderFunction<T> readerFunction(ScanContext context) {
+      if (table instanceof BaseMetadataTable) {
+        MetaDataReaderFunction rowDataReaderFunction =
+            new MetaDataReaderFunction(
+                flinkConfig, table.schema(), context.project(), table.io(), 
table.encryption());
+        return (ReaderFunction<T>) rowDataReaderFunction;
+      } else {
+        if (converter == null) {
+          return (ReaderFunction<T>)
+              new RowDataReaderFunction(
+                  flinkConfig,
+                  table.schema(),
+                  context.project(),
+                  context.nameMapping(),
+                  context.caseSensitive(),
+                  table.io(),
+                  table.encryption(),
+                  context.filters(),
+                  context.limit());
+        } else {
+          return new ConverterReaderFunction<>(
+              converter,
+              flinkConfig,
+              table.schema(),
+              context.project(),
+              context.nameMapping(),
+              context.caseSensitive(),
+              table.io(),
+              table.encryption(),
+              context.filters(),
+              context.limit());
+        }
+      }
+    }
   }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
new file mode 100644
index 0000000000..b158b0871a
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+
+public class AvroGenericRecordConverter implements 
RowDataConverter<GenericRecord> {
+  private final Schema avroSchema;
+  private final RowDataToAvroConverters.RowDataToAvroConverter flinkConverter;
+  private final TypeInformation<GenericRecord> outputTypeInfo;
+
+  private AvroGenericRecordConverter(Schema avroSchema, RowType rowType) {
+    this.avroSchema = avroSchema;
+    this.flinkConverter = RowDataToAvroConverters.createConverter(rowType);
+    this.outputTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
+  }
+
+  public static AvroGenericRecordConverter fromIcebergSchema(
+      org.apache.iceberg.Schema icebergSchema, String tableName) {
+    RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+    Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, tableName);
+    return new AvroGenericRecordConverter(avroSchema, rowType);
+  }
+
+  public static AvroGenericRecordConverter fromAvroSchema(Schema avroSchema, 
String tableName) {
+    DataType dataType = 
AvroSchemaConverter.convertToDataType(avroSchema.toString());
+    LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
+    RowType rowType = RowType.of(logicalType.getChildren().toArray(new 
LogicalType[0]));
+    return new AvroGenericRecordConverter(avroSchema, rowType);
+  }
+
+  @Override
+  public GenericRecord apply(RowData rowData) {
+    return (GenericRecord) flinkConverter.convert(avroSchema, rowData);
+  }
+
+  @Override
+  public TypeInformation<GenericRecord> getProducedType() {
+    return outputTypeInfo;
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
index 66e59633ff..f89e5ce134 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
@@ -28,13 +28,21 @@ import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
 import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.IcebergSource;
 import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
 import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
-/** Read Iceberg rows as {@link GenericRecord}. */
+/**
+ * Read Iceberg rows as {@link GenericRecord}.
+ *
+ * @deprecated since 1.7.0. Will be removed in 2.0.0; use {@link
+ *     IcebergSource#forOutputType(RowDataConverter)} and {@link 
AvroGenericRecordConverter}
+ *     instead.
+ */
+@Deprecated
 public class AvroGenericRecordReaderFunction extends 
DataIteratorReaderFunction<GenericRecord> {
   private final String tableName;
   private final Schema readSchema;
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ConverterReaderFunction.java
similarity index 51%
copy from 
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
copy to 
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ConverterReaderFunction.java
index 66e59633ff..e1e7c17d63 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ConverterReaderFunction.java
@@ -19,49 +19,37 @@
 package org.apache.iceberg.flink.source.reader;
 
 import java.util.List;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
 import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.encryption.InputFilesDecryptor;
 import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
 import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.FileScanTaskReader;
 import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
-import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
-/** Read Iceberg rows as {@link GenericRecord}. */
-public class AvroGenericRecordReaderFunction extends 
DataIteratorReaderFunction<GenericRecord> {
-  private final String tableName;
+@Internal
+public class ConverterReaderFunction<T> extends DataIteratorReaderFunction<T> {
+  private final RowDataConverter<T> converter;
+  private final Schema tableSchema;
   private final Schema readSchema;
+  private final String nameMapping;
+  private final boolean caseSensitive;
   private final FileIO io;
   private final EncryptionManager encryption;
-  private final RowDataFileScanTaskReader rowDataReader;
+  private final List<Expression> filters;
+  private final long limit;
 
-  private transient RowDataToAvroGenericRecordConverter converter;
+  private transient RecordLimiter recordLimiter = null;
 
-  /**
-   * Create a reader function without projection and name mapping. Column name 
is case-insensitive.
-   */
-  public static AvroGenericRecordReaderFunction fromTable(Table table) {
-    return new AvroGenericRecordReaderFunction(
-        table.name(),
-        new Configuration(),
-        table.schema(),
-        null,
-        null,
-        false,
-        table.io(),
-        table.encryption(),
-        null);
-  }
-
-  public AvroGenericRecordReaderFunction(
-      String tableName,
+  public ConverterReaderFunction(
+      RowDataConverter<T> converter,
       ReadableConfig config,
       Schema tableSchema,
       Schema projectedSchema,
@@ -69,34 +57,61 @@ public class AvroGenericRecordReaderFunction extends 
DataIteratorReaderFunction<
       boolean caseSensitive,
       FileIO io,
       EncryptionManager encryption,
-      List<Expression> filters) {
+      List<Expression> filters,
+      long limit) {
     super(new ListDataIteratorBatcher<>(config));
-    this.tableName = tableName;
+    this.converter = converter;
+    this.tableSchema = tableSchema;
     this.readSchema = readSchema(tableSchema, projectedSchema);
+    this.nameMapping = nameMapping;
+    this.caseSensitive = caseSensitive;
     this.io = io;
     this.encryption = encryption;
-    this.rowDataReader =
-        new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, 
caseSensitive, filters);
+    this.filters = filters;
+    this.limit = limit;
   }
 
   @Override
-  protected DataIterator<GenericRecord> createDataIterator(IcebergSourceSplit 
split) {
-    return new DataIterator<>(
-        new AvroGenericRecordFileScanTaskReader(rowDataReader, 
lazyConverter()),
+  protected DataIterator<T> createDataIterator(IcebergSourceSplit split) {
+    RowDataFileScanTaskReader rowDataReader =
+        new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, 
caseSensitive, filters);
+    return new LimitableDataIterator<>(
+        new ConverterFileScanTaskReader<>(rowDataReader, converter),
         split.task(),
         io,
-        encryption);
-  }
-
-  private RowDataToAvroGenericRecordConverter lazyConverter() {
-    if (converter == null) {
-      this.converter = 
RowDataToAvroGenericRecordConverter.fromIcebergSchema(tableName, readSchema);
-    }
-    return converter;
+        encryption,
+        lazyLimiter());
   }
 
   private static Schema readSchema(Schema tableSchema, Schema projectedSchema) 
{
     Preconditions.checkNotNull(tableSchema, "Table schema can't be null");
     return projectedSchema == null ? tableSchema : projectedSchema;
   }
+
+  /** Lazily create RecordLimiter to avoid the need to make it serializable */
+  private RecordLimiter lazyLimiter() {
+    if (recordLimiter == null) {
+      this.recordLimiter = RecordLimiter.create(limit);
+    }
+
+    return recordLimiter;
+  }
+
+  private static class ConverterFileScanTaskReader<T> implements 
FileScanTaskReader<T> {
+    private final RowDataFileScanTaskReader rowDataReader;
+    private final RowDataConverter<T> converter;
+
+    ConverterFileScanTaskReader(
+        RowDataFileScanTaskReader rowDataReader, RowDataConverter<T> 
converter) {
+      this.rowDataReader = rowDataReader;
+      this.converter = converter;
+    }
+
+    @Override
+    public CloseableIterator<T> open(
+        FileScanTask fileScanTask, InputFilesDecryptor inputFilesDecryptor) {
+      return CloseableIterator.transform(
+          rowDataReader.open(fileScanTask, inputFilesDecryptor), converter);
+    }
+  }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index 9c20494fdb..bcd72e2503 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -85,7 +85,7 @@ class IcebergSourceSplitReader<T> implements 
SplitReader<RecordAndPosition<T>, I
       } else {
         // return an empty result, which will lead to split fetch to be idle.
         // SplitFetcherManager will then close idle fetcher.
-        return new RecordsBySplits(Collections.emptyMap(), 
Collections.emptySet());
+        return new RecordsBySplits<>(Collections.emptyMap(), 
Collections.emptySet());
       }
     }
 
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataConverter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataConverter.java
new file mode 100644
index 0000000000..98bb7e9818
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.function.Function;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Convert RowData to a different output type.
+ *
+ * @param <T> output type
+ */
+public interface RowDataConverter<T>
+    extends Function<RowData, T>, ResultTypeQueryable<T>, Serializable {}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
index 7bfed00a9e..4e649d15b1 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.flink.data.RowDataToRowMapper;
 import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
 import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.AvroGenericRecordConverter;
 import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -69,12 +70,13 @@ public class TestIcebergSourceBoundedGenericRecord {
   private static final HadoopCatalogExtension CATALOG_EXTENSION =
       new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
 
-  @Parameters(name = "format={0}, parallelism = {1}")
+  @Parameters(name = "format={0}, parallelism = {1}, useConverter = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      {FileFormat.AVRO, 2},
-      {FileFormat.PARQUET, 2},
-      {FileFormat.ORC, 2}
+      {FileFormat.AVRO, 2, true},
+      {FileFormat.PARQUET, 2, true},
+      {FileFormat.PARQUET, 2, false},
+      {FileFormat.ORC, 2, true}
     };
   }
 
@@ -84,6 +86,9 @@ public class TestIcebergSourceBoundedGenericRecord {
   @Parameter(index = 1)
   private int parallelism;
 
+  @Parameter(index = 2)
+  private boolean useConverter;
+
   @TestTemplate
   public void testUnpartitionedTable() throws Exception {
     Table table =
@@ -147,24 +152,15 @@ public class TestIcebergSourceBoundedGenericRecord {
       table = tableLoader.loadTable();
     }
 
-    AvroGenericRecordReaderFunction readerFunction =
-        new AvroGenericRecordReaderFunction(
-            TestFixtures.TABLE_IDENTIFIER.name(),
-            new Configuration(),
-            table.schema(),
-            null,
-            null,
-            false,
-            table.io(),
-            table.encryption(),
-            filters);
+    Schema readSchema = projectedSchema != null ? projectedSchema : 
table.schema();
+    IcebergSource.Builder<GenericRecord> sourceBuilder;
+    if (useConverter) {
+      sourceBuilder = createSourceBuilderWithConverter(table, readSchema, 
config);
+    } else {
+      sourceBuilder =
+          createSourceBuilderWithReaderFunction(table, projectedSchema, 
filters, config);
+    }
 
-    IcebergSource.Builder<GenericRecord> sourceBuilder =
-        IcebergSource.<GenericRecord>builder()
-            .tableLoader(CATALOG_EXTENSION.tableLoader())
-            .readerFunction(readerFunction)
-            .assignerFactory(new SimpleSplitAssignerFactory())
-            .flinkConfig(config);
     if (projectedSchema != null) {
       sourceBuilder.project(projectedSchema);
     }
@@ -172,7 +168,6 @@ public class TestIcebergSourceBoundedGenericRecord {
     sourceBuilder.filters(filters);
     sourceBuilder.setAll(options);
 
-    Schema readSchema = projectedSchema != null ? projectedSchema : 
table.schema();
     RowType rowType = FlinkSchemaUtil.convert(readSchema);
     org.apache.avro.Schema avroSchema =
         AvroSchemaUtil.convert(readSchema, 
TestFixtures.TABLE_IDENTIFIER.name());
@@ -193,4 +188,35 @@ public class TestIcebergSourceBoundedGenericRecord {
       return Lists.newArrayList(iter);
     }
   }
+
+  private IcebergSource.Builder<GenericRecord> 
createSourceBuilderWithReaderFunction(
+      Table table, Schema projected, List<Expression> filters, Configuration 
config) {
+    AvroGenericRecordReaderFunction readerFunction =
+        new AvroGenericRecordReaderFunction(
+            TestFixtures.TABLE_IDENTIFIER.name(),
+            new Configuration(),
+            table.schema(),
+            projected,
+            null,
+            false,
+            table.io(),
+            table.encryption(),
+            filters);
+
+    return IcebergSource.<GenericRecord>builder()
+        .tableLoader(CATALOG_EXTENSION.tableLoader())
+        .readerFunction(readerFunction)
+        .assignerFactory(new SimpleSplitAssignerFactory())
+        .flinkConfig(config);
+  }
+
+  private IcebergSource.Builder<GenericRecord> 
createSourceBuilderWithConverter(
+      Table table, Schema readSchema, Configuration config) {
+    AvroGenericRecordConverter converter =
+        AvroGenericRecordConverter.fromIcebergSchema(readSchema, table.name());
+    return IcebergSource.forOutputType(converter)
+        .tableLoader(CATALOG_EXTENSION.tableLoader())
+        .assignerFactory(new SimpleSplitAssignerFactory())
+        .flinkConfig(config);
+  }
 }

Reply via email to