twalthr commented on PR #26879:
URL: https://github.com/apache/flink/pull/26879#issuecomment-3173996255

   Thanks for the quick reply @xuyangzhong. I tried to apply your suggestion to 
`DuplicateChangesInferRule#canConsumeDuplicateChanges`:
   ```
       private boolean canConsumeDuplicateChanges(StreamPhysicalSink sink) {
           try {
               final ChangelogMode sinkChangelogMode =
                       sink.tableSink().getChangelogMode(ChangelogMode.all());
               final boolean sinkIsAppend = 
sinkChangelogMode.containsOnly(RowKind.INSERT);
               final boolean sinkIsRetract = 
sinkChangelogMode.contains(RowKind.UPDATE_BEFORE);
               if (sinkIsAppend || sinkIsRetract) {
                   return false;
               }
           } catch (Throwable t) {
               return false;
           }
           return 
sink.contextResolvedTable().getResolvedSchema().getPrimaryKey().isPresent();
       }
   ```
   A lot of test will behave differently with this change. I guess also because 
`pk_snk` in `DuplicateChangesInferRuleTest` should be declared as 
`'sink-changelog-mode-enforced' = 'I,UA,D'`? In any case, it might be best if 
you take another look at this issue. I will open a followup issue and disable 
the affected tests temporarily.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to