Xin Gao created FLINK-39944:
-------------------------------
Summary: Dynamic Kafka source cluster removal is not resilient
enough
Key: FLINK-39944
URL: https://issues.apache.org/jira/browse/FLINK-39944
Project: Flink
Issue Type: Bug
Reporter: Xin Gao
{{DynamicKafkaSource}} supports reading from a set of Kafka clusters returned
by a metadata service. It has:
* one top-level enumerator that periodically refreshes the metadata service to
find the current active cluster/topic set;
* one per-cluster `{{{}KafkaSourceEnumerator`{}}} for each active Kafka
cluster, which periodically discovers topic partitions from that cluster.
When the metadata service removes cluster {{{}A{}}}, the next top-level
metadata refresh should detect that {{A}} is no longer active and remove its
per-cluster enumerator/readers.
The problem is that both operations use
`{{{}SplitEnumeratorContext.callAsync`{}}}:
* per-cluster partition discovery for {{{}A{}}};
* top-level metadata refresh that should remove {{{}A{}}}.
{*}If cluster {{A}} is already removed or unreachable{*}, its Kafka AdminClient
metadata request can block until timeout. That stale per-cluster async work can
delay the top-level metadata refresh task in the same {{callAsync}} worker
path. Then the callback that applies the new active cluster set does not run in
time, s{*}o {{DynamicKafkaSource}} keeps consuming from or waiting on the
removed cluster longer than expected, and may restart before reconciliation
completes.{*}
Instead, we should *expect* that a stale per-cluster metadata request should
not block the control-plane metadata refresh needed to remove that cluster.
{*}Proposal{*}:
Add cancellation and/or priority/isolation support to
`{{{}SplitEnumeratorContext.callAsync{}}}`, while keeping result handlers
serialized on the coordinator thread.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)