> Regarding (2), definitely. This is something we planned to add later on but so far keeping things common has been working well.
My main worry for doing this as a later iteration is that this would probably be a breaking change for the public interface. If that can be avoided and planned ahead, I'm fine with moving forward with how it is right now. > DynamicKafkaSource may be confusing because it is really similar to the KafkaDynamicSource/Sink (table connectors). The table / sql Kafka connectors (KafkaDynamicTableFactory, KafkaDynamicTableSource / KafkaDynamicTableSink) are all internal classes not really meant to be exposed to the user though. It can cause some confusion internally for the code maintainers, but on the actual public surface I don't see this being an issue. Thanks, Gordon On Wed, Jun 7, 2023 at 8:55 PM Mason Chen <mas.chen6...@gmail.com> wrote: > Hi Gordon, > > Thanks for taking a look! > > Regarding (1), there is a need from the readers to send this event at > startup because the reader state may reflect outdated metadata. Thus, the > reader should not start without fresh metadata. With fresh metadata, the > reader can filter splits from state--this filtering capability is > ultimately how we solve the common issue of "I re-configured my Kafka > source and removed some topic, but it refers to the old topic due to state > *[1]*". I did not mention this because I thought this is more of a detail > but I'll make a brief note of it. > > Regarding (2), definitely. This is something we planned to add later on but > so far keeping things common has been working well. In that regard, yes the > metadata service should expose these configurations but the source should > not check it into state unlike the other metadata. I'm going to add it to a > section called "future enhancements". This is also feedback that Ryan, an > interested user, gave earlier in this thread. > > Regarding (3), that's definitely a good point and there are some real use > cases, in addition to what you mentioned, to use this in single cluster > mode (see *[1] *above). DynamicKafkaSource may be confusing because it is > really similar to the KafkaDynamicSource/Sink (table connectors). > > Best, > Mason > > On Wed, Jun 7, 2023 at 10:40 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > > > Hi Mason, > > > > Thanks for updating the FLIP. In principle, I believe this would be a > > useful addition. Some comments so far: > > > > 1. In this sequence diagram [1], why is there a need for a > > GetMetadataUpdateEvent from the MultiClusterSourceReader going to the > > MultiClusterSourceEnumerator? Shouldn't the enumerator simply start > sending > > metadata update events to the reader once it is registered at the > > enumerator? > > > > 2. Looking at the new builder API, there's a few configurations that are > > common across *all *discovered Kafka clusters / topics, specifically the > > deserialization schema, offset initialization strategy, Kafka client > > properties, and consumer group ID. Is there any use case that users would > > want to have these configurations differ across different Kafka clusters? > > If that's the case, would it make more sense to encapsulate these > > configurations to be owned by the metadata service? > > > > 3. Is MultiClusterKafkaSource the best name for this connector? I find > that > > the dynamic aspect of Kafka connectivity to be a more defining > > characteristic, and that is the main advantage it has compared to the > > static KafkaSource. A user may want to use this new connector over > > KafkaSource even if they're just consuming from a single Kafka cluster; > for > > example, one immediate use case I can think of is Kafka repartitioning > with > > zero Flink job downtime. They create a new topic with higher parallelism > > and repartition their Kafka records from the old topic to the new topic, > > and they want the consuming Flink job to be able to move from the old > topic > > to the new topic with zero-downtime while retaining exactly-once > > guarantees. So, perhaps DynamicKafkaSource is a better name for this > > connector? > > > > Thanks, > > Gordon > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source?preview=/217389320/255072018/image-2023-6-7_2-29-13.png > > > > On Wed, Jun 7, 2023 at 3:07 AM Mason Chen <mas.chen6...@gmail.com> > wrote: > > > > > Hi Jing, > > > > > > Thanks for the prompt feedback! I had some confusion with how to resize > > > images in confluence--anyways, I have made the font bigger, added white > > > background, and also made the diagrams themselves bigger. > > > > > > Regarding the exactly once semantics, that's definitely good to point > out > > > in the doc. Thus, I have broken out my "Basic Idea" section into: > > > 1. an intro > > > 2. details about KafkaMetadataService > > > 3. details about KafkaStream and KafkaClusterId (the metadata) > > > 4. details about exactly once semantics and consistency guarantees > > > > > > This should give readers enough context about the design goals and > > > interactions before deep diving into the class interfaces. > > > > > > Best, > > > Mason > > > > > > On Tue, Jun 6, 2023 at 1:25 PM Jing Ge <j...@ververica.com.invalid> > > wrote: > > > > > > > Hi Mason, > > > > > > > > It is a very practical feature that many users are keen to use. > Thanks > > to > > > > the previous discussion, the FLIP now looks informative. Thanks for > > your > > > > proposal. One small suggestion is that the attached images are quite > > > small > > > > to read if we don't click and enlarge them. Besides that, It is > > difficult > > > > to read the text on the current sequence diagram because it has a > > > > transparent background. Would you like to replace it with a white > > > > background? > > > > > > > > Exactly-one is one of the key features of Kafka connector. I have the > > > same > > > > concern as Qingsheng. Since you have answered questions about it > > > > previously, would you like to create an extra section in your FLIP to > > > > explicitly describe scenarios when exactly-one is supported and when > it > > > is > > > > not? > > > > > > > > Best regards, > > > > Jing > > > > > > > > On Mon, Jun 5, 2023 at 11:41 PM Mason Chen <mas.chen6...@gmail.com> > > > wrote: > > > > > > > > > Hi all, > > > > > > > > > > I'm working on FLIP-246 again, for the Multi Cluster Kafka Source > > > > > contribution. The document has been updated with some more context > > > about > > > > > how it can solve the Kafka topic removal scenario and a sequence > > > diagram > > > > to > > > > > illustrate how the components interact. > > > > > > > > > > Looking forward to any feedback! > > > > > > > > > > Best, > > > > > Mason > > > > > > > > > > On Wed, Oct 12, 2022 at 11:12 PM Mason Chen < > mas.chen6...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Ryan, > > > > > > > > > > > > Thanks for the additional context! Yes, the offset initializer > > would > > > > need > > > > > > to take a cluster as a parameter and the > > MultiClusterKafkaSourceSplit > > > > can > > > > > > be exposed in an initializer. > > > > > > > > > > > > Best, > > > > > > Mason > > > > > > > > > > > > On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot < > > > > > > ryan.vanhuuksl...@shopify.com> wrote: > > > > > > > > > > > >> Hi Mason, > > > > > >> > > > > > >> Thanks for the clarification! In regards to the addition to the > > > > > >> OffsetInitializer of this API - this would be an awesome > addition > > > and > > > > I > > > > > >> think this entire FLIP would be a great addition to the Flink. > > > > > >> > > > > > >> To provide more context as to why we need particular offsets, we > > use > > > > > >> Hybrid Source to currently backfill from buckets prior to > reading > > > from > > > > > >> Kafka. We have a service that will tell us what offset has last > > been > > > > > loaded > > > > > >> into said bucket which we will use to initialize the KafkaSource > > > > > >> OffsetsInitializer. We couldn't use a timestamp here and the > > offset > > > > > would > > > > > >> be different for each Cluster. > > > > > >> > > > > > >> In pseudocode, we'd want the ability to do something like this > > with > > > > > >> HybridSources - if this is possible. > > > > > >> > > > > > >> ```scala > > > > > >> val offsetsMetadata: Map[TopicPartition, Long] = // Get current > > > > offsets > > > > > >> from OffsetReaderService > > > > > >> val multiClusterArchiveSource: MultiBucketFileSource[T] = // > Data > > is > > > > > read > > > > > >> from different buckets (multiple topics) > > > > > >> val multiClusterKafkaSource: MultiClusterKafkaSource[T] = > > > > > >> MultiClusterKafkaSource.builder() > > > > > >> .setKafkaMetadataService(new KafkaMetadataServiceImpl()) > > > > > >> .setStreamIds(List.of("my-stream-1", "my-stream-2")) > > > > > >> .setGroupId("myConsumerGroup") > > > > > >> > > > > > >> > > > > > > > > > > > > > > > .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) > > > > > >> .setStartingOffsets(offsetsMetadata) > > > > > >> .setProperties(properties) > > > > > >> .build() > > > > > >> val source = > > > > > >> > > > > > > > > > > > > > > > HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build() > > > > > >> ``` > > > > > >> > > > > > >> Few notes: > > > > > >> - TopicPartition won't work because the topic may be the same > name > > > as > > > > > >> this is something that is supported IIRC > > > > > >> - I chose to pass a map into starting offsets just for > > demonstrative > > > > > >> purposes, I would be fine with whatever data structure would > work > > > best > > > > > >> > > > > > >> Ryan van Huuksloot > > > > > >> Data Developer | Production Engineering | Streaming Capabilities > > > > > >> [image: Shopify] > > > > > >> < > > > > > > > > > https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> > > > > > >> > > > > > >> > > > > > >> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen < > > mas.chen6...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > >>> Hi Ryan, > > > > > >>> > > > > > >>> Just copying your message over to the email chain. > > > > > >>> > > > > > >>> Hi Mason, > > > > > >>>> First off, thanks for putting this FLIP together! Sorry for > the > > > > delay. > > > > > >>>> Full disclosure Mason and I chatted a little bit at Flink > > Forward > > > > > 2022 but > > > > > >>>> I have tried to capture the questions I had for him then. > > > > > >>>> I'll start the conversation with a few questions: > > > > > >>>> 1. The concept of streamIds is not clear to me in the proposal > > and > > > > > >>>> could use some more information. If I understand correctly, > they > > > > will > > > > > be > > > > > >>>> used in the MetadataService to link KafkaClusters to ones you > > want > > > > to > > > > > use? > > > > > >>>> If you assign stream ids using `setStreamIds`, how can you > > > > dynamically > > > > > >>>> increase the number of clusters you consume if the list of > > > StreamIds > > > > > is > > > > > >>>> static? I am basing this off of your example > > > .setStreamIds(List.of( > > > > > >>>> "my-stream-1", "my-stream-2")) so I could be off base with my > > > > > >>>> assumption. If you don't mind clearing up the intention, that > > > would > > > > be > > > > > >>>> great! > > > > > >>>> 2. How would offsets work if you wanted to use this > > > > > >>>> MultiClusterKafkaSource with a file based backfill? In the > case > > I > > > am > > > > > >>>> thinking of, you have a bucket backed archive of Kafka data > per > > > > > cluster. > > > > > >>>> and you want to pick up from the last offset in the archived > > > system, > > > > > how > > > > > >>>> would you set OffsetInitializers "per cluster" potentially as > a > > > > > function or > > > > > >>>> are you limited to setting an OffsetInitializer for the entire > > > > Source? > > > > > >>>> 3. Just to make sure - because this system will layer on top > of > > > > > >>>> Flink-27 and use KafkaSource for some aspects under the hood, > > the > > > > > watermark > > > > > >>>> alignment that was introduced in FLIP-182 / Flink 1.15 would > be > > > > > possible > > > > > >>>> across multiple clusters if you assign them to the same > > alignment > > > > > group? > > > > > >>>> Thanks! > > > > > >>>> Ryan > > > > > >>> > > > > > >>> > > > > > >>> 1. The stream ids are static--however, what the physical > clusters > > > and > > > > > >>> topics that they map to can mutate. Let's say my-stream-1 maps > to > > > > > cluster-1 > > > > > >>> and topic-1. The KafkaMetadataService can return a different > > > mapping > > > > > when > > > > > >>> metadata is fetched the next time e.g. my-stream-1 mapping to > > > > > cluster-1 and > > > > > >>> topic-1, and cluster-2 and topic-2. Let me add more details on > > how > > > > the > > > > > >>> KafkaMetadataService is used. > > > > > >>> 2. The current design limits itself to a single configured > > > > > >>> OffsetInitializer that is used for every underlying > KafkaSource. > > > > > >>> 3. Yes, it is in our plan to integrate this source with > watermark > > > > > >>> alignment in which the user can align watermarks from all > > clusters > > > > > within > > > > > >>> the single. It will leverage the Kafka Source implementation to > > > > achieve > > > > > >>> this. > > > > > >>> > > > > > >>> With regards to 2, it's an interesting idea. I think we can > > extend > > > > the > > > > > >>> design to support a map of offset initializers to clusters, > which > > > > would > > > > > >>> solve your file based backfill. If you initialize the source > > with a > > > > > single > > > > > >>> timestamp, the current design may work for your usecase, but I > > > can't > > > > > tell > > > > > >>> without more details. Thanks for your interest and sorry for > the > > > > delay! > > > > > >>> > > > > > >>> Best, > > > > > >>> Mason > > > > > >>> > > > > > >>> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen < > > > mas.chen6...@gmail.com> > > > > > >>> wrote: > > > > > >>> > > > > > >>>> Hi Max, > > > > > >>>> > > > > > >>>> Thanks for taking a look! > > > > > >>>> > > > > > >>>> > > > > > >>>>> I'm wondering whether we can share some of the code of the > > > existing > > > > > >>>>> KafkaSource. > > > > > >>>>> > > > > > >>>> > > > > > >>>> That is the intention--let me call it out more explicitly. > > > > > >>>> > > > > > >>>> Regarding your questions: > > > > > >>>> > > > > > >>>> 1. Indeed, the KafkaMetadataService has the describe stream > > method > > > > to > > > > > >>>> get a particular stream id. We decided to support getting all > > the > > > > > streams > > > > > >>>> for subscribing via a regex pattern (similar to the Kafka > Source > > > > > >>>> implementation). > > > > > >>>> > > > > > >>>> 2. The idea was that if metadata is removed that it is no > longer > > > > > active. > > > > > >>>> > > > > > >>>> 3. The MetadataUpdateEvent's format is specifically for > > > > communicating > > > > > >>>> to the reader what the clusters and topics it should read > from. > > It > > > > > doesn't > > > > > >>>> need stream information since it doesn't interact with the > > > > > >>>> KafkaMetadataService (only the enumerator interacts with it). > > > > > >>>> > > > > > >>>> 4. Metrics will be reported per cluster. For example, > > KafkaSource > > > > > >>>> already reports pendingRecords and the corresponding metric, > for > > > > > example > > > > > >>>> for cluster0, would be a metric called > > > > > >>>> > `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`. > > In > > > > > cluster > > > > > >>>> removal, these metrics wouldn't be valid so the implementation > > can > > > > > close > > > > > >>>> them. > > > > > >>>> > > > > > >>>> 5. I'm fine with that name; however, I got some feedback > > > internally > > > > > >>>> since the bulk of the logic is in stopping the scheduled tasks > > of > > > > the > > > > > >>>> underlying enumerators and handling cluster unavailability > edge > > > > > cases. I'm > > > > > >>>> open to changing the name if the design changes (it is an > > internal > > > > > class > > > > > >>>> anyways, so we can make these name changes without breaking > > > users). > > > > > >>>> > > > > > >>>> 6. Yes, there are some limitations but I have not > > > > > >>>> considered implementing that in the basic ConfigMap > > > > > >>>> implementation--currently users are allowed to do any changes. > > For > > > > > example, > > > > > >>>> a user should not delete and recreate a topic, on the same > > > cluster. > > > > > >>>> Regarding the logic to properly remove a cluster, a user could > > > > > certainly > > > > > >>>> support it with a custom KafkaMetadataService--I intended to > > keep > > > > the > > > > > >>>> ConfigMap implementation basic for simple use cases (so users > > here > > > > > would > > > > > >>>> rely on manual monitoring or something built externally). > > However, > > > > > I'm open > > > > > >>>> to the idea if the usage changes and maybe there could be > > > > > improvements to > > > > > >>>> the Flink metric API to achieve more seamless integration. And > > > > > finally, > > > > > >>>> yes, the semantics are as such and gives reason to my response > > for > > > > > question > > > > > >>>> 2. > > > > > >>>> > > > > > >>>> I've updated the doc with more context from my question > > responses, > > > > let > > > > > >>>> me know if there are more questions! > > > > > >>>> > > > > > >>>> Best, > > > > > >>>> Mason > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels < > > > m...@apache.org> > > > > > >>>> wrote: > > > > > >>>> > > > > > >>>>> Hey Mason, > > > > > >>>>> > > > > > >>>>> I just had a look at the FLIP. If I understand correctly, you > > are > > > > > >>>>> proposing a very sophisticated way to read from multiple > Kafka > > > > > >>>>> clusters > > > > > >>>>> / topics. > > > > > >>>>> > > > > > >>>>> I'm wondering whether we can share some of the code of the > > > existing > > > > > >>>>> KafkaSource. I suppose you don't want to modify KafkaSource > > > itself > > > > to > > > > > >>>>> avoid any breakage. But it would be good not to duplicate too > > > much > > > > > >>>>> code, > > > > > >>>>> such that functionality that can be shared between the two > > > > > >>>>> implementations (e.g. the reader implementation). > > > > > >>>>> > > > > > >>>>> Some questions that I had when browsing the current version: > > > > > >>>>> > > > > > >>>>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320 > > > > > >>>>> > > > > > >>>>> 1. Why does KafkaMetadataService have the `describeStream` > > method > > > > in > > > > > >>>>> addition to `getAllStreams`? This may be redundant. Is the > idea > > > to > > > > > get > > > > > >>>>> the updated metadata for a particular StreamId? > > > > > >>>>> > > > > > >>>>> 2. KafkaMetaDataService#isClusterActive serves the purpose to > > > check > > > > > >>>>> for > > > > > >>>>> activeness, couldn't this be included in the metadata of > > > > KafkaStream? > > > > > >>>>> > > > > > >>>>> 3. Shouldn't MetadataUpdateEvent contain a full list of > > > KafkaStream > > > > > >>>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`? > > > > > >>>>> > > > > > >>>>> 4. "In addition, restarting enumerators involve clearing > > outdated > > > > > >>>>> metrics" What metrics are we talking about here? > > > > > >>>>> > > > > > >>>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me. How > > > about > > > > > >>>>> `MultiKafkaSplitEnumeratorContext`? > > > > > >>>>> > > > > > >>>>> 6. What about the ConfigMap implementation? Are there any > > > > limitations > > > > > >>>>> on > > > > > >>>>> the type of configuration changes that we want to allow? For > > > > example, > > > > > >>>>> is > > > > > >>>>> it allowed to remove a cluster before it has been drained / > > > > > >>>>> deactivated? > > > > > >>>>> Is "not active" semantically identical to having the cluster > / > > > > stream > > > > > >>>>> removed? > > > > > >>>>> > > > > > >>>>> This is an exciting new addition! > > > > > >>>>> > > > > > >>>>> Cheers, > > > > > >>>>> Max > > > > > >>>>> > > > > > >>>>> On 11.08.22 10:10, Mason Chen wrote: > > > > > >>>>> > 5. At startup, GetMetadataUpdateEvent is also used to allow > > the > > > > > >>>>> > MultiClusterKafkaSourceReader to get the latest metadata > from > > > the > > > > > >>>>> > enumerator to filter out invalid splits This is how the > > reader > > > > can > > > > > >>>>> solve > > > > > >>>>> > "removing" splits/topics in the startup case. > > > > > >>>>> > > > > > > >>>>> > Sorry for the late response, really appreciate you taking a > > > look > > > > at > > > > > >>>>> the > > > > > >>>>> > FLIP! > > > > > >>>>> > > > > > > >>>>> > Best, > > > > > >>>>> > Mason > > > > > >>>>> > > > > > > >>>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen < > > > > mas.chen6...@gmail.com > > > > > > > > > > > >>>>> wrote: > > > > > >>>>> > > > > > > >>>>> >> Hi Qingsheng, > > > > > >>>>> >> > > > > > >>>>> >> Thanks for the feedback--these are great points to raise. > > > > > >>>>> >> > > > > > >>>>> >> 1. This is something I missed that is now added. More > > > generally, > > > > > it > > > > > >>>>> can > > > > > >>>>> >> locate multiple topics in multiple clusters (1 topic on 1 > > > > cluster > > > > > >>>>> is the > > > > > >>>>> >> simplest case). > > > > > >>>>> >> > > > > > >>>>> >> 2. The KafkaMetadataService doesn't interact with the > > > > > >>>>> KafkaAdminClients. > > > > > >>>>> >> This source merely composes the functionality of the > > > KafkaSource > > > > > so > > > > > >>>>> >> KafkaAdminClient interaction is handled by the > > > KafkaSubscriber. > > > > > >>>>> >> > > > > > >>>>> >> 3. There are no requirements for the two > > clusters--KafkaStream > > > > > >>>>> should > > > > > >>>>> >> clarify this question. For example, you could move from > > > > topicName1 > > > > > >>>>> in > > > > > >>>>> >> cluster 1 with 11 partitions to topicName2 in cluster 2 > with > > > 22 > > > > > >>>>> >> partitions--only the KafkaStream id needs to remain the > > same. > > > If > > > > > >>>>> there are > > > > > >>>>> >> no offsets in checkpoint, the offsets are handled by the > > > offsets > > > > > >>>>> >> initializer from KafkaSource and currently the design only > > > > exposes > > > > > >>>>> 1 option > > > > > >>>>> >> for all Kafka clusters, although this could be a valuable > > > > > extension. > > > > > >>>>> >> > > > > > >>>>> >> 4. Regarding topic and cluster removal, metadata is > > checkpoint > > > > in > > > > > >>>>> state > > > > > >>>>> >> via the splits. Exactly once can be maintained with the > > > > assumption > > > > > >>>>> that > > > > > >>>>> >> required data from the dead cluster lives in the live > > cluster. > > > > > This > > > > > >>>>> can be > > > > > >>>>> >> solved by not destroying the old Kafka cluster until > > consumers > > > > are > > > > > >>>>> already > > > > > >>>>> >> drained. In switchover, the consumer would consume from > both > > > old > > > > > >>>>> and new > > > > > >>>>> >> clusters. And finally, the metadata can be changed to > point > > > only > > > > > to > > > > > >>>>> the new > > > > > >>>>> >> cluster when consumers are drained. With the regular > > > > KafkaSource, > > > > > >>>>> if Kafka > > > > > >>>>> >> deletes topic or a cluster is destroyed, the exactly once > > > > > semantics > > > > > >>>>> are not > > > > > >>>>> >> preserved and the semantic is tightly coupled with > storage. > > > The > > > > > >>>>> design > > > > > >>>>> >> composes and delegates the responsibilities to KafkaSource > > > > > >>>>> components so it > > > > > >>>>> >> is limited to whatever KafkaSource can do for exactly once > > > > > >>>>> semantics. > > > > > >>>>> >> > > > > > >>>>> >> 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent > was > > > > added > > > > > >>>>> to make > > > > > >>>>> >> the order of steps in reader restart during split > assignment > > > > > >>>>> deterministic. > > > > > >>>>> >> StoppableKafkaEnumContextProxy are used by the underlying > > > > > >>>>> >> KafkaSourceEnumerator to assign splits and do topic > periodic > > > > > >>>>> partition > > > > > >>>>> >> discovery. So, these scheduled thread pools need to be > > cleaned > > > > up > > > > > >>>>> properly > > > > > >>>>> >> and splits need to be wrapped with cluster information. > > These > > > > > >>>>> details are > > > > > >>>>> >> added to the FLIP. > > > > > >>>>> >> > > > > > >>>>> >> Best, > > > > > >>>>> >> Mason > > > > > >>>>> >> > > > > > >>>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren < > > > > renqs...@gmail.com > > > > > > > > > > > >>>>> wrote: > > > > > >>>>> >> > > > > > >>>>> >>> Hi Mason, > > > > > >>>>> >>> > > > > > >>>>> >>> Thank you for starting this FLIP! > > > > > >>>>> >>> > > > > > >>>>> >>> From my first glance this FLIP looks like a collection > of > > > many > > > > > new > > > > > >>>>> >>> interfaces, but I can’t stitch them together. It’ll be > > great > > > to > > > > > >>>>> have some > > > > > >>>>> >>> brief descriptions about how the source works internally. > > > Here > > > > > are > > > > > >>>>> some > > > > > >>>>> >>> questions in my mind and please correct me if I > > misunderstand > > > > > your > > > > > >>>>> design. > > > > > >>>>> >>> > > > > > >>>>> >>> 1. I can’t find the definition (in code) of KafkaStream. > > As a > > > > > part > > > > > >>>>> of the > > > > > >>>>> >>> public interface KafkaMetadataService it has to be public > > > too. > > > > > If I > > > > > >>>>> >>> understand correctly it locates a topic on a specific > > > cluster. > > > > > >>>>> >>> > > > > > >>>>> >>> 2. I think there should be a default implementation / > > example > > > > for > > > > > >>>>> >>> KafkaMetadataService for out-of-box usage, for example a > > > > wrapper > > > > > of > > > > > >>>>> >>> multiple Kafka AdminClients that watching clusters > > > > periodically. > > > > > >>>>> >>> > > > > > >>>>> >>> 3. It looks like the source has the ability to handle > Kafka > > > > > cluster > > > > > >>>>> >>> failures, like switching connections to another cluster > > > without > > > > > >>>>> restarting > > > > > >>>>> >>> the Flink job. Is there any requirement for the two > > clusters? > > > > For > > > > > >>>>> example > > > > > >>>>> >>> they have to be identical in topic names, number of > > > partitions > > > > > and > > > > > >>>>> offsets > > > > > >>>>> >>> etc. > > > > > >>>>> >>> > > > > > >>>>> >>> 4. Regarding topic and cluster removal, how to handle and > > > > recover > > > > > >>>>> from > > > > > >>>>> >>> checkpoint? Let’s say a topic is removed or migrated to > > > another > > > > > >>>>> cluster > > > > > >>>>> >>> after a successful checkpoint. If the job tries to roll > > back > > > to > > > > > the > > > > > >>>>> >>> checkpoint which still contains the deleted topic or info > > of > > > a > > > > > dead > > > > > >>>>> >>> cluster, then how to keep the exactly-once semantic under > > > this > > > > > >>>>> case? > > > > > >>>>> >>> > > > > > >>>>> >>> 5. I don’t quite get the design of > > > > StoppableKafkaEnumContextProxy > > > > > >>>>> and the > > > > > >>>>> >>> GetMeradataUpdateEvent. Could you elaborate more in the > > FLIP? > > > > > >>>>> >>> > > > > > >>>>> >>> In a nutshell I think the idea of this FLIP is good, > which > > > > > extends > > > > > >>>>> the > > > > > >>>>> >>> usage of Kafka source. However as a design doc, some > > details > > > > need > > > > > >>>>> to be > > > > > >>>>> >>> enriched for other users and developers to better > > understand > > > > how > > > > > >>>>> this > > > > > >>>>> >>> source works. > > > > > >>>>> >>> > > > > > >>>>> >>> Best, > > > > > >>>>> >>> Qingsheng > > > > > >>>>> >>> > > > > > >>>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen < > > > mas.chen6...@gmail.com > > > > > > > > > > >>>>> wrote: > > > > > >>>>> >>>> > > > > > >>>>> >>>> Hi all, > > > > > >>>>> >>>> > > > > > >>>>> >>>> We would like to start a discussion thread on FLIP-246: > > > Multi > > > > > >>>>> Cluster > > > > > >>>>> >>> Kafka > > > > > >>>>> >>>> Source [1] where we propose to provide a source > connector > > > for > > > > > >>>>> >>> dynamically > > > > > >>>>> >>>> reading from Kafka multiple clusters, which will not > > require > > > > > >>>>> Flink job > > > > > >>>>> >>>> restart. This can greatly improve the Kafka migration > > > > experience > > > > > >>>>> for > > > > > >>>>> >>>> clusters and topics, and it solves some existing > problems > > > with > > > > > the > > > > > >>>>> >>> current > > > > > >>>>> >>>> KafkaSource. There was some interest from users [2] > from a > > > > > meetup > > > > > >>>>> and > > > > > >>>>> >>> the > > > > > >>>>> >>>> mailing list. Looking forward to comments and feedback, > > > > thanks! > > > > > >>>>> >>>> > > > > > >>>>> >>>> [1] > > > > > >>>>> >>>> > > > > > >>>>> >>> > > > > > >>>>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source > > > > > >>>>> >>>> [2] > > > > > >>>>> > > https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc > > > > > >>>>> >>>> > > > > > >>>>> >>>> Best, > > > > > >>>>> >>>> Mason > > > > > >>>>> >>> > > > > > >>>>> >>> > > > > > >>>>> > > > > > > >>>>> > > > > > >>>> > > > > > > > > > > > > > > >