Hi Efrat,
Thanks for the amazing work on the FLIP, the update and for descoping
the FLIP! I think the new direction looks much cleaner.
I have a few follow-up questions:
1. State migration: restoring from a pre-FLIP checkpoint. What happens
when a user upgrades to a connector version that supports this FLIP,
enables enableTopicIntegrityCheck(), and resumes from an existing
checkpoint that has no persisted topic IDs?
2. Could you please clarify about excluded topic-pattern subscribers?
The FLIP rationale ("pattern-based subscriptions inherently accept any
matching topic") justifies tolerating new topics that begin matching
the pattern, but I don't think it justifies accepting a
previously-known topic being deleted and recreated under the same
name. From a user's perspective, a checkpointed offset for a
previously-known topic is just as invalid after recreation regardless
of whether the topic was reached via a list or a pattern. Could
pattern subscribers reuse the same logic to allow new names freely,
but enforce ID stability for already-known names?
3. Where will topic IDs live in the enumerator state? I think the FLIP
would benefit from a concrete datamodel "sketch".
4. Bounded sources: Doesn't KafkaSourceBuilder override
partition.discovery.interval.ms to -1 whenever setBounded(...) is
called, with no opt-out? The FLIP states that with discovery disabled,
the integrity check runs only once on startup. For long-running
bounded jobs (e.g. backfill against a snapshot up to `latest()`),
recreation mid-job is exactly the scenario this FLIP exists to detect,
and a single startup check feels insufficient. Should bounded sources
be allowed to opt into periodic checks when integrity verification is
enabled, or is this an explicitly accepted limitation?  Maybe Im
missing something.

Thanks again,
Aleksandr Savonin

On Tue, 21 Apr 2026 at 13:42, Efrat Levitan <[email protected]> wrote:
>
> Hi everyone,
> Following the feedback and offline discussions, I came to realize descoping
> FLIP-562 to source topics will have better ROI.
> As source topic integrity presents the pressing risk to job recovery, and
> sink topics recreation, while inflicting inevitable incompleteness, does
> not.
> I'd like to defer the sink topics integrity work until a sink coordinator
> is introduced, which can be discussed orthogonal to FLIP-562.
>
> I updated the FLIP to reflect the API changes suggested by Rui and Leonard,
> so a user is not asked to provide a topicId anymore [1], and descoped to
> source topics, with a note about sink topics [2].
>
> I'd appreciate your feedback on the updated proposal.
>
> Thanks,
> Efrat.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-PublicInterfaces
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=406619238#FLIP562:TopicintegritychecksinKafkaConnector-Sinktopicsintegrity
>
>
> On Sun, 8 Feb 2026 at 20:30, Efrat Levitan <[email protected]> wrote:
>
> > Hi everyone, thank you for reviewing the FLIP.
> >
> > > 1. Manually specifying the topic ID seems to be
> > > a bit costly for users. Why does it require flink users to pass the topic
> > > id
> > > manually or explicitly? It would be helpful for users if flink source or
> > > sink
> > > are able to fetch topic id automatically in the beginning, right?
> >
> > Thanks Rui for your suggestion.
> > Topic ID auto-retrieval indeed sounds like a better UX, I will update the
> > FLIP accordingly
> > However a downside to be aware of is that topic names to IDs mapping must
> > be checkpointed to retain the same ID across recoveries,
> > inflicting statefulness on so-far stateless operators like the
> > non-transactional sink.
> >
> > > 2. On the Sink side, the proposal uses "setPartitionDiscoveryIntervalMs".
> > > Since "Partition Discovery" in Flink implies Source behavior (finding
> > > splits), and the Sink is doing a metadata update, have you considered
> > > "setMetadataRefreshIntervalMs"? Maybe that aligns better with the
> > > underlying Kafka Client terminology (Metadata Fetch/Lookup/Refresh).
> >
> > >  introducing “Partition Discovery” for sinks feels conceptually
> > inconsistent with Flink’s existing design.
> >
> > Thanks Aleksandr and Leonard
> > The concept of partition discovery isn’t tied with source.
> > It was introduced to address the increase of partitions count, which may
> > happen on both source and sink topics.
> > Assigning splits to the newly found partitions is a source implementation
> > detail, and is also not on the docs.
> > I decided to advertise the feature as "sink partition discovery" to help
> > folks already familiar with the concept better understand the implications.
> > What do you think?
> >
> > > 3. Is it correct that enabling partition discovery is strictly required
> > for
> > > the integrity check?
> >
> > The integrity verification is performed upon metadata refresh.
> > On both source and sink, disabling partition discovery means metadata
> > isn't refreshed,
> > so integrity check will be performed once on startup.
> >
> > > 4. The disclaimer mentions an "inevitable short period of time" where the
> > > job reads/writes to the new topic before detection. If a checkpoint
> > > completes successfully during this window, do we risk to have a corrupted
> > > state? Is this a known limitation?
> >
> > The risk of data corruption upon topic recreation is discussed on the FLIP.
> > We can not save users from themselves. After all, if they decide to
> > recreate a topic, the data is lost.
> > Indeed the proposed check can not guarantee 100% protection due to its
> > periodical nature.
> > Nevertheless the job will fail shortly after, and jobs restored from that
> > checkpoint will not be allowed to start.
> > For more sensitive jobs you could obviously decrease the interval,
> > (trading performance)
> >
> > > 5. Will there be metrics exposed for monitoring the integrity checks?
> > We will not measure integrity checks, as a failure immediately triggers a
> > global unrecoverable failure.
> >
> > Efrat
> >
> > On Tue, 20 Jan 2026 at 05:58, Leonard Xu <[email protected]> wrote:
> >
> >> Hi Efrat,
> >>
> >> Thanks for kicking off this discussion — I’m also in favor of adding a
> >> check for the Kafka Connector.
> >>
> >>
> >> > My only concern is that manually specifying the topic ID seems to be
> >> > a bit costly for users. Why does it require flink users to pass the
> >> topic
> >> > id
> >> > manually or explicitly? It would be helpful for users if flink source or
> >> > sink
> >> > are able to fetch topic id automatically in the beginning, right?
> >>
> >> I share Rui’s concern about requiring users to manually specify the
> >> topicId. Given that topicId-based communication isn’t part of Kafka’s
> >> current roadmap (as noted in your “Future Plan” section) and is starting
> >> available in Kafka 4.0–4.2, it seems premature—and potentially
> >> burdensome—to expose it in Flink’s user-facing API. Ideally, if needed at
> >> all, the source or sink should be able to resolve the topicId automatically
> >> during initialization.
> >>
> >> Additionally, I agree with Aleksandr that introducing “Partition
> >> Discovery” for sinks feels conceptually inconsistent with Flink’s existing
> >> design.
> >>
> >> Best,
> >> Leonard
> >>
> >> >
> >> > On Thu, Jan 15, 2026 at 12:38 PM Efrat Levitan <[email protected]>
> >> > wrote:
> >> >
> >> >> Hi everyone, I'd like to start a discussion on FLIP-562 [1] to
> >> implement
> >> >> topic integrity checks on kafka connector, as currently the connector
> >> is
> >> >> blind to topic recreations, presenting risks to job consistency.
> >> >>
> >> >> Kafka APIs traditionally rely on topic names, which do not guarantee
> >> >> uniqueness over time.
> >> >> Though both KIP-516 [2] and KIP-848 [3] discuss topicId based
> >> >> communication, client support is not on the roadmap [4].
> >> >>
> >> >> The FLIP contains the new proposal to make flink kafka connector
> >> sensitive
> >> >> to topicId changes through integrity checks over the periodical
> >> metadata
> >> >> fetching (AKA topic partition discovery), and sets the grounds for
> >> future
> >> >> topicId based communication with both the user and kafka server.
> >> >>
> >> >> I'd appreciate your feedback on the proposed changes.
> >> >>
> >> >> Thanks,
> >> >> Efrat.
> >> >>
> >> >> [1]
> >> >>
> >> >>
> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-562%3A+Topic+integrity+checks+in+Kafka+Connector
> >> >>
> >> >> [2]
> >> >>
> >> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers
> >> >>
> >> >> [3]
> >> >>
> >> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol
> >> >>
> >> >> [4]
> >> >>
> >> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-516%3A+Topic+Identifiers/#KIP516:TopicIdentifiers-Clients
> >> >>
> >>
> >>



-- 
Kind regards,
Aleksandr

Reply via email to