[
https://issues.apache.org/jira/browse/FLINK-39435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18079302#comment-18079302
]
sherlock-lin commented on FLINK-39435:
--------------------------------------
I understand the intention behind this design is good, however the drawback is
the lack of Topic isolation. When switching the source Topic and restoring from
state, even if setStartingOffsets() is
configured to latest, the new Topic will still start consuming from the
earliest offset — because the stale state from the old Topic gets picked up and
applied. This is the root cause of the issue.
> [flink-connector-kafka] Switching Kafka topics via Savepoint restore causes
> dual-topic consumption AND new topic consumed from earliest instead of
> configured offset
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39435
> URL: https://issues.apache.org/jira/browse/FLINK-39435
> Project: Flink
> Issue Type: Bug
> Environment: - Flink version: 1.17.0
> Reporter: sherlock-lin
> Priority: Major
> Labels: bug, flink-connector-kafka
> Attachments: image-2026-04-14-17-15-57-350.png,
> image-2026-04-14-17-16-33-680.png, image-2026-04-30-11-37-02-642.png
>
>
> In `KafkaSourceEnumerator.handlePartitionSplitChanges()`, there is a
> long-standing `TODO` comment:
> !image-2026-04-14-17-16-33-680.png!
>
> Two separate bugs are triggered simultaneously:
> *Bug 1 — Dual-topic consumption (old topic is never stopped)*
> The job consumes from both `topic-A` and `topic-B` simultaneously:
> - `topic-A` splits come from the Reader State restored from Savepoint
> (`SourceOperator.open()` → `sourceReader.addSplits(restoredSplits)`).
> - `topic-B` splits come from the Enumerator discovering new partitions
> (`initializePartitionSplits()` → `assignSplits()`).
> - In `KafkaPartitionSplitReader.handleSplitsChanges()`, line:
> ```java
> newPartitionAssignments.addAll(consumer.assignment()); // merges, does NOT
> replace
> consumer.assign(newPartitionAssignments);
> ```
> The old topic's partitions remain in the consumer assignment permanently. The
> Enumerator detects the removed partitions in `getPartitionChange()` but does
> nothing with them (`// TODO: Handle removed partitions.`).
> *Bug 2 — New topic consumed from earliest instead of configured `latest()`*
> Even though the job is configured with
> `.setStartingOffsets(OffsetsInitializer.latest())`, the new topic (`topic-B`)
> is consumed from the earliest available offset.
> Log evidence:
> ```
> INFO KafkaSourceEnumerator - Assigning splits to readers
> {57=[[Partition: topic-B-0, StartingOffset: -2, StoppingOffset:
> -9223372036854775808], ...]}
> ```
> Sentinel `-2` = `EARLIEST_OFFSET`, not `-1` (`LATEST_OFFSET`) as configured.
> Root cause: `setStartingOffsets()` only takes effect for truly new partitions
> discovered by the Enumerator. During Savepoint restore, the Reader State
> injects old splits directly into the Reader via `SourceOperator.open() →
> sourceReader.addSplits(restoredSplits)`, bypassing `setStartingOffsets()`
> entirely. The default `startingOffsetsInitializer` in `KafkaSourceBuilder`
> (line 106) is `OffsetsInitializer.earliest()`, and for partitions not found
> in Reader State this default is used instead of the user-configured value.
> *Expected behavior*
> **For Bug 1:** When `getPartitionChange()` detects `removedPartitions`, the
> Enumerator should send a signal to the affected Readers to **unassign** those
> partitions from their `KafkaConsumer`, so the job only consumes from the
> newly subscribed topic.
> **For Bug 2:** When the Enumerator assigns splits for newly discovered
> partitions (i.e., partitions not present in any restored Reader State), the
> `startingOffset` in those splits should reflect the **user-configured**
> `setStartingOffsets()` value (e.g., `-1` for `latest()`), rather than falling
> back to the `KafkaSourceBuilder` default (`earliest()`). The user-configured
> initializer should take precedence over the builder default at all times,
> including during Savepoint restore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)