Ran Tao created FLINK-39696:
-------------------------------

             Summary: Distributed SchemaOperator emits FlushEvent with schema 
operator subtaskId instead of upstream sourcePartition
                 Key: FLINK-39696
                 URL: https://issues.apache.org/jira/browse/FLINK-39696
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Ran Tao


In the distributed schema evolution topology, the source identity used by the 
schema-change path and the flush path is inconsistent.

{*}Current behavior{*}:
1. DistributedPrePartitionOperator preserves the original upstream source 
partition when it broadcasts a schema change event.
2. Distributed SchemaOperator forwards that identity to the coordinator via 
{*}SchemaChangeRequest(sourcePartition, sinkSubTaskId, schemaChangeEvent){*}.
3. SchemaCoordinator also deduplicates distributed schema change events by 
(sourcePartition, schemaChangeEvent).
4. However, when SchemaOperator emits the downstream FlushEvent, it currently 
uses the SchemaOperator subtask id instead of the original source partition.

As a result, the same distributed schema change is identified by different keys 
in different parts of the pipeline:
 * SchemaChangeRequest / coordinator deduplication use the original source 
partition
 * FlushEvent / FlushSuccessEvent use the SchemaOperator subtask id

This makes the flush lineage inconsistent with the schema-change lineage in 
distributed topology, and duplicated broadcast branches of one upstream schema 
change cannot be consistently aligned by the original source partition.

{*}Expected behavior{*}:
FlushEvent in distributed SchemaOperator should carry the original upstream 
source partition, i.e. the same source identity already used in 
SchemaChangeRequest.

{*}Suggested fix{*}:
In 
{*}org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaOperator#requestSchemaChange{*},
 construct FlushEvent with schemaChangeRequest.getSourceSubTaskId() instead of 
the current SchemaOperator subtask id.

A simple regression test can reproduce the issue:
 * source partition = 0
 * distributed SchemaOperator subtask = 1
 * process a CreateTableEvent
 * the first emitted FlushEvent should use 0, not 1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to