wenlong88 commented on a change in pull request #16096:
URL: https://github.com/apache/flink/pull/16096#discussion_r651616824



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkMetadata.java
##########
@@ -238,4 +239,20 @@ ImmutableBitSet getUniqueGroups(
             RelWindowProperties getWindowProperties(RelNode r, 
RelMetadataQuery mq);
         }
     }
+
+    /** Metadata about which combinations of columns are change log upsert 
identifiers. */
+    public interface ChangeLogUpsertKeys extends Metadata {
+        Method METHOD = Types.lookupMethod(ChangeLogUpsertKeys.class, 
"getChangeLogUpsertKeys");
+
+        MetadataDef<ChangeLogUpsertKeys> DEF =
+                MetadataDef.of(
+                        ChangeLogUpsertKeys.class, 
ChangeLogUpsertKeys.Handler.class, METHOD);
+
+        Set<ImmutableBitSet> getChangeLogUpsertKeys();

Review comment:
       In my understanding from the description of the issue, the new added key 
is used to describe how the change log stream is distributed in parallel, so 
the key can only be single one, correct me if I am wrong. 
   
   on the other hand, Consider: changelog source + rank(top 1) + upsert sink, 
if both input unique keys and partitionKey + rank_number are changelog upsert 
keys,  how can we get the right decision as: when pk of sink is partition key 
we don't need any extra process before write to sink, but when pk of sink is 
the pk of source, we will need to add extra process before sink to take records 
disorder into consider.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to