[ 
https://issues.apache.org/jira/browse/FLINK-39728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18082834#comment-18082834
 ] 

Aleksandr Savonin commented on FLINK-39728:
-------------------------------------------

Hi, thank you for the fix. 

One small observation on the description, in case it helps future readers:

The pause/resume command is enqueued by the SourceOperator (mailbox thread) and 
executed later by the SplitFetcher thread. Between those two events, the 
SplitFetcher itself may run a {{fetch()}} iteration that unassigns finished 
splits, or process an {{AddSplitsTask}} whose {{removeEmptySplits()}} unassigns 
empty ones. When the queued command then executes, the targeted partition may 
no longer be in the consumer's assignment.

That’s why I find “concurrently” a bit misleading: the unassign and the 
pause/resume call are sequential on the SplitFetcher thread, not parallel. The 
race is between the mailbox thread’s view at enqueue time and the 
SplitFetcher’s state at execution time.

But it does not affect the proposed fix.

> KafkaPartitionSplitReader.pauseOrResumeSplits throws IllegalStateException on 
> concurrently unassigned partitions
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39728
>                 URL: https://issues.apache.org/jira/browse/FLINK-39728
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Jim Hughes
>            Assignee: Jim Hughes
>            Priority: Major
>              Labels: pull-request-available
>
> KafkaPartitionSplitReader.pauseOrResumeSplits throws IllegalStateException on 
> concurrently unassigned partitions
> KafkaPartitionSplitReader.pauseOrResumeSplits() calls consumer.pause() and 
> consumer.resume() with the full set of requested partitions without checking 
> whether those partitions are still part of the consumer's current assignment.
> Between the time the source reader decides which splits to pause/resume and 
> the time pauseOrResumeSplits() executes, a partition can be unassigned by a 
> concurrent fetch() call (which removes finished splits) or by 
> removeEmptySplits(). When this happens, the Kafka consumer throws:
> {code}
> java.lang.IllegalStateException: No current assignment for partition 
> <topic>-<partition>
> {code}
> This crashes the source reader even though the correct behavior is to 
> silently skip partitions that are no longer assigned — pausing or resuming an 
> unassigned partition is a no-op by definition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to