[ 
https://issues.apache.org/jira/browse/FLINK-36701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897856#comment-17897856
 ] 

zjjiang edited comment on FLINK-36701 at 11/13/24 8:48 AM:
-----------------------------------------------------------

 I'd like to work on this issue, would you please assign it to me?

I intend to request Schema Manager's latest evolved schema when the flushevent 
is processed by the SinkWriterOperator(step 5. in the picture above) if the 
operator doesn't have a local cache of the schema. At this point the schema 
manager hasn't confirmed that the flush was successful, so the schema change 
event hasn't been applied to the evolved schema yet (step 6.3).


was (Author: JIRAUSER307560):
 I'd like to work on this issue, would you please assign it to me?

I intend to request the latest evolved schema when the flushevent is processed 
by the SinkWriterOperator if the sink operator doesn't have a local cache of 
the schema. At this point the schema manager hasn't confirmed that the flush 
was successful, so the schema change event hasn't been applied to the evolved 
schema yet.

> Pipeline failover again after handling a schema change event as the first 
> event after a failover
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36701
>                 URL: https://issues.apache.org/jira/browse/FLINK-36701
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.0, cdc-3.3.0
>            Reporter: zjjiang
>            Priority: Major
>         Attachments: image-2024-11-13-16-29-30-065.png
>
>
> Currently, directly after a failover, when the pipeline first handles a 
> schema change event (e.g. addColumnEvent) and then a DataChangeEvent, it may 
> cause the job to fail again as sink has repeatedly applied that schema change.
> The cause of the problem can be explained as follows:
> 1. SinkWriterOperator now requests the latest schema when it receives the 
> first non-createTableEvent schema change event (assuming there is no schema 
> in the local cache).
> 2. The schema manager applies the schema change after confirming flush 
> success.
> 3. Assume that the sequence after failover is to process a schema change 
> event first, followed by a data change event.
> On the schema manager side, the schema manager will apply the schema change 
> event to its cached schema(i.e. evolvedSchema) after confirming a successful 
> flush.
> On the SinkWriterOperator side, the processing flow is:
> 1) Handle the flushEvent;
> 2) Handle the schema change event (in this step, the latest schema will be 
> fetched from the schema manager and sent downstream; then the schema change 
> event will be emitted) – note that this step does not report an error;
> 3) Handle the data change – here the failover occurs because the data record 
> column size does not match the schema.
> !image-2024-11-13-16-29-30-065.png|width=1436,height=960!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to