zhangjun0x01 commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r588847716



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,68 @@
 package org.apache.iceberg.flink;
 
 import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.flink.sink.FlinkSink;
 
-public class IcebergTableSink implements AppendStreamTableSink<RowData>, 
OverwritableTableSink, PartitionableTableSink {
-  private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning, SupportsOverwrite {
   private final TableLoader tableLoader;
   private final TableSchema tableSchema;
 
   private boolean overwrite = false;
 
-  public IcebergTableSink(boolean isBounded, TableLoader tableLoader, 
TableSchema tableSchema) {
-    this.isBounded = isBounded;
+  public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
     this.tableLoader = tableLoader;
     this.tableSchema = tableSchema;
   }
 
   @Override
-  public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
-    Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream 
doesn't support overwrite operation.");
+  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+    Preconditions
+        .checkState(!overwrite || context.isBounded(), "Unbounded data stream 
doesn't support overwrite operation.");
 
-    return FlinkSink.forRowData(dataStream)
+    return (DataStreamSinkProvider) dataStream -> 
FlinkSink.forRowData(dataStream)
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .overwrite(overwrite)

Review comment:
       > We might need a fully covered SQL unit tests to ensure the whole flow 
work in future PR.
   
   yes, I think all data types of mysql should be covered.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to