Hi Hongshun and Aleksandr Thank you for taking another pass at the FLIP. I updated it with additional context regarding state, testing and the default state of the feature (disabled)
State & Migration I plan to introduce a `topicIntegrityMapping` property to `KafkaSourceEnumState`, When topic integrity is enabled, that field will be used for topic name -> id mapping Otherwise the map is empty, so there's no excessive state by default. Additionally if the user enables the feature and later decides to disable it for a job, the additional state is wiped. I plan to have migration tests to KafkaSourceEnumStateSerializerTest to ensure backward compatibility. Pattern-based subscription Considering the new mechanism for obtaining topicIds from kafka server I agree we can include pattern based subscription in the FLIP scope. I removed it from the out of scope section. Bounded sources As pointed out in the FLIP, continuous integrity is coupled to partition discovery, because it reuses the fetched metadata, decoupling the two will come with a trade off of an additional periodic metadata call. However the 1st metadata call during job startup (from either scratch or recovery) is performed regardless, so for a bounded source, topic integrity check is performed at least once if enabled. (and never otherwise) > 5. What happens if the user: 1. remove the topic from `topic` or `topic-pattern` 2. remove and recreate the topic in kafka 3. In the next start, re-add this `topic` or `topic-pattern`? And what the state change in each step? You are asking about tracking Ids for topics the app no longer uses. Reliability-wise we could claim that topics that were used by the job should be verified forever. However while topic recreation is a common scenario, expecting the app to track a growing list of unused topics is an expensive corner case to handle Implying multiple metadata calls per interval in case of pattern based subscription, or if the cluster was replaced. Therefore to the question, if a user removes topics from the job source declaration, topic integrity will no longer track them, hence after step 3 the job will not fail. State changes (given the job committed at least 1 checkpoint before/while stopped, otherwise the state will not be refreshed to catch up with the new configurations): 1: Topic name -> id entry for the topic is removed from the checkpointed state (as irrelevant) 2: - 3: A new topic name -> id entry is stored for the registered topic Eager to know your thoughts on this. Thanks, Efrat On Wed, 6 May 2026 at 10:59, Hongshun Wang <[email protected]> wrote: > Hi Efrat, > 1. Please add the default value of scan.topic-integrity-check.enabled in > the public apis. The default value should be the same with current > behavior. > 2. I don't know why `Topic pattern` is out of pattern? If a topic is > dropped and then created, I think every source pattern keeps the same > behavior(still read new topics or throwing exceptions based on > scan.topic-integrity-check.enabled) > 3. Same as Savonin , I am also curious about State migration. > 4. "partition discovery must be enabled > (KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS set to a positive > number)". I don't think scan.topic-integrity-check.enabled) should be > bonded together with partition discovery . Even > PARTITION_DISCOVERY_INTERVAL_MS = -1, each time restart the job, still will > try to discover new partitions, thus still need to check topic integrity. > 5. What happens if the user: 1. remove the topic from `topic` or > `topic-pattern` 2. remove and recreate the topic in kafka 3. In the > next start, re-add this `topic` or `topic-pattern`? And what the state > change in each step? > > Best, > Hongshun > > On Mon, May 4, 2026 at 8:10 PM Aleksandr Savonin <[email protected]> > wrote: > > > Hi Efrat, > > Thanks for the amazing work on the FLIP, the update and for descoping > > the FLIP! I think the new direction looks much cleaner. > > I have a few follow-up questions: > > 1. State migration: restoring from a pre-FLIP checkpoint. What happens > > when a user upgrades to a connector version that supports this FLIP, > > enables enableTopicIntegrityCheck(), and resumes from an existing > > checkpoint that has no persisted topic IDs? > > 2. Could you please clarify about excluded topic-pattern subscribers? > > The FLIP rationale ("pattern-based subscriptions inherently accept any > > matching topic") justifies tolerating new topics that begin matching > > the pattern, but I don't think it justifies accepting a > > previously-known topic being deleted and recreated under the same > > name. From a user's perspective, a checkpointed offset for a > > previously-known topic is just as invalid after recreation regardless > > of whether the topic was reached via a list or a pattern. Could > > pattern subscribers reuse the same logic to allow new names freely, > > but enforce ID stability for already-known names? > > 3. Where will topic IDs live in the enumerator state? I think the FLIP > > would benefit from a concrete datamodel "sketch". > > 4. Bounded sources: Doesn't KafkaSourceBuilder override > > partition.discovery.interval.ms to -1 whenever setBounded(...) is > > called, with no opt-out? The FLIP states that with discovery disabled, > > the integrity check runs only once on startup. For long-running > > bounded jobs (e.g. backfill against a snapshot up to `latest()`), > > recreation mid-job is exactly the scenario this FLIP exists to detect, > > and a single startup check feels insufficient. Should bounded sources > > be allowed to opt into periodic checks when integrity verification is > > enabled, or is this an explicitly accepted limitation? Maybe Im > > missing something. > > > > Thanks again, > > Aleksandr Savonin > > > > On Tue, 21 Apr 2026 at 13:42, Efrat Levitan <[email protected]> > wrote: > > > > > > Hi everyone, > > > Following the feedback and offline discussions, I came to realize > > descoping > > > FLIP-562 to source topics will have better ROI. > > > As source topic integrity presents the pressing risk to job recovery, > and > > > sink topics recreation, while inflicting inevitable incompleteness, > does > > > not. > > > I'd like to defer the sink topics integrity work until a sink > coordinator > > > is introduced, which can be discussed orthogonal to FLIP-562. > > > > > > I updated the FLIP to reflect the API changes suggested by Rui and > > Leonard, > > > so a user is not asked to provide a topicId anymore [1], and descoped > to > > > source topics, with a note about sink topics [2]. > > > > > > I'd appreciate your feedback on the updated proposal. > > > > > > Thanks, > > > Efrat. > > > > > > [1] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-PublicInterfaces > > > [2] > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-Sinktopicsintegrity > > > > > > > > > On Sun, 8 Feb 2026 at 20:30, Efrat Levitan <[email protected]> > > wrote: > > > > > > > Hi everyone, thank you for reviewing the FLIP. > > > > > > > > > 1. Manually specifying the topic ID seems to be > > > > > a bit costly for users. Why does it require flink users to pass the > > topic > > > > > id > > > > > manually or explicitly? It would be helpful for users if flink > > source or > > > > > sink > > > > > are able to fetch topic id automatically in the beginning, right? > > > > > > > > Thanks Rui for your suggestion. > > > > Topic ID auto-retrieval indeed sounds like a better UX, I will update > > the > > > > FLIP accordingly > > > > However a downside to be aware of is that topic names to IDs mapping > > must > > > > be checkpointed to retain the same ID across recoveries, > > > > inflicting statefulness on so-far stateless operators like the > > > > non-transactional sink. > > > > > > > > > 2. On the Sink side, the proposal uses > > "setPartitionDiscoveryIntervalMs". > > > > > Since "Partition Discovery" in Flink implies Source behavior > (finding > > > > > splits), and the Sink is doing a metadata update, have you > considered > > > > > "setMetadataRefreshIntervalMs"? Maybe that aligns better with the > > > > > underlying Kafka Client terminology (Metadata > Fetch/Lookup/Refresh). > > > > > > > > > introducing “Partition Discovery” for sinks feels conceptually > > > > inconsistent with Flink’s existing design. > > > > > > > > Thanks Aleksandr and Leonard > > > > The concept of partition discovery isn’t tied with source. > > > > It was introduced to address the increase of partitions count, which > > may > > > > happen on both source and sink topics. > > > > Assigning splits to the newly found partitions is a source > > implementation > > > > detail, and is also not on the docs. > > > > I decided to advertise the feature as "sink partition discovery" to > > help > > > > folks already familiar with the concept better understand the > > implications. > > > > What do you think? > > > > > > > > > 3. Is it correct that enabling partition discovery is strictly > > required > > > > for > > > > > the integrity check? > > > > > > > > The integrity verification is performed upon metadata refresh. > > > > On both source and sink, disabling partition discovery means metadata > > > > isn't refreshed, > > > > so integrity check will be performed once on startup. > > > > > > > > > 4. The disclaimer mentions an "inevitable short period of time" > > where the > > > > > job reads/writes to the new topic before detection. If a checkpoint > > > > > completes successfully during this window, do we risk to have a > > corrupted > > > > > state? Is this a known limitation? > > > > > > > > The risk of data corruption upon topic recreation is discussed on the > > FLIP. > > > > We can not save users from themselves. After all, if they decide to > > > > recreate a topic, the data is lost. > > > > Indeed the proposed check can not guarantee 100% protection due to > its > > > > periodical nature. > > > > Nevertheless the job will fail shortly after, and jobs restored from > > that > > > > checkpoint will not be allowed to start. > > > > For more sensitive jobs you could obviously decrease the interval, > > > > (trading performance) > > > > > > > > > 5. Will there be metrics exposed for monitoring the integrity > checks? > > > > We will not measure integrity checks, as a failure immediately > > triggers a > > > > global unrecoverable failure. > > > > > > > > Efrat > > > > > > > > On Tue, 20 Jan 2026 at 05:58, Leonard Xu <[email protected]> wrote: > > > > > > > >> Hi Efrat, > > > >> > > > >> Thanks for kicking off this discussion — I’m also in favor of > adding a > > > >> check for the Kafka Connector. > > > >> > > > >> > > > >> > My only concern is that manually specifying the topic ID seems to > be > > > >> > a bit costly for users. Why does it require flink users to pass > the > > > >> topic > > > >> > id > > > >> > manually or explicitly? It would be helpful for users if flink > > source or > > > >> > sink > > > >> > are able to fetch topic id automatically in the beginning, right? > > > >> > > > >> I share Rui’s concern about requiring users to manually specify the > > > >> topicId. Given that topicId-based communication isn’t part of > Kafka’s > > > >> current roadmap (as noted in your “Future Plan” section) and is > > starting > > > >> available in Kafka 4.0–4.2, it seems premature—and potentially > > > >> burdensome—to expose it in Flink’s user-facing API. Ideally, if > > needed at > > > >> all, the source or sink should be able to resolve the topicId > > automatically > > > >> during initialization. > > > >> > > > >> Additionally, I agree with Aleksandr that introducing “Partition > > > >> Discovery” for sinks feels conceptually inconsistent with Flink’s > > existing > > > >> design. > > > >> > > > >> Best, > > > >> Leonard > > > >> > > > >> > > > > >> > On Thu, Jan 15, 2026 at 12:38 PM Efrat Levitan < > > [email protected]> > > > >> > wrote: > > > >> > > > > >> >> Hi everyone, I'd like to start a discussion on FLIP-562 [1] to > > > >> implement > > > >> >> topic integrity checks on kafka connector, as currently the > > connector > > > >> is > > > >> >> blind to topic recreations, presenting risks to job consistency. > > > >> >> > > > >> >> Kafka APIs traditionally rely on topic names, which do not > > guarantee > > > >> >> uniqueness over time. > > > >> >> Though both KIP-516 [2] and KIP-848 [3] discuss topicId based > > > >> >> communication, client support is not on the roadmap [4]. > > > >> >> > > > >> >> The FLIP contains the new proposal to make flink kafka connector > > > >> sensitive > > > >> >> to topicId changes through integrity checks over the periodical > > > >> metadata > > > >> >> fetching (AKA topic partition discovery), and sets the grounds > for > > > >> future > > > >> >> topicId based communication with both the user and kafka server. > > > >> >> > > > >> >> I'd appreciate your feedback on the proposed changes. > > > >> >> > > > >> >> Thanks, > > > >> >> Efrat. > > > >> >> > > > >> >> [1] > > > >> >> > > > >> >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-562%3A+Topic+integrity+checks+in+Kafka+Connector > > > >> >> > > > >> >> [2] > > > >> >> > > > >> >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers > > > >> >> > > > >> >> [3] > > > >> >> > > > >> >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol > > > >> >> > > > >> >> [4] > > > >> >> > > > >> >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers/#KIP516:TopicIdentifiers-Clients > > > >> >> > > > >> > > > >> > > > > > > > > -- > > Kind regards, > > Aleksandr > > >
