[
https://issues.apache.org/jira/browse/FLINK-39728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18082834#comment-18082834
]
Aleksandr Savonin commented on FLINK-39728:
-------------------------------------------
Hi, thank you for the fix.
One small observation on the description, in case it helps future readers:
The pause/resume command is enqueued by the SourceOperator (mailbox thread) and
executed later by the SplitFetcher thread. Between those two events, the
SplitFetcher itself may run a {{fetch()}} iteration that unassigns finished
splits, or process an {{AddSplitsTask}} whose {{removeEmptySplits()}} unassigns
empty ones. When the queued command then executes, the targeted partition may
no longer be in the consumer's assignment.
That’s why I find “concurrently” a bit misleading: the unassign and the
pause/resume call are sequential on the SplitFetcher thread, not parallel. The
race is between the mailbox thread’s view at enqueue time and the
SplitFetcher’s state at execution time.
But it does not affect the proposed fix.
> KafkaPartitionSplitReader.pauseOrResumeSplits throws IllegalStateException on
> concurrently unassigned partitions
> ----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39728
> URL: https://issues.apache.org/jira/browse/FLINK-39728
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Reporter: Jim Hughes
> Assignee: Jim Hughes
> Priority: Major
> Labels: pull-request-available
>
> KafkaPartitionSplitReader.pauseOrResumeSplits throws IllegalStateException on
> concurrently unassigned partitions
> KafkaPartitionSplitReader.pauseOrResumeSplits() calls consumer.pause() and
> consumer.resume() with the full set of requested partitions without checking
> whether those partitions are still part of the consumer's current assignment.
> Between the time the source reader decides which splits to pause/resume and
> the time pauseOrResumeSplits() executes, a partition can be unassigned by a
> concurrent fetch() call (which removes finished splits) or by
> removeEmptySplits(). When this happens, the Kafka consumer throws:
> {code}
> java.lang.IllegalStateException: No current assignment for partition
> <topic>-<partition>
> {code}
> This crashes the source reader even though the correct behavior is to
> silently skip partitions that are no longer assigned — pausing or resuming an
> unassigned partition is a no-op by definition.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)