This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c2fd77a3f4 Flink: Add RowConverter for Iceberg Source (#11301)
c2fd77a3f4 is described below

commit c2fd77a3f4c804572c4fbe6081e150510ca9e262
Author: abharath9 <[email protected]>
AuthorDate: Sat Dec 14 16:49:11 2024 -0600

    Flink: Add RowConverter for Iceberg Source (#11301)
    
    Co-authored-by: Bharath Kumar Avusherla <[email protected]>
---
 .../iceberg/flink/source/reader/RowConverter.java  |  59 +++++++
 ... => TestIcebergSourceBoundedConverterBase.java} | 151 ++++++++--------
 .../TestIcebergSourceBoundedGenericRecord.java     | 192 ++++-----------------
 .../flink/source/TestIcebergSourceBoundedRow.java  |  52 ++++++
 4 files changed, 220 insertions(+), 234 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java
new file mode 100644
index 0000000000..a84384fe17
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+
+public class RowConverter implements RowDataConverter<Row> {
+  private final DataStructureConverter<Object, Object> converter;
+  private final TypeInformation<Row> outputTypeInfo;
+
+  private RowConverter(RowType rowType, TypeInformation<Row> rowTypeInfo) {
+    this.converter =
+        
DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(rowType));
+    this.outputTypeInfo = rowTypeInfo;
+  }
+
+  public static RowConverter fromIcebergSchema(org.apache.iceberg.Schema 
icebergSchema) {
+    RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+    TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema);
+    RowTypeInfo rowTypeInfo =
+        new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
+    return new RowConverter(rowType, rowTypeInfo);
+  }
+
+  @Override
+  public Row apply(RowData rowData) {
+    return (Row) converter.toExternal(rowData);
+  }
+
+  @Override
+  public TypeInformation<Row> getProducedType() {
+    return outputTypeInfo;
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java
similarity index 62%
copy from 
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
copy to 
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java
index 4e649d15b1..5ef387864b 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java
@@ -18,18 +18,17 @@
  */
 package org.apache.iceberg.flink.source;
 
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.iceberg.FileFormat;
@@ -38,22 +37,17 @@ import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.flink.FlinkConfigOptions;
-import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.HadoopCatalogExtension;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.TestHelpers;
-import org.apache.iceberg.flink.data.RowDataToRowMapper;
-import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
 import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
-import org.apache.iceberg.flink.source.reader.AvroGenericRecordConverter;
-import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
@@ -63,11 +57,11 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
 
 @ExtendWith(ParameterizedTestExtension.class)
-public class TestIcebergSourceBoundedGenericRecord {
+public abstract class TestIcebergSourceBoundedConverterBase<T> {
   @TempDir protected Path temporaryFolder;
 
   @RegisterExtension
-  private static final HadoopCatalogExtension CATALOG_EXTENSION =
+  static final HadoopCatalogExtension CATALOG_EXTENSION =
       new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
 
   @Parameters(name = "format={0}, parallelism = {1}, useConverter = {2}")
@@ -75,19 +69,18 @@ public class TestIcebergSourceBoundedGenericRecord {
     return new Object[][] {
       {FileFormat.AVRO, 2, true},
       {FileFormat.PARQUET, 2, true},
-      {FileFormat.PARQUET, 2, false},
       {FileFormat.ORC, 2, true}
     };
   }
 
   @Parameter(index = 0)
-  private FileFormat fileFormat;
+  FileFormat fileFormat;
 
   @Parameter(index = 1)
-  private int parallelism;
+  int parallelism;
 
   @Parameter(index = 2)
-  private boolean useConverter;
+  boolean useConverter;
 
   @TestTemplate
   public void testUnpartitionedTable() throws Exception {
@@ -101,29 +94,20 @@ public class TestIcebergSourceBoundedGenericRecord {
   @TestTemplate
   public void testPartitionedTable() throws Exception {
     String dateStr = "2020-03-20";
-    Table table =
-        CATALOG_EXTENSION
-            .catalog()
-            .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, 
TestFixtures.SPEC);
+    Table table = getPartitionedTable();
     List<Record> expectedRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
-    for (int i = 0; i < expectedRecords.size(); ++i) {
-      expectedRecords.get(i).setField("dt", dateStr);
+    for (Record expectedRecord : expectedRecords) {
+      expectedRecord.setField("dt", dateStr);
     }
-
-    new GenericAppenderHelper(table, fileFormat, temporaryFolder)
-        .appendToTable(org.apache.iceberg.TestHelpers.Row.of(dateStr, 0), 
expectedRecords);
+    addRecordsToPartitionedTable(table, dateStr, expectedRecords);
     TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
   }
 
   @TestTemplate
   public void testProjection() throws Exception {
-    Table table =
-        CATALOG_EXTENSION
-            .catalog()
-            .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, 
TestFixtures.SPEC);
+    Table table = getPartitionedTable();
     List<Record> expectedRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
-    new GenericAppenderHelper(table, fileFormat, temporaryFolder)
-        .appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), 
expectedRecords);
+    addRecordsToPartitionedTable(table, "2020-03-20", expectedRecords);
     // select the "data" field (fieldId == 1)
     Schema projectedSchema = TypeUtil.select(TestFixtures.SCHEMA, 
Sets.newHashSet(1));
     List<Row> expectedRows =
@@ -132,6 +116,22 @@ public class TestIcebergSourceBoundedGenericRecord {
         run(projectedSchema, Collections.emptyList(), Collections.emptyMap()), 
expectedRows);
   }
 
+  static Table getPartitionedTable() {
+    return CATALOG_EXTENSION
+        .catalog()
+        .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, 
TestFixtures.SPEC);
+  }
+
+  static TableLoader tableLoader() {
+    return CATALOG_EXTENSION.tableLoader();
+  }
+
+  private void addRecordsToPartitionedTable(
+      Table table, String dateStr, List<Record> expectedRecords) throws 
IOException {
+    new GenericAppenderHelper(table, fileFormat, temporaryFolder)
+        .appendToTable(org.apache.iceberg.TestHelpers.Row.of(dateStr, 0), 
expectedRecords);
+  }
+
   private List<Row> run() throws Exception {
     return run(null, Collections.emptyList(), Collections.emptyMap());
   }
@@ -147,19 +147,14 @@ public class TestIcebergSourceBoundedGenericRecord {
     Configuration config = new Configuration();
     
config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 
128);
     Table table;
-    try (TableLoader tableLoader = CATALOG_EXTENSION.tableLoader()) {
+    try (TableLoader tableLoader = tableLoader()) {
       tableLoader.open();
       table = tableLoader.loadTable();
     }
 
     Schema readSchema = projectedSchema != null ? projectedSchema : 
table.schema();
-    IcebergSource.Builder<GenericRecord> sourceBuilder;
-    if (useConverter) {
-      sourceBuilder = createSourceBuilderWithConverter(table, readSchema, 
config);
-    } else {
-      sourceBuilder =
-          createSourceBuilderWithReaderFunction(table, projectedSchema, 
filters, config);
-    }
+    IcebergSource.Builder<T> sourceBuilder =
+        getSourceBuilder(projectedSchema, filters, readSchema, config, table);
 
     if (projectedSchema != null) {
       sourceBuilder.project(projectedSchema);
@@ -168,55 +163,61 @@ public class TestIcebergSourceBoundedGenericRecord {
     sourceBuilder.filters(filters);
     sourceBuilder.setAll(options);
 
-    RowType rowType = FlinkSchemaUtil.convert(readSchema);
-    org.apache.avro.Schema avroSchema =
-        AvroSchemaUtil.convert(readSchema, 
TestFixtures.TABLE_IDENTIFIER.name());
-
-    DataStream<Row> stream =
+    DataStream<T> inputStream =
         env.fromSource(
-                sourceBuilder.build(),
-                WatermarkStrategy.noWatermarks(),
-                "testBasicRead",
-                new GenericRecordAvroTypeInfo(avroSchema))
-            // There are two reasons for converting GenericRecord back to Row.
-            // 1. Avro GenericRecord/Schema is not serializable.
-            // 2. leverage the TestHelpers.assertRecords for validation.
-            .map(AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema))
-            .map(new RowDataToRowMapper(rowType));
+            sourceBuilder.build(),
+            WatermarkStrategy.noWatermarks(),
+            "testBasicRead",
+            getTypeInfo(readSchema));
+
+    DataStream<Row> stream = mapToRow(inputStream, readSchema);
 
     try (CloseableIterator<Row> iter = stream.executeAndCollect()) {
       return Lists.newArrayList(iter);
     }
   }
 
-  private IcebergSource.Builder<GenericRecord> 
createSourceBuilderWithReaderFunction(
-      Table table, Schema projected, List<Expression> filters, Configuration 
config) {
-    AvroGenericRecordReaderFunction readerFunction =
-        new AvroGenericRecordReaderFunction(
-            TestFixtures.TABLE_IDENTIFIER.name(),
-            new Configuration(),
-            table.schema(),
-            projected,
-            null,
-            false,
-            table.io(),
-            table.encryption(),
-            filters);
-
-    return IcebergSource.<GenericRecord>builder()
-        .tableLoader(CATALOG_EXTENSION.tableLoader())
-        .readerFunction(readerFunction)
+  private IcebergSource.Builder<T> getSourceBuilder(
+      Schema projectedSchema,
+      List<Expression> filters,
+      Schema readSchema,
+      Configuration config,
+      Table table)
+      throws Exception {
+    if (useConverter) {
+      return createSourceBuilderWithConverter(readSchema, config, table);
+    }
+    return createSourceBuilderWithReaderFunction(table, projectedSchema, 
filters, config);
+  }
+
+  private IcebergSource.Builder<T> createSourceBuilderWithConverter(
+      Schema readSchema, Configuration config, Table table) throws Exception {
+    return IcebergSource.forOutputType(getConverter(readSchema, table))
+        .tableLoader(tableLoader())
         .assignerFactory(new SimpleSplitAssignerFactory())
         .flinkConfig(config);
   }
 
-  private IcebergSource.Builder<GenericRecord> 
createSourceBuilderWithConverter(
-      Table table, Schema readSchema, Configuration config) {
-    AvroGenericRecordConverter converter =
-        AvroGenericRecordConverter.fromIcebergSchema(readSchema, table.name());
-    return IcebergSource.forOutputType(converter)
-        .tableLoader(CATALOG_EXTENSION.tableLoader())
+  private IcebergSource.Builder<T> createSourceBuilderWithReaderFunction(
+      Table table, Schema projected, List<Expression> filters, Configuration 
config)
+      throws Exception {
+    return IcebergSource.<T>builder()
+        .tableLoader(tableLoader())
+        .readerFunction(getReaderFunction(projected, table, filters))
         .assignerFactory(new SimpleSplitAssignerFactory())
         .flinkConfig(config);
   }
+
+  protected abstract 
org.apache.iceberg.flink.source.reader.RowDataConverter<T> getConverter(
+      org.apache.iceberg.Schema icebergSchema, Table table) throws Exception;
+
+  protected ReaderFunction<T> getReaderFunction(
+      org.apache.iceberg.Schema icebergSchema, Table table, List<Expression> 
filters)
+      throws Exception {
+    throw new UnsupportedOperationException("No default implementation for 
getReaderFunction");
+  }
+
+  protected abstract TypeInformation<T> getTypeInfo(Schema icebergSchema);
+
+  protected abstract DataStream<Row> mapToRow(DataStream<T> inputStream, 
Schema icebergSchema);
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
index 4e649d15b1..faddce5422 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
@@ -18,57 +18,34 @@
  */
 package org.apache.iceberg.flink.source;
 
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
 import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.data.GenericAppenderHelper;
-import org.apache.iceberg.data.RandomGenericData;
-import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.flink.FlinkConfigOptions;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
-import org.apache.iceberg.flink.TestHelpers;
 import org.apache.iceberg.flink.data.RowDataToRowMapper;
 import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
-import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
 import org.apache.iceberg.flink.source.reader.AvroGenericRecordConverter;
 import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.TypeUtil;
-import org.junit.jupiter.api.TestTemplate;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.RowDataConverter;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.io.TempDir;
 
 @ExtendWith(ParameterizedTestExtension.class)
-public class TestIcebergSourceBoundedGenericRecord {
-  @TempDir protected Path temporaryFolder;
-
-  @RegisterExtension
-  private static final HadoopCatalogExtension CATALOG_EXTENSION =
-      new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
+public class TestIcebergSourceBoundedGenericRecord
+    extends TestIcebergSourceBoundedConverterBase<GenericRecord> {
 
   @Parameters(name = "format={0}, parallelism = {1}, useConverter = {2}")
   public static Object[][] parameters() {
@@ -80,143 +57,40 @@ public class TestIcebergSourceBoundedGenericRecord {
     };
   }
 
-  @Parameter(index = 0)
-  private FileFormat fileFormat;
-
-  @Parameter(index = 1)
-  private int parallelism;
-
-  @Parameter(index = 2)
-  private boolean useConverter;
-
-  @TestTemplate
-  public void testUnpartitionedTable() throws Exception {
-    Table table =
-        CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, 
TestFixtures.SCHEMA);
-    List<Record> expectedRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
-    new GenericAppenderHelper(table, fileFormat, 
temporaryFolder).appendToTable(expectedRecords);
-    TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
+  @Override
+  protected RowDataConverter<GenericRecord> getConverter(Schema icebergSchema, 
Table table) {
+    return AvroGenericRecordConverter.fromIcebergSchema(icebergSchema, 
table.name());
   }
 
-  @TestTemplate
-  public void testPartitionedTable() throws Exception {
-    String dateStr = "2020-03-20";
-    Table table =
-        CATALOG_EXTENSION
-            .catalog()
-            .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, 
TestFixtures.SPEC);
-    List<Record> expectedRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
-    for (int i = 0; i < expectedRecords.size(); ++i) {
-      expectedRecords.get(i).setField("dt", dateStr);
-    }
-
-    new GenericAppenderHelper(table, fileFormat, temporaryFolder)
-        .appendToTable(org.apache.iceberg.TestHelpers.Row.of(dateStr, 0), 
expectedRecords);
-    TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
+  @Override
+  protected ReaderFunction<GenericRecord> getReaderFunction(
+      Schema icebergSchema, Table table, List<Expression> filters) throws 
Exception {
+    return new AvroGenericRecordReaderFunction(
+        TestFixtures.TABLE_IDENTIFIER.name(),
+        new Configuration(),
+        table.schema(),
+        icebergSchema,
+        null,
+        false,
+        table.io(),
+        table.encryption(),
+        filters);
   }
 
-  @TestTemplate
-  public void testProjection() throws Exception {
-    Table table =
-        CATALOG_EXTENSION
-            .catalog()
-            .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA, 
TestFixtures.SPEC);
-    List<Record> expectedRecords = 
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
-    new GenericAppenderHelper(table, fileFormat, temporaryFolder)
-        .appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), 
expectedRecords);
-    // select the "data" field (fieldId == 1)
-    Schema projectedSchema = TypeUtil.select(TestFixtures.SCHEMA, 
Sets.newHashSet(1));
-    List<Row> expectedRows =
-        Arrays.asList(Row.of(expectedRecords.get(0).get(0)), 
Row.of(expectedRecords.get(1).get(0)));
-    TestHelpers.assertRows(
-        run(projectedSchema, Collections.emptyList(), Collections.emptyMap()), 
expectedRows);
-  }
-
-  private List<Row> run() throws Exception {
-    return run(null, Collections.emptyList(), Collections.emptyMap());
-  }
-
-  private List<Row> run(
-      Schema projectedSchema, List<Expression> filters, Map<String, String> 
options)
-      throws Exception {
-
-    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-    env.setParallelism(parallelism);
-    env.getConfig().enableObjectReuse();
-
-    Configuration config = new Configuration();
-    
config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 
128);
-    Table table;
-    try (TableLoader tableLoader = CATALOG_EXTENSION.tableLoader()) {
-      tableLoader.open();
-      table = tableLoader.loadTable();
-    }
-
-    Schema readSchema = projectedSchema != null ? projectedSchema : 
table.schema();
-    IcebergSource.Builder<GenericRecord> sourceBuilder;
-    if (useConverter) {
-      sourceBuilder = createSourceBuilderWithConverter(table, readSchema, 
config);
-    } else {
-      sourceBuilder =
-          createSourceBuilderWithReaderFunction(table, projectedSchema, 
filters, config);
-    }
-
-    if (projectedSchema != null) {
-      sourceBuilder.project(projectedSchema);
-    }
-
-    sourceBuilder.filters(filters);
-    sourceBuilder.setAll(options);
-
-    RowType rowType = FlinkSchemaUtil.convert(readSchema);
+  @Override
+  protected TypeInformation<GenericRecord> getTypeInfo(Schema icebergSchema) {
     org.apache.avro.Schema avroSchema =
-        AvroSchemaUtil.convert(readSchema, 
TestFixtures.TABLE_IDENTIFIER.name());
-
-    DataStream<Row> stream =
-        env.fromSource(
-                sourceBuilder.build(),
-                WatermarkStrategy.noWatermarks(),
-                "testBasicRead",
-                new GenericRecordAvroTypeInfo(avroSchema))
-            // There are two reasons for converting GenericRecord back to Row.
-            // 1. Avro GenericRecord/Schema is not serializable.
-            // 2. leverage the TestHelpers.assertRecords for validation.
-            .map(AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema))
-            .map(new RowDataToRowMapper(rowType));
-
-    try (CloseableIterator<Row> iter = stream.executeAndCollect()) {
-      return Lists.newArrayList(iter);
-    }
+        AvroSchemaUtil.convert(icebergSchema, 
TestFixtures.TABLE_IDENTIFIER.name());
+    return new GenericRecordAvroTypeInfo(avroSchema);
   }
 
-  private IcebergSource.Builder<GenericRecord> 
createSourceBuilderWithReaderFunction(
-      Table table, Schema projected, List<Expression> filters, Configuration 
config) {
-    AvroGenericRecordReaderFunction readerFunction =
-        new AvroGenericRecordReaderFunction(
-            TestFixtures.TABLE_IDENTIFIER.name(),
-            new Configuration(),
-            table.schema(),
-            projected,
-            null,
-            false,
-            table.io(),
-            table.encryption(),
-            filters);
-
-    return IcebergSource.<GenericRecord>builder()
-        .tableLoader(CATALOG_EXTENSION.tableLoader())
-        .readerFunction(readerFunction)
-        .assignerFactory(new SimpleSplitAssignerFactory())
-        .flinkConfig(config);
-  }
-
-  private IcebergSource.Builder<GenericRecord> 
createSourceBuilderWithConverter(
-      Table table, Schema readSchema, Configuration config) {
-    AvroGenericRecordConverter converter =
-        AvroGenericRecordConverter.fromIcebergSchema(readSchema, table.name());
-    return IcebergSource.forOutputType(converter)
-        .tableLoader(CATALOG_EXTENSION.tableLoader())
-        .assignerFactory(new SimpleSplitAssignerFactory())
-        .flinkConfig(config);
+  @Override
+  protected DataStream<Row> mapToRow(DataStream<GenericRecord> inputStream, 
Schema icebergSchema) {
+    RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+    org.apache.avro.Schema avroSchema =
+        AvroSchemaUtil.convert(icebergSchema, 
TestFixtures.TABLE_IDENTIFIER.name());
+    return inputStream
+        .map(AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema))
+        .map(new RowDataToRowMapper(rowType));
   }
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java
new file mode 100644
index 0000000000..170069fecb
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.source.reader.RowConverter;
+import org.apache.iceberg.flink.source.reader.RowDataConverter;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestIcebergSourceBoundedRow extends 
TestIcebergSourceBoundedConverterBase<Row> {
+
+  @Override
+  protected RowDataConverter<Row> getConverter(Schema icebergSchema, Table 
table) {
+    return RowConverter.fromIcebergSchema(icebergSchema);
+  }
+
+  @Override
+  protected TypeInformation<Row> getTypeInfo(Schema icebergSchema) {
+    TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema);
+    return new RowTypeInfo(tableSchema.getFieldTypes(), 
tableSchema.getFieldNames());
+  }
+
+  @Override
+  protected DataStream<Row> mapToRow(DataStream<Row> inputStream, Schema 
icebergSchema) {
+    return inputStream;
+  }
+}

Reply via email to