Hi Efrat, 1. Please add the default value of scan.topic-integrity-check.enabled in the public apis. The default value should be the same with current behavior. 2. I don't know why `Topic pattern` is out of pattern? If a topic is dropped and then created, I think every source pattern keeps the same behavior(still read new topics or throwing exceptions based on scan.topic-integrity-check.enabled) 3. Same as Savonin , I am also curious about State migration. 4. "partition discovery must be enabled (KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS set to a positive number)". I don't think scan.topic-integrity-check.enabled) should be bonded together with partition discovery . Even PARTITION_DISCOVERY_INTERVAL_MS = -1, each time restart the job, still will try to discover new partitions, thus still need to check topic integrity. 5. What happens if the user: 1. remove the topic from `topic` or `topic-pattern` 2. remove and recreate the topic in kafka 3. In the next start, re-add this `topic` or `topic-pattern`? And what the state change in each step?
Best, Hongshun On Mon, May 4, 2026 at 8:10 PM Aleksandr Savonin <[email protected]> wrote: > Hi Efrat, > Thanks for the amazing work on the FLIP, the update and for descoping > the FLIP! I think the new direction looks much cleaner. > I have a few follow-up questions: > 1. State migration: restoring from a pre-FLIP checkpoint. What happens > when a user upgrades to a connector version that supports this FLIP, > enables enableTopicIntegrityCheck(), and resumes from an existing > checkpoint that has no persisted topic IDs? > 2. Could you please clarify about excluded topic-pattern subscribers? > The FLIP rationale ("pattern-based subscriptions inherently accept any > matching topic") justifies tolerating new topics that begin matching > the pattern, but I don't think it justifies accepting a > previously-known topic being deleted and recreated under the same > name. From a user's perspective, a checkpointed offset for a > previously-known topic is just as invalid after recreation regardless > of whether the topic was reached via a list or a pattern. Could > pattern subscribers reuse the same logic to allow new names freely, > but enforce ID stability for already-known names? > 3. Where will topic IDs live in the enumerator state? I think the FLIP > would benefit from a concrete datamodel "sketch". > 4. Bounded sources: Doesn't KafkaSourceBuilder override > partition.discovery.interval.ms to -1 whenever setBounded(...) is > called, with no opt-out? The FLIP states that with discovery disabled, > the integrity check runs only once on startup. For long-running > bounded jobs (e.g. backfill against a snapshot up to `latest()`), > recreation mid-job is exactly the scenario this FLIP exists to detect, > and a single startup check feels insufficient. Should bounded sources > be allowed to opt into periodic checks when integrity verification is > enabled, or is this an explicitly accepted limitation? Maybe Im > missing something. > > Thanks again, > Aleksandr Savonin > > On Tue, 21 Apr 2026 at 13:42, Efrat Levitan <[email protected]> wrote: > > > > Hi everyone, > > Following the feedback and offline discussions, I came to realize > descoping > > FLIP-562 to source topics will have better ROI. > > As source topic integrity presents the pressing risk to job recovery, and > > sink topics recreation, while inflicting inevitable incompleteness, does > > not. > > I'd like to defer the sink topics integrity work until a sink coordinator > > is introduced, which can be discussed orthogonal to FLIP-562. > > > > I updated the FLIP to reflect the API changes suggested by Rui and > Leonard, > > so a user is not asked to provide a topicId anymore [1], and descoped to > > source topics, with a note about sink topics [2]. > > > > I'd appreciate your feedback on the updated proposal. > > > > Thanks, > > Efrat. > > > > [1] > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-PublicInterfaces > > [2] > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-Sinktopicsintegrity > > > > > > On Sun, 8 Feb 2026 at 20:30, Efrat Levitan <[email protected]> > wrote: > > > > > Hi everyone, thank you for reviewing the FLIP. > > > > > > > 1. Manually specifying the topic ID seems to be > > > > a bit costly for users. Why does it require flink users to pass the > topic > > > > id > > > > manually or explicitly? It would be helpful for users if flink > source or > > > > sink > > > > are able to fetch topic id automatically in the beginning, right? > > > > > > Thanks Rui for your suggestion. > > > Topic ID auto-retrieval indeed sounds like a better UX, I will update > the > > > FLIP accordingly > > > However a downside to be aware of is that topic names to IDs mapping > must > > > be checkpointed to retain the same ID across recoveries, > > > inflicting statefulness on so-far stateless operators like the > > > non-transactional sink. > > > > > > > 2. On the Sink side, the proposal uses > "setPartitionDiscoveryIntervalMs". > > > > Since "Partition Discovery" in Flink implies Source behavior (finding > > > > splits), and the Sink is doing a metadata update, have you considered > > > > "setMetadataRefreshIntervalMs"? Maybe that aligns better with the > > > > underlying Kafka Client terminology (Metadata Fetch/Lookup/Refresh). > > > > > > > introducing “Partition Discovery” for sinks feels conceptually > > > inconsistent with Flink’s existing design. > > > > > > Thanks Aleksandr and Leonard > > > The concept of partition discovery isn’t tied with source. > > > It was introduced to address the increase of partitions count, which > may > > > happen on both source and sink topics. > > > Assigning splits to the newly found partitions is a source > implementation > > > detail, and is also not on the docs. > > > I decided to advertise the feature as "sink partition discovery" to > help > > > folks already familiar with the concept better understand the > implications. > > > What do you think? > > > > > > > 3. Is it correct that enabling partition discovery is strictly > required > > > for > > > > the integrity check? > > > > > > The integrity verification is performed upon metadata refresh. > > > On both source and sink, disabling partition discovery means metadata > > > isn't refreshed, > > > so integrity check will be performed once on startup. > > > > > > > 4. The disclaimer mentions an "inevitable short period of time" > where the > > > > job reads/writes to the new topic before detection. If a checkpoint > > > > completes successfully during this window, do we risk to have a > corrupted > > > > state? Is this a known limitation? > > > > > > The risk of data corruption upon topic recreation is discussed on the > FLIP. > > > We can not save users from themselves. After all, if they decide to > > > recreate a topic, the data is lost. > > > Indeed the proposed check can not guarantee 100% protection due to its > > > periodical nature. > > > Nevertheless the job will fail shortly after, and jobs restored from > that > > > checkpoint will not be allowed to start. > > > For more sensitive jobs you could obviously decrease the interval, > > > (trading performance) > > > > > > > 5. Will there be metrics exposed for monitoring the integrity checks? > > > We will not measure integrity checks, as a failure immediately > triggers a > > > global unrecoverable failure. > > > > > > Efrat > > > > > > On Tue, 20 Jan 2026 at 05:58, Leonard Xu <[email protected]> wrote: > > > > > >> Hi Efrat, > > >> > > >> Thanks for kicking off this discussion — I’m also in favor of adding a > > >> check for the Kafka Connector. > > >> > > >> > > >> > My only concern is that manually specifying the topic ID seems to be > > >> > a bit costly for users. Why does it require flink users to pass the > > >> topic > > >> > id > > >> > manually or explicitly? It would be helpful for users if flink > source or > > >> > sink > > >> > are able to fetch topic id automatically in the beginning, right? > > >> > > >> I share Rui’s concern about requiring users to manually specify the > > >> topicId. Given that topicId-based communication isn’t part of Kafka’s > > >> current roadmap (as noted in your “Future Plan” section) and is > starting > > >> available in Kafka 4.0–4.2, it seems premature—and potentially > > >> burdensome—to expose it in Flink’s user-facing API. Ideally, if > needed at > > >> all, the source or sink should be able to resolve the topicId > automatically > > >> during initialization. > > >> > > >> Additionally, I agree with Aleksandr that introducing “Partition > > >> Discovery” for sinks feels conceptually inconsistent with Flink’s > existing > > >> design. > > >> > > >> Best, > > >> Leonard > > >> > > >> > > > >> > On Thu, Jan 15, 2026 at 12:38 PM Efrat Levitan < > [email protected]> > > >> > wrote: > > >> > > > >> >> Hi everyone, I'd like to start a discussion on FLIP-562 [1] to > > >> implement > > >> >> topic integrity checks on kafka connector, as currently the > connector > > >> is > > >> >> blind to topic recreations, presenting risks to job consistency. > > >> >> > > >> >> Kafka APIs traditionally rely on topic names, which do not > guarantee > > >> >> uniqueness over time. > > >> >> Though both KIP-516 [2] and KIP-848 [3] discuss topicId based > > >> >> communication, client support is not on the roadmap [4]. > > >> >> > > >> >> The FLIP contains the new proposal to make flink kafka connector > > >> sensitive > > >> >> to topicId changes through integrity checks over the periodical > > >> metadata > > >> >> fetching (AKA topic partition discovery), and sets the grounds for > > >> future > > >> >> topicId based communication with both the user and kafka server. > > >> >> > > >> >> I'd appreciate your feedback on the proposed changes. > > >> >> > > >> >> Thanks, > > >> >> Efrat. > > >> >> > > >> >> [1] > > >> >> > > >> >> > > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-562%3A+Topic+integrity+checks+in+Kafka+Connector > > >> >> > > >> >> [2] > > >> >> > > >> >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers > > >> >> > > >> >> [3] > > >> >> > > >> >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol > > >> >> > > >> >> [4] > > >> >> > > >> >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers/#KIP516:TopicIdentifiers-Clients > > >> >> > > >> > > >> > > > > -- > Kind regards, > Aleksandr >
