>
> My main worry for doing this as a later iteration is that this would
> probably be a breaking change for the public interface. If that can be
> avoided and planned ahead, I'm fine with moving forward with how it is
> right now.


Make sense. Considering the public interfaces, I think we still want to
provide clients the ability to pin certain configurations in the
builder--however, cluster specific configurations may not be known upfront
or generalize to all clusters so there would need to be changes in the
`KafkaMetadataService` interface. This could be achieved by exposing via:

1. A separate API (e.g. `Map<KafkaClusterIdentifier, Properties>
getKafkaClusterProperties()`) in KafkaMetadataService
2. In `KafkaClusterIdentifier` as this already contains some configuration
(e.g. Bootstrap server) in which case we should rename the class to
something like `KafkaCluster` as it is no longer just an identifier
3. Reorganize the metadata in a Map<String, ClusterMetadata> in
`KafkaStream` where the String is the proposed
`KafkaClusterIdentifier.name` field.

I am preferring option 3 since this simplifies equals() checks on
KafkaClusterIdentifier (e.g. is it the name, bootstrap, or both?).

Small correction for the MultiClusterKafkaSourceEnumerator section: "This
> reader is responsible for discovering and assigning splits from 1+ cluster"

Thanks for the catch!

the defining characteristic is the dynamic discovery vs. the fact that
> multiple clusters [...]



I think the "Table" in the name of those SQL connectors should avoid
> confusion. Perhaps we can also solicit other ideas? I would throw
> "DiscoveringKafkaSource" into the mix.

 Agreed with Gordon's and your suggestions. Right, the only public facing
name for SQL is `kafka` for the SQL connector identifier. Based on your
suggestions:

1. MultiClusterKafkaSource
2. DynamicKafkaSource
3. DiscoveringKafkaSource
4. MutableKafkaSource
5. AdaptiveKafkaSource

I added a few of my own. I do prefer 2. What do others think?

Best,
Mason

On Sun, Jun 11, 2023 at 1:12 PM Thomas Weise <t...@apache.org> wrote:

> Hi Mason,
>
> Thanks for the iterations on the FLIP, I think this is in a very good shape
> now.
>
> Small correction for the MultiClusterKafkaSourceEnumerator section: "This
> reader is responsible for discovering and assigning splits from 1+ cluster"
>
> Regarding the user facing name of the connector: I agree with Gordon that
> the defining characteristic is the dynamic discovery vs. the fact that
> multiple clusters may be consumed in parallel. (Although, as described in
> the FLIP, lossless consumer migration only works with a strategy that
> involves intermittent parallel consumption of old and new clusters to drain
> and switch.)
>
> I think the "Table" in the name of those SQL connectors should avoid
> confusion. Perhaps we can also solicit other ideas? I would throw
> "DiscoveringKafkaSource" into the mix.
>
> Cheers,
> Thomas
>
>
>
>
> On Fri, Jun 9, 2023 at 3:40 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
> > > Regarding (2), definitely. This is something we planned to add later on
> > but
> > so far keeping things common has been working well.
> >
> > My main worry for doing this as a later iteration is that this would
> > probably be a breaking change for the public interface. If that can be
> > avoided and planned ahead, I'm fine with moving forward with how it is
> > right now.
> >
> > > DynamicKafkaSource may be confusing because it is really similar to the
> > KafkaDynamicSource/Sink (table connectors).
> >
> > The table / sql Kafka connectors (KafkaDynamicTableFactory,
> > KafkaDynamicTableSource / KafkaDynamicTableSink) are all internal classes
> > not really meant to be exposed to the user though.
> > It can cause some confusion internally for the code maintainers, but on
> the
> > actual public surface I don't see this being an issue.
> >
> > Thanks,
> > Gordon
> >
> > On Wed, Jun 7, 2023 at 8:55 PM Mason Chen <mas.chen6...@gmail.com>
> wrote:
> >
> > > Hi Gordon,
> > >
> > > Thanks for taking a look!
> > >
> > > Regarding (1), there is a need from the readers to send this event at
> > > startup because the reader state may reflect outdated metadata. Thus,
> the
> > > reader should not start without fresh metadata. With fresh metadata,
> the
> > > reader can filter splits from state--this filtering capability is
> > > ultimately how we solve the common issue of "I re-configured my Kafka
> > > source and removed some topic, but it refers to the old topic due to
> > state
> > > *[1]*". I did not mention this because I thought this is more of a
> detail
> > > but I'll make a brief note of it.
> > >
> > > Regarding (2), definitely. This is something we planned to add later on
> > but
> > > so far keeping things common has been working well. In that regard, yes
> > the
> > > metadata service should expose these configurations but the source
> should
> > > not check it into state unlike the other metadata. I'm going to add it
> > to a
> > > section called "future enhancements". This is also feedback that Ryan,
> an
> > > interested user, gave earlier in this thread.
> > >
> > > Regarding (3), that's definitely a good point and there are some real
> use
> > > cases, in addition to what you mentioned, to use this in single cluster
> > > mode (see *[1] *above). DynamicKafkaSource may be confusing because it
> is
> > > really similar to the KafkaDynamicSource/Sink (table connectors).
> > >
> > > Best,
> > > Mason
> > >
> > > On Wed, Jun 7, 2023 at 10:40 AM Tzu-Li (Gordon) Tai <
> tzuli...@apache.org
> > >
> > > wrote:
> > >
> > > > Hi Mason,
> > > >
> > > > Thanks for updating the FLIP. In principle, I believe this would be a
> > > > useful addition. Some comments so far:
> > > >
> > > > 1. In this sequence diagram [1], why is there a need for a
> > > > GetMetadataUpdateEvent from the MultiClusterSourceReader going to the
> > > > MultiClusterSourceEnumerator? Shouldn't the enumerator simply start
> > > sending
> > > > metadata update events to the reader once it is registered at the
> > > > enumerator?
> > > >
> > > > 2. Looking at the new builder API, there's a few configurations that
> > are
> > > > common across *all *discovered Kafka clusters / topics, specifically
> > the
> > > > deserialization schema, offset initialization strategy, Kafka client
> > > > properties, and consumer group ID. Is there any use case that users
> > would
> > > > want to have these configurations differ across different Kafka
> > clusters?
> > > > If that's the case, would it make more sense to encapsulate these
> > > > configurations to be owned by the metadata service?
> > > >
> > > > 3. Is MultiClusterKafkaSource the best name for this connector? I
> find
> > > that
> > > > the dynamic aspect of Kafka connectivity to be a more defining
> > > > characteristic, and that is the main advantage it has compared to the
> > > > static KafkaSource. A user may want to use this new connector over
> > > > KafkaSource even if they're just consuming from a single Kafka
> cluster;
> > > for
> > > > example, one immediate use case I can think of is Kafka
> repartitioning
> > > with
> > > > zero Flink job downtime. They create a new topic with higher
> > parallelism
> > > > and repartition their Kafka records from the old topic to the new
> > topic,
> > > > and they want the consuming Flink job to be able to move from the old
> > > topic
> > > > to the new topic with zero-downtime while retaining exactly-once
> > > > guarantees. So, perhaps DynamicKafkaSource is a better name for this
> > > > connector?
> > > >
> > > > Thanks,
> > > > Gordon
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source?preview=/217389320/255072018/image-2023-6-7_2-29-13.png
> > > >
> > > > On Wed, Jun 7, 2023 at 3:07 AM Mason Chen <mas.chen6...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Jing,
> > > > >
> > > > > Thanks for the prompt feedback! I had some confusion with how to
> > resize
> > > > > images in confluence--anyways, I have made the font bigger, added
> > white
> > > > > background, and also made the diagrams themselves bigger.
> > > > >
> > > > > Regarding the exactly once semantics, that's definitely good to
> point
> > > out
> > > > > in the doc. Thus, I have broken out my "Basic Idea" section into:
> > > > > 1. an intro
> > > > > 2. details about KafkaMetadataService
> > > > > 3. details about KafkaStream and KafkaClusterId (the metadata)
> > > > > 4. details about exactly once semantics and consistency guarantees
> > > > >
> > > > > This should give readers enough context about the design goals and
> > > > > interactions before deep diving into the class interfaces.
> > > > >
> > > > > Best,
> > > > > Mason
> > > > >
> > > > > On Tue, Jun 6, 2023 at 1:25 PM Jing Ge <j...@ververica.com.invalid
> >
> > > > wrote:
> > > > >
> > > > > > Hi Mason,
> > > > > >
> > > > > > It is a very practical feature that many users are keen to use.
> > > Thanks
> > > > to
> > > > > > the previous discussion, the FLIP now looks informative. Thanks
> for
> > > > your
> > > > > > proposal. One small suggestion is that the attached images are
> > quite
> > > > > small
> > > > > > to read if we don't click and enlarge them. Besides that, It is
> > > > difficult
> > > > > > to read the text on the current sequence diagram because it has a
> > > > > > transparent background. Would you like to replace it with a white
> > > > > > background?
> > > > > >
> > > > > > Exactly-one is one of the key features of Kafka connector. I have
> > the
> > > > > same
> > > > > > concern as Qingsheng. Since you have answered questions about it
> > > > > > previously, would you like to create an extra section in your
> FLIP
> > to
> > > > > > explicitly describe scenarios when exactly-one is supported and
> > when
> > > it
> > > > > is
> > > > > > not?
> > > > > >
> > > > > > Best regards,
> > > > > > Jing
> > > > > >
> > > > > > On Mon, Jun 5, 2023 at 11:41 PM Mason Chen <
> mas.chen6...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I'm working on FLIP-246 again, for the Multi Cluster Kafka
> Source
> > > > > > > contribution. The document has been updated with some more
> > context
> > > > > about
> > > > > > > how it can solve the Kafka topic removal scenario and a
> sequence
> > > > > diagram
> > > > > > to
> > > > > > > illustrate how the components interact.
> > > > > > >
> > > > > > > Looking forward to any feedback!
> > > > > > >
> > > > > > > Best,
> > > > > > > Mason
> > > > > > >
> > > > > > > On Wed, Oct 12, 2022 at 11:12 PM Mason Chen <
> > > mas.chen6...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Ryan,
> > > > > > > >
> > > > > > > > Thanks for the additional context! Yes, the offset
> initializer
> > > > would
> > > > > > need
> > > > > > > > to take a cluster as a parameter and the
> > > > MultiClusterKafkaSourceSplit
> > > > > > can
> > > > > > > > be exposed in an initializer.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Mason
> > > > > > > >
> > > > > > > > On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot <
> > > > > > > > ryan.vanhuuksl...@shopify.com> wrote:
> > > > > > > >
> > > > > > > >> Hi Mason,
> > > > > > > >>
> > > > > > > >> Thanks for the clarification! In regards to the addition to
> > the
> > > > > > > >> OffsetInitializer of this API - this would be an awesome
> > > addition
> > > > > and
> > > > > > I
> > > > > > > >> think this entire FLIP would be a great addition to the
> Flink.
> > > > > > > >>
> > > > > > > >> To provide more context as to why we need particular
> offsets,
> > we
> > > > use
> > > > > > > >> Hybrid Source to currently backfill from buckets prior to
> > > reading
> > > > > from
> > > > > > > >> Kafka. We have a service that will tell us what offset has
> > last
> > > > been
> > > > > > > loaded
> > > > > > > >> into said bucket which we will use to initialize the
> > KafkaSource
> > > > > > > >> OffsetsInitializer. We couldn't use a timestamp here and the
> > > > offset
> > > > > > > would
> > > > > > > >> be different for each Cluster.
> > > > > > > >>
> > > > > > > >> In pseudocode, we'd want the ability to do something like
> this
> > > > with
> > > > > > > >> HybridSources - if this is possible.
> > > > > > > >>
> > > > > > > >> ```scala
> > > > > > > >> val offsetsMetadata: Map[TopicPartition, Long] = // Get
> > current
> > > > > > offsets
> > > > > > > >> from OffsetReaderService
> > > > > > > >> val multiClusterArchiveSource: MultiBucketFileSource[T] = //
> > > Data
> > > > is
> > > > > > > read
> > > > > > > >> from different buckets (multiple topics)
> > > > > > > >> val multiClusterKafkaSource: MultiClusterKafkaSource[T] =
> > > > > > > >> MultiClusterKafkaSource.builder()
> > > > > > > >>   .setKafkaMetadataService(new KafkaMetadataServiceImpl())
> > > > > > > >>   .setStreamIds(List.of("my-stream-1", "my-stream-2"))
> > > > > > > >>   .setGroupId("myConsumerGroup")
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
> > > > > > > >>   .setStartingOffsets(offsetsMetadata)
> > > > > > > >>   .setProperties(properties)
> > > > > > > >>   .build()
> > > > > > > >> val source =
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build()
> > > > > > > >> ```
> > > > > > > >>
> > > > > > > >> Few notes:
> > > > > > > >> - TopicPartition won't work because the topic may be the
> same
> > > name
> > > > > as
> > > > > > > >> this is something that is supported IIRC
> > > > > > > >> - I chose to pass a map into starting offsets just for
> > > > demonstrative
> > > > > > > >> purposes, I would be fine with whatever data structure would
> > > work
> > > > > best
> > > > > > > >>
> > > > > > > >> Ryan van Huuksloot
> > > > > > > >> Data Developer | Production Engineering | Streaming
> > Capabilities
> > > > > > > >> [image: Shopify]
> > > > > > > >> <
> > > > > > >
> > > > >
> > >
> https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen <
> > > > mas.chen6...@gmail.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >>> Hi Ryan,
> > > > > > > >>>
> > > > > > > >>> Just copying your message over to the email chain.
> > > > > > > >>>
> > > > > > > >>> Hi Mason,
> > > > > > > >>>> First off, thanks for putting this FLIP together! Sorry
> for
> > > the
> > > > > > delay.
> > > > > > > >>>> Full disclosure Mason and I chatted a little bit at Flink
> > > > Forward
> > > > > > > 2022 but
> > > > > > > >>>> I have tried to capture the questions I had for him then.
> > > > > > > >>>> I'll start the conversation with a few questions:
> > > > > > > >>>> 1. The concept of streamIds is not clear to me in the
> > proposal
> > > > and
> > > > > > > >>>> could use some more information. If I understand
> correctly,
> > > they
> > > > > > will
> > > > > > > be
> > > > > > > >>>> used in the MetadataService to link KafkaClusters to ones
> > you
> > > > want
> > > > > > to
> > > > > > > use?
> > > > > > > >>>> If you assign stream ids using `setStreamIds`, how can you
> > > > > > dynamically
> > > > > > > >>>> increase the number of clusters you consume if the list of
> > > > > StreamIds
> > > > > > > is
> > > > > > > >>>> static? I am basing this off of your example
> > > > > .setStreamIds(List.of(
> > > > > > > >>>> "my-stream-1", "my-stream-2")) so I could be off base with
> > my
> > > > > > > >>>> assumption. If you don't mind clearing up the intention,
> > that
> > > > > would
> > > > > > be
> > > > > > > >>>> great!
> > > > > > > >>>> 2. How would offsets work if you wanted to use this
> > > > > > > >>>> MultiClusterKafkaSource with a file based backfill? In the
> > > case
> > > > I
> > > > > am
> > > > > > > >>>> thinking of, you have a bucket backed archive of Kafka
> data
> > > per
> > > > > > > cluster.
> > > > > > > >>>> and you want to pick up from the last offset in the
> archived
> > > > > system,
> > > > > > > how
> > > > > > > >>>> would you set OffsetInitializers "per cluster" potentially
> > as
> > > a
> > > > > > > function or
> > > > > > > >>>> are you limited to setting an OffsetInitializer for the
> > entire
> > > > > > Source?
> > > > > > > >>>> 3. Just to make sure - because this system will layer on
> top
> > > of
> > > > > > > >>>> Flink-27 and use KafkaSource for some aspects under the
> > hood,
> > > > the
> > > > > > > watermark
> > > > > > > >>>> alignment that was introduced in FLIP-182 / Flink 1.15
> would
> > > be
> > > > > > > possible
> > > > > > > >>>> across multiple clusters if you assign them to the same
> > > > alignment
> > > > > > > group?
> > > > > > > >>>> Thanks!
> > > > > > > >>>> Ryan
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> 1. The stream ids are static--however, what the physical
> > > clusters
> > > > > and
> > > > > > > >>> topics that they map to can mutate. Let's say my-stream-1
> > maps
> > > to
> > > > > > > cluster-1
> > > > > > > >>> and topic-1. The KafkaMetadataService can return a
> different
> > > > > mapping
> > > > > > > when
> > > > > > > >>> metadata is fetched the next time e.g. my-stream-1 mapping
> to
> > > > > > > cluster-1 and
> > > > > > > >>> topic-1, and cluster-2 and topic-2. Let me add more details
> > on
> > > > how
> > > > > > the
> > > > > > > >>> KafkaMetadataService is used.
> > > > > > > >>> 2. The current design limits itself to a single configured
> > > > > > > >>> OffsetInitializer that is used for every underlying
> > > KafkaSource.
> > > > > > > >>> 3. Yes, it is in our plan to integrate this source with
> > > watermark
> > > > > > > >>> alignment in which the user can align watermarks from all
> > > > clusters
> > > > > > > within
> > > > > > > >>> the single. It will leverage the Kafka Source
> implementation
> > to
> > > > > > achieve
> > > > > > > >>> this.
> > > > > > > >>>
> > > > > > > >>> With regards to 2, it's an interesting idea. I think we can
> > > > extend
> > > > > > the
> > > > > > > >>> design to support a map of offset initializers to clusters,
> > > which
> > > > > > would
> > > > > > > >>> solve your file based backfill. If you initialize the
> source
> > > > with a
> > > > > > > single
> > > > > > > >>> timestamp, the current design may work for your usecase,
> but
> > I
> > > > > can't
> > > > > > > tell
> > > > > > > >>> without more details. Thanks for your interest and sorry
> for
> > > the
> > > > > > delay!
> > > > > > > >>>
> > > > > > > >>> Best,
> > > > > > > >>> Mason
> > > > > > > >>>
> > > > > > > >>> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen <
> > > > > mas.chen6...@gmail.com>
> > > > > > > >>> wrote:
> > > > > > > >>>
> > > > > > > >>>> Hi Max,
> > > > > > > >>>>
> > > > > > > >>>> Thanks for taking a look!
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>> I'm wondering whether we can share some of the code of
> the
> > > > > existing
> > > > > > > >>>>> KafkaSource.
> > > > > > > >>>>>
> > > > > > > >>>>
> > > > > > > >>>> That is the intention--let me call it out more explicitly.
> > > > > > > >>>>
> > > > > > > >>>> Regarding your questions:
> > > > > > > >>>>
> > > > > > > >>>> 1. Indeed, the KafkaMetadataService has the describe
> stream
> > > > method
> > > > > > to
> > > > > > > >>>> get a particular stream id. We decided to support getting
> > all
> > > > the
> > > > > > > streams
> > > > > > > >>>> for subscribing via a regex pattern (similar to the Kafka
> > > Source
> > > > > > > >>>> implementation).
> > > > > > > >>>>
> > > > > > > >>>> 2. The idea was that if metadata is removed that it is no
> > > longer
> > > > > > > active.
> > > > > > > >>>>
> > > > > > > >>>> 3. The MetadataUpdateEvent's format is specifically for
> > > > > > communicating
> > > > > > > >>>> to the reader what the clusters and topics it should read
> > > from.
> > > > It
> > > > > > > doesn't
> > > > > > > >>>> need stream information since it doesn't interact with the
> > > > > > > >>>> KafkaMetadataService (only the enumerator interacts with
> > it).
> > > > > > > >>>>
> > > > > > > >>>> 4. Metrics will be reported per cluster. For example,
> > > > KafkaSource
> > > > > > > >>>> already reports pendingRecords and the corresponding
> metric,
> > > for
> > > > > > > example
> > > > > > > >>>> for cluster0, would be a metric called
> > > > > > > >>>>
> > > `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`.
> > > > In
> > > > > > > cluster
> > > > > > > >>>> removal, these metrics wouldn't be valid so the
> > implementation
> > > > can
> > > > > > > close
> > > > > > > >>>> them.
> > > > > > > >>>>
> > > > > > > >>>> 5. I'm fine with that name; however, I got some feedback
> > > > > internally
> > > > > > > >>>> since the bulk of the logic is in stopping the scheduled
> > tasks
> > > > of
> > > > > > the
> > > > > > > >>>> underlying enumerators and handling cluster unavailability
> > > edge
> > > > > > > cases. I'm
> > > > > > > >>>> open to changing the name if the design changes (it is an
> > > > internal
> > > > > > > class
> > > > > > > >>>> anyways, so we can make these name changes without
> breaking
> > > > > users).
> > > > > > > >>>>
> > > > > > > >>>> 6. Yes, there are some limitations but I have not
> > > > > > > >>>> considered implementing that in the basic ConfigMap
> > > > > > > >>>> implementation--currently users are allowed to do any
> > changes.
> > > > For
> > > > > > > example,
> > > > > > > >>>> a user should not delete and recreate a topic, on the same
> > > > > cluster.
> > > > > > > >>>> Regarding the logic to properly remove a cluster, a user
> > could
> > > > > > > certainly
> > > > > > > >>>> support it with a custom KafkaMetadataService--I intended
> to
> > > > keep
> > > > > > the
> > > > > > > >>>> ConfigMap implementation basic for simple use cases (so
> > users
> > > > here
> > > > > > > would
> > > > > > > >>>> rely on manual monitoring or something built externally).
> > > > However,
> > > > > > > I'm open
> > > > > > > >>>> to the idea if the usage changes and maybe there could be
> > > > > > > improvements to
> > > > > > > >>>> the Flink metric API to achieve more seamless integration.
> > And
> > > > > > > finally,
> > > > > > > >>>> yes, the semantics are as such and gives reason to my
> > response
> > > > for
> > > > > > > question
> > > > > > > >>>> 2.
> > > > > > > >>>>
> > > > > > > >>>> I've updated the doc with more context from my question
> > > > responses,
> > > > > > let
> > > > > > > >>>> me know if there are more questions!
> > > > > > > >>>>
> > > > > > > >>>> Best,
> > > > > > > >>>> Mason
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>>
> > > > > > > >>>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels <
> > > > > m...@apache.org>
> > > > > > > >>>> wrote:
> > > > > > > >>>>
> > > > > > > >>>>> Hey Mason,
> > > > > > > >>>>>
> > > > > > > >>>>> I just had a look at the FLIP. If I understand correctly,
> > you
> > > > are
> > > > > > > >>>>> proposing a very sophisticated way to read from multiple
> > > Kafka
> > > > > > > >>>>> clusters
> > > > > > > >>>>> / topics.
> > > > > > > >>>>>
> > > > > > > >>>>> I'm wondering whether we can share some of the code of
> the
> > > > > existing
> > > > > > > >>>>> KafkaSource. I suppose you don't want to modify
> KafkaSource
> > > > > itself
> > > > > > to
> > > > > > > >>>>> avoid any breakage. But it would be good not to duplicate
> > too
> > > > > much
> > > > > > > >>>>> code,
> > > > > > > >>>>>   such that functionality that can be shared between the
> > two
> > > > > > > >>>>> implementations (e.g. the reader implementation).
> > > > > > > >>>>>
> > > > > > > >>>>> Some questions that I had when browsing the current
> > version:
> > > > > > > >>>>>
> > > > > > > >>>>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
> > > > > > > >>>>>
> > > > > > > >>>>> 1. Why does KafkaMetadataService have the
> `describeStream`
> > > > method
> > > > > > in
> > > > > > > >>>>> addition to `getAllStreams`? This may be redundant. Is
> the
> > > idea
> > > > > to
> > > > > > > get
> > > > > > > >>>>> the updated metadata for a particular StreamId?
> > > > > > > >>>>>
> > > > > > > >>>>> 2. KafkaMetaDataService#isClusterActive serves the
> purpose
> > to
> > > > > check
> > > > > > > >>>>> for
> > > > > > > >>>>> activeness, couldn't this be included in the metadata of
> > > > > > KafkaStream?
> > > > > > > >>>>>
> > > > > > > >>>>> 3. Shouldn't MetadataUpdateEvent contain a full list of
> > > > > KafkaStream
> > > > > > > >>>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`?
> > > > > > > >>>>>
> > > > > > > >>>>> 4. "In addition, restarting enumerators involve clearing
> > > > outdated
> > > > > > > >>>>> metrics" What metrics are we talking about here?
> > > > > > > >>>>>
> > > > > > > >>>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me.
> > How
> > > > > about
> > > > > > > >>>>> `MultiKafkaSplitEnumeratorContext`?
> > > > > > > >>>>>
> > > > > > > >>>>> 6. What about the ConfigMap implementation? Are there any
> > > > > > limitations
> > > > > > > >>>>> on
> > > > > > > >>>>> the type of configuration changes that we want to allow?
> > For
> > > > > > example,
> > > > > > > >>>>> is
> > > > > > > >>>>> it allowed to remove a cluster before it has been
> drained /
> > > > > > > >>>>> deactivated?
> > > > > > > >>>>> Is "not active" semantically identical to having the
> > cluster
> > > /
> > > > > > stream
> > > > > > > >>>>> removed?
> > > > > > > >>>>>
> > > > > > > >>>>> This is an exciting new addition!
> > > > > > > >>>>>
> > > > > > > >>>>> Cheers,
> > > > > > > >>>>> Max
> > > > > > > >>>>>
> > > > > > > >>>>> On 11.08.22 10:10, Mason Chen wrote:
> > > > > > > >>>>> > 5. At startup, GetMetadataUpdateEvent is also used to
> > allow
> > > > the
> > > > > > > >>>>> > MultiClusterKafkaSourceReader to get the latest
> metadata
> > > from
> > > > > the
> > > > > > > >>>>> > enumerator to filter out invalid splits This is how the
> > > > reader
> > > > > > can
> > > > > > > >>>>> solve
> > > > > > > >>>>> > "removing" splits/topics in the startup case.
> > > > > > > >>>>> >
> > > > > > > >>>>> > Sorry for the late response, really appreciate you
> > taking a
> > > > > look
> > > > > > at
> > > > > > > >>>>> the
> > > > > > > >>>>> > FLIP!
> > > > > > > >>>>> >
> > > > > > > >>>>> > Best,
> > > > > > > >>>>> > Mason
> > > > > > > >>>>> >
> > > > > > > >>>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <
> > > > > > mas.chen6...@gmail.com
> > > > > > > >
> > > > > > > >>>>> wrote:
> > > > > > > >>>>> >
> > > > > > > >>>>> >> Hi Qingsheng,
> > > > > > > >>>>> >>
> > > > > > > >>>>> >> Thanks for the feedback--these are great points to
> > raise.
> > > > > > > >>>>> >>
> > > > > > > >>>>> >> 1. This is something I missed that is now added. More
> > > > > generally,
> > > > > > > it
> > > > > > > >>>>> can
> > > > > > > >>>>> >> locate multiple topics in multiple clusters (1 topic
> on
> > 1
> > > > > > cluster
> > > > > > > >>>>> is the
> > > > > > > >>>>> >> simplest case).
> > > > > > > >>>>> >>
> > > > > > > >>>>> >> 2. The KafkaMetadataService doesn't interact with the
> > > > > > > >>>>> KafkaAdminClients.
> > > > > > > >>>>> >> This source merely composes the functionality of the
> > > > > KafkaSource
> > > > > > > so
> > > > > > > >>>>> >> KafkaAdminClient interaction is handled by the
> > > > > KafkaSubscriber.
> > > > > > > >>>>> >>
> > > > > > > >>>>> >> 3. There are no requirements for the two
> > > > clusters--KafkaStream
> > > > > > > >>>>> should
> > > > > > > >>>>> >> clarify this question. For example, you could move
> from
> > > > > > topicName1
> > > > > > > >>>>> in
> > > > > > > >>>>> >> cluster 1 with 11 partitions to topicName2 in cluster
> 2
> > > with
> > > > > 22
> > > > > > > >>>>> >> partitions--only the KafkaStream id needs to remain
> the
> > > > same.
> > > > > If
> > > > > > > >>>>> there are
> > > > > > > >>>>> >> no offsets in checkpoint, the offsets are handled by
> the
> > > > > offsets
> > > > > > > >>>>> >> initializer from KafkaSource and currently the design
> > only
> > > > > > exposes
> > > > > > > >>>>> 1 option
> > > > > > > >>>>> >> for all Kafka clusters, although this could be a
> > valuable
> > > > > > > extension.
> > > > > > > >>>>> >>
> > > > > > > >>>>> >> 4. Regarding topic and cluster removal, metadata is
> > > > checkpoint
> > > > > > in
> > > > > > > >>>>> state
> > > > > > > >>>>> >> via the splits. Exactly once can be maintained with
> the
> > > > > > assumption
> > > > > > > >>>>> that
> > > > > > > >>>>> >> required data from the dead cluster lives in the live
> > > > cluster.
> > > > > > > This
> > > > > > > >>>>> can be
> > > > > > > >>>>> >> solved by not destroying the old Kafka cluster until
> > > > consumers
> > > > > > are
> > > > > > > >>>>> already
> > > > > > > >>>>> >> drained. In switchover, the consumer would consume
> from
> > > both
> > > > > old
> > > > > > > >>>>> and new
> > > > > > > >>>>> >> clusters. And finally, the metadata can be changed to
> > > point
> > > > > only
> > > > > > > to
> > > > > > > >>>>> the new
> > > > > > > >>>>> >> cluster when consumers are drained. With the regular
> > > > > > KafkaSource,
> > > > > > > >>>>> if Kafka
> > > > > > > >>>>> >> deletes topic or a cluster is destroyed, the exactly
> > once
> > > > > > > semantics
> > > > > > > >>>>> are not
> > > > > > > >>>>> >> preserved and the semantic is tightly coupled with
> > > storage.
> > > > > The
> > > > > > > >>>>> design
> > > > > > > >>>>> >> composes and delegates the responsibilities to
> > KafkaSource
> > > > > > > >>>>> components so it
> > > > > > > >>>>> >> is limited to whatever KafkaSource can do for exactly
> > once
> > > > > > > >>>>> semantics.
> > > > > > > >>>>> >>
> > > > > > > >>>>> >> 5. Yes, I added more in the FLIP.
> GetMetadataUpdateEvent
> > > was
> > > > > > added
> > > > > > > >>>>> to make
> > > > > > > >>>>> >> the order of steps in reader restart during split
> > > assignment
> > > > > > > >>>>> deterministic.
> > > > > > > >>>>> >> StoppableKafkaEnumContextProxy are used by the
> > underlying
> > > > > > > >>>>> >> KafkaSourceEnumerator to assign splits and do topic
> > > periodic
> > > > > > > >>>>> partition
> > > > > > > >>>>> >> discovery. So, these scheduled thread pools need to be
> > > > cleaned
> > > > > > up
> > > > > > > >>>>> properly
> > > > > > > >>>>> >> and splits need to be wrapped with cluster
> information.
> > > > These
> > > > > > > >>>>> details are
> > > > > > > >>>>> >> added to the FLIP.
> > > > > > > >>>>> >>
> > > > > > > >>>>> >> Best,
> > > > > > > >>>>> >> Mason
> > > > > > > >>>>> >>
> > > > > > > >>>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <
> > > > > > renqs...@gmail.com
> > > > > > > >
> > > > > > > >>>>> wrote:
> > > > > > > >>>>> >>
> > > > > > > >>>>> >>> Hi Mason,
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>> Thank you for starting this FLIP!
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>>  From my first glance this FLIP looks like a
> collection
> > > of
> > > > > many
> > > > > > > new
> > > > > > > >>>>> >>> interfaces, but I can’t stitch them together. It’ll
> be
> > > > great
> > > > > to
> > > > > > > >>>>> have some
> > > > > > > >>>>> >>> brief descriptions about how the source works
> > internally.
> > > > > Here
> > > > > > > are
> > > > > > > >>>>> some
> > > > > > > >>>>> >>> questions in my mind and please correct me if I
> > > > misunderstand
> > > > > > > your
> > > > > > > >>>>> design.
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>> 1. I can’t find the definition (in code) of
> > KafkaStream.
> > > > As a
> > > > > > > part
> > > > > > > >>>>> of the
> > > > > > > >>>>> >>> public interface KafkaMetadataService it has to be
> > public
> > > > > too.
> > > > > > > If I
> > > > > > > >>>>> >>> understand correctly it locates a topic on a specific
> > > > > cluster.
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>> 2. I think there should be a default implementation /
> > > > example
> > > > > > for
> > > > > > > >>>>> >>> KafkaMetadataService for out-of-box usage, for
> example
> > a
> > > > > > wrapper
> > > > > > > of
> > > > > > > >>>>> >>> multiple Kafka AdminClients that watching clusters
> > > > > > periodically.
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>> 3. It looks like the source has the ability to handle
> > > Kafka
> > > > > > > cluster
> > > > > > > >>>>> >>> failures, like switching connections to another
> cluster
> > > > > without
> > > > > > > >>>>> restarting
> > > > > > > >>>>> >>> the Flink job. Is there any requirement for the two
> > > > clusters?
> > > > > > For
> > > > > > > >>>>> example
> > > > > > > >>>>> >>> they have to be identical in topic names, number of
> > > > > partitions
> > > > > > > and
> > > > > > > >>>>> offsets
> > > > > > > >>>>> >>> etc.
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>> 4. Regarding topic and cluster removal, how to handle
> > and
> > > > > > recover
> > > > > > > >>>>> from
> > > > > > > >>>>> >>> checkpoint? Let’s say a topic is removed or migrated
> to
> > > > > another
> > > > > > > >>>>> cluster
> > > > > > > >>>>> >>> after a successful checkpoint. If the job tries to
> roll
> > > > back
> > > > > to
> > > > > > > the
> > > > > > > >>>>> >>> checkpoint which still contains the deleted topic or
> > info
> > > > of
> > > > > a
> > > > > > > dead
> > > > > > > >>>>> >>> cluster, then how to keep the exactly-once semantic
> > under
> > > > > this
> > > > > > > >>>>> case?
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>> 5. I don’t quite get the design of
> > > > > > StoppableKafkaEnumContextProxy
> > > > > > > >>>>> and the
> > > > > > > >>>>> >>> GetMeradataUpdateEvent. Could you elaborate more in
> the
> > > > FLIP?
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>> In a nutshell I think the idea of this FLIP is good,
> > > which
> > > > > > > extends
> > > > > > > >>>>> the
> > > > > > > >>>>> >>> usage of Kafka source. However as a design doc, some
> > > > details
> > > > > > need
> > > > > > > >>>>> to be
> > > > > > > >>>>> >>> enriched for other users and developers to better
> > > > understand
> > > > > > how
> > > > > > > >>>>> this
> > > > > > > >>>>> >>> source works.
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>> Best,
> > > > > > > >>>>> >>> Qingsheng
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen <
> > > > > mas.chen6...@gmail.com
> > > > > > >
> > > > > > > >>>>> wrote:
> > > > > > > >>>>> >>>>
> > > > > > > >>>>> >>>> Hi all,
> > > > > > > >>>>> >>>>
> > > > > > > >>>>> >>>> We would like to start a discussion thread on
> > FLIP-246:
> > > > > Multi
> > > > > > > >>>>> Cluster
> > > > > > > >>>>> >>> Kafka
> > > > > > > >>>>> >>>> Source [1] where we propose to provide a source
> > > connector
> > > > > for
> > > > > > > >>>>> >>> dynamically
> > > > > > > >>>>> >>>> reading from Kafka multiple clusters, which will not
> > > > require
> > > > > > > >>>>> Flink job
> > > > > > > >>>>> >>>> restart. This can greatly improve the Kafka
> migration
> > > > > > experience
> > > > > > > >>>>> for
> > > > > > > >>>>> >>>> clusters and topics, and it solves some existing
> > > problems
> > > > > with
> > > > > > > the
> > > > > > > >>>>> >>> current
> > > > > > > >>>>> >>>> KafkaSource. There was some interest from users [2]
> > > from a
> > > > > > > meetup
> > > > > > > >>>>> and
> > > > > > > >>>>> >>> the
> > > > > > > >>>>> >>>> mailing list. Looking forward to comments and
> > feedback,
> > > > > > thanks!
> > > > > > > >>>>> >>>>
> > > > > > > >>>>> >>>> [1]
> > > > > > > >>>>> >>>>
> > > > > > > >>>>> >>>
> > > > > > > >>>>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
> > > > > > > >>>>> >>>> [2]
> > > > > > > >>>>>
> > > > https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc
> > > > > > > >>>>> >>>>
> > > > > > > >>>>> >>>> Best,
> > > > > > > >>>>> >>>> Mason
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >>>
> > > > > > > >>>>> >
> > > > > > > >>>>>
> > > > > > > >>>>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to