[
https://issues.apache.org/jira/browse/FLINK-39435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18079613#comment-18079613
]
Hongshun Wang commented on FLINK-39435:
---------------------------------------
I don't think this is an issue. As we discussed previously, if you set the
offset to "latest," the expectation is that it should start from the point of
the initial launch, not from a dynamically shifting "latest" position with
every subsequent restart. Furthermore, if you switch topics, why not simply
start without restoring state? I fail to understand the rationale behind state
recovery in this scenario. If you want to change this behavior, we can discuss
in dev mails.
> [flink-connector-kafka] Switching Kafka topics via Savepoint restore causes
> dual-topic consumption AND new topic consumed from earliest instead of
> configured offset
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39435
> URL: https://issues.apache.org/jira/browse/FLINK-39435
> Project: Flink
> Issue Type: Bug
> Environment: - Flink version: 1.17.0
> Reporter: sherlock-lin
> Priority: Major
> Labels: bug, flink-connector-kafka
> Attachments: image-2026-04-14-17-15-57-350.png,
> image-2026-04-14-17-16-33-680.png, image-2026-04-30-11-37-02-642.png
>
>
> In `KafkaSourceEnumerator.handlePartitionSplitChanges()`, there is a
> long-standing `TODO` comment:
> !image-2026-04-14-17-16-33-680.png!
>
> Two separate bugs are triggered simultaneously:
> *Bug 1 — Dual-topic consumption (old topic is never stopped)*
> The job consumes from both `topic-A` and `topic-B` simultaneously:
> - `topic-A` splits come from the Reader State restored from Savepoint
> (`SourceOperator.open()` → `sourceReader.addSplits(restoredSplits)`).
> - `topic-B` splits come from the Enumerator discovering new partitions
> (`initializePartitionSplits()` → `assignSplits()`).
> - In `KafkaPartitionSplitReader.handleSplitsChanges()`, line:
> ```java
> newPartitionAssignments.addAll(consumer.assignment()); // merges, does NOT
> replace
> consumer.assign(newPartitionAssignments);
> ```
> The old topic's partitions remain in the consumer assignment permanently. The
> Enumerator detects the removed partitions in `getPartitionChange()` but does
> nothing with them (`// TODO: Handle removed partitions.`).
> *Bug 2 — New topic consumed from earliest instead of configured `latest()`*
> Even though the job is configured with
> `.setStartingOffsets(OffsetsInitializer.latest())`, the new topic (`topic-B`)
> is consumed from the earliest available offset.
> Log evidence:
> ```
> INFO KafkaSourceEnumerator - Assigning splits to readers
> {57=[[Partition: topic-B-0, StartingOffset: -2, StoppingOffset:
> -9223372036854775808], ...]}
> ```
> Sentinel `-2` = `EARLIEST_OFFSET`, not `-1` (`LATEST_OFFSET`) as configured.
> Root cause: `setStartingOffsets()` only takes effect for truly new partitions
> discovered by the Enumerator. During Savepoint restore, the Reader State
> injects old splits directly into the Reader via `SourceOperator.open() →
> sourceReader.addSplits(restoredSplits)`, bypassing `setStartingOffsets()`
> entirely. The default `startingOffsetsInitializer` in `KafkaSourceBuilder`
> (line 106) is `OffsetsInitializer.earliest()`, and for partitions not found
> in Reader State this default is used instead of the user-configured value.
> *Expected behavior*
> **For Bug 1:** When `getPartitionChange()` detects `removedPartitions`, the
> Enumerator should send a signal to the affected Readers to **unassign** those
> partitions from their `KafkaConsumer`, so the job only consumes from the
> newly subscribed topic.
> **For Bug 2:** When the Enumerator assigns splits for newly discovered
> partitions (i.e., partitions not present in any restored Reader State), the
> `startingOffset` in those splits should reflect the **user-configured**
> `setStartingOffsets()` value (e.g., `-1` for `latest()`), rather than falling
> back to the `KafkaSourceBuilder` default (`earliest()`). The user-configured
> initializer should take precedence over the builder default at all times,
> including during Savepoint restore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)