This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 85cf79de01 Flink: deprecate ReaderFunction with a new Converter
interface to simplify user experience (#10956)
85cf79de01 is described below
commit 85cf79de0156e4bb4d02931c8d8e10e1d6b478c5
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Wed Aug 21 14:56:32 2024 -0700
Flink: deprecate ReaderFunction with a new Converter interface to simplify
user experience (#10956)
---
.../apache/iceberg/flink/source/IcebergSource.java | 94 +++++++++++++++----
.../source/reader/AvroGenericRecordConverter.java | 69 ++++++++++++++
.../reader/AvroGenericRecordReaderFunction.java | 10 +-
...rFunction.java => ConverterReaderFunction.java} | 101 ++++++++++++---------
.../source/reader/IcebergSourceSplitReader.java | 2 +-
.../flink/source/reader/RowDataConverter.java | 32 +++++++
.../TestIcebergSourceBoundedGenericRecord.java | 70 +++++++++-----
7 files changed, 292 insertions(+), 86 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
index e629cc19bb..351ba54e5c 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
@@ -61,10 +61,12 @@ import
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import
org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
+import org.apache.iceberg.flink.source.reader.ConverterReaderFunction;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
import org.apache.iceberg.flink.source.reader.MetaDataReaderFunction;
import org.apache.iceberg.flink.source.reader.ReaderFunction;
+import org.apache.iceberg.flink.source.reader.RowDataConverter;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
@@ -211,20 +213,40 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
}
}
+ /**
+ * Create a source builder.
+ *
+ * @deprecated since 1.7.0. Will be removed in 2.0.0; use{@link
IcebergSource#forRowData()} or
+ * {@link IcebergSource#forOutputType(RowDataConverter)} instead
+ */
+ @Deprecated
public static <T> Builder<T> builder() {
return new Builder<>();
}
+ /** Create a source builder for RowData output type. */
public static Builder<RowData> forRowData() {
return new Builder<>();
}
+ /**
+ * Create a source builder that would convert {@link RowData} to the output
type {@code T}.
+ *
+ * @param converter convert {@link RowData} to output type {@code T}
+ * @param <T> output type
+ * @return an IcebergSource builder
+ */
+ public static <T> Builder<T> forOutputType(RowDataConverter<T> converter) {
+ return new Builder<T>().converter(converter);
+ }
+
public static class Builder<T> {
private TableLoader tableLoader;
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private ReaderFunction<T> readerFunction;
+ private RowDataConverter<T> converter;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
private TableSchema projectedFlinkSchema;
@@ -255,11 +277,28 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
return this;
}
+ /**
+ * @deprecated since 1.7.0. Will be removed in 2.0.0; use{@link
+ * IcebergSource#forOutputType(RowDataConverter)} instead to produce
output type other than
+ * {@link RowData}.
+ */
+ @Deprecated
public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
+ Preconditions.checkState(
+ converter == null,
+ "Cannot set reader function when builder was created via
IcebergSource.forOutputType(Converter)");
this.readerFunction = newReaderFunction;
return this;
}
+ /**
+ * Don't need to be public. It is set by {@link
IcebergSource#forOutputType(RowDataConverter)}.
+ */
+ private Builder<T> converter(RowDataConverter<T> newConverter) {
+ this.converter = newConverter;
+ return this;
+ }
+
public Builder<T> flinkConfig(ReadableConfig config) {
this.flinkConfig = config;
return this;
@@ -510,25 +549,7 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
ScanContext context = contextBuilder.build();
context.validate();
if (readerFunction == null) {
- if (table instanceof BaseMetadataTable) {
- MetaDataReaderFunction rowDataReaderFunction =
- new MetaDataReaderFunction(
- flinkConfig, table.schema(), context.project(), table.io(),
table.encryption());
- this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
- } else {
- RowDataReaderFunction rowDataReaderFunction =
- new RowDataReaderFunction(
- flinkConfig,
- table.schema(),
- context.project(),
- context.nameMapping(),
- context.caseSensitive(),
- table.io(),
- table.encryption(),
- context.filters(),
- context.limit());
- this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
- }
+ this.readerFunction = readerFunction(context);
}
if (splitAssignerFactory == null) {
@@ -549,5 +570,40 @@ public class IcebergSource<T> implements Source<T,
IcebergSourceSplit, IcebergEn
table,
emitter);
}
+
+ private ReaderFunction<T> readerFunction(ScanContext context) {
+ if (table instanceof BaseMetadataTable) {
+ MetaDataReaderFunction rowDataReaderFunction =
+ new MetaDataReaderFunction(
+ flinkConfig, table.schema(), context.project(), table.io(),
table.encryption());
+ return (ReaderFunction<T>) rowDataReaderFunction;
+ } else {
+ if (converter == null) {
+ return (ReaderFunction<T>)
+ new RowDataReaderFunction(
+ flinkConfig,
+ table.schema(),
+ context.project(),
+ context.nameMapping(),
+ context.caseSensitive(),
+ table.io(),
+ table.encryption(),
+ context.filters(),
+ context.limit());
+ } else {
+ return new ConverterReaderFunction<>(
+ converter,
+ flinkConfig,
+ table.schema(),
+ context.project(),
+ context.nameMapping(),
+ context.caseSensitive(),
+ table.io(),
+ table.encryption(),
+ context.filters(),
+ context.limit());
+ }
+ }
+ }
}
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
new file mode 100644
index 0000000000..b158b0871a
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+
+public class AvroGenericRecordConverter implements
RowDataConverter<GenericRecord> {
+ private final Schema avroSchema;
+ private final RowDataToAvroConverters.RowDataToAvroConverter flinkConverter;
+ private final TypeInformation<GenericRecord> outputTypeInfo;
+
+ private AvroGenericRecordConverter(Schema avroSchema, RowType rowType) {
+ this.avroSchema = avroSchema;
+ this.flinkConverter = RowDataToAvroConverters.createConverter(rowType);
+ this.outputTypeInfo = new GenericRecordAvroTypeInfo(avroSchema);
+ }
+
+ public static AvroGenericRecordConverter fromIcebergSchema(
+ org.apache.iceberg.Schema icebergSchema, String tableName) {
+ RowType rowType = FlinkSchemaUtil.convert(icebergSchema);
+ Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, tableName);
+ return new AvroGenericRecordConverter(avroSchema, rowType);
+ }
+
+ public static AvroGenericRecordConverter fromAvroSchema(Schema avroSchema,
String tableName) {
+ DataType dataType =
AvroSchemaConverter.convertToDataType(avroSchema.toString());
+ LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType);
+ RowType rowType = RowType.of(logicalType.getChildren().toArray(new
LogicalType[0]));
+ return new AvroGenericRecordConverter(avroSchema, rowType);
+ }
+
+ @Override
+ public GenericRecord apply(RowData rowData) {
+ return (GenericRecord) flinkConverter.convert(avroSchema, rowData);
+ }
+
+ @Override
+ public TypeInformation<GenericRecord> getProducedType() {
+ return outputTypeInfo;
+ }
+}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
index 66e59633ff..f89e5ce134 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
@@ -28,13 +28,21 @@ import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.IcebergSource;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-/** Read Iceberg rows as {@link GenericRecord}. */
+/**
+ * Read Iceberg rows as {@link GenericRecord}.
+ *
+ * @deprecated since 1.7.0. Will be removed in 2.0.0; use {@link
+ * IcebergSource#forOutputType(RowDataConverter)} and {@link
AvroGenericRecordConverter}
+ * instead.
+ */
+@Deprecated
public class AvroGenericRecordReaderFunction extends
DataIteratorReaderFunction<GenericRecord> {
private final String tableName;
private final Schema readSchema;
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ConverterReaderFunction.java
similarity index 51%
copy from
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
copy to
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ConverterReaderFunction.java
index 66e59633ff..e1e7c17d63 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordReaderFunction.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/ConverterReaderFunction.java
@@ -19,49 +19,37 @@
package org.apache.iceberg.flink.source.reader;
import java.util.List;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.encryption.InputFilesDecryptor;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.flink.source.AvroGenericRecordFileScanTaskReader;
import org.apache.iceberg.flink.source.DataIterator;
+import org.apache.iceberg.flink.source.FileScanTaskReader;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
-import org.apache.iceberg.flink.source.RowDataToAvroGenericRecordConverter;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-/** Read Iceberg rows as {@link GenericRecord}. */
-public class AvroGenericRecordReaderFunction extends
DataIteratorReaderFunction<GenericRecord> {
- private final String tableName;
+@Internal
+public class ConverterReaderFunction<T> extends DataIteratorReaderFunction<T> {
+ private final RowDataConverter<T> converter;
+ private final Schema tableSchema;
private final Schema readSchema;
+ private final String nameMapping;
+ private final boolean caseSensitive;
private final FileIO io;
private final EncryptionManager encryption;
- private final RowDataFileScanTaskReader rowDataReader;
+ private final List<Expression> filters;
+ private final long limit;
- private transient RowDataToAvroGenericRecordConverter converter;
+ private transient RecordLimiter recordLimiter = null;
- /**
- * Create a reader function without projection and name mapping. Column name
is case-insensitive.
- */
- public static AvroGenericRecordReaderFunction fromTable(Table table) {
- return new AvroGenericRecordReaderFunction(
- table.name(),
- new Configuration(),
- table.schema(),
- null,
- null,
- false,
- table.io(),
- table.encryption(),
- null);
- }
-
- public AvroGenericRecordReaderFunction(
- String tableName,
+ public ConverterReaderFunction(
+ RowDataConverter<T> converter,
ReadableConfig config,
Schema tableSchema,
Schema projectedSchema,
@@ -69,34 +57,61 @@ public class AvroGenericRecordReaderFunction extends
DataIteratorReaderFunction<
boolean caseSensitive,
FileIO io,
EncryptionManager encryption,
- List<Expression> filters) {
+ List<Expression> filters,
+ long limit) {
super(new ListDataIteratorBatcher<>(config));
- this.tableName = tableName;
+ this.converter = converter;
+ this.tableSchema = tableSchema;
this.readSchema = readSchema(tableSchema, projectedSchema);
+ this.nameMapping = nameMapping;
+ this.caseSensitive = caseSensitive;
this.io = io;
this.encryption = encryption;
- this.rowDataReader =
- new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping,
caseSensitive, filters);
+ this.filters = filters;
+ this.limit = limit;
}
@Override
- protected DataIterator<GenericRecord> createDataIterator(IcebergSourceSplit
split) {
- return new DataIterator<>(
- new AvroGenericRecordFileScanTaskReader(rowDataReader,
lazyConverter()),
+ protected DataIterator<T> createDataIterator(IcebergSourceSplit split) {
+ RowDataFileScanTaskReader rowDataReader =
+ new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping,
caseSensitive, filters);
+ return new LimitableDataIterator<>(
+ new ConverterFileScanTaskReader<>(rowDataReader, converter),
split.task(),
io,
- encryption);
- }
-
- private RowDataToAvroGenericRecordConverter lazyConverter() {
- if (converter == null) {
- this.converter =
RowDataToAvroGenericRecordConverter.fromIcebergSchema(tableName, readSchema);
- }
- return converter;
+ encryption,
+ lazyLimiter());
}
private static Schema readSchema(Schema tableSchema, Schema projectedSchema)
{
Preconditions.checkNotNull(tableSchema, "Table schema can't be null");
return projectedSchema == null ? tableSchema : projectedSchema;
}
+
+ /** Lazily create RecordLimiter to avoid the need to make it serializable */
+ private RecordLimiter lazyLimiter() {
+ if (recordLimiter == null) {
+ this.recordLimiter = RecordLimiter.create(limit);
+ }
+
+ return recordLimiter;
+ }
+
+ private static class ConverterFileScanTaskReader<T> implements
FileScanTaskReader<T> {
+ private final RowDataFileScanTaskReader rowDataReader;
+ private final RowDataConverter<T> converter;
+
+ ConverterFileScanTaskReader(
+ RowDataFileScanTaskReader rowDataReader, RowDataConverter<T>
converter) {
+ this.rowDataReader = rowDataReader;
+ this.converter = converter;
+ }
+
+ @Override
+ public CloseableIterator<T> open(
+ FileScanTask fileScanTask, InputFilesDecryptor inputFilesDecryptor) {
+ return CloseableIterator.transform(
+ rowDataReader.open(fileScanTask, inputFilesDecryptor), converter);
+ }
+ }
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
index 9c20494fdb..bcd72e2503 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java
@@ -85,7 +85,7 @@ class IcebergSourceSplitReader<T> implements
SplitReader<RecordAndPosition<T>, I
} else {
// return an empty result, which will lead to split fetch to be idle.
// SplitFetcherManager will then close idle fetcher.
- return new RecordsBySplits(Collections.emptyMap(),
Collections.emptySet());
+ return new RecordsBySplits<>(Collections.emptyMap(),
Collections.emptySet());
}
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataConverter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataConverter.java
new file mode 100644
index 0000000000..98bb7e9818
--- /dev/null
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.reader;
+
+import java.io.Serializable;
+import java.util.function.Function;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Convert RowData to a different output type.
+ *
+ * @param <T> output type
+ */
+public interface RowDataConverter<T>
+ extends Function<RowData, T>, ResultTypeQueryable<T>, Serializable {}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
index 7bfed00a9e..4e649d15b1 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedGenericRecord.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.flink.data.RowDataToRowMapper;
import org.apache.iceberg.flink.sink.AvroGenericRecordToRowDataMapper;
import org.apache.iceberg.flink.source.assigner.SimpleSplitAssignerFactory;
+import org.apache.iceberg.flink.source.reader.AvroGenericRecordConverter;
import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -69,12 +70,13 @@ public class TestIcebergSourceBoundedGenericRecord {
private static final HadoopCatalogExtension CATALOG_EXTENSION =
new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
- @Parameters(name = "format={0}, parallelism = {1}")
+ @Parameters(name = "format={0}, parallelism = {1}, useConverter = {2}")
public static Object[][] parameters() {
return new Object[][] {
- {FileFormat.AVRO, 2},
- {FileFormat.PARQUET, 2},
- {FileFormat.ORC, 2}
+ {FileFormat.AVRO, 2, true},
+ {FileFormat.PARQUET, 2, true},
+ {FileFormat.PARQUET, 2, false},
+ {FileFormat.ORC, 2, true}
};
}
@@ -84,6 +86,9 @@ public class TestIcebergSourceBoundedGenericRecord {
@Parameter(index = 1)
private int parallelism;
+ @Parameter(index = 2)
+ private boolean useConverter;
+
@TestTemplate
public void testUnpartitionedTable() throws Exception {
Table table =
@@ -147,24 +152,15 @@ public class TestIcebergSourceBoundedGenericRecord {
table = tableLoader.loadTable();
}
- AvroGenericRecordReaderFunction readerFunction =
- new AvroGenericRecordReaderFunction(
- TestFixtures.TABLE_IDENTIFIER.name(),
- new Configuration(),
- table.schema(),
- null,
- null,
- false,
- table.io(),
- table.encryption(),
- filters);
+ Schema readSchema = projectedSchema != null ? projectedSchema :
table.schema();
+ IcebergSource.Builder<GenericRecord> sourceBuilder;
+ if (useConverter) {
+ sourceBuilder = createSourceBuilderWithConverter(table, readSchema,
config);
+ } else {
+ sourceBuilder =
+ createSourceBuilderWithReaderFunction(table, projectedSchema,
filters, config);
+ }
- IcebergSource.Builder<GenericRecord> sourceBuilder =
- IcebergSource.<GenericRecord>builder()
- .tableLoader(CATALOG_EXTENSION.tableLoader())
- .readerFunction(readerFunction)
- .assignerFactory(new SimpleSplitAssignerFactory())
- .flinkConfig(config);
if (projectedSchema != null) {
sourceBuilder.project(projectedSchema);
}
@@ -172,7 +168,6 @@ public class TestIcebergSourceBoundedGenericRecord {
sourceBuilder.filters(filters);
sourceBuilder.setAll(options);
- Schema readSchema = projectedSchema != null ? projectedSchema :
table.schema();
RowType rowType = FlinkSchemaUtil.convert(readSchema);
org.apache.avro.Schema avroSchema =
AvroSchemaUtil.convert(readSchema,
TestFixtures.TABLE_IDENTIFIER.name());
@@ -193,4 +188,35 @@ public class TestIcebergSourceBoundedGenericRecord {
return Lists.newArrayList(iter);
}
}
+
+ private IcebergSource.Builder<GenericRecord>
createSourceBuilderWithReaderFunction(
+ Table table, Schema projected, List<Expression> filters, Configuration
config) {
+ AvroGenericRecordReaderFunction readerFunction =
+ new AvroGenericRecordReaderFunction(
+ TestFixtures.TABLE_IDENTIFIER.name(),
+ new Configuration(),
+ table.schema(),
+ projected,
+ null,
+ false,
+ table.io(),
+ table.encryption(),
+ filters);
+
+ return IcebergSource.<GenericRecord>builder()
+ .tableLoader(CATALOG_EXTENSION.tableLoader())
+ .readerFunction(readerFunction)
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .flinkConfig(config);
+ }
+
+ private IcebergSource.Builder<GenericRecord>
createSourceBuilderWithConverter(
+ Table table, Schema readSchema, Configuration config) {
+ AvroGenericRecordConverter converter =
+ AvroGenericRecordConverter.fromIcebergSchema(readSchema, table.name());
+ return IcebergSource.forOutputType(converter)
+ .tableLoader(CATALOG_EXTENSION.tableLoader())
+ .assignerFactory(new SimpleSplitAssignerFactory())
+ .flinkConfig(config);
+ }
}