openinx commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r589893045
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
package org.apache.iceberg.flink;
import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
-public class IcebergTableSink implements AppendStreamTableSink<RowData>,
OverwritableTableSink, PartitionableTableSink {
- private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
private final TableLoader tableLoader;
private final TableSchema tableSchema;
private boolean overwrite = false;
- public IcebergTableSink(boolean isBounded, TableLoader tableLoader,
TableSchema tableSchema) {
- this.isBounded = isBounded;
+ public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
}
@Override
- public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
- Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream
doesn't support overwrite operation.");
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ Preconditions
+ .checkState(!overwrite || context.isBounded(), "Unbounded data stream
doesn't support overwrite operation.");
- return FlinkSink.forRowData(dataStream)
+ return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.overwrite(overwrite)
Review comment:
I mean we will need an integration test case to verify that the whole
path would work as expected. We will create a CDC source table, then create
an iceberg sink table, finally execute the `INSERT INTO` streaming job. In the
end, both the source table and sink table should have the consistent results.
That could be future PRs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]