Xin Gao created FLINK-39944:
-------------------------------

             Summary: Dynamic Kafka source cluster removal is not resilient 
enough
                 Key: FLINK-39944
                 URL: https://issues.apache.org/jira/browse/FLINK-39944
             Project: Flink
          Issue Type: Bug
            Reporter: Xin Gao


{{DynamicKafkaSource}} supports reading from a set of Kafka clusters returned 
by a metadata service. It has:
 * one top-level enumerator that periodically refreshes the metadata service to 
find the current active cluster/topic set;
 * one per-cluster `{{{}KafkaSourceEnumerator`{}}} for each active Kafka 
cluster, which periodically discovers topic partitions from that cluster.

When the metadata service removes cluster {{{}A{}}}, the next top-level 
metadata refresh should detect that {{A}} is no longer active and remove its 
per-cluster enumerator/readers.

The problem is that both operations use 
`{{{}SplitEnumeratorContext.callAsync`{}}}:
 * per-cluster partition discovery for {{{}A{}}};
 * top-level metadata refresh that should remove {{{}A{}}}.

{*}If cluster {{A}} is already removed or unreachable{*}, its Kafka AdminClient 
metadata request can block until timeout. That stale per-cluster async work can 
delay the top-level metadata refresh task in the same {{callAsync}} worker 
path. Then the callback that applies the new active cluster set does not run in 
time, s{*}o {{DynamicKafkaSource}} keeps consuming from or waiting on the 
removed cluster longer than expected, and may restart before reconciliation 
completes.{*}

 

Instead, we should *expect* that a stale per-cluster metadata request should 
not block the control-plane metadata refresh needed to remove that cluster.

{*}Proposal{*}: 
Add cancellation and/or priority/isolation support to 
`{{{}SplitEnumeratorContext.callAsync{}}}`, while keeping result handlers 
serialized on the coordinator thread.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to