openinx commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r589905537



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,72 @@
 package org.apache.iceberg.flink;
 
 import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 import org.apache.iceberg.flink.sink.FlinkSink;
 
-public class IcebergTableSink implements AppendStreamTableSink<RowData>, 
OverwritableTableSink, PartitionableTableSink {
-  private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning, SupportsOverwrite {
   private final TableLoader tableLoader;
   private final TableSchema tableSchema;
 
   private boolean overwrite = false;
 
-  public IcebergTableSink(boolean isBounded, TableLoader tableLoader, 
TableSchema tableSchema) {
-    this.isBounded = isBounded;
+  private IcebergTableSink(IcebergTableSink toCopy) {
+    this.tableLoader = toCopy.tableLoader;
+    this.tableSchema = toCopy.tableSchema;
+    this.overwrite = toCopy.overwrite;
+  }
+
+  public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
     this.tableLoader = tableLoader;
     this.tableSchema = tableSchema;
   }
 
   @Override
-  public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
-    Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream 
doesn't support overwrite operation.");
+  public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+    Preconditions.checkState(!overwrite || context.isBounded(),
+            "Unbounded data stream doesn't support overwrite operation.");
 
-    return FlinkSink.forRowData(dataStream)
+    return (DataStreamSinkProvider) dataStream -> 
FlinkSink.forRowData(dataStream)
         .tableLoader(tableLoader)
         .tableSchema(tableSchema)
         .overwrite(overwrite)
         .build();
   }
 
   @Override
-  public DataType getConsumedDataType() {
-    return tableSchema.toRowDataType().bridgedTo(RowData.class);
+  public void applyStaticPartition(Map<String, String> partition) {
+    // The flink's PartitionFanoutWriter will handle the static partition 
write policy automatically.
   }
 
   @Override
-  public TableSchema getTableSchema() {
-    return this.tableSchema;
+  public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+    return ChangelogMode.newBuilder()

Review comment:
       As the javadoc said: 
   
   ```java
   @PublicEvolving
   public interface DynamicTableSink {
   
       /**
        * Returns the set of changes that the sink accepts during runtime.
        *
        * <p>The planner can make suggestions but the sink has the final 
decision what it requires. If
        * the planner does not support this mode, it will throw an error. For 
example, the sink can
        * return that it only supports {@link ChangelogMode#insertOnly()}.
        *
        * @param requestedMode expected set of changes by the current plan
        */
       ChangelogMode getChangelogMode(ChangelogMode requestedMode);
   ```
   
   We need to ensure that the returned changelogMode is a subset of 
`requestedMode`.  The correct one should be: 
   
   ```java
           @Override
           public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
               ChangelogMode.Builder builder = ChangelogMode.newBuilder();
               for (RowKind kind : requestedMode.getContainedKinds()) {
                       builder.addContainedKind(kind);
               }
               return builder.build();
           }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to