Hi Gordon,

Thanks for taking a look!

Regarding (1), there is a need from the readers to send this event at
startup because the reader state may reflect outdated metadata. Thus, the
reader should not start without fresh metadata. With fresh metadata, the
reader can filter splits from state--this filtering capability is
ultimately how we solve the common issue of "I re-configured my Kafka
source and removed some topic, but it refers to the old topic due to state
*[1]*". I did not mention this because I thought this is more of a detail
but I'll make a brief note of it.

Regarding (2), definitely. This is something we planned to add later on but
so far keeping things common has been working well. In that regard, yes the
metadata service should expose these configurations but the source should
not check it into state unlike the other metadata. I'm going to add it to a
section called "future enhancements". This is also feedback that Ryan, an
interested user, gave earlier in this thread.

Regarding (3), that's definitely a good point and there are some real use
cases, in addition to what you mentioned, to use this in single cluster
mode (see *[1] *above). DynamicKafkaSource may be confusing because it is
really similar to the KafkaDynamicSource/Sink (table connectors).

Best,
Mason

On Wed, Jun 7, 2023 at 10:40 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Mason,
>
> Thanks for updating the FLIP. In principle, I believe this would be a
> useful addition. Some comments so far:
>
> 1. In this sequence diagram [1], why is there a need for a
> GetMetadataUpdateEvent from the MultiClusterSourceReader going to the
> MultiClusterSourceEnumerator? Shouldn't the enumerator simply start sending
> metadata update events to the reader once it is registered at the
> enumerator?
>
> 2. Looking at the new builder API, there's a few configurations that are
> common across *all *discovered Kafka clusters / topics, specifically the
> deserialization schema, offset initialization strategy, Kafka client
> properties, and consumer group ID. Is there any use case that users would
> want to have these configurations differ across different Kafka clusters?
> If that's the case, would it make more sense to encapsulate these
> configurations to be owned by the metadata service?
>
> 3. Is MultiClusterKafkaSource the best name for this connector? I find that
> the dynamic aspect of Kafka connectivity to be a more defining
> characteristic, and that is the main advantage it has compared to the
> static KafkaSource. A user may want to use this new connector over
> KafkaSource even if they're just consuming from a single Kafka cluster; for
> example, one immediate use case I can think of is Kafka repartitioning with
> zero Flink job downtime. They create a new topic with higher parallelism
> and repartition their Kafka records from the old topic to the new topic,
> and they want the consuming Flink job to be able to move from the old topic
> to the new topic with zero-downtime while retaining exactly-once
> guarantees. So, perhaps DynamicKafkaSource is a better name for this
> connector?
>
> Thanks,
> Gordon
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source?preview=/217389320/255072018/image-2023-6-7_2-29-13.png
>
> On Wed, Jun 7, 2023 at 3:07 AM Mason Chen <mas.chen6...@gmail.com> wrote:
>
> > Hi Jing,
> >
> > Thanks for the prompt feedback! I had some confusion with how to resize
> > images in confluence--anyways, I have made the font bigger, added white
> > background, and also made the diagrams themselves bigger.
> >
> > Regarding the exactly once semantics, that's definitely good to point out
> > in the doc. Thus, I have broken out my "Basic Idea" section into:
> > 1. an intro
> > 2. details about KafkaMetadataService
> > 3. details about KafkaStream and KafkaClusterId (the metadata)
> > 4. details about exactly once semantics and consistency guarantees
> >
> > This should give readers enough context about the design goals and
> > interactions before deep diving into the class interfaces.
> >
> > Best,
> > Mason
> >
> > On Tue, Jun 6, 2023 at 1:25 PM Jing Ge <j...@ververica.com.invalid>
> wrote:
> >
> > > Hi Mason,
> > >
> > > It is a very practical feature that many users are keen to use. Thanks
> to
> > > the previous discussion, the FLIP now looks informative. Thanks for
> your
> > > proposal. One small suggestion is that the attached images are quite
> > small
> > > to read if we don't click and enlarge them. Besides that, It is
> difficult
> > > to read the text on the current sequence diagram because it has a
> > > transparent background. Would you like to replace it with a white
> > > background?
> > >
> > > Exactly-one is one of the key features of Kafka connector. I have the
> > same
> > > concern as Qingsheng. Since you have answered questions about it
> > > previously, would you like to create an extra section in your FLIP to
> > > explicitly describe scenarios when exactly-one is supported and when it
> > is
> > > not?
> > >
> > > Best regards,
> > > Jing
> > >
> > > On Mon, Jun 5, 2023 at 11:41 PM Mason Chen <mas.chen6...@gmail.com>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'm working on FLIP-246 again, for the Multi Cluster Kafka Source
> > > > contribution. The document has been updated with some more context
> > about
> > > > how it can solve the Kafka topic removal scenario and a sequence
> > diagram
> > > to
> > > > illustrate how the components interact.
> > > >
> > > > Looking forward to any feedback!
> > > >
> > > > Best,
> > > > Mason
> > > >
> > > > On Wed, Oct 12, 2022 at 11:12 PM Mason Chen <mas.chen6...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Ryan,
> > > > >
> > > > > Thanks for the additional context! Yes, the offset initializer
> would
> > > need
> > > > > to take a cluster as a parameter and the
> MultiClusterKafkaSourceSplit
> > > can
> > > > > be exposed in an initializer.
> > > > >
> > > > > Best,
> > > > > Mason
> > > > >
> > > > > On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot <
> > > > > ryan.vanhuuksl...@shopify.com> wrote:
> > > > >
> > > > >> Hi Mason,
> > > > >>
> > > > >> Thanks for the clarification! In regards to the addition to the
> > > > >> OffsetInitializer of this API - this would be an awesome addition
> > and
> > > I
> > > > >> think this entire FLIP would be a great addition to the Flink.
> > > > >>
> > > > >> To provide more context as to why we need particular offsets, we
> use
> > > > >> Hybrid Source to currently backfill from buckets prior to reading
> > from
> > > > >> Kafka. We have a service that will tell us what offset has last
> been
> > > > loaded
> > > > >> into said bucket which we will use to initialize the KafkaSource
> > > > >> OffsetsInitializer. We couldn't use a timestamp here and the
> offset
> > > > would
> > > > >> be different for each Cluster.
> > > > >>
> > > > >> In pseudocode, we'd want the ability to do something like this
> with
> > > > >> HybridSources - if this is possible.
> > > > >>
> > > > >> ```scala
> > > > >> val offsetsMetadata: Map[TopicPartition, Long] = // Get current
> > > offsets
> > > > >> from OffsetReaderService
> > > > >> val multiClusterArchiveSource: MultiBucketFileSource[T] = // Data
> is
> > > > read
> > > > >> from different buckets (multiple topics)
> > > > >> val multiClusterKafkaSource: MultiClusterKafkaSource[T] =
> > > > >> MultiClusterKafkaSource.builder()
> > > > >>   .setKafkaMetadataService(new KafkaMetadataServiceImpl())
> > > > >>   .setStreamIds(List.of("my-stream-1", "my-stream-2"))
> > > > >>   .setGroupId("myConsumerGroup")
> > > > >>
> > > > >>
> > > >
> > >
> >
> .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
> > > > >>   .setStartingOffsets(offsetsMetadata)
> > > > >>   .setProperties(properties)
> > > > >>   .build()
> > > > >> val source =
> > > > >>
> > > >
> > >
> >
> HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build()
> > > > >> ```
> > > > >>
> > > > >> Few notes:
> > > > >> - TopicPartition won't work because the topic may be the same name
> > as
> > > > >> this is something that is supported IIRC
> > > > >> - I chose to pass a map into starting offsets just for
> demonstrative
> > > > >> purposes, I would be fine with whatever data structure would work
> > best
> > > > >>
> > > > >> Ryan van Huuksloot
> > > > >> Data Developer | Production Engineering | Streaming Capabilities
> > > > >> [image: Shopify]
> > > > >> <
> > > >
> > https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
> > > > >>
> > > > >>
> > > > >> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen <
> mas.chen6...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>> Hi Ryan,
> > > > >>>
> > > > >>> Just copying your message over to the email chain.
> > > > >>>
> > > > >>> Hi Mason,
> > > > >>>> First off, thanks for putting this FLIP together! Sorry for the
> > > delay.
> > > > >>>> Full disclosure Mason and I chatted a little bit at Flink
> Forward
> > > > 2022 but
> > > > >>>> I have tried to capture the questions I had for him then.
> > > > >>>> I'll start the conversation with a few questions:
> > > > >>>> 1. The concept of streamIds is not clear to me in the proposal
> and
> > > > >>>> could use some more information. If I understand correctly, they
> > > will
> > > > be
> > > > >>>> used in the MetadataService to link KafkaClusters to ones you
> want
> > > to
> > > > use?
> > > > >>>> If you assign stream ids using `setStreamIds`, how can you
> > > dynamically
> > > > >>>> increase the number of clusters you consume if the list of
> > StreamIds
> > > > is
> > > > >>>> static? I am basing this off of your example
> > .setStreamIds(List.of(
> > > > >>>> "my-stream-1", "my-stream-2")) so I could be off base with my
> > > > >>>> assumption. If you don't mind clearing up the intention, that
> > would
> > > be
> > > > >>>> great!
> > > > >>>> 2. How would offsets work if you wanted to use this
> > > > >>>> MultiClusterKafkaSource with a file based backfill? In the case
> I
> > am
> > > > >>>> thinking of, you have a bucket backed archive of Kafka data per
> > > > cluster.
> > > > >>>> and you want to pick up from the last offset in the archived
> > system,
> > > > how
> > > > >>>> would you set OffsetInitializers "per cluster" potentially as a
> > > > function or
> > > > >>>> are you limited to setting an OffsetInitializer for the entire
> > > Source?
> > > > >>>> 3. Just to make sure - because this system will layer on top of
> > > > >>>> Flink-27 and use KafkaSource for some aspects under the hood,
> the
> > > > watermark
> > > > >>>> alignment that was introduced in FLIP-182 / Flink 1.15 would be
> > > > possible
> > > > >>>> across multiple clusters if you assign them to the same
> alignment
> > > > group?
> > > > >>>> Thanks!
> > > > >>>> Ryan
> > > > >>>
> > > > >>>
> > > > >>> 1. The stream ids are static--however, what the physical clusters
> > and
> > > > >>> topics that they map to can mutate. Let's say my-stream-1 maps to
> > > > cluster-1
> > > > >>> and topic-1. The KafkaMetadataService can return a different
> > mapping
> > > > when
> > > > >>> metadata is fetched the next time e.g. my-stream-1 mapping to
> > > > cluster-1 and
> > > > >>> topic-1, and cluster-2 and topic-2. Let me add more details on
> how
> > > the
> > > > >>> KafkaMetadataService is used.
> > > > >>> 2. The current design limits itself to a single configured
> > > > >>> OffsetInitializer that is used for every underlying KafkaSource.
> > > > >>> 3. Yes, it is in our plan to integrate this source with watermark
> > > > >>> alignment in which the user can align watermarks from all
> clusters
> > > > within
> > > > >>> the single. It will leverage the Kafka Source implementation to
> > > achieve
> > > > >>> this.
> > > > >>>
> > > > >>> With regards to 2, it's an interesting idea. I think we can
> extend
> > > the
> > > > >>> design to support a map of offset initializers to clusters, which
> > > would
> > > > >>> solve your file based backfill. If you initialize the source
> with a
> > > > single
> > > > >>> timestamp, the current design may work for your usecase, but I
> > can't
> > > > tell
> > > > >>> without more details. Thanks for your interest and sorry for the
> > > delay!
> > > > >>>
> > > > >>> Best,
> > > > >>> Mason
> > > > >>>
> > > > >>> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen <
> > mas.chen6...@gmail.com>
> > > > >>> wrote:
> > > > >>>
> > > > >>>> Hi Max,
> > > > >>>>
> > > > >>>> Thanks for taking a look!
> > > > >>>>
> > > > >>>>
> > > > >>>>> I'm wondering whether we can share some of the code of the
> > existing
> > > > >>>>> KafkaSource.
> > > > >>>>>
> > > > >>>>
> > > > >>>> That is the intention--let me call it out more explicitly.
> > > > >>>>
> > > > >>>> Regarding your questions:
> > > > >>>>
> > > > >>>> 1. Indeed, the KafkaMetadataService has the describe stream
> method
> > > to
> > > > >>>> get a particular stream id. We decided to support getting all
> the
> > > > streams
> > > > >>>> for subscribing via a regex pattern (similar to the Kafka Source
> > > > >>>> implementation).
> > > > >>>>
> > > > >>>> 2. The idea was that if metadata is removed that it is no longer
> > > > active.
> > > > >>>>
> > > > >>>> 3. The MetadataUpdateEvent's format is specifically for
> > > communicating
> > > > >>>> to the reader what the clusters and topics it should read from.
> It
> > > > doesn't
> > > > >>>> need stream information since it doesn't interact with the
> > > > >>>> KafkaMetadataService (only the enumerator interacts with it).
> > > > >>>>
> > > > >>>> 4. Metrics will be reported per cluster. For example,
> KafkaSource
> > > > >>>> already reports pendingRecords and the corresponding metric, for
> > > > example
> > > > >>>> for cluster0, would be a metric called
> > > > >>>> `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`.
> In
> > > > cluster
> > > > >>>> removal, these metrics wouldn't be valid so the implementation
> can
> > > > close
> > > > >>>> them.
> > > > >>>>
> > > > >>>> 5. I'm fine with that name; however, I got some feedback
> > internally
> > > > >>>> since the bulk of the logic is in stopping the scheduled tasks
> of
> > > the
> > > > >>>> underlying enumerators and handling cluster unavailability edge
> > > > cases. I'm
> > > > >>>> open to changing the name if the design changes (it is an
> internal
> > > > class
> > > > >>>> anyways, so we can make these name changes without breaking
> > users).
> > > > >>>>
> > > > >>>> 6. Yes, there are some limitations but I have not
> > > > >>>> considered implementing that in the basic ConfigMap
> > > > >>>> implementation--currently users are allowed to do any changes.
> For
> > > > example,
> > > > >>>> a user should not delete and recreate a topic, on the same
> > cluster.
> > > > >>>> Regarding the logic to properly remove a cluster, a user could
> > > > certainly
> > > > >>>> support it with a custom KafkaMetadataService--I intended to
> keep
> > > the
> > > > >>>> ConfigMap implementation basic for simple use cases (so users
> here
> > > > would
> > > > >>>> rely on manual monitoring or something built externally).
> However,
> > > > I'm open
> > > > >>>> to the idea if the usage changes and maybe there could be
> > > > improvements to
> > > > >>>> the Flink metric API to achieve more seamless integration. And
> > > > finally,
> > > > >>>> yes, the semantics are as such and gives reason to my response
> for
> > > > question
> > > > >>>> 2.
> > > > >>>>
> > > > >>>> I've updated the doc with more context from my question
> responses,
> > > let
> > > > >>>> me know if there are more questions!
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Mason
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels <
> > m...@apache.org>
> > > > >>>> wrote:
> > > > >>>>
> > > > >>>>> Hey Mason,
> > > > >>>>>
> > > > >>>>> I just had a look at the FLIP. If I understand correctly, you
> are
> > > > >>>>> proposing a very sophisticated way to read from multiple Kafka
> > > > >>>>> clusters
> > > > >>>>> / topics.
> > > > >>>>>
> > > > >>>>> I'm wondering whether we can share some of the code of the
> > existing
> > > > >>>>> KafkaSource. I suppose you don't want to modify KafkaSource
> > itself
> > > to
> > > > >>>>> avoid any breakage. But it would be good not to duplicate too
> > much
> > > > >>>>> code,
> > > > >>>>>   such that functionality that can be shared between the two
> > > > >>>>> implementations (e.g. the reader implementation).
> > > > >>>>>
> > > > >>>>> Some questions that I had when browsing the current version:
> > > > >>>>>
> > > > >>>>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
> > > > >>>>>
> > > > >>>>> 1. Why does KafkaMetadataService have the `describeStream`
> method
> > > in
> > > > >>>>> addition to `getAllStreams`? This may be redundant. Is the idea
> > to
> > > > get
> > > > >>>>> the updated metadata for a particular StreamId?
> > > > >>>>>
> > > > >>>>> 2. KafkaMetaDataService#isClusterActive serves the purpose to
> > check
> > > > >>>>> for
> > > > >>>>> activeness, couldn't this be included in the metadata of
> > > KafkaStream?
> > > > >>>>>
> > > > >>>>> 3. Shouldn't MetadataUpdateEvent contain a full list of
> > KafkaStream
> > > > >>>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`?
> > > > >>>>>
> > > > >>>>> 4. "In addition, restarting enumerators involve clearing
> outdated
> > > > >>>>> metrics" What metrics are we talking about here?
> > > > >>>>>
> > > > >>>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me. How
> > about
> > > > >>>>> `MultiKafkaSplitEnumeratorContext`?
> > > > >>>>>
> > > > >>>>> 6. What about the ConfigMap implementation? Are there any
> > > limitations
> > > > >>>>> on
> > > > >>>>> the type of configuration changes that we want to allow? For
> > > example,
> > > > >>>>> is
> > > > >>>>> it allowed to remove a cluster before it has been drained /
> > > > >>>>> deactivated?
> > > > >>>>> Is "not active" semantically identical to having the cluster /
> > > stream
> > > > >>>>> removed?
> > > > >>>>>
> > > > >>>>> This is an exciting new addition!
> > > > >>>>>
> > > > >>>>> Cheers,
> > > > >>>>> Max
> > > > >>>>>
> > > > >>>>> On 11.08.22 10:10, Mason Chen wrote:
> > > > >>>>> > 5. At startup, GetMetadataUpdateEvent is also used to allow
> the
> > > > >>>>> > MultiClusterKafkaSourceReader to get the latest metadata from
> > the
> > > > >>>>> > enumerator to filter out invalid splits This is how the
> reader
> > > can
> > > > >>>>> solve
> > > > >>>>> > "removing" splits/topics in the startup case.
> > > > >>>>> >
> > > > >>>>> > Sorry for the late response, really appreciate you taking a
> > look
> > > at
> > > > >>>>> the
> > > > >>>>> > FLIP!
> > > > >>>>> >
> > > > >>>>> > Best,
> > > > >>>>> > Mason
> > > > >>>>> >
> > > > >>>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <
> > > mas.chen6...@gmail.com
> > > > >
> > > > >>>>> wrote:
> > > > >>>>> >
> > > > >>>>> >> Hi Qingsheng,
> > > > >>>>> >>
> > > > >>>>> >> Thanks for the feedback--these are great points to raise.
> > > > >>>>> >>
> > > > >>>>> >> 1. This is something I missed that is now added. More
> > generally,
> > > > it
> > > > >>>>> can
> > > > >>>>> >> locate multiple topics in multiple clusters (1 topic on 1
> > > cluster
> > > > >>>>> is the
> > > > >>>>> >> simplest case).
> > > > >>>>> >>
> > > > >>>>> >> 2. The KafkaMetadataService doesn't interact with the
> > > > >>>>> KafkaAdminClients.
> > > > >>>>> >> This source merely composes the functionality of the
> > KafkaSource
> > > > so
> > > > >>>>> >> KafkaAdminClient interaction is handled by the
> > KafkaSubscriber.
> > > > >>>>> >>
> > > > >>>>> >> 3. There are no requirements for the two
> clusters--KafkaStream
> > > > >>>>> should
> > > > >>>>> >> clarify this question. For example, you could move from
> > > topicName1
> > > > >>>>> in
> > > > >>>>> >> cluster 1 with 11 partitions to topicName2 in cluster 2 with
> > 22
> > > > >>>>> >> partitions--only the KafkaStream id needs to remain the
> same.
> > If
> > > > >>>>> there are
> > > > >>>>> >> no offsets in checkpoint, the offsets are handled by the
> > offsets
> > > > >>>>> >> initializer from KafkaSource and currently the design only
> > > exposes
> > > > >>>>> 1 option
> > > > >>>>> >> for all Kafka clusters, although this could be a valuable
> > > > extension.
> > > > >>>>> >>
> > > > >>>>> >> 4. Regarding topic and cluster removal, metadata is
> checkpoint
> > > in
> > > > >>>>> state
> > > > >>>>> >> via the splits. Exactly once can be maintained with the
> > > assumption
> > > > >>>>> that
> > > > >>>>> >> required data from the dead cluster lives in the live
> cluster.
> > > > This
> > > > >>>>> can be
> > > > >>>>> >> solved by not destroying the old Kafka cluster until
> consumers
> > > are
> > > > >>>>> already
> > > > >>>>> >> drained. In switchover, the consumer would consume from both
> > old
> > > > >>>>> and new
> > > > >>>>> >> clusters. And finally, the metadata can be changed to point
> > only
> > > > to
> > > > >>>>> the new
> > > > >>>>> >> cluster when consumers are drained. With the regular
> > > KafkaSource,
> > > > >>>>> if Kafka
> > > > >>>>> >> deletes topic or a cluster is destroyed, the exactly once
> > > > semantics
> > > > >>>>> are not
> > > > >>>>> >> preserved and the semantic is tightly coupled with storage.
> > The
> > > > >>>>> design
> > > > >>>>> >> composes and delegates the responsibilities to KafkaSource
> > > > >>>>> components so it
> > > > >>>>> >> is limited to whatever KafkaSource can do for exactly once
> > > > >>>>> semantics.
> > > > >>>>> >>
> > > > >>>>> >> 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent was
> > > added
> > > > >>>>> to make
> > > > >>>>> >> the order of steps in reader restart during split assignment
> > > > >>>>> deterministic.
> > > > >>>>> >> StoppableKafkaEnumContextProxy are used by the underlying
> > > > >>>>> >> KafkaSourceEnumerator to assign splits and do topic periodic
> > > > >>>>> partition
> > > > >>>>> >> discovery. So, these scheduled thread pools need to be
> cleaned
> > > up
> > > > >>>>> properly
> > > > >>>>> >> and splits need to be wrapped with cluster information.
> These
> > > > >>>>> details are
> > > > >>>>> >> added to the FLIP.
> > > > >>>>> >>
> > > > >>>>> >> Best,
> > > > >>>>> >> Mason
> > > > >>>>> >>
> > > > >>>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <
> > > renqs...@gmail.com
> > > > >
> > > > >>>>> wrote:
> > > > >>>>> >>
> > > > >>>>> >>> Hi Mason,
> > > > >>>>> >>>
> > > > >>>>> >>> Thank you for starting this FLIP!
> > > > >>>>> >>>
> > > > >>>>> >>>  From my first glance this FLIP looks like a collection of
> > many
> > > > new
> > > > >>>>> >>> interfaces, but I can’t stitch them together. It’ll be
> great
> > to
> > > > >>>>> have some
> > > > >>>>> >>> brief descriptions about how the source works internally.
> > Here
> > > > are
> > > > >>>>> some
> > > > >>>>> >>> questions in my mind and please correct me if I
> misunderstand
> > > > your
> > > > >>>>> design.
> > > > >>>>> >>>
> > > > >>>>> >>> 1. I can’t find the definition (in code) of KafkaStream.
> As a
> > > > part
> > > > >>>>> of the
> > > > >>>>> >>> public interface KafkaMetadataService it has to be public
> > too.
> > > > If I
> > > > >>>>> >>> understand correctly it locates a topic on a specific
> > cluster.
> > > > >>>>> >>>
> > > > >>>>> >>> 2. I think there should be a default implementation /
> example
> > > for
> > > > >>>>> >>> KafkaMetadataService for out-of-box usage, for example a
> > > wrapper
> > > > of
> > > > >>>>> >>> multiple Kafka AdminClients that watching clusters
> > > periodically.
> > > > >>>>> >>>
> > > > >>>>> >>> 3. It looks like the source has the ability to handle Kafka
> > > > cluster
> > > > >>>>> >>> failures, like switching connections to another cluster
> > without
> > > > >>>>> restarting
> > > > >>>>> >>> the Flink job. Is there any requirement for the two
> clusters?
> > > For
> > > > >>>>> example
> > > > >>>>> >>> they have to be identical in topic names, number of
> > partitions
> > > > and
> > > > >>>>> offsets
> > > > >>>>> >>> etc.
> > > > >>>>> >>>
> > > > >>>>> >>> 4. Regarding topic and cluster removal, how to handle and
> > > recover
> > > > >>>>> from
> > > > >>>>> >>> checkpoint? Let’s say a topic is removed or migrated to
> > another
> > > > >>>>> cluster
> > > > >>>>> >>> after a successful checkpoint. If the job tries to roll
> back
> > to
> > > > the
> > > > >>>>> >>> checkpoint which still contains the deleted topic or info
> of
> > a
> > > > dead
> > > > >>>>> >>> cluster, then how to keep the exactly-once semantic under
> > this
> > > > >>>>> case?
> > > > >>>>> >>>
> > > > >>>>> >>> 5. I don’t quite get the design of
> > > StoppableKafkaEnumContextProxy
> > > > >>>>> and the
> > > > >>>>> >>> GetMeradataUpdateEvent. Could you elaborate more in the
> FLIP?
> > > > >>>>> >>>
> > > > >>>>> >>> In a nutshell I think the idea of this FLIP is good, which
> > > > extends
> > > > >>>>> the
> > > > >>>>> >>> usage of Kafka source. However as a design doc, some
> details
> > > need
> > > > >>>>> to be
> > > > >>>>> >>> enriched for other users and developers to better
> understand
> > > how
> > > > >>>>> this
> > > > >>>>> >>> source works.
> > > > >>>>> >>>
> > > > >>>>> >>> Best,
> > > > >>>>> >>> Qingsheng
> > > > >>>>> >>>
> > > > >>>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen <
> > mas.chen6...@gmail.com
> > > >
> > > > >>>>> wrote:
> > > > >>>>> >>>>
> > > > >>>>> >>>> Hi all,
> > > > >>>>> >>>>
> > > > >>>>> >>>> We would like to start a discussion thread on FLIP-246:
> > Multi
> > > > >>>>> Cluster
> > > > >>>>> >>> Kafka
> > > > >>>>> >>>> Source [1] where we propose to provide a source connector
> > for
> > > > >>>>> >>> dynamically
> > > > >>>>> >>>> reading from Kafka multiple clusters, which will not
> require
> > > > >>>>> Flink job
> > > > >>>>> >>>> restart. This can greatly improve the Kafka migration
> > > experience
> > > > >>>>> for
> > > > >>>>> >>>> clusters and topics, and it solves some existing problems
> > with
> > > > the
> > > > >>>>> >>> current
> > > > >>>>> >>>> KafkaSource. There was some interest from users [2] from a
> > > > meetup
> > > > >>>>> and
> > > > >>>>> >>> the
> > > > >>>>> >>>> mailing list. Looking forward to comments and feedback,
> > > thanks!
> > > > >>>>> >>>>
> > > > >>>>> >>>> [1]
> > > > >>>>> >>>>
> > > > >>>>> >>>
> > > > >>>>>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
> > > > >>>>> >>>> [2]
> > > > >>>>>
> https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc
> > > > >>>>> >>>>
> > > > >>>>> >>>> Best,
> > > > >>>>> >>>> Mason
> > > > >>>>> >>>
> > > > >>>>> >>>
> > > > >>>>> >
> > > > >>>>>
> > > > >>>>
> > > >
> > >
> >
>

Reply via email to