Reo-LEI commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r693335688
##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -59,12 +60,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context
context) {
.map(UniqueConstraint::getColumns)
.orElseGet(ImmutableList::of);
- return (DataStreamSinkProvider) dataStream ->
FlinkSink.forRowData(dataStream)
- .tableLoader(tableLoader)
- .tableSchema(tableSchema)
- .equalityFieldColumns(equalityColumns)
- .overwrite(overwrite)
- .build();
+ return (DataStreamSinkProvider) dataStream -> {
+ // For CDC case in FlinkSQL, change log will be rebalanced(default
partition strategy) distributed to Filter opr
+ // when set job default parallelism greater than 1. That will make
change log data disorder and produce a wrong
+ // result for iceberg(e.g. +U comes before -U). Here try to specific the
Filter opr parallelism same as it's
+ // input to keep Filter chaining it's input and avoid rebalance.
+ Transformation<?> forwardOpr = dataStream.getTransformation();
+ if (forwardOpr.getName().equals("Filter") &&
forwardOpr.getInputs().size() == 1) {
Review comment:
I found that we can capture the inputstream changelog mode when A is
called, and then determine whether `Filter` is needed to chain its input by
checking inputstream changelog mode.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]