[
https://issues.apache.org/jira/browse/FLINK-38201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18014058#comment-18014058
]
Timo Walther edited comment on FLINK-38201 at 8/15/25 7:45 AM:
---------------------------------------------------------------
Hi [~xuyangzhong], thanks for taking the time and diving deeper into this
topic. Let me try to give some explanation:
Retract sinks work different than upsert sinks. Usually they need to shuffle
either on the entire row or a subset of columns. The subset of columns does not
have to be related to the upsert key. In general, retract sinks do not have
problems with changelog disorder as they can support duplicates. After
materialization of these duplicates, the changelog disorder is resolved. But
this materialization does not need to happen within Flink. In fact a retract
source in Flink, can simplify consume these changes without any issues.
Retract is actually more related to append than to upsert mode. Retract sinks
should behave like append sinks. For the append case, we also just blindly
accept the PRIMARY KEY declaration without checking with an upsert key. And
this is fine because the key is marked as NOT ENFORCED. What the PR did is to
just remove the SinkUpsertMaterializer, because it did not fit into the
pipeline.
In general, I would not advise users to set the primary key on retract sinks.
But if they want, the behavior is as inconsistent as with append sinks.
was (Author: twalthr):
Hi [~xuyangzhong], thanks for taking the time and diving deeper into this
topic. Let me try to give some explanation:
Retract sinks work different than upsert sinks. Usually they need to shuffle
either on the entire row or a subset of columns. The subset of columns does not
have to be related to the upsert key. In general, retract sinks do not have
problems with changelog disorder as they can support duplicates. After
materialization of these duplicates, the changelog disorder is resolved. But
this materialization does not need to happen within Flink.
Retract is actually more related to append than to upsert mode. Retract sinks
should behave like append sinks. For the append case, we also just blindly
accept the PRIMARY KEY declaration without checking with an upsert key. And
this is fine because the key is marked as NOT ENFORCED. What the PR did is to
just remove the SinkUpsertMaterializer, because it did not fit into the
pipeline.
In general, I would not advise users to set the primary key on retract sinks.
But if they want, the behavior is as inconsistent as with append sinks.
> SinkUpsertMaterializer should not be inserted for retract sinks
> ---------------------------------------------------------------
>
> Key: FLINK-38201
> URL: https://issues.apache.org/jira/browse/FLINK-38201
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.1.0
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.2.0
>
>
> Currently, the logic in FlinkChangelogModeInferenceProgram for enabling
> upsert materialize does not distinguish between retract and upsert. Thus,
> SinkUpsertMaterializer is added for retract sinks. The resulting changelog is
> incorrect as it misses -U. Retract is flexible enough to simply be passed to
> the sink without further changes.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)