openinx commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r586179283
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
package org.apache.iceberg.flink;
import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
-public class IcebergTableSink implements AppendStreamTableSink<RowData>,
OverwritableTableSink, PartitionableTableSink {
- private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
private final TableLoader tableLoader;
private final TableSchema tableSchema;
private boolean overwrite = false;
- public IcebergTableSink(boolean isBounded, TableLoader tableLoader,
TableSchema tableSchema) {
- this.isBounded = isBounded;
+ public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
}
@Override
- public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
- Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream
doesn't support overwrite operation.");
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ Preconditions
Review comment:
Nit: It's better to format this code like this:
```java
Preconditions.checkState(!overwrite || context.isBounded(),
"Unbounded data stream doesn't support overwrite operation.");
```
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
package org.apache.iceberg.flink;
import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
-public class IcebergTableSink implements AppendStreamTableSink<RowData>,
OverwritableTableSink, PartitionableTableSink {
- private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
private final TableLoader tableLoader;
private final TableSchema tableSchema;
private boolean overwrite = false;
- public IcebergTableSink(boolean isBounded, TableLoader tableLoader,
TableSchema tableSchema) {
- this.isBounded = isBounded;
+ public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
}
@Override
- public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
- Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream
doesn't support overwrite operation.");
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ Preconditions
+ .checkState(!overwrite || context.isBounded(), "Unbounded data stream
doesn't support overwrite operation.");
- return FlinkSink.forRowData(dataStream)
+ return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.overwrite(overwrite)
Review comment:
In you MySQL CDC -> iceberg test, did you provide the `equalityColumns`
in the builder ? I think if make this whole workflow work , we will need the
equalityColumns. We might need a fully covered SQL unit tests to ensure the
whole flow work in future PR.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
package org.apache.iceberg.flink;
import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
-public class IcebergTableSink implements AppendStreamTableSink<RowData>,
OverwritableTableSink, PartitionableTableSink {
- private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
private final TableLoader tableLoader;
private final TableSchema tableSchema;
private boolean overwrite = false;
- public IcebergTableSink(boolean isBounded, TableLoader tableLoader,
TableSchema tableSchema) {
- this.isBounded = isBounded;
+ public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
}
@Override
- public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
- Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream
doesn't support overwrite operation.");
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ Preconditions
+ .checkState(!overwrite || context.isBounded(), "Unbounded data stream
doesn't support overwrite operation.");
- return FlinkSink.forRowData(dataStream)
+ return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.overwrite(overwrite)
.build();
}
@Override
- public DataType getConsumedDataType() {
- return tableSchema.toRowDataType().bridgedTo(RowData.class);
+ public void applyStaticPartition(Map<String, String> partition) {
+ // The flink's PartitionFanoutWriter will handle the static partition
write policy automatically.
}
@Override
- public TableSchema getTableSchema() {
- return this.tableSchema;
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
}
@Override
- public TableSink<RowData> configure(String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
- // This method has been deprecated and it will be removed in future
version, so left the empty implementation here.
- return this;
+ public DynamicTableSink copy() {
+ IcebergTableSink icebergTableSink = new IcebergTableSink(tableLoader,
tableSchema);
+ icebergTableSink.overwrite = overwrite;
+ return icebergTableSink;
}
@Override
- public void setOverwrite(boolean overwrite) {
- this.overwrite = overwrite;
+ public String asSummaryString() {
+ return "iceberg table sink";
Review comment:
Nit: It's better to use "Iceberg table sink" here ( to align with
other flink table sink ).
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -40,14 +40,12 @@
private static final String CATALOG_NAME = "test_catalog";
private static final String DATABASE_NAME = "test_db";
private static final String TABLE_NAME = "test_table";
- private final String expectedFilterPushDownExplain = "FilterPushDown";
Review comment:
Yeah, that sounds reasonable. the `lastScanEvent.filter().toString()`
is enough to validate the pushed down filters.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -78,17 +77,11 @@ private IcebergTableSource(TableLoader loader, TableSchema
schema, Map<String, S
}
@Override
- public boolean isBounded() {
- return FlinkSource.isBounded(properties);
+ public void applyProjection(int[][] projectFields) {
Review comment:
As we don't implement nested projection in iceberg source, here I think
we could add a check here to ensure that the projections are not nested:
```java
@Override
public void applyProjection(int[][] projectFields) {
this.projectedFields = new int[projectFields.length];
for (int i = 0; i < projectFields.length; i++) {
Preconditions.checkArgument(projectFields[i].length == 0,
"Don't support nested projection in iceberg source now.");
this.projectedFields[i] = projectFields[i][0];
}
}
```
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -78,17 +77,11 @@ private IcebergTableSource(TableLoader loader, TableSchema
schema, Map<String, S
}
@Override
- public boolean isBounded() {
- return FlinkSource.isBounded(properties);
+ public void applyProjection(int[][] projectFields) {
+ this.projectedFields = Arrays.stream(projectFields).mapToInt(value ->
value[0]).toArray();
}
- @Override
- public TableSource<RowData> projectFields(int[] fields) {
- return new IcebergTableSource(loader, schema, properties, fields,
isLimitPushDown, limit, filters, readableConfig);
- }
-
- @Override
- public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv)
{
+ private DataStream<RowData> getDataStream(StreamExecutionEnvironment
execEnv) {
Review comment:
Nit: It's better to rename it as `createDataStream`
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
package org.apache.iceberg.flink;
import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
-public class IcebergTableSink implements AppendStreamTableSink<RowData>,
OverwritableTableSink, PartitionableTableSink {
- private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
private final TableLoader tableLoader;
private final TableSchema tableSchema;
private boolean overwrite = false;
- public IcebergTableSink(boolean isBounded, TableLoader tableLoader,
TableSchema tableSchema) {
- this.isBounded = isBounded;
+ public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
}
@Override
- public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
- Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream
doesn't support overwrite operation.");
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ Preconditions
+ .checkState(!overwrite || context.isBounded(), "Unbounded data stream
doesn't support overwrite operation.");
- return FlinkSink.forRowData(dataStream)
+ return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.overwrite(overwrite)
.build();
}
@Override
- public DataType getConsumedDataType() {
- return tableSchema.toRowDataType().bridgedTo(RowData.class);
+ public void applyStaticPartition(Map<String, String> partition) {
+ // The flink's PartitionFanoutWriter will handle the static partition
write policy automatically.
}
@Override
- public TableSchema getTableSchema() {
- return this.tableSchema;
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return ChangelogMode.newBuilder()
+ .addContainedKind(RowKind.INSERT)
+ .addContainedKind(RowKind.UPDATE_BEFORE)
+ .addContainedKind(RowKind.UPDATE_AFTER)
+ .addContainedKind(RowKind.DELETE)
+ .build();
}
@Override
- public TableSink<RowData> configure(String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
- // This method has been deprecated and it will be removed in future
version, so left the empty implementation here.
- return this;
+ public DynamicTableSink copy() {
+ IcebergTableSink icebergTableSink = new IcebergTableSink(tableLoader,
tableSchema);
Review comment:
How about introducing a new IcebergTableSink constructor like the
following ?
```java
private IcebergTableSink(IcebergTableSink toCopy) {
this.tableLoader = toCopy.tableLoader;
this.tableSchema = toCopy.tableSchema;
this.overwrite = toCopy.overwrite;
}
```
Then, here we could just return the `new IcebergTableSink(this)` here. We
usually use this pattern in iceberg to copy an object , you could also see the
[StructCopy](https://github.com/apache/iceberg/blob/cde7ec33a075bba95583eb1a5d393880d141b04f/core/src/main/java/org/apache/iceberg/io/StructCopy.java#L28).
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -100,70 +93,73 @@ public boolean isBounded() {
.build();
}
- @Override
- public TableSchema getTableSchema() {
- return schema;
- }
-
- @Override
- public DataType getProducedDataType() {
- return getProjectedSchema().toRowDataType().bridgedTo(RowData.class);
- }
-
private TableSchema getProjectedSchema() {
- TableSchema fullSchema = getTableSchema();
if (projectedFields == null) {
- return fullSchema;
+ return schema;
} else {
- String[] fullNames = fullSchema.getFieldNames();
- DataType[] fullTypes = fullSchema.getFieldDataTypes();
+ String[] fullNames = schema.getFieldNames();
+ DataType[] fullTypes = schema.getFieldDataTypes();
return TableSchema.builder().fields(
Arrays.stream(projectedFields).mapToObj(i ->
fullNames[i]).toArray(String[]::new),
Arrays.stream(projectedFields).mapToObj(i ->
fullTypes[i]).toArray(DataType[]::new)).build();
}
}
@Override
- public String explainSource() {
- String explain = "Iceberg table: " + loader.toString();
- if (projectedFields != null) {
- explain += ", ProjectedFields: " + Arrays.toString(projectedFields);
- }
-
- if (isLimitPushDown) {
- explain += String.format(", LimitPushDown : %d", limit);
- }
+ public void applyLimit(long newLimit) {
+ this.limit = newLimit;
+ }
- if (isFilterPushedDown()) {
- explain += String.format(", FilterPushDown: %s", COMMA.join(filters));
+ @Override
+ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+ List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+ List<Expression> expressions = Lists.newArrayList();
+
+ for (ResolvedExpression resolvedExpression : flinkFilters) {
+ Optional<Expression> icebergExpression =
FlinkFilters.convert(resolvedExpression);
+ if (icebergExpression.isPresent()) {
+ expressions.add(icebergExpression.get());
+ acceptedFilters.add(resolvedExpression);
+ }
}
- return TableConnectorUtils.generateRuntimeName(getClass(),
getTableSchema().getFieldNames()) + explain;
+ this.filters = expressions;
+ return Result.of(acceptedFilters, flinkFilters);
}
@Override
- public boolean isLimitPushedDown() {
- return isLimitPushDown;
+ public boolean supportsNestedProjection() {
+ return false;
}
@Override
- public TableSource<RowData> applyLimit(long newLimit) {
- return new IcebergTableSource(loader, schema, properties, projectedFields,
true, newLimit, filters, readableConfig);
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
}
@Override
- public TableSource<RowData> applyPredicate(List<Expression> predicates) {
- List<org.apache.iceberg.expressions.Expression> expressions =
Lists.newArrayList();
- for (Expression predicate : predicates) {
- FlinkFilters.convert(predicate).ifPresent(expressions::add);
- }
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ return new DataStreamScanProvider() {
+ @Override
+ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment
execEnv) {
+ return getDataStream(execEnv);
+ }
+
+ @Override
+ public boolean isBounded() {
+ return FlinkSource.isBounded(properties);
+ }
+ };
+ }
- return new IcebergTableSource(loader, schema, properties, projectedFields,
isLimitPushDown, limit, expressions,
+ @Override
+ public DynamicTableSource copy() {
+ return new IcebergTableSource(loader, schema, properties, projectedFields,
isLimitPushDown, limit, filters,
Review comment:
Nit: we could use the similar `IcebergTableSource(toCopy)` to copy the
object.
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java
##########
@@ -19,48 +19,53 @@
package org.apache.iceberg.flink;
-import java.util.List;
-import java.util.Map;
+import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.TableSinkFactory;
-import org.apache.flink.table.factories.TableSourceFactory;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.utils.TableSchemaUtils;
-public class FlinkTableFactory implements TableSinkFactory<RowData>,
TableSourceFactory<RowData> {
+public class FlinkDynamicTableFactory implements DynamicTableSinkFactory,
DynamicTableSourceFactory {
private final FlinkCatalog catalog;
- public FlinkTableFactory(FlinkCatalog catalog) {
+ public FlinkDynamicTableFactory(FlinkCatalog catalog) {
this.catalog = catalog;
}
@Override
- public TableSource<RowData> createTableSource(TableSourceFactory.Context
context) {
+ public DynamicTableSource
createDynamicTableSource(DynamicTableFactory.Context context) {
Review comment:
Nit: Here we could just use the `Context` to replace the
`DynamicTableFactory.Context` ?
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
##########
@@ -56,6 +58,28 @@ public TestFlinkTableSource() {
}, ScanEvent.class);
}
+ @Override
+ protected TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ if (tEnv == null) {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inBatchMode()
+ .build();
+
+ TableEnvironment env = TableEnvironment.create(settings);
+ env.getConfig().getConfiguration()
+
.set(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false)
+ .set(CoreOptions.DEFAULT_PARALLELISM, 1);
Review comment:
The default parallelism is 1, so why do we need to implement the
getTableEnv with setting the DEFAULT_PARALLELISM to 1 again ?
```java
public static final ConfigOption<Integer> DEFAULT_PARALLELISM =
ConfigOptions.key("parallelism.default")
.defaultValue(1)
.withDescription("Default parallelism for jobs.");
```
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -100,70 +93,73 @@ public boolean isBounded() {
.build();
}
- @Override
- public TableSchema getTableSchema() {
- return schema;
- }
-
- @Override
- public DataType getProducedDataType() {
- return getProjectedSchema().toRowDataType().bridgedTo(RowData.class);
- }
-
private TableSchema getProjectedSchema() {
- TableSchema fullSchema = getTableSchema();
if (projectedFields == null) {
- return fullSchema;
+ return schema;
} else {
- String[] fullNames = fullSchema.getFieldNames();
- DataType[] fullTypes = fullSchema.getFieldDataTypes();
+ String[] fullNames = schema.getFieldNames();
+ DataType[] fullTypes = schema.getFieldDataTypes();
return TableSchema.builder().fields(
Arrays.stream(projectedFields).mapToObj(i ->
fullNames[i]).toArray(String[]::new),
Arrays.stream(projectedFields).mapToObj(i ->
fullTypes[i]).toArray(DataType[]::new)).build();
}
}
@Override
- public String explainSource() {
- String explain = "Iceberg table: " + loader.toString();
- if (projectedFields != null) {
- explain += ", ProjectedFields: " + Arrays.toString(projectedFields);
- }
-
- if (isLimitPushDown) {
- explain += String.format(", LimitPushDown : %d", limit);
- }
+ public void applyLimit(long newLimit) {
+ this.limit = newLimit;
+ }
- if (isFilterPushedDown()) {
- explain += String.format(", FilterPushDown: %s", COMMA.join(filters));
+ @Override
+ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+ List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+ List<Expression> expressions = Lists.newArrayList();
+
+ for (ResolvedExpression resolvedExpression : flinkFilters) {
+ Optional<Expression> icebergExpression =
FlinkFilters.convert(resolvedExpression);
+ if (icebergExpression.isPresent()) {
+ expressions.add(icebergExpression.get());
+ acceptedFilters.add(resolvedExpression);
+ }
}
- return TableConnectorUtils.generateRuntimeName(getClass(),
getTableSchema().getFieldNames()) + explain;
+ this.filters = expressions;
+ return Result.of(acceptedFilters, flinkFilters);
Review comment:
I read the javadoc of
[SupportsFilterPushDown](https://github.com/apache/flink/blob/97bfd049951f8d52a2e0aed14265074c4255ead0/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsFilterPushDown.java#L63),
the remanningFilters don't have to be the full list of origin `flinkFilters`,
it could be the list of filters that did push down for iceberg source.
Although it will be surely correct if pass the complete list as
`remainingFilters`, but that seems will introduce extra resources to apply the
`acceptedFilters` twice.
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java
##########
@@ -280,15 +280,15 @@ public void testRewriteLargeTableHasResiduals() throws
IOException {
Assert.assertEquals("Residuals must be ignored",
Expressions.alwaysTrue(), task.residual());
}
List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
- Assert.assertEquals("Should have 2 data files before rewrite", 2,
dataFiles.size());
Review comment:
Where did we have change the default parallelism from 1 to 4 ? I did
not find the changes .
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -100,70 +93,73 @@ public boolean isBounded() {
.build();
}
- @Override
- public TableSchema getTableSchema() {
- return schema;
- }
-
- @Override
- public DataType getProducedDataType() {
- return getProjectedSchema().toRowDataType().bridgedTo(RowData.class);
- }
-
private TableSchema getProjectedSchema() {
- TableSchema fullSchema = getTableSchema();
if (projectedFields == null) {
- return fullSchema;
+ return schema;
} else {
- String[] fullNames = fullSchema.getFieldNames();
- DataType[] fullTypes = fullSchema.getFieldDataTypes();
+ String[] fullNames = schema.getFieldNames();
+ DataType[] fullTypes = schema.getFieldDataTypes();
return TableSchema.builder().fields(
Arrays.stream(projectedFields).mapToObj(i ->
fullNames[i]).toArray(String[]::new),
Arrays.stream(projectedFields).mapToObj(i ->
fullTypes[i]).toArray(DataType[]::new)).build();
}
}
@Override
- public String explainSource() {
- String explain = "Iceberg table: " + loader.toString();
- if (projectedFields != null) {
- explain += ", ProjectedFields: " + Arrays.toString(projectedFields);
- }
-
- if (isLimitPushDown) {
- explain += String.format(", LimitPushDown : %d", limit);
- }
+ public void applyLimit(long newLimit) {
+ this.limit = newLimit;
+ }
- if (isFilterPushedDown()) {
- explain += String.format(", FilterPushDown: %s", COMMA.join(filters));
+ @Override
+ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+ List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+ List<Expression> expressions = Lists.newArrayList();
+
+ for (ResolvedExpression resolvedExpression : flinkFilters) {
+ Optional<Expression> icebergExpression =
FlinkFilters.convert(resolvedExpression);
+ if (icebergExpression.isPresent()) {
+ expressions.add(icebergExpression.get());
+ acceptedFilters.add(resolvedExpression);
+ }
}
- return TableConnectorUtils.generateRuntimeName(getClass(),
getTableSchema().getFieldNames()) + explain;
+ this.filters = expressions;
+ return Result.of(acceptedFilters, flinkFilters);
}
@Override
- public boolean isLimitPushedDown() {
- return isLimitPushDown;
+ public boolean supportsNestedProjection() {
+ return false;
Review comment:
Could you pls add a TODO indicate that need to support nested
projection in future PR, actually we iceberg has the ability to support nested
projection now. We could add next PR to address this issue.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java
##########
@@ -100,70 +93,73 @@ public boolean isBounded() {
.build();
}
- @Override
- public TableSchema getTableSchema() {
- return schema;
- }
-
- @Override
- public DataType getProducedDataType() {
- return getProjectedSchema().toRowDataType().bridgedTo(RowData.class);
- }
-
private TableSchema getProjectedSchema() {
- TableSchema fullSchema = getTableSchema();
if (projectedFields == null) {
- return fullSchema;
+ return schema;
} else {
- String[] fullNames = fullSchema.getFieldNames();
- DataType[] fullTypes = fullSchema.getFieldDataTypes();
+ String[] fullNames = schema.getFieldNames();
+ DataType[] fullTypes = schema.getFieldDataTypes();
return TableSchema.builder().fields(
Arrays.stream(projectedFields).mapToObj(i ->
fullNames[i]).toArray(String[]::new),
Arrays.stream(projectedFields).mapToObj(i ->
fullTypes[i]).toArray(DataType[]::new)).build();
}
}
@Override
- public String explainSource() {
- String explain = "Iceberg table: " + loader.toString();
- if (projectedFields != null) {
- explain += ", ProjectedFields: " + Arrays.toString(projectedFields);
- }
-
- if (isLimitPushDown) {
- explain += String.format(", LimitPushDown : %d", limit);
- }
+ public void applyLimit(long newLimit) {
+ this.limit = newLimit;
+ }
- if (isFilterPushedDown()) {
- explain += String.format(", FilterPushDown: %s", COMMA.join(filters));
+ @Override
+ public Result applyFilters(List<ResolvedExpression> flinkFilters) {
+ List<ResolvedExpression> acceptedFilters = Lists.newArrayList();
+ List<Expression> expressions = Lists.newArrayList();
+
+ for (ResolvedExpression resolvedExpression : flinkFilters) {
+ Optional<Expression> icebergExpression =
FlinkFilters.convert(resolvedExpression);
+ if (icebergExpression.isPresent()) {
+ expressions.add(icebergExpression.get());
+ acceptedFilters.add(resolvedExpression);
+ }
}
- return TableConnectorUtils.generateRuntimeName(getClass(),
getTableSchema().getFieldNames()) + explain;
+ this.filters = expressions;
+ return Result.of(acceptedFilters, flinkFilters);
}
@Override
- public boolean isLimitPushedDown() {
- return isLimitPushDown;
+ public boolean supportsNestedProjection() {
+ return false;
}
@Override
- public TableSource<RowData> applyLimit(long newLimit) {
- return new IcebergTableSource(loader, schema, properties, projectedFields,
true, newLimit, filters, readableConfig);
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
}
@Override
- public TableSource<RowData> applyPredicate(List<Expression> predicates) {
- List<org.apache.iceberg.expressions.Expression> expressions =
Lists.newArrayList();
- for (Expression predicate : predicates) {
- FlinkFilters.convert(predicate).ifPresent(expressions::add);
- }
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ return new DataStreamScanProvider() {
+ @Override
+ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment
execEnv) {
+ return getDataStream(execEnv);
+ }
+
+ @Override
+ public boolean isBounded() {
+ return FlinkSource.isBounded(properties);
+ }
+ };
+ }
- return new IcebergTableSource(loader, schema, properties, projectedFields,
isLimitPushDown, limit, expressions,
+ @Override
+ public DynamicTableSource copy() {
+ return new IcebergTableSource(loader, schema, properties, projectedFields,
isLimitPushDown, limit, filters,
readableConfig);
}
@Override
- public boolean isFilterPushedDown() {
- return this.filters != null && this.filters.size() > 0;
+ public String asSummaryString() {
+ return "iceberg table source";
Review comment:
Nit: use `Iceberg table source` ( to align with other existing flink
table source ).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]