openinx commented on a change in pull request #2229:
URL: https://github.com/apache/iceberg/pull/2229#discussion_r589905537
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -20,66 +20,72 @@
package org.apache.iceberg.flink;
import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.sinks.OverwritableTableSink;
-import org.apache.flink.table.sinks.PartitionableTableSink;
-import org.apache.flink.table.sinks.TableSink;
-import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
-public class IcebergTableSink implements AppendStreamTableSink<RowData>,
OverwritableTableSink, PartitionableTableSink {
- private final boolean isBounded;
+public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning, SupportsOverwrite {
private final TableLoader tableLoader;
private final TableSchema tableSchema;
private boolean overwrite = false;
- public IcebergTableSink(boolean isBounded, TableLoader tableLoader,
TableSchema tableSchema) {
- this.isBounded = isBounded;
+ private IcebergTableSink(IcebergTableSink toCopy) {
+ this.tableLoader = toCopy.tableLoader;
+ this.tableSchema = toCopy.tableSchema;
+ this.overwrite = toCopy.overwrite;
+ }
+
+ public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
}
@Override
- public DataStreamSink<?> consumeDataStream(DataStream<RowData> dataStream) {
- Preconditions.checkState(!overwrite || isBounded, "Unbounded data stream
doesn't support overwrite operation.");
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+ Preconditions.checkState(!overwrite || context.isBounded(),
+ "Unbounded data stream doesn't support overwrite operation.");
- return FlinkSink.forRowData(dataStream)
+ return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.overwrite(overwrite)
.build();
}
@Override
- public DataType getConsumedDataType() {
- return tableSchema.toRowDataType().bridgedTo(RowData.class);
+ public void applyStaticPartition(Map<String, String> partition) {
+ // The flink's PartitionFanoutWriter will handle the static partition
write policy automatically.
}
@Override
- public TableSchema getTableSchema() {
- return this.tableSchema;
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return ChangelogMode.newBuilder()
Review comment:
As the javadoc said:
```java
@PublicEvolving
public interface DynamicTableSink {
/**
* Returns the set of changes that the sink accepts during runtime.
*
* <p>The planner can make suggestions but the sink has the final
decision what it requires. If
* the planner does not support this mode, it will throw an error. For
example, the sink can
* return that it only supports {@link ChangelogMode#insertOnly()}.
*
* @param requestedMode expected set of changes by the current plan
*/
ChangelogMode getChangelogMode(ChangelogMode requestedMode);
```
We need to ensure that the returned changelogMode is a subset of
`requestedMode`. The correct one should be:
```java
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
builder.addContainedKind(kind);
}
return builder.build();
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]