Ran Tao created FLINK-39696:
-------------------------------
Summary: Distributed SchemaOperator emits FlushEvent with schema
operator subtaskId instead of upstream sourcePartition
Key: FLINK-39696
URL: https://issues.apache.org/jira/browse/FLINK-39696
Project: Flink
Issue Type: Bug
Components: Flink CDC
Reporter: Ran Tao
In the distributed schema evolution topology, the source identity used by the
schema-change path and the flush path is inconsistent.
{*}Current behavior{*}:
1. DistributedPrePartitionOperator preserves the original upstream source
partition when it broadcasts a schema change event.
2. Distributed SchemaOperator forwards that identity to the coordinator via
{*}SchemaChangeRequest(sourcePartition, sinkSubTaskId, schemaChangeEvent){*}.
3. SchemaCoordinator also deduplicates distributed schema change events by
(sourcePartition, schemaChangeEvent).
4. However, when SchemaOperator emits the downstream FlushEvent, it currently
uses the SchemaOperator subtask id instead of the original source partition.
As a result, the same distributed schema change is identified by different keys
in different parts of the pipeline:
* SchemaChangeRequest / coordinator deduplication use the original source
partition
* FlushEvent / FlushSuccessEvent use the SchemaOperator subtask id
This makes the flush lineage inconsistent with the schema-change lineage in
distributed topology, and duplicated broadcast branches of one upstream schema
change cannot be consistently aligned by the original source partition.
{*}Expected behavior{*}:
FlushEvent in distributed SchemaOperator should carry the original upstream
source partition, i.e. the same source identity already used in
SchemaChangeRequest.
{*}Suggested fix{*}:
In
{*}org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaOperator#requestSchemaChange{*},
construct FlushEvent with schemaChangeRequest.getSourceSubTaskId() instead of
the current SchemaOperator subtask id.
A simple regression test can reproduce the issue:
* source partition = 0
* distributed SchemaOperator subtask = 1
* process a CreateTableEvent
* the first emitted FlushEvent should use 0, not 1
--
This message was sent by Atlassian Jira
(v8.20.10#820010)