[ 
https://issues.apache.org/jira/browse/FLINK-38201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013666#comment-18013666
 ] 

Xuyang Zhong commented on FLINK-38201:
--------------------------------------

Hi, [~twalthr] I have some doubts about whether we can remove the sink 
materializer from the retract sink without loss. Assuming we have an upstream 
node with the upsert key `b`, writing to a sink where the primary key is column 
`a`, and the sink is a retract sink.

If two subtasks on the upstream node produce the following data: subtask1 sends 
`+I(a1, b1)` and `-U(a1, b1)`; subtask2 sends `+U(a1, b2)`. What will be the 
final result?

In fact, there will be several possible sequences of data that the sink can 
receive:
 * If the sink receives `+I(a1, b1)`, `-U(a1, b1)`, and `+U(a1, b2)` in that 
order, then regardless of whether there is a sink materializer node, we will 
ultimately get `a1, b2`.
 * If the sink receives `+I(a1, b1)`, `+U(a1, b2)`, and `-U(a1, b1)` in that 
order, and if there is a sink materializer, the result will be `a1, b2`. But 
what is the current result after removing the sink materializer?
 * If the sink receives `+U(a1, b2)`, `+I(a1, b1)`, and `-U(a1, b1)` in that 
order, and if there is a sink materializer, the result will be `a1, b2`. But 
what is the current result after removing the sink materializer?

> SinkUpsertMaterializer should not be inserted for retract sinks
> ---------------------------------------------------------------
>
>                 Key: FLINK-38201
>                 URL: https://issues.apache.org/jira/browse/FLINK-38201
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.1.0
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.2.0
>
>
> Currently, the logic in FlinkChangelogModeInferenceProgram for enabling 
> upsert materialize does not distinguish between retract and upsert. Thus, 
> SinkUpsertMaterializer is added for retract sinks. The resulting changelog is 
> incorrect as it misses -U. Retract is flexible enough to simply be passed to 
> the sink without further changes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to