kbendick commented on a change in pull request #2898:
URL: https://github.com/apache/iceberg/pull/2898#discussion_r685772264



##########
File path: flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java
##########
@@ -59,12 +60,22 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
         .map(UniqueConstraint::getColumns)
         .orElseGet(ImmutableList::of);
 
-    return (DataStreamSinkProvider) dataStream -> 
FlinkSink.forRowData(dataStream)
-        .tableLoader(tableLoader)
-        .tableSchema(tableSchema)
-        .equalityFieldColumns(equalityColumns)
-        .overwrite(overwrite)
-        .build();
+    return (DataStreamSinkProvider) dataStream ->  {
+      // For CDC case in FlinkSQL, change log will be rebalanced(default 
partition strategy) distributed to Filter opr
+      // when set job default parallelism greater than 1. That will make 
change log data disorder and produce a wrong
+      // result for iceberg(e.g. +U comes before -U). Here try to specific the 
Filter opr parallelism same as it's
+      // input to keep Filter chaining it's input and avoid rebalance.
+      Transformation<?> forwardOpr = dataStream.getTransformation();
+      if (forwardOpr.getName().equals("Filter") && 
forwardOpr.getInputs().size() == 1) {

Review comment:
       So I'm still not comfortable with relying on this `Filter` operation 
being appended. This assumes that Flink won't change that behavior, or that 
we'll catch it in between releases.
   
   Is there another way to detect the CDC case without relying on the `Filter` 
operation?
   
   Also, I think we'll potentially catch more cases than just CDC by relying on 
`Filter` (as that's a pretty generic operation name).
   
   Is there no way to detect that the source is a change data capture? Or that 
we more generally have a `RetractStream` (unless I'm mistaken, The +U / -U 
should be in play because this is a `RetractStream`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to