[ 
https://issues.apache.org/jira/browse/FLINK-39696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39696:
-----------------------------------
    Labels: pull-request-available  (was: )

> Distributed SchemaOperator emits FlushEvent with schema operator subtaskId 
> instead of upstream sourcePartition
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39696
>                 URL: https://issues.apache.org/jira/browse/FLINK-39696
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>            Reporter: Ran Tao
>            Priority: Major
>              Labels: pull-request-available
>
> In the distributed schema evolution topology, the source identity used by the 
> schema-change path and the flush path is inconsistent.
> {*}Current behavior{*}:
> 1. DistributedPrePartitionOperator preserves the original upstream source 
> partition when it broadcasts a schema change event.
> 2. Distributed SchemaOperator forwards that identity to the coordinator via 
> {*}SchemaChangeRequest(sourcePartition, sinkSubTaskId, schemaChangeEvent){*}.
> 3. SchemaCoordinator also deduplicates distributed schema change events by 
> (sourcePartition, schemaChangeEvent).
> 4. However, when SchemaOperator emits the downstream FlushEvent, it currently 
> uses the SchemaOperator subtask id instead of the original source partition.
> As a result, the same distributed schema change is identified by different 
> keys in different parts of the pipeline:
>  * SchemaChangeRequest / coordinator deduplication use the original source 
> partition
>  * FlushEvent / FlushSuccessEvent use the SchemaOperator subtask id
> This makes the flush lineage inconsistent with the schema-change lineage in 
> distributed topology, and duplicated broadcast branches of one upstream 
> schema change cannot be consistently aligned by the original source partition.
> {*}Expected behavior{*}:
> FlushEvent in distributed SchemaOperator should carry the original upstream 
> source partition, i.e. the same source identity already used in 
> SchemaChangeRequest.
> {*}Suggested fix{*}:
> In 
> {*}org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaOperator#requestSchemaChange{*},
>  construct FlushEvent with schemaChangeRequest.getSourceSubTaskId() instead 
> of the current SchemaOperator subtask id.
> A simple regression test can reproduce the issue:
>  * source partition = 0
>  * distributed SchemaOperator subtask = 1
>  * process a CreateTableEvent
>  * the first emitted FlushEvent should use 0, not 1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to