> Regarding (2), definitely. This is something we planned to add later on
but
so far keeping things common has been working well.

My main worry for doing this as a later iteration is that this would
probably be a breaking change for the public interface. If that can be
avoided and planned ahead, I'm fine with moving forward with how it is
right now.

> DynamicKafkaSource may be confusing because it is really similar to the
KafkaDynamicSource/Sink (table connectors).

The table / sql Kafka connectors (KafkaDynamicTableFactory,
KafkaDynamicTableSource / KafkaDynamicTableSink) are all internal classes
not really meant to be exposed to the user though.
It can cause some confusion internally for the code maintainers, but on the
actual public surface I don't see this being an issue.

Thanks,
Gordon

On Wed, Jun 7, 2023 at 8:55 PM Mason Chen <mas.chen6...@gmail.com> wrote:

> Hi Gordon,
>
> Thanks for taking a look!
>
> Regarding (1), there is a need from the readers to send this event at
> startup because the reader state may reflect outdated metadata. Thus, the
> reader should not start without fresh metadata. With fresh metadata, the
> reader can filter splits from state--this filtering capability is
> ultimately how we solve the common issue of "I re-configured my Kafka
> source and removed some topic, but it refers to the old topic due to state
> *[1]*". I did not mention this because I thought this is more of a detail
> but I'll make a brief note of it.
>
> Regarding (2), definitely. This is something we planned to add later on but
> so far keeping things common has been working well. In that regard, yes the
> metadata service should expose these configurations but the source should
> not check it into state unlike the other metadata. I'm going to add it to a
> section called "future enhancements". This is also feedback that Ryan, an
> interested user, gave earlier in this thread.
>
> Regarding (3), that's definitely a good point and there are some real use
> cases, in addition to what you mentioned, to use this in single cluster
> mode (see *[1] *above). DynamicKafkaSource may be confusing because it is
> really similar to the KafkaDynamicSource/Sink (table connectors).
>
> Best,
> Mason
>
> On Wed, Jun 7, 2023 at 10:40 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
> > Hi Mason,
> >
> > Thanks for updating the FLIP. In principle, I believe this would be a
> > useful addition. Some comments so far:
> >
> > 1. In this sequence diagram [1], why is there a need for a
> > GetMetadataUpdateEvent from the MultiClusterSourceReader going to the
> > MultiClusterSourceEnumerator? Shouldn't the enumerator simply start
> sending
> > metadata update events to the reader once it is registered at the
> > enumerator?
> >
> > 2. Looking at the new builder API, there's a few configurations that are
> > common across *all *discovered Kafka clusters / topics, specifically the
> > deserialization schema, offset initialization strategy, Kafka client
> > properties, and consumer group ID. Is there any use case that users would
> > want to have these configurations differ across different Kafka clusters?
> > If that's the case, would it make more sense to encapsulate these
> > configurations to be owned by the metadata service?
> >
> > 3. Is MultiClusterKafkaSource the best name for this connector? I find
> that
> > the dynamic aspect of Kafka connectivity to be a more defining
> > characteristic, and that is the main advantage it has compared to the
> > static KafkaSource. A user may want to use this new connector over
> > KafkaSource even if they're just consuming from a single Kafka cluster;
> for
> > example, one immediate use case I can think of is Kafka repartitioning
> with
> > zero Flink job downtime. They create a new topic with higher parallelism
> > and repartition their Kafka records from the old topic to the new topic,
> > and they want the consuming Flink job to be able to move from the old
> topic
> > to the new topic with zero-downtime while retaining exactly-once
> > guarantees. So, perhaps DynamicKafkaSource is a better name for this
> > connector?
> >
> > Thanks,
> > Gordon
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source?preview=/217389320/255072018/image-2023-6-7_2-29-13.png
> >
> > On Wed, Jun 7, 2023 at 3:07 AM Mason Chen <mas.chen6...@gmail.com>
> wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for the prompt feedback! I had some confusion with how to resize
> > > images in confluence--anyways, I have made the font bigger, added white
> > > background, and also made the diagrams themselves bigger.
> > >
> > > Regarding the exactly once semantics, that's definitely good to point
> out
> > > in the doc. Thus, I have broken out my "Basic Idea" section into:
> > > 1. an intro
> > > 2. details about KafkaMetadataService
> > > 3. details about KafkaStream and KafkaClusterId (the metadata)
> > > 4. details about exactly once semantics and consistency guarantees
> > >
> > > This should give readers enough context about the design goals and
> > > interactions before deep diving into the class interfaces.
> > >
> > > Best,
> > > Mason
> > >
> > > On Tue, Jun 6, 2023 at 1:25 PM Jing Ge <j...@ververica.com.invalid>
> > wrote:
> > >
> > > > Hi Mason,
> > > >
> > > > It is a very practical feature that many users are keen to use.
> Thanks
> > to
> > > > the previous discussion, the FLIP now looks informative. Thanks for
> > your
> > > > proposal. One small suggestion is that the attached images are quite
> > > small
> > > > to read if we don't click and enlarge them. Besides that, It is
> > difficult
> > > > to read the text on the current sequence diagram because it has a
> > > > transparent background. Would you like to replace it with a white
> > > > background?
> > > >
> > > > Exactly-one is one of the key features of Kafka connector. I have the
> > > same
> > > > concern as Qingsheng. Since you have answered questions about it
> > > > previously, would you like to create an extra section in your FLIP to
> > > > explicitly describe scenarios when exactly-one is supported and when
> it
> > > is
> > > > not?
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Mon, Jun 5, 2023 at 11:41 PM Mason Chen <mas.chen6...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I'm working on FLIP-246 again, for the Multi Cluster Kafka Source
> > > > > contribution. The document has been updated with some more context
> > > about
> > > > > how it can solve the Kafka topic removal scenario and a sequence
> > > diagram
> > > > to
> > > > > illustrate how the components interact.
> > > > >
> > > > > Looking forward to any feedback!
> > > > >
> > > > > Best,
> > > > > Mason
> > > > >
> > > > > On Wed, Oct 12, 2022 at 11:12 PM Mason Chen <
> mas.chen6...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Ryan,
> > > > > >
> > > > > > Thanks for the additional context! Yes, the offset initializer
> > would
> > > > need
> > > > > > to take a cluster as a parameter and the
> > MultiClusterKafkaSourceSplit
> > > > can
> > > > > > be exposed in an initializer.
> > > > > >
> > > > > > Best,
> > > > > > Mason
> > > > > >
> > > > > > On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot <
> > > > > > ryan.vanhuuksl...@shopify.com> wrote:
> > > > > >
> > > > > >> Hi Mason,
> > > > > >>
> > > > > >> Thanks for the clarification! In regards to the addition to the
> > > > > >> OffsetInitializer of this API - this would be an awesome
> addition
> > > and
> > > > I
> > > > > >> think this entire FLIP would be a great addition to the Flink.
> > > > > >>
> > > > > >> To provide more context as to why we need particular offsets, we
> > use
> > > > > >> Hybrid Source to currently backfill from buckets prior to
> reading
> > > from
> > > > > >> Kafka. We have a service that will tell us what offset has last
> > been
> > > > > loaded
> > > > > >> into said bucket which we will use to initialize the KafkaSource
> > > > > >> OffsetsInitializer. We couldn't use a timestamp here and the
> > offset
> > > > > would
> > > > > >> be different for each Cluster.
> > > > > >>
> > > > > >> In pseudocode, we'd want the ability to do something like this
> > with
> > > > > >> HybridSources - if this is possible.
> > > > > >>
> > > > > >> ```scala
> > > > > >> val offsetsMetadata: Map[TopicPartition, Long] = // Get current
> > > > offsets
> > > > > >> from OffsetReaderService
> > > > > >> val multiClusterArchiveSource: MultiBucketFileSource[T] = //
> Data
> > is
> > > > > read
> > > > > >> from different buckets (multiple topics)
> > > > > >> val multiClusterKafkaSource: MultiClusterKafkaSource[T] =
> > > > > >> MultiClusterKafkaSource.builder()
> > > > > >>   .setKafkaMetadataService(new KafkaMetadataServiceImpl())
> > > > > >>   .setStreamIds(List.of("my-stream-1", "my-stream-2"))
> > > > > >>   .setGroupId("myConsumerGroup")
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
> > > > > >>   .setStartingOffsets(offsetsMetadata)
> > > > > >>   .setProperties(properties)
> > > > > >>   .build()
> > > > > >> val source =
> > > > > >>
> > > > >
> > > >
> > >
> >
> HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build()
> > > > > >> ```
> > > > > >>
> > > > > >> Few notes:
> > > > > >> - TopicPartition won't work because the topic may be the same
> name
> > > as
> > > > > >> this is something that is supported IIRC
> > > > > >> - I chose to pass a map into starting offsets just for
> > demonstrative
> > > > > >> purposes, I would be fine with whatever data structure would
> work
> > > best
> > > > > >>
> > > > > >> Ryan van Huuksloot
> > > > > >> Data Developer | Production Engineering | Streaming Capabilities
> > > > > >> [image: Shopify]
> > > > > >> <
> > > > >
> > >
> https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
> > > > > >>
> > > > > >>
> > > > > >> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen <
> > mas.chen6...@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> Hi Ryan,
> > > > > >>>
> > > > > >>> Just copying your message over to the email chain.
> > > > > >>>
> > > > > >>> Hi Mason,
> > > > > >>>> First off, thanks for putting this FLIP together! Sorry for
> the
> > > > delay.
> > > > > >>>> Full disclosure Mason and I chatted a little bit at Flink
> > Forward
> > > > > 2022 but
> > > > > >>>> I have tried to capture the questions I had for him then.
> > > > > >>>> I'll start the conversation with a few questions:
> > > > > >>>> 1. The concept of streamIds is not clear to me in the proposal
> > and
> > > > > >>>> could use some more information. If I understand correctly,
> they
> > > > will
> > > > > be
> > > > > >>>> used in the MetadataService to link KafkaClusters to ones you
> > want
> > > > to
> > > > > use?
> > > > > >>>> If you assign stream ids using `setStreamIds`, how can you
> > > > dynamically
> > > > > >>>> increase the number of clusters you consume if the list of
> > > StreamIds
> > > > > is
> > > > > >>>> static? I am basing this off of your example
> > > .setStreamIds(List.of(
> > > > > >>>> "my-stream-1", "my-stream-2")) so I could be off base with my
> > > > > >>>> assumption. If you don't mind clearing up the intention, that
> > > would
> > > > be
> > > > > >>>> great!
> > > > > >>>> 2. How would offsets work if you wanted to use this
> > > > > >>>> MultiClusterKafkaSource with a file based backfill? In the
> case
> > I
> > > am
> > > > > >>>> thinking of, you have a bucket backed archive of Kafka data
> per
> > > > > cluster.
> > > > > >>>> and you want to pick up from the last offset in the archived
> > > system,
> > > > > how
> > > > > >>>> would you set OffsetInitializers "per cluster" potentially as
> a
> > > > > function or
> > > > > >>>> are you limited to setting an OffsetInitializer for the entire
> > > > Source?
> > > > > >>>> 3. Just to make sure - because this system will layer on top
> of
> > > > > >>>> Flink-27 and use KafkaSource for some aspects under the hood,
> > the
> > > > > watermark
> > > > > >>>> alignment that was introduced in FLIP-182 / Flink 1.15 would
> be
> > > > > possible
> > > > > >>>> across multiple clusters if you assign them to the same
> > alignment
> > > > > group?
> > > > > >>>> Thanks!
> > > > > >>>> Ryan
> > > > > >>>
> > > > > >>>
> > > > > >>> 1. The stream ids are static--however, what the physical
> clusters
> > > and
> > > > > >>> topics that they map to can mutate. Let's say my-stream-1 maps
> to
> > > > > cluster-1
> > > > > >>> and topic-1. The KafkaMetadataService can return a different
> > > mapping
> > > > > when
> > > > > >>> metadata is fetched the next time e.g. my-stream-1 mapping to
> > > > > cluster-1 and
> > > > > >>> topic-1, and cluster-2 and topic-2. Let me add more details on
> > how
> > > > the
> > > > > >>> KafkaMetadataService is used.
> > > > > >>> 2. The current design limits itself to a single configured
> > > > > >>> OffsetInitializer that is used for every underlying
> KafkaSource.
> > > > > >>> 3. Yes, it is in our plan to integrate this source with
> watermark
> > > > > >>> alignment in which the user can align watermarks from all
> > clusters
> > > > > within
> > > > > >>> the single. It will leverage the Kafka Source implementation to
> > > > achieve
> > > > > >>> this.
> > > > > >>>
> > > > > >>> With regards to 2, it's an interesting idea. I think we can
> > extend
> > > > the
> > > > > >>> design to support a map of offset initializers to clusters,
> which
> > > > would
> > > > > >>> solve your file based backfill. If you initialize the source
> > with a
> > > > > single
> > > > > >>> timestamp, the current design may work for your usecase, but I
> > > can't
> > > > > tell
> > > > > >>> without more details. Thanks for your interest and sorry for
> the
> > > > delay!
> > > > > >>>
> > > > > >>> Best,
> > > > > >>> Mason
> > > > > >>>
> > > > > >>> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen <
> > > mas.chen6...@gmail.com>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> Hi Max,
> > > > > >>>>
> > > > > >>>> Thanks for taking a look!
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>> I'm wondering whether we can share some of the code of the
> > > existing
> > > > > >>>>> KafkaSource.
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>> That is the intention--let me call it out more explicitly.
> > > > > >>>>
> > > > > >>>> Regarding your questions:
> > > > > >>>>
> > > > > >>>> 1. Indeed, the KafkaMetadataService has the describe stream
> > method
> > > > to
> > > > > >>>> get a particular stream id. We decided to support getting all
> > the
> > > > > streams
> > > > > >>>> for subscribing via a regex pattern (similar to the Kafka
> Source
> > > > > >>>> implementation).
> > > > > >>>>
> > > > > >>>> 2. The idea was that if metadata is removed that it is no
> longer
> > > > > active.
> > > > > >>>>
> > > > > >>>> 3. The MetadataUpdateEvent's format is specifically for
> > > > communicating
> > > > > >>>> to the reader what the clusters and topics it should read
> from.
> > It
> > > > > doesn't
> > > > > >>>> need stream information since it doesn't interact with the
> > > > > >>>> KafkaMetadataService (only the enumerator interacts with it).
> > > > > >>>>
> > > > > >>>> 4. Metrics will be reported per cluster. For example,
> > KafkaSource
> > > > > >>>> already reports pendingRecords and the corresponding metric,
> for
> > > > > example
> > > > > >>>> for cluster0, would be a metric called
> > > > > >>>>
> `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`.
> > In
> > > > > cluster
> > > > > >>>> removal, these metrics wouldn't be valid so the implementation
> > can
> > > > > close
> > > > > >>>> them.
> > > > > >>>>
> > > > > >>>> 5. I'm fine with that name; however, I got some feedback
> > > internally
> > > > > >>>> since the bulk of the logic is in stopping the scheduled tasks
> > of
> > > > the
> > > > > >>>> underlying enumerators and handling cluster unavailability
> edge
> > > > > cases. I'm
> > > > > >>>> open to changing the name if the design changes (it is an
> > internal
> > > > > class
> > > > > >>>> anyways, so we can make these name changes without breaking
> > > users).
> > > > > >>>>
> > > > > >>>> 6. Yes, there are some limitations but I have not
> > > > > >>>> considered implementing that in the basic ConfigMap
> > > > > >>>> implementation--currently users are allowed to do any changes.
> > For
> > > > > example,
> > > > > >>>> a user should not delete and recreate a topic, on the same
> > > cluster.
> > > > > >>>> Regarding the logic to properly remove a cluster, a user could
> > > > > certainly
> > > > > >>>> support it with a custom KafkaMetadataService--I intended to
> > keep
> > > > the
> > > > > >>>> ConfigMap implementation basic for simple use cases (so users
> > here
> > > > > would
> > > > > >>>> rely on manual monitoring or something built externally).
> > However,
> > > > > I'm open
> > > > > >>>> to the idea if the usage changes and maybe there could be
> > > > > improvements to
> > > > > >>>> the Flink metric API to achieve more seamless integration. And
> > > > > finally,
> > > > > >>>> yes, the semantics are as such and gives reason to my response
> > for
> > > > > question
> > > > > >>>> 2.
> > > > > >>>>
> > > > > >>>> I've updated the doc with more context from my question
> > responses,
> > > > let
> > > > > >>>> me know if there are more questions!
> > > > > >>>>
> > > > > >>>> Best,
> > > > > >>>> Mason
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>>
> > > > > >>>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels <
> > > m...@apache.org>
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>>> Hey Mason,
> > > > > >>>>>
> > > > > >>>>> I just had a look at the FLIP. If I understand correctly, you
> > are
> > > > > >>>>> proposing a very sophisticated way to read from multiple
> Kafka
> > > > > >>>>> clusters
> > > > > >>>>> / topics.
> > > > > >>>>>
> > > > > >>>>> I'm wondering whether we can share some of the code of the
> > > existing
> > > > > >>>>> KafkaSource. I suppose you don't want to modify KafkaSource
> > > itself
> > > > to
> > > > > >>>>> avoid any breakage. But it would be good not to duplicate too
> > > much
> > > > > >>>>> code,
> > > > > >>>>>   such that functionality that can be shared between the two
> > > > > >>>>> implementations (e.g. the reader implementation).
> > > > > >>>>>
> > > > > >>>>> Some questions that I had when browsing the current version:
> > > > > >>>>>
> > > > > >>>>>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
> > > > > >>>>>
> > > > > >>>>> 1. Why does KafkaMetadataService have the `describeStream`
> > method
> > > > in
> > > > > >>>>> addition to `getAllStreams`? This may be redundant. Is the
> idea
> > > to
> > > > > get
> > > > > >>>>> the updated metadata for a particular StreamId?
> > > > > >>>>>
> > > > > >>>>> 2. KafkaMetaDataService#isClusterActive serves the purpose to
> > > check
> > > > > >>>>> for
> > > > > >>>>> activeness, couldn't this be included in the metadata of
> > > > KafkaStream?
> > > > > >>>>>
> > > > > >>>>> 3. Shouldn't MetadataUpdateEvent contain a full list of
> > > KafkaStream
> > > > > >>>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`?
> > > > > >>>>>
> > > > > >>>>> 4. "In addition, restarting enumerators involve clearing
> > outdated
> > > > > >>>>> metrics" What metrics are we talking about here?
> > > > > >>>>>
> > > > > >>>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me. How
> > > about
> > > > > >>>>> `MultiKafkaSplitEnumeratorContext`?
> > > > > >>>>>
> > > > > >>>>> 6. What about the ConfigMap implementation? Are there any
> > > > limitations
> > > > > >>>>> on
> > > > > >>>>> the type of configuration changes that we want to allow? For
> > > > example,
> > > > > >>>>> is
> > > > > >>>>> it allowed to remove a cluster before it has been drained /
> > > > > >>>>> deactivated?
> > > > > >>>>> Is "not active" semantically identical to having the cluster
> /
> > > > stream
> > > > > >>>>> removed?
> > > > > >>>>>
> > > > > >>>>> This is an exciting new addition!
> > > > > >>>>>
> > > > > >>>>> Cheers,
> > > > > >>>>> Max
> > > > > >>>>>
> > > > > >>>>> On 11.08.22 10:10, Mason Chen wrote:
> > > > > >>>>> > 5. At startup, GetMetadataUpdateEvent is also used to allow
> > the
> > > > > >>>>> > MultiClusterKafkaSourceReader to get the latest metadata
> from
> > > the
> > > > > >>>>> > enumerator to filter out invalid splits This is how the
> > reader
> > > > can
> > > > > >>>>> solve
> > > > > >>>>> > "removing" splits/topics in the startup case.
> > > > > >>>>> >
> > > > > >>>>> > Sorry for the late response, really appreciate you taking a
> > > look
> > > > at
> > > > > >>>>> the
> > > > > >>>>> > FLIP!
> > > > > >>>>> >
> > > > > >>>>> > Best,
> > > > > >>>>> > Mason
> > > > > >>>>> >
> > > > > >>>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <
> > > > mas.chen6...@gmail.com
> > > > > >
> > > > > >>>>> wrote:
> > > > > >>>>> >
> > > > > >>>>> >> Hi Qingsheng,
> > > > > >>>>> >>
> > > > > >>>>> >> Thanks for the feedback--these are great points to raise.
> > > > > >>>>> >>
> > > > > >>>>> >> 1. This is something I missed that is now added. More
> > > generally,
> > > > > it
> > > > > >>>>> can
> > > > > >>>>> >> locate multiple topics in multiple clusters (1 topic on 1
> > > > cluster
> > > > > >>>>> is the
> > > > > >>>>> >> simplest case).
> > > > > >>>>> >>
> > > > > >>>>> >> 2. The KafkaMetadataService doesn't interact with the
> > > > > >>>>> KafkaAdminClients.
> > > > > >>>>> >> This source merely composes the functionality of the
> > > KafkaSource
> > > > > so
> > > > > >>>>> >> KafkaAdminClient interaction is handled by the
> > > KafkaSubscriber.
> > > > > >>>>> >>
> > > > > >>>>> >> 3. There are no requirements for the two
> > clusters--KafkaStream
> > > > > >>>>> should
> > > > > >>>>> >> clarify this question. For example, you could move from
> > > > topicName1
> > > > > >>>>> in
> > > > > >>>>> >> cluster 1 with 11 partitions to topicName2 in cluster 2
> with
> > > 22
> > > > > >>>>> >> partitions--only the KafkaStream id needs to remain the
> > same.
> > > If
> > > > > >>>>> there are
> > > > > >>>>> >> no offsets in checkpoint, the offsets are handled by the
> > > offsets
> > > > > >>>>> >> initializer from KafkaSource and currently the design only
> > > > exposes
> > > > > >>>>> 1 option
> > > > > >>>>> >> for all Kafka clusters, although this could be a valuable
> > > > > extension.
> > > > > >>>>> >>
> > > > > >>>>> >> 4. Regarding topic and cluster removal, metadata is
> > checkpoint
> > > > in
> > > > > >>>>> state
> > > > > >>>>> >> via the splits. Exactly once can be maintained with the
> > > > assumption
> > > > > >>>>> that
> > > > > >>>>> >> required data from the dead cluster lives in the live
> > cluster.
> > > > > This
> > > > > >>>>> can be
> > > > > >>>>> >> solved by not destroying the old Kafka cluster until
> > consumers
> > > > are
> > > > > >>>>> already
> > > > > >>>>> >> drained. In switchover, the consumer would consume from
> both
> > > old
> > > > > >>>>> and new
> > > > > >>>>> >> clusters. And finally, the metadata can be changed to
> point
> > > only
> > > > > to
> > > > > >>>>> the new
> > > > > >>>>> >> cluster when consumers are drained. With the regular
> > > > KafkaSource,
> > > > > >>>>> if Kafka
> > > > > >>>>> >> deletes topic or a cluster is destroyed, the exactly once
> > > > > semantics
> > > > > >>>>> are not
> > > > > >>>>> >> preserved and the semantic is tightly coupled with
> storage.
> > > The
> > > > > >>>>> design
> > > > > >>>>> >> composes and delegates the responsibilities to KafkaSource
> > > > > >>>>> components so it
> > > > > >>>>> >> is limited to whatever KafkaSource can do for exactly once
> > > > > >>>>> semantics.
> > > > > >>>>> >>
> > > > > >>>>> >> 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent
> was
> > > > added
> > > > > >>>>> to make
> > > > > >>>>> >> the order of steps in reader restart during split
> assignment
> > > > > >>>>> deterministic.
> > > > > >>>>> >> StoppableKafkaEnumContextProxy are used by the underlying
> > > > > >>>>> >> KafkaSourceEnumerator to assign splits and do topic
> periodic
> > > > > >>>>> partition
> > > > > >>>>> >> discovery. So, these scheduled thread pools need to be
> > cleaned
> > > > up
> > > > > >>>>> properly
> > > > > >>>>> >> and splits need to be wrapped with cluster information.
> > These
> > > > > >>>>> details are
> > > > > >>>>> >> added to the FLIP.
> > > > > >>>>> >>
> > > > > >>>>> >> Best,
> > > > > >>>>> >> Mason
> > > > > >>>>> >>
> > > > > >>>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <
> > > > renqs...@gmail.com
> > > > > >
> > > > > >>>>> wrote:
> > > > > >>>>> >>
> > > > > >>>>> >>> Hi Mason,
> > > > > >>>>> >>>
> > > > > >>>>> >>> Thank you for starting this FLIP!
> > > > > >>>>> >>>
> > > > > >>>>> >>>  From my first glance this FLIP looks like a collection
> of
> > > many
> > > > > new
> > > > > >>>>> >>> interfaces, but I can’t stitch them together. It’ll be
> > great
> > > to
> > > > > >>>>> have some
> > > > > >>>>> >>> brief descriptions about how the source works internally.
> > > Here
> > > > > are
> > > > > >>>>> some
> > > > > >>>>> >>> questions in my mind and please correct me if I
> > misunderstand
> > > > > your
> > > > > >>>>> design.
> > > > > >>>>> >>>
> > > > > >>>>> >>> 1. I can’t find the definition (in code) of KafkaStream.
> > As a
> > > > > part
> > > > > >>>>> of the
> > > > > >>>>> >>> public interface KafkaMetadataService it has to be public
> > > too.
> > > > > If I
> > > > > >>>>> >>> understand correctly it locates a topic on a specific
> > > cluster.
> > > > > >>>>> >>>
> > > > > >>>>> >>> 2. I think there should be a default implementation /
> > example
> > > > for
> > > > > >>>>> >>> KafkaMetadataService for out-of-box usage, for example a
> > > > wrapper
> > > > > of
> > > > > >>>>> >>> multiple Kafka AdminClients that watching clusters
> > > > periodically.
> > > > > >>>>> >>>
> > > > > >>>>> >>> 3. It looks like the source has the ability to handle
> Kafka
> > > > > cluster
> > > > > >>>>> >>> failures, like switching connections to another cluster
> > > without
> > > > > >>>>> restarting
> > > > > >>>>> >>> the Flink job. Is there any requirement for the two
> > clusters?
> > > > For
> > > > > >>>>> example
> > > > > >>>>> >>> they have to be identical in topic names, number of
> > > partitions
> > > > > and
> > > > > >>>>> offsets
> > > > > >>>>> >>> etc.
> > > > > >>>>> >>>
> > > > > >>>>> >>> 4. Regarding topic and cluster removal, how to handle and
> > > > recover
> > > > > >>>>> from
> > > > > >>>>> >>> checkpoint? Let’s say a topic is removed or migrated to
> > > another
> > > > > >>>>> cluster
> > > > > >>>>> >>> after a successful checkpoint. If the job tries to roll
> > back
> > > to
> > > > > the
> > > > > >>>>> >>> checkpoint which still contains the deleted topic or info
> > of
> > > a
> > > > > dead
> > > > > >>>>> >>> cluster, then how to keep the exactly-once semantic under
> > > this
> > > > > >>>>> case?
> > > > > >>>>> >>>
> > > > > >>>>> >>> 5. I don’t quite get the design of
> > > > StoppableKafkaEnumContextProxy
> > > > > >>>>> and the
> > > > > >>>>> >>> GetMeradataUpdateEvent. Could you elaborate more in the
> > FLIP?
> > > > > >>>>> >>>
> > > > > >>>>> >>> In a nutshell I think the idea of this FLIP is good,
> which
> > > > > extends
> > > > > >>>>> the
> > > > > >>>>> >>> usage of Kafka source. However as a design doc, some
> > details
> > > > need
> > > > > >>>>> to be
> > > > > >>>>> >>> enriched for other users and developers to better
> > understand
> > > > how
> > > > > >>>>> this
> > > > > >>>>> >>> source works.
> > > > > >>>>> >>>
> > > > > >>>>> >>> Best,
> > > > > >>>>> >>> Qingsheng
> > > > > >>>>> >>>
> > > > > >>>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen <
> > > mas.chen6...@gmail.com
> > > > >
> > > > > >>>>> wrote:
> > > > > >>>>> >>>>
> > > > > >>>>> >>>> Hi all,
> > > > > >>>>> >>>>
> > > > > >>>>> >>>> We would like to start a discussion thread on FLIP-246:
> > > Multi
> > > > > >>>>> Cluster
> > > > > >>>>> >>> Kafka
> > > > > >>>>> >>>> Source [1] where we propose to provide a source
> connector
> > > for
> > > > > >>>>> >>> dynamically
> > > > > >>>>> >>>> reading from Kafka multiple clusters, which will not
> > require
> > > > > >>>>> Flink job
> > > > > >>>>> >>>> restart. This can greatly improve the Kafka migration
> > > > experience
> > > > > >>>>> for
> > > > > >>>>> >>>> clusters and topics, and it solves some existing
> problems
> > > with
> > > > > the
> > > > > >>>>> >>> current
> > > > > >>>>> >>>> KafkaSource. There was some interest from users [2]
> from a
> > > > > meetup
> > > > > >>>>> and
> > > > > >>>>> >>> the
> > > > > >>>>> >>>> mailing list. Looking forward to comments and feedback,
> > > > thanks!
> > > > > >>>>> >>>>
> > > > > >>>>> >>>> [1]
> > > > > >>>>> >>>>
> > > > > >>>>> >>>
> > > > > >>>>>
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
> > > > > >>>>> >>>> [2]
> > > > > >>>>>
> > https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc
> > > > > >>>>> >>>>
> > > > > >>>>> >>>> Best,
> > > > > >>>>> >>>> Mason
> > > > > >>>>> >>>
> > > > > >>>>> >>>
> > > > > >>>>> >
> > > > > >>>>>
> > > > > >>>>
> > > > >
> > > >
> > >
> >
>

Reply via email to