openinx commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r591279196
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
package org.apache.iceberg.flink;
import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
-public class IcebergTableSink implements AppendStreamTableSink<RowData>,
OverwritableTableSink, PartitionableTableSink {
- private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
private final TableLoader tableLoader;
private final TableSchema tableSchema;
private boolean overwrite = false;
- public IcebergTableSink(boolean isBounded, TableLoader tableLoader,
TableSchema tableSchema) {
- this.isBounded = isBounded;
+ public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
}
@Override
- public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
- Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream
doesn't support overwrite operation.");
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ Preconditions
+ .checkState(!overwrite || context.isBounded(), "Unbounded data stream
doesn't support overwrite operation.");
- return FlinkSink.forRowData(dataStream)
+ return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.overwrite(overwrite)
Review comment:
Filed an issue for this: https://github.com/apache/iceberg/issues/2313
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]