[ 
https://issues.apache.org/jira/browse/FLINK-38201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18014058#comment-18014058
 ] 

Timo Walther edited comment on FLINK-38201 at 8/15/25 7:45 AM:
---------------------------------------------------------------

Hi [~xuyangzhong], thanks for taking the time and diving deeper into this 
topic. Let me try to give some explanation:

Retract sinks work different than upsert sinks. Usually they need to shuffle 
either on the entire row or a subset of columns. The subset of columns does not 
have to be related to the upsert key. In general, retract sinks do not have 
problems with changelog disorder as they can support duplicates. After 
materialization of these duplicates, the changelog disorder is resolved. But 
this materialization does not need to happen within Flink. In fact a retract 
source in Flink, can simplify consume these changes without any issues.

Retract is actually more related to append than to upsert mode. Retract sinks 
should behave like append sinks. For the append case, we also just blindly 
accept the PRIMARY KEY declaration without checking with an upsert key. And 
this is fine because the key is marked as NOT ENFORCED. What the PR did is to 
just remove the SinkUpsertMaterializer, because it did not fit into the 
pipeline.

In general, I would not advise users to set the primary key on retract sinks. 
But if they want, the behavior is as inconsistent as with append sinks.


was (Author: twalthr):
Hi [~xuyangzhong], thanks for taking the time and diving deeper into this 
topic. Let me try to give some explanation:

Retract sinks work different than upsert sinks. Usually they need to shuffle 
either on the entire row or a subset of columns. The subset of columns does not 
have to be related to the upsert key. In general, retract sinks do not have 
problems with changelog disorder as they can support duplicates. After 
materialization of these duplicates, the changelog disorder is resolved. But 
this materialization does not need to happen within Flink.

Retract is actually more related to append than to upsert mode. Retract sinks 
should behave like append sinks. For the append case, we also just blindly 
accept the PRIMARY KEY declaration without checking with an upsert key. And 
this is fine because the key is marked as NOT ENFORCED. What the PR did is to 
just remove the SinkUpsertMaterializer, because it did not fit into the 
pipeline.

In general, I would not advise users to set the primary key on retract sinks. 
But if they want, the behavior is as inconsistent as with append sinks.

> SinkUpsertMaterializer should not be inserted for retract sinks
> ---------------------------------------------------------------
>
>                 Key: FLINK-38201
>                 URL: https://issues.apache.org/jira/browse/FLINK-38201
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.1.0
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.2.0
>
>
> Currently, the logic in FlinkChangelogModeInferenceProgram for enabling 
> upsert materialize does not distinguish between retract and upsert. Thus, 
> SinkUpsertMaterializer is added for retract sinks. The resulting changelog is 
> incorrect as it misses -U. Retract is flexible enough to simply be passed to 
> the sink without further changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to