[
https://issues.apache.org/jira/browse/FLINK-39979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39979:
-----------------------------------
Labels: pull-request-available (was: )
> DynamicKafkaSource does not mark reader idle when metadata removal leaves a
> subtask with no active subreaders
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39979
> URL: https://issues.apache.org/jira/browse/FLINK-39979
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 2.2.0
> Reporter: Bowen Li
> Assignee: Bowen Li
> Priority: Major
> Labels: pull-request-available
>
> When DynamicKafkaSource consumes a metadata update that removes all assigned
> splits for a source subtask, the subtask can end up with an empty
> {{{}clusterReaderMap{}}}. The reader releases removed split outputs, but
> because there is no child {{KafkaSourceReader}} left, no child can call
> {{{}markIdle(){}}}.
> The aggregate idleness logic currently only treats the reader as idle when
> {{clusterReaderMap}} is non-empty and every child reader is idle. That misses
> the “all children removed after metadata update” case, so the source subtask
> can remain non-idle and hold back watermarks. In production this manifested
> as frozen watermarks and delayed partition release after a large analytics
> topic migration removed old clusters.
> *Expected Behavior*
> After metadata removal leaves a DynamicKafkaSourceReader with no active
> subreaders, the reader should mark its aggregate output idle so it does not
> block downstream watermarks.
> *Actual Behavior*
> The reader returns {{{}NOTHING_AVAILABLE{}}}, but does not mark idle because
> {{clusterReaderMap.isEmpty()}} makes the aggregate idleness check false.
> *Impact*
> Large cluster/topic removals can freeze watermarks for affected subtasks even
> when Kafka lag is low, delaying StreamFlink partition release and triggering
> false/high-severity data freshness alerts.
> *Proposed Fix*
> Treat an empty {{clusterReaderMap}} as idle once the reader has already
> received metadata / started active consumption. Keep startup behavior
> unchanged so a fresh reader with no metadata yet does not incorrectly become
> idle.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)