Hey all, If no further concerns are raised in the upcoming week I will open the FLIP for voting. Thank you for your feedback, Efrat
On Sun, 17 May 2026 at 20:24, Efrat Levitan <[email protected]> wrote: > > Hi Hongshun and Aleksandr > Thank you for taking another pass at the FLIP. > I updated it with additional context regarding state, testing and the default > state of the feature (disabled) > > State & Migration > I plan to introduce a `topicIntegrityMapping` property to > `KafkaSourceEnumState`, > When topic integrity is enabled, that field will be used for topic name -> id > mapping > Otherwise the map is empty, so there's no excessive state by default. > Additionally if the user enables the feature and later decides to disable it > for a job, the additional state is wiped. > I plan to have migration tests to KafkaSourceEnumStateSerializerTest to > ensure backward compatibility. > > Pattern-based subscription > Considering the new mechanism for obtaining topicIds from kafka server I > agree we can include pattern based subscription in the FLIP scope. > I removed it from the out of scope section. > > Bounded sources > As pointed out in the FLIP, continuous integrity is coupled to partition > discovery, because it reuses the fetched metadata, decoupling the two will > come with a trade off of an additional periodic metadata call. > However the 1st metadata call during job startup (from either scratch or > recovery) is performed regardless, so for a bounded source, topic integrity > check is performed at least once if enabled. (and never otherwise) > > > 5. What happens if the user: 1. remove the topic from `topic` or > `topic-pattern` 2. remove and recreate the topic in kafka 3. In the > next start, re-add this `topic` or `topic-pattern`? And what the state > change in each step? > > You are asking about tracking Ids for topics the app no longer uses. > Reliability-wise we could claim that topics that were used by the job should > be verified forever. > However while topic recreation is a common scenario, expecting the app to > track a growing list of unused topics is an expensive corner case to handle > Implying multiple metadata calls per interval in case of pattern based > subscription, or if the cluster was replaced. > Therefore to the question, if a user removes topics from the job source > declaration, topic integrity will no longer track them, hence after step 3 > the job will not fail. > State changes (given the job committed at least 1 checkpoint before/while > stopped, otherwise the state will not be refreshed to catch up with the new > configurations): > 1: Topic name -> id entry for the topic is removed from the checkpointed > state (as irrelevant) > 2: - > 3: A new topic name -> id entry is stored for the registered topic > Eager to know your thoughts on this. > > Thanks, > Efrat > > > On Wed, 6 May 2026 at 10:59, Hongshun Wang <[email protected]> wrote: >> >> Hi Efrat, >> 1. Please add the default value of scan.topic-integrity-check.enabled in >> the public apis. The default value should be the same with current behavior. >> 2. I don't know why `Topic pattern` is out of pattern? If a topic is >> dropped and then created, I think every source pattern keeps the same >> behavior(still read new topics or throwing exceptions based on >> scan.topic-integrity-check.enabled) >> 3. Same as Savonin , I am also curious about State migration. >> 4. "partition discovery must be enabled >> (KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS set to a positive >> number)". I don't think scan.topic-integrity-check.enabled) should be >> bonded together with partition discovery . Even >> PARTITION_DISCOVERY_INTERVAL_MS = -1, each time restart the job, still will >> try to discover new partitions, thus still need to check topic integrity. >> 5. What happens if the user: 1. remove the topic from `topic` or >> `topic-pattern` 2. remove and recreate the topic in kafka 3. In the >> next start, re-add this `topic` or `topic-pattern`? And what the state >> change in each step? >> >> Best, >> Hongshun >> >> On Mon, May 4, 2026 at 8:10 PM Aleksandr Savonin <[email protected]> >> wrote: >> >> > Hi Efrat, >> > Thanks for the amazing work on the FLIP, the update and for descoping >> > the FLIP! I think the new direction looks much cleaner. >> > I have a few follow-up questions: >> > 1. State migration: restoring from a pre-FLIP checkpoint. What happens >> > when a user upgrades to a connector version that supports this FLIP, >> > enables enableTopicIntegrityCheck(), and resumes from an existing >> > checkpoint that has no persisted topic IDs? >> > 2. Could you please clarify about excluded topic-pattern subscribers? >> > The FLIP rationale ("pattern-based subscriptions inherently accept any >> > matching topic") justifies tolerating new topics that begin matching >> > the pattern, but I don't think it justifies accepting a >> > previously-known topic being deleted and recreated under the same >> > name. From a user's perspective, a checkpointed offset for a >> > previously-known topic is just as invalid after recreation regardless >> > of whether the topic was reached via a list or a pattern. Could >> > pattern subscribers reuse the same logic to allow new names freely, >> > but enforce ID stability for already-known names? >> > 3. Where will topic IDs live in the enumerator state? I think the FLIP >> > would benefit from a concrete datamodel "sketch". >> > 4. Bounded sources: Doesn't KafkaSourceBuilder override >> > partition.discovery.interval.ms to -1 whenever setBounded(...) is >> > called, with no opt-out? The FLIP states that with discovery disabled, >> > the integrity check runs only once on startup. For long-running >> > bounded jobs (e.g. backfill against a snapshot up to `latest()`), >> > recreation mid-job is exactly the scenario this FLIP exists to detect, >> > and a single startup check feels insufficient. Should bounded sources >> > be allowed to opt into periodic checks when integrity verification is >> > enabled, or is this an explicitly accepted limitation? Maybe Im >> > missing something. >> > >> > Thanks again, >> > Aleksandr Savonin >> > >> > On Tue, 21 Apr 2026 at 13:42, Efrat Levitan <[email protected]> wrote: >> > > >> > > Hi everyone, >> > > Following the feedback and offline discussions, I came to realize >> > descoping >> > > FLIP-562 to source topics will have better ROI. >> > > As source topic integrity presents the pressing risk to job recovery, and >> > > sink topics recreation, while inflicting inevitable incompleteness, does >> > > not. >> > > I'd like to defer the sink topics integrity work until a sink coordinator >> > > is introduced, which can be discussed orthogonal to FLIP-562. >> > > >> > > I updated the FLIP to reflect the API changes suggested by Rui and >> > Leonard, >> > > so a user is not asked to provide a topicId anymore [1], and descoped to >> > > source topics, with a note about sink topics [2]. >> > > >> > > I'd appreciate your feedback on the updated proposal. >> > > >> > > Thanks, >> > > Efrat. >> > > >> > > [1] >> > > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-PublicInterfaces >> > > [2] >> > > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-Sinktopicsintegrity >> > > >> > > >> > > On Sun, 8 Feb 2026 at 20:30, Efrat Levitan <[email protected]> >> > wrote: >> > > >> > > > Hi everyone, thank you for reviewing the FLIP. >> > > > >> > > > > 1. Manually specifying the topic ID seems to be >> > > > > a bit costly for users. Why does it require flink users to pass the >> > topic >> > > > > id >> > > > > manually or explicitly? It would be helpful for users if flink >> > source or >> > > > > sink >> > > > > are able to fetch topic id automatically in the beginning, right? >> > > > >> > > > Thanks Rui for your suggestion. >> > > > Topic ID auto-retrieval indeed sounds like a better UX, I will update >> > the >> > > > FLIP accordingly >> > > > However a downside to be aware of is that topic names to IDs mapping >> > must >> > > > be checkpointed to retain the same ID across recoveries, >> > > > inflicting statefulness on so-far stateless operators like the >> > > > non-transactional sink. >> > > > >> > > > > 2. On the Sink side, the proposal uses >> > "setPartitionDiscoveryIntervalMs". >> > > > > Since "Partition Discovery" in Flink implies Source behavior (finding >> > > > > splits), and the Sink is doing a metadata update, have you considered >> > > > > "setMetadataRefreshIntervalMs"? Maybe that aligns better with the >> > > > > underlying Kafka Client terminology (Metadata Fetch/Lookup/Refresh). >> > > > >> > > > > introducing “Partition Discovery” for sinks feels conceptually >> > > > inconsistent with Flink’s existing design. >> > > > >> > > > Thanks Aleksandr and Leonard >> > > > The concept of partition discovery isn’t tied with source. >> > > > It was introduced to address the increase of partitions count, which >> > may >> > > > happen on both source and sink topics. >> > > > Assigning splits to the newly found partitions is a source >> > implementation >> > > > detail, and is also not on the docs. >> > > > I decided to advertise the feature as "sink partition discovery" to >> > help >> > > > folks already familiar with the concept better understand the >> > implications. >> > > > What do you think? >> > > > >> > > > > 3. Is it correct that enabling partition discovery is strictly >> > required >> > > > for >> > > > > the integrity check? >> > > > >> > > > The integrity verification is performed upon metadata refresh. >> > > > On both source and sink, disabling partition discovery means metadata >> > > > isn't refreshed, >> > > > so integrity check will be performed once on startup. >> > > > >> > > > > 4. The disclaimer mentions an "inevitable short period of time" >> > where the >> > > > > job reads/writes to the new topic before detection. If a checkpoint >> > > > > completes successfully during this window, do we risk to have a >> > corrupted >> > > > > state? Is this a known limitation? >> > > > >> > > > The risk of data corruption upon topic recreation is discussed on the >> > FLIP. >> > > > We can not save users from themselves. After all, if they decide to >> > > > recreate a topic, the data is lost. >> > > > Indeed the proposed check can not guarantee 100% protection due to its >> > > > periodical nature. >> > > > Nevertheless the job will fail shortly after, and jobs restored from >> > that >> > > > checkpoint will not be allowed to start. >> > > > For more sensitive jobs you could obviously decrease the interval, >> > > > (trading performance) >> > > > >> > > > > 5. Will there be metrics exposed for monitoring the integrity checks? >> > > > We will not measure integrity checks, as a failure immediately >> > triggers a >> > > > global unrecoverable failure. >> > > > >> > > > Efrat >> > > > >> > > > On Tue, 20 Jan 2026 at 05:58, Leonard Xu <[email protected]> wrote: >> > > > >> > > >> Hi Efrat, >> > > >> >> > > >> Thanks for kicking off this discussion — I’m also in favor of adding a >> > > >> check for the Kafka Connector. >> > > >> >> > > >> >> > > >> > My only concern is that manually specifying the topic ID seems to be >> > > >> > a bit costly for users. Why does it require flink users to pass the >> > > >> topic >> > > >> > id >> > > >> > manually or explicitly? It would be helpful for users if flink >> > source or >> > > >> > sink >> > > >> > are able to fetch topic id automatically in the beginning, right? >> > > >> >> > > >> I share Rui’s concern about requiring users to manually specify the >> > > >> topicId. Given that topicId-based communication isn’t part of Kafka’s >> > > >> current roadmap (as noted in your “Future Plan” section) and is >> > starting >> > > >> available in Kafka 4.0–4.2, it seems premature—and potentially >> > > >> burdensome—to expose it in Flink’s user-facing API. Ideally, if >> > needed at >> > > >> all, the source or sink should be able to resolve the topicId >> > automatically >> > > >> during initialization. >> > > >> >> > > >> Additionally, I agree with Aleksandr that introducing “Partition >> > > >> Discovery” for sinks feels conceptually inconsistent with Flink’s >> > existing >> > > >> design. >> > > >> >> > > >> Best, >> > > >> Leonard >> > > >> >> > > >> > >> > > >> > On Thu, Jan 15, 2026 at 12:38 PM Efrat Levitan < >> > [email protected]> >> > > >> > wrote: >> > > >> > >> > > >> >> Hi everyone, I'd like to start a discussion on FLIP-562 [1] to >> > > >> implement >> > > >> >> topic integrity checks on kafka connector, as currently the >> > connector >> > > >> is >> > > >> >> blind to topic recreations, presenting risks to job consistency. >> > > >> >> >> > > >> >> Kafka APIs traditionally rely on topic names, which do not >> > guarantee >> > > >> >> uniqueness over time. >> > > >> >> Though both KIP-516 [2] and KIP-848 [3] discuss topicId based >> > > >> >> communication, client support is not on the roadmap [4]. >> > > >> >> >> > > >> >> The FLIP contains the new proposal to make flink kafka connector >> > > >> sensitive >> > > >> >> to topicId changes through integrity checks over the periodical >> > > >> metadata >> > > >> >> fetching (AKA topic partition discovery), and sets the grounds for >> > > >> future >> > > >> >> topicId based communication with both the user and kafka server. >> > > >> >> >> > > >> >> I'd appreciate your feedback on the proposed changes. >> > > >> >> >> > > >> >> Thanks, >> > > >> >> Efrat. >> > > >> >> >> > > >> >> [1] >> > > >> >> >> > > >> >> >> > > >> >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-562%3A+Topic+integrity+checks+in+Kafka+Connector >> > > >> >> >> > > >> >> [2] >> > > >> >> >> > > >> >> >> > > >> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers >> > > >> >> >> > > >> >> [3] >> > > >> >> >> > > >> >> >> > > >> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol >> > > >> >> >> > > >> >> [4] >> > > >> >> >> > > >> >> >> > > >> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers/#KIP516:TopicIdentifiers-Clients >> > > >> >> >> > > >> >> > > >> >> > >> > >> > >> > -- >> > Kind regards, >> > Aleksandr >> >
