Reo-LEI commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r681841099



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -59,12 +60,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
         .map(UniqueConstraint::getColumns)
         .orElseGet(ImmutableList::of);
 
-    return (DataStreamSinkProvider) dataStream -> 
FlinkSink.forRowData(dataStream)
-        .tableLoader(tableLoader)
-        .tableSchema(tableSchema)
-        .equalityFieldColumns(equalityColumns)
-        .overwrite(overwrite)
-        .build();
+    return (DataStreamSinkProvider) dataStream ->  {
+      // For CDC case in FlinkSQL, change log will be rebalanced(default 
partition strategy) distributed to Filter opr
+      // when set job default parallelism greater than 1. That will make 
change log data disorder and produce a wrong
+      // result for iceberg(e.g. +U comes before -U). Here try to specific the 
Filter opr parallelism same as it's
+      // input to keep Filter chaining it's input and avoid rebalance.
+      Transformation<?> forwardOpr = dataStream.getTransformation();
+      if (forwardOpr.getName().equals("Filter") && 
forwardOpr.getInputs().size() == 1) {

Review comment:
       The `Filter` opr is appended by flink to prevenet send null data to 
downstream. So the `Fliter` opr will alway exists when we use FlinkSQL to 
construct our job, and I think the opr name can not be change by user(so far 
I'm not found relevant api to rename an opr by sql).
   
   And I think we can simply set the `Filter` opr parallelism as its input and 
keep `Filter` chain its input opr in all case and without worrying some 
side-effect. Because `Filter` will not effect the input stream.
   
   Here we only change the forward opr parallelism when forward opr is 
`Filter`, I think that is generally enough and safe condition to keep `Filter` 
chain its input




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to