This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c2fd77a3f4 Flink: Add RowConverter for Iceberg Source (#11301)
c2fd77a3f4 is described below
commit c2fd77a3f4c804572c4fbe6081e150510ca9e262
Author: abharath9 <[email protected]>
AuthorDate: Sat Dec 14 16:49:11 2024 -0600
Flink: Add RowConverter for Iceberg Source (#11301)
Co-authored-by: Bharath Kumar Avusherla <[email protected]>
---
.../iceberg/flink/source/reader/RowConverter.java | 59 +++++++
... => TestIcebergSourceBoundedConverterBase.java} | 151 ++++++++--------
.../TestIcebergSourceBoundedGenericRecord.java | 192 ++++-----------------
.../flink/source/TestIcebergSourceBoundedRow.java | 52 ++++++
4 files changed, 220 insertions(+), 234 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java
new file mode 100644
index 0000000000..a84384fe17
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+
+public class RowConverter implements RowDataConverter<Row> {
+ private final DataStructureConverter<Object, Object> converter;
+ private final TypeInformation<Row> outputTypeInfo;
+
+ private RowConverter(RowType rowType, TypeInformation<Row> rowTypeInfo) {
+ this.converter =
+
DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(rowType));
+ this.outputTypeInfo = rowTypeInfo;
+ }
+
+ public static RowConverter fromIcebergSchema(org.apache.iceberg.Schema
icebergSchema) {
+ RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+ TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema);
+ RowTypeInfo rowTypeInfo =
+ new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames());
+ return new RowConverter(rowType, rowTypeInfo);
+ }
+
+ @Override
+ public Row apply(RowData rowData) {
+ return (Row) converter.toExternal(rowData);
+ }
+
+ @Override
+ public TypeInformation<Row> getProducedType() {
+ return outputTypeInfo;
+ }
+}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java
similarity index 62%
copy from
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
copy to
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java
index 4e649d15b1..5ef387864b 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedConverterBase.java
@@ -18,18 +18,17 @@
*/
package org.apache.iceberg.flink.source;
+import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
@@ -38,22 +37,17 @@ import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
-import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.TestHelpers;
-import org.apache.iceberg.flink.data.RowDataToRowMapper;
-import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
-import org.apache.iceberg.flink.source.reader.AvroGenericRecordConverter;
-import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -63,11 +57,11 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
@ExtendWith(ParameterizedTestExtension.class)
-public class TestIcebergSourceBoundedGenericRecord {
+public abstract class TestIcebergSourceBoundedConverterBase<T> {
@TempDir protected Path temporaryFolder;
@RegisterExtension
- private static final HadoopCatalogExtension CATALOG_EXTENSION =
+ static final HadoopCatalogExtension CATALOG_EXTENSION =
new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
@Parameters(name = "format={0}, parallelism = {1}, useConverter = {2}")
@@ -75,19 +69,18 @@ public class TestIcebergSourceBoundedGenericRecord {
return new Object[][] {
{FileFormat.AVRO, 2, true},
{FileFormat.PARQUET, 2, true},
- {FileFormat.PARQUET, 2, false},
{FileFormat.ORC, 2, true}
};
}
@Parameter(index = 0)
- private FileFormat fileFormat;
+ FileFormat fileFormat;
@Parameter(index = 1)
- private int parallelism;
+ int parallelism;
@Parameter(index = 2)
- private boolean useConverter;
+ boolean useConverter;
@TestTemplate
public void testUnpartitionedTable() throws Exception {
@@ -101,29 +94,20 @@ public class TestIcebergSourceBoundedGenericRecord {
@TestTemplate
public void testPartitionedTable() throws Exception {
String dateStr = "2020-03-20";
- Table table =
- CATALOG_EXTENSION
- .catalog()
- .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA,
TestFixtures.SPEC);
+ Table table = getPartitionedTable();
List<Record> expectedRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
- for (int i = 0; i < expectedRecords.size(); ++i) {
- expectedRecords.get(i).setField("dt", dateStr);
+ for (Record expectedRecord : expectedRecords) {
+ expectedRecord.setField("dt", dateStr);
}
-
- new GenericAppenderHelper(table, fileFormat, temporaryFolder)
- .appendToTable(org.apache.iceberg.TestHelpers.Row.of(dateStr, 0),
expectedRecords);
+ addRecordsToPartitionedTable(table, dateStr, expectedRecords);
TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
}
@TestTemplate
public void testProjection() throws Exception {
- Table table =
- CATALOG_EXTENSION
- .catalog()
- .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA,
TestFixtures.SPEC);
+ Table table = getPartitionedTable();
List<Record> expectedRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
- new GenericAppenderHelper(table, fileFormat, temporaryFolder)
- .appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0),
expectedRecords);
+ addRecordsToPartitionedTable(table, "2020-03-20", expectedRecords);
// select the "data" field (fieldId == 1)
Schema projectedSchema = TypeUtil.select(TestFixtures.SCHEMA,
Sets.newHashSet(1));
List<Row> expectedRows =
@@ -132,6 +116,22 @@ public class TestIcebergSourceBoundedGenericRecord {
run(projectedSchema, Collections.emptyList(), Collections.emptyMap()),
expectedRows);
}
+ static Table getPartitionedTable() {
+ return CATALOG_EXTENSION
+ .catalog()
+ .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA,
TestFixtures.SPEC);
+ }
+
+ static TableLoader tableLoader() {
+ return CATALOG_EXTENSION.tableLoader();
+ }
+
+ private void addRecordsToPartitionedTable(
+ Table table, String dateStr, List<Record> expectedRecords) throws
IOException {
+ new GenericAppenderHelper(table, fileFormat, temporaryFolder)
+ .appendToTable(org.apache.iceberg.TestHelpers.Row.of(dateStr, 0),
expectedRecords);
+ }
+
private List<Row> run() throws Exception {
return run(null, Collections.emptyList(), Collections.emptyMap());
}
@@ -147,19 +147,14 @@ public class TestIcebergSourceBoundedGenericRecord {
Configuration config = new Configuration();
config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT,
128);
Table table;
- try (TableLoader tableLoader = CATALOG_EXTENSION.tableLoader()) {
+ try (TableLoader tableLoader = tableLoader()) {
tableLoader.open();
table = tableLoader.loadTable();
}
Schema readSchema = projectedSchema != null ? projectedSchema :
table.schema();
- IcebergSource.Builder<GenericRecord> sourceBuilder;
- if (useConverter) {
- sourceBuilder = createSourceBuilderWithConverter(table, readSchema,
config);
- } else {
- sourceBuilder =
- createSourceBuilderWithReaderFunction(table, projectedSchema,
filters, config);
- }
+ IcebergSource.Builder<T> sourceBuilder =
+ getSourceBuilder(projectedSchema, filters, readSchema, config, table);
if (projectedSchema != null) {
sourceBuilder.project(projectedSchema);
@@ -168,55 +163,61 @@ public class TestIcebergSourceBoundedGenericRecord {
sourceBuilder.filters(filters);
sourceBuilder.setAll(options);
- RowType rowType = FlinkSchemaUtil.convert(readSchema);
- org.apache.avro.Schema avroSchema =
- AvroSchemaUtil.convert(readSchema,
TestFixtures.TABLE_IDENTIFIER.name());
-
- DataStream<Row> stream =
+ DataStream<T> inputStream =
env.fromSource(
- sourceBuilder.build(),
- WatermarkStrategy.noWatermarks(),
- "testBasicRead",
- new GenericRecordAvroTypeInfo(avroSchema))
- // There are two reasons for converting GenericRecord back to Row.
- // 1. Avro GenericRecord/Schema is not serializable.
- // 2. leverage the TestHelpers.assertRecords for validation.
- .map(AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema))
- .map(new RowDataToRowMapper(rowType));
+ sourceBuilder.build(),
+ WatermarkStrategy.noWatermarks(),
+ "testBasicRead",
+ getTypeInfo(readSchema));
+
+ DataStream<Row> stream = mapToRow(inputStream, readSchema);
try (CloseableIterator<Row> iter = stream.executeAndCollect()) {
return Lists.newArrayList(iter);
}
}
- private IcebergSource.Builder<GenericRecord>
createSourceBuilderWithReaderFunction(
- Table table, Schema projected, List<Expression> filters, Configuration
config) {
- AvroGenericRecordReaderFunction readerFunction =
- new AvroGenericRecordReaderFunction(
- TestFixtures.TABLE_IDENTIFIER.name(),
- new Configuration(),
- table.schema(),
- projected,
- null,
- false,
- table.io(),
- table.encryption(),
- filters);
-
- return IcebergSource.<GenericRecord>builder()
- .tableLoader(CATALOG_EXTENSION.tableLoader())
- .readerFunction(readerFunction)
+ private IcebergSource.Builder<T> getSourceBuilder(
+ Schema projectedSchema,
+ List<Expression> filters,
+ Schema readSchema,
+ Configuration config,
+ Table table)
+ throws Exception {
+ if (useConverter) {
+ return createSourceBuilderWithConverter(readSchema, config, table);
+ }
+ return createSourceBuilderWithReaderFunction(table, projectedSchema,
filters, config);
+ }
+
+ private IcebergSource.Builder<T> createSourceBuilderWithConverter(
+ Schema readSchema, Configuration config, Table table) throws Exception {
+ return IcebergSource.forOutputType(getConverter(readSchema, table))
+ .tableLoader(tableLoader())
.assignerFactory(new SimpleSplitAssignerFactory())
.flinkConfig(config);
}
- private IcebergSource.Builder<GenericRecord>
createSourceBuilderWithConverter(
- Table table, Schema readSchema, Configuration config) {
- AvroGenericRecordConverter converter =
- AvroGenericRecordConverter.fromIcebergSchema(readSchema, table.name());
- return IcebergSource.forOutputType(converter)
- .tableLoader(CATALOG_EXTENSION.tableLoader())
+ private IcebergSource.Builder<T> createSourceBuilderWithReaderFunction(
+ Table table, Schema projected, List<Expression> filters, Configuration
config)
+ throws Exception {
+ return IcebergSource.<T>builder()
+ .tableLoader(tableLoader())
+ .readerFunction(getReaderFunction(projected, table, filters))
.assignerFactory(new SimpleSplitAssignerFactory())
.flinkConfig(config);
}
+
+ protected abstract
org.apache.iceberg.flink.source.reader.RowDataConverter<T> getConverter(
+ org.apache.iceberg.Schema icebergSchema, Table table) throws Exception;
+
+ protected ReaderFunction<T> getReaderFunction(
+ org.apache.iceberg.Schema icebergSchema, Table table, List<Expression>
filters)
+ throws Exception {
+ throw new UnsupportedOperationException("No default implementation for
getReaderFunction");
+ }
+
+ protected abstract TypeInformation<T> getTypeInfo(Schema icebergSchema);
+
+ protected abstract DataStream<Row> mapToRow(DataStream<T> inputStream,
Schema icebergSchema);
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
index 4e649d15b1..faddce5422 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
@@ -18,57 +18,34 @@
*/
package org.apache.iceberg.flink.source;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.data.GenericAppenderHelper;
-import org.apache.iceberg.data.RandomGenericData;
-import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkSchemaUtil;
-import org.apache.iceberg.flink.HadoopCatalogExtension;
-import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
-import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
-import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
import org.apache.iceberg.flink.source.reader.AvroGenericRecordConverter;
import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.TypeUtil;
-import org.junit.jupiter.api.TestTemplate;
+import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.RowDataConverter;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.junit.jupiter.api.extension.RegisterExtension;
-import org.junit.jupiter.api.io.TempDir;
@ExtendWith(ParameterizedTestExtension.class)
-public class TestIcebergSourceBoundedGenericRecord {
- @TempDir protected Path temporaryFolder;
-
- @RegisterExtension
- private static final HadoopCatalogExtension CATALOG_EXTENSION =
- new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
+public class TestIcebergSourceBoundedGenericRecord
+ extends TestIcebergSourceBoundedConverterBase<GenericRecord> {
@Parameters(name = "format={0}, parallelism = {1}, useConverter = {2}")
public static Object[][] parameters() {
@@ -80,143 +57,40 @@ public class TestIcebergSourceBoundedGenericRecord {
};
}
- @Parameter(index = 0)
- private FileFormat fileFormat;
-
- @Parameter(index = 1)
- private int parallelism;
-
- @Parameter(index = 2)
- private boolean useConverter;
-
- @TestTemplate
- public void testUnpartitionedTable() throws Exception {
- Table table =
- CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER,
TestFixtures.SCHEMA);
- List<Record> expectedRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
- new GenericAppenderHelper(table, fileFormat,
temporaryFolder).appendToTable(expectedRecords);
- TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
+ @Override
+ protected RowDataConverter<GenericRecord> getConverter(Schema icebergSchema,
Table table) {
+ return AvroGenericRecordConverter.fromIcebergSchema(icebergSchema,
table.name());
}
- @TestTemplate
- public void testPartitionedTable() throws Exception {
- String dateStr = "2020-03-20";
- Table table =
- CATALOG_EXTENSION
- .catalog()
- .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA,
TestFixtures.SPEC);
- List<Record> expectedRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
- for (int i = 0; i < expectedRecords.size(); ++i) {
- expectedRecords.get(i).setField("dt", dateStr);
- }
-
- new GenericAppenderHelper(table, fileFormat, temporaryFolder)
- .appendToTable(org.apache.iceberg.TestHelpers.Row.of(dateStr, 0),
expectedRecords);
- TestHelpers.assertRecords(run(), expectedRecords, TestFixtures.SCHEMA);
+ @Override
+ protected ReaderFunction<GenericRecord> getReaderFunction(
+ Schema icebergSchema, Table table, List<Expression> filters) throws
Exception {
+ return new AvroGenericRecordReaderFunction(
+ TestFixtures.TABLE_IDENTIFIER.name(),
+ new Configuration(),
+ table.schema(),
+ icebergSchema,
+ null,
+ false,
+ table.io(),
+ table.encryption(),
+ filters);
}
- @TestTemplate
- public void testProjection() throws Exception {
- Table table =
- CATALOG_EXTENSION
- .catalog()
- .createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA,
TestFixtures.SPEC);
- List<Record> expectedRecords =
RandomGenericData.generate(TestFixtures.SCHEMA, 2, 0L);
- new GenericAppenderHelper(table, fileFormat, temporaryFolder)
- .appendToTable(org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0),
expectedRecords);
- // select the "data" field (fieldId == 1)
- Schema projectedSchema = TypeUtil.select(TestFixtures.SCHEMA,
Sets.newHashSet(1));
- List<Row> expectedRows =
- Arrays.asList(Row.of(expectedRecords.get(0).get(0)),
Row.of(expectedRecords.get(1).get(0)));
- TestHelpers.assertRows(
- run(projectedSchema, Collections.emptyList(), Collections.emptyMap()),
expectedRows);
- }
-
- private List<Row> run() throws Exception {
- return run(null, Collections.emptyList(), Collections.emptyMap());
- }
-
- private List<Row> run(
- Schema projectedSchema, List<Expression> filters, Map<String, String>
options)
- throws Exception {
-
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(parallelism);
- env.getConfig().enableObjectReuse();
-
- Configuration config = new Configuration();
-
config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT,
128);
- Table table;
- try (TableLoader tableLoader = CATALOG_EXTENSION.tableLoader()) {
- tableLoader.open();
- table = tableLoader.loadTable();
- }
-
- Schema readSchema = projectedSchema != null ? projectedSchema :
table.schema();
- IcebergSource.Builder<GenericRecord> sourceBuilder;
- if (useConverter) {
- sourceBuilder = createSourceBuilderWithConverter(table, readSchema,
config);
- } else {
- sourceBuilder =
- createSourceBuilderWithReaderFunction(table, projectedSchema,
filters, config);
- }
-
- if (projectedSchema != null) {
- sourceBuilder.project(projectedSchema);
- }
-
- sourceBuilder.filters(filters);
- sourceBuilder.setAll(options);
-
- RowType rowType = FlinkSchemaUtil.convert(readSchema);
+ @Override
+ protected TypeInformation<GenericRecord> getTypeInfo(Schema icebergSchema) {
org.apache.avro.Schema avroSchema =
- AvroSchemaUtil.convert(readSchema,
TestFixtures.TABLE_IDENTIFIER.name());
-
- DataStream<Row> stream =
- env.fromSource(
- sourceBuilder.build(),
- WatermarkStrategy.noWatermarks(),
- "testBasicRead",
- new GenericRecordAvroTypeInfo(avroSchema))
- // There are two reasons for converting GenericRecord back to Row.
- // 1. Avro GenericRecord/Schema is not serializable.
- // 2. leverage the TestHelpers.assertRecords for validation.
- .map(AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema))
- .map(new RowDataToRowMapper(rowType));
-
- try (CloseableIterator<Row> iter = stream.executeAndCollect()) {
- return Lists.newArrayList(iter);
- }
+ AvroSchemaUtil.convert(icebergSchema,
TestFixtures.TABLE_IDENTIFIER.name());
+ return new GenericRecordAvroTypeInfo(avroSchema);
}
- private IcebergSource.Builder<GenericRecord>
createSourceBuilderWithReaderFunction(
- Table table, Schema projected, List<Expression> filters, Configuration
config) {
- AvroGenericRecordReaderFunction readerFunction =
- new AvroGenericRecordReaderFunction(
- TestFixtures.TABLE_IDENTIFIER.name(),
- new Configuration(),
- table.schema(),
- projected,
- null,
- false,
- table.io(),
- table.encryption(),
- filters);
-
- return IcebergSource.<GenericRecord>builder()
- .tableLoader(CATALOG_EXTENSION.tableLoader())
- .readerFunction(readerFunction)
- .assignerFactory(new SimpleSplitAssignerFactory())
- .flinkConfig(config);
- }
-
- private IcebergSource.Builder<GenericRecord>
createSourceBuilderWithConverter(
- Table table, Schema readSchema, Configuration config) {
- AvroGenericRecordConverter converter =
- AvroGenericRecordConverter.fromIcebergSchema(readSchema, table.name());
- return IcebergSource.forOutputType(converter)
- .tableLoader(CATALOG_EXTENSION.tableLoader())
- .assignerFactory(new SimpleSplitAssignerFactory())
- .flinkConfig(config);
+ @Override
+ protected DataStream<Row> mapToRow(DataStream<GenericRecord> inputStream,
Schema icebergSchema) {
+ RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+ org.apache.avro.Schema avroSchema =
+ AvroSchemaUtil.convert(icebergSchema,
TestFixtures.TABLE_IDENTIFIER.name());
+ return inputStream
+ .map(AvroGenericRecordToRowDataMapper.forAvroSchema(avroSchema))
+ .map(new RowDataToRowMapper(rowType));
}
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java
new file mode 100644
index 0000000000..170069fecb
--- /dev/null
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedRow.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.source.reader.RowConverter;
+import org.apache.iceberg.flink.source.reader.RowDataConverter;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestIcebergSourceBoundedRow extends
TestIcebergSourceBoundedConverterBase<Row> {
+
+ @Override
+ protected RowDataConverter<Row> getConverter(Schema icebergSchema, Table
table) {
+ return RowConverter.fromIcebergSchema(icebergSchema);
+ }
+
+ @Override
+ protected TypeInformation<Row> getTypeInfo(Schema icebergSchema) {
+ TableSchema tableSchema = FlinkSchemaUtil.toSchema(icebergSchema);
+ return new RowTypeInfo(tableSchema.getFieldTypes(),
tableSchema.getFieldNames());
+ }
+
+ @Override
+ protected DataStream<Row> mapToRow(DataStream<Row> inputStream, Schema
icebergSchema) {
+ return inputStream;
+ }
+}