[ 
https://issues.apache.org/jira/browse/FLINK-39728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jim Hughes updated FLINK-39728:
-------------------------------
    Description: 
KafkaPartitionSplitReader.pauseOrResumeSplits throws IllegalStateException on 
concurrently unassigned partitions

KafkaPartitionSplitReader.pauseOrResumeSplits() calls consumer.pause() and 
consumer.resume() with the full set of requested partitions without checking 
whether those partitions are still part of the consumer's current assignment.

Between the time the source reader decides which splits to pause/resume and the 
time pauseOrResumeSplits() executes, a partition can be unassigned by a 
concurrent fetch() call (which removes finished splits) or by 
removeEmptySplits(). When this happens, the Kafka consumer throws:

{code}
java.lang.IllegalStateException: No current assignment for partition 
<topic>-<partition>
{code}

This crashes the source reader even though the correct behavior is to silently 
skip partitions that are no longer assigned — pausing or resuming an unassigned 
partition is a no-op by definition.

> KafkaPartitionSplitReader.pauseOrResumeSplits throws IllegalStateException on 
> concurrently unassigned partitions
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39728
>                 URL: https://issues.apache.org/jira/browse/FLINK-39728
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Jim Hughes
>            Assignee: Jim Hughes
>            Priority: Major
>
> KafkaPartitionSplitReader.pauseOrResumeSplits throws IllegalStateException on 
> concurrently unassigned partitions
> KafkaPartitionSplitReader.pauseOrResumeSplits() calls consumer.pause() and 
> consumer.resume() with the full set of requested partitions without checking 
> whether those partitions are still part of the consumer's current assignment.
> Between the time the source reader decides which splits to pause/resume and 
> the time pauseOrResumeSplits() executes, a partition can be unassigned by a 
> concurrent fetch() call (which removes finished splits) or by 
> removeEmptySplits(). When this happens, the Kafka consumer throws:
> {code}
> java.lang.IllegalStateException: No current assignment for partition 
> <topic>-<partition>
> {code}
> This crashes the source reader even though the correct behavior is to 
> silently skip partitions that are no longer assigned — pausing or resuming an 
> unassigned partition is a no-op by definition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to