[ 
https://issues.apache.org/jira/browse/FLINK-39944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-39944:
-----------------------------------
    Component/s: Connectors / Kafka

> Dynamic Kafka source cluster removal is not resilient enough
> ------------------------------------------------------------
>
>                 Key: FLINK-39944
>                 URL: https://issues.apache.org/jira/browse/FLINK-39944
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Xin Gao
>            Assignee: Xin Gao
>            Priority: Critical
>
> {{DynamicKafkaSource}} supports reading from a set of Kafka clusters returned 
> by a metadata service. It has:
>  * one top-level enumerator that periodically refreshes the metadata service 
> to find the current active cluster/topic set;
>  * one per-cluster `{{{}KafkaSourceEnumerator`{}}} for each active Kafka 
> cluster, which periodically discovers topic partitions from that cluster.
> When the metadata service removes cluster {{{}A{}}}, the next top-level 
> metadata refresh should detect that {{A}} is no longer active and remove its 
> per-cluster enumerator/readers.
> The problem is that both operations use 
> `{{{}SplitEnumeratorContext.callAsync`{}}}:
>  * per-cluster partition discovery for {{{}A{}}};
>  * top-level metadata refresh that should remove {{{}A{}}}.
> {*}If cluster {{A}} is already removed or unreachable{*}, its Kafka 
> AdminClient metadata request can block until timeout. That stale per-cluster 
> async work can delay the top-level metadata refresh task in the same 
> {{callAsync}} worker path. Then the callback that applies the new active 
> cluster set does not run in time, s{*}o {{DynamicKafkaSource}} keeps 
> consuming from or waiting on the removed cluster longer than expected, and 
> may restart before reconciliation completes.{*}
>  
> Instead, we should *expect* that a stale per-cluster metadata request should 
> not block the control-plane metadata refresh needed to remove that cluster.
> {*}Proposal{*}: 
> Add cancellation and/or priority/isolation support to 
> `{{{}SplitEnumeratorContext.callAsync{}}}`, while keeping result handlers 
> serialized on the coordinator thread.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to