[ 
https://issues.apache.org/jira/browse/FLINK-39979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39979:
-----------------------------------
    Labels: pull-request-available  (was: )

> DynamicKafkaSource does not mark reader idle when metadata removal leaves a 
> subtask with no active subreaders
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39979
>                 URL: https://issues.apache.org/jira/browse/FLINK-39979
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 2.2.0
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>              Labels: pull-request-available
>
> When DynamicKafkaSource consumes a metadata update that removes all assigned 
> splits for a source subtask, the subtask can end up with an empty 
> {{{}clusterReaderMap{}}}. The reader releases removed split outputs, but 
> because there is no child {{KafkaSourceReader}} left, no child can call 
> {{{}markIdle(){}}}.
> The aggregate idleness logic currently only treats the reader as idle when 
> {{clusterReaderMap}} is non-empty and every child reader is idle. That misses 
> the “all children removed after metadata update” case, so the source subtask 
> can remain non-idle and hold back watermarks. In production this manifested 
> as frozen watermarks and delayed partition release after a large analytics 
> topic migration removed old clusters.
> *Expected Behavior*
> After metadata removal leaves a DynamicKafkaSourceReader with no active 
> subreaders, the reader should mark its aggregate output idle so it does not 
> block downstream watermarks.
> *Actual Behavior*
> The reader returns {{{}NOTHING_AVAILABLE{}}}, but does not mark idle because 
> {{clusterReaderMap.isEmpty()}} makes the aggregate idleness check false.
> *Impact*
> Large cluster/topic removals can freeze watermarks for affected subtasks even 
> when Kafka lag is low, delaying StreamFlink partition release and triggering 
> false/high-severity data freshness alerts.
> *Proposed Fix*
> Treat an empty {{clusterReaderMap}} as idle once the reader has already 
> received metadata / started active consumption. Keep startup behavior 
> unchanged so a fresh reader with no metadata yet does not incorrectly become 
> idle.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to