> > My main worry for doing this as a later iteration is that this would > probably be a breaking change for the public interface. If that can be > avoided and planned ahead, I'm fine with moving forward with how it is > right now.
Make sense. Considering the public interfaces, I think we still want to provide clients the ability to pin certain configurations in the builder--however, cluster specific configurations may not be known upfront or generalize to all clusters so there would need to be changes in the `KafkaMetadataService` interface. This could be achieved by exposing via: 1. A separate API (e.g. `Map<KafkaClusterIdentifier, Properties> getKafkaClusterProperties()`) in KafkaMetadataService 2. In `KafkaClusterIdentifier` as this already contains some configuration (e.g. Bootstrap server) in which case we should rename the class to something like `KafkaCluster` as it is no longer just an identifier 3. Reorganize the metadata in a Map<String, ClusterMetadata> in `KafkaStream` where the String is the proposed `KafkaClusterIdentifier.name` field. I am preferring option 3 since this simplifies equals() checks on KafkaClusterIdentifier (e.g. is it the name, bootstrap, or both?). Small correction for the MultiClusterKafkaSourceEnumerator section: "This > reader is responsible for discovering and assigning splits from 1+ cluster" Thanks for the catch! the defining characteristic is the dynamic discovery vs. the fact that > multiple clusters [...] I think the "Table" in the name of those SQL connectors should avoid > confusion. Perhaps we can also solicit other ideas? I would throw > "DiscoveringKafkaSource" into the mix. Agreed with Gordon's and your suggestions. Right, the only public facing name for SQL is `kafka` for the SQL connector identifier. Based on your suggestions: 1. MultiClusterKafkaSource 2. DynamicKafkaSource 3. DiscoveringKafkaSource 4. MutableKafkaSource 5. AdaptiveKafkaSource I added a few of my own. I do prefer 2. What do others think? Best, Mason On Sun, Jun 11, 2023 at 1:12 PM Thomas Weise <t...@apache.org> wrote: > Hi Mason, > > Thanks for the iterations on the FLIP, I think this is in a very good shape > now. > > Small correction for the MultiClusterKafkaSourceEnumerator section: "This > reader is responsible for discovering and assigning splits from 1+ cluster" > > Regarding the user facing name of the connector: I agree with Gordon that > the defining characteristic is the dynamic discovery vs. the fact that > multiple clusters may be consumed in parallel. (Although, as described in > the FLIP, lossless consumer migration only works with a strategy that > involves intermittent parallel consumption of old and new clusters to drain > and switch.) > > I think the "Table" in the name of those SQL connectors should avoid > confusion. Perhaps we can also solicit other ideas? I would throw > "DiscoveringKafkaSource" into the mix. > > Cheers, > Thomas > > > > > On Fri, Jun 9, 2023 at 3:40 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > > > > Regarding (2), definitely. This is something we planned to add later on > > but > > so far keeping things common has been working well. > > > > My main worry for doing this as a later iteration is that this would > > probably be a breaking change for the public interface. If that can be > > avoided and planned ahead, I'm fine with moving forward with how it is > > right now. > > > > > DynamicKafkaSource may be confusing because it is really similar to the > > KafkaDynamicSource/Sink (table connectors). > > > > The table / sql Kafka connectors (KafkaDynamicTableFactory, > > KafkaDynamicTableSource / KafkaDynamicTableSink) are all internal classes > > not really meant to be exposed to the user though. > > It can cause some confusion internally for the code maintainers, but on > the > > actual public surface I don't see this being an issue. > > > > Thanks, > > Gordon > > > > On Wed, Jun 7, 2023 at 8:55 PM Mason Chen <mas.chen6...@gmail.com> > wrote: > > > > > Hi Gordon, > > > > > > Thanks for taking a look! > > > > > > Regarding (1), there is a need from the readers to send this event at > > > startup because the reader state may reflect outdated metadata. Thus, > the > > > reader should not start without fresh metadata. With fresh metadata, > the > > > reader can filter splits from state--this filtering capability is > > > ultimately how we solve the common issue of "I re-configured my Kafka > > > source and removed some topic, but it refers to the old topic due to > > state > > > *[1]*". I did not mention this because I thought this is more of a > detail > > > but I'll make a brief note of it. > > > > > > Regarding (2), definitely. This is something we planned to add later on > > but > > > so far keeping things common has been working well. In that regard, yes > > the > > > metadata service should expose these configurations but the source > should > > > not check it into state unlike the other metadata. I'm going to add it > > to a > > > section called "future enhancements". This is also feedback that Ryan, > an > > > interested user, gave earlier in this thread. > > > > > > Regarding (3), that's definitely a good point and there are some real > use > > > cases, in addition to what you mentioned, to use this in single cluster > > > mode (see *[1] *above). DynamicKafkaSource may be confusing because it > is > > > really similar to the KafkaDynamicSource/Sink (table connectors). > > > > > > Best, > > > Mason > > > > > > On Wed, Jun 7, 2023 at 10:40 AM Tzu-Li (Gordon) Tai < > tzuli...@apache.org > > > > > > wrote: > > > > > > > Hi Mason, > > > > > > > > Thanks for updating the FLIP. In principle, I believe this would be a > > > > useful addition. Some comments so far: > > > > > > > > 1. In this sequence diagram [1], why is there a need for a > > > > GetMetadataUpdateEvent from the MultiClusterSourceReader going to the > > > > MultiClusterSourceEnumerator? Shouldn't the enumerator simply start > > > sending > > > > metadata update events to the reader once it is registered at the > > > > enumerator? > > > > > > > > 2. Looking at the new builder API, there's a few configurations that > > are > > > > common across *all *discovered Kafka clusters / topics, specifically > > the > > > > deserialization schema, offset initialization strategy, Kafka client > > > > properties, and consumer group ID. Is there any use case that users > > would > > > > want to have these configurations differ across different Kafka > > clusters? > > > > If that's the case, would it make more sense to encapsulate these > > > > configurations to be owned by the metadata service? > > > > > > > > 3. Is MultiClusterKafkaSource the best name for this connector? I > find > > > that > > > > the dynamic aspect of Kafka connectivity to be a more defining > > > > characteristic, and that is the main advantage it has compared to the > > > > static KafkaSource. A user may want to use this new connector over > > > > KafkaSource even if they're just consuming from a single Kafka > cluster; > > > for > > > > example, one immediate use case I can think of is Kafka > repartitioning > > > with > > > > zero Flink job downtime. They create a new topic with higher > > parallelism > > > > and repartition their Kafka records from the old topic to the new > > topic, > > > > and they want the consuming Flink job to be able to move from the old > > > topic > > > > to the new topic with zero-downtime while retaining exactly-once > > > > guarantees. So, perhaps DynamicKafkaSource is a better name for this > > > > connector? > > > > > > > > Thanks, > > > > Gordon > > > > > > > > [1] > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source?preview=/217389320/255072018/image-2023-6-7_2-29-13.png > > > > > > > > On Wed, Jun 7, 2023 at 3:07 AM Mason Chen <mas.chen6...@gmail.com> > > > wrote: > > > > > > > > > Hi Jing, > > > > > > > > > > Thanks for the prompt feedback! I had some confusion with how to > > resize > > > > > images in confluence--anyways, I have made the font bigger, added > > white > > > > > background, and also made the diagrams themselves bigger. > > > > > > > > > > Regarding the exactly once semantics, that's definitely good to > point > > > out > > > > > in the doc. Thus, I have broken out my "Basic Idea" section into: > > > > > 1. an intro > > > > > 2. details about KafkaMetadataService > > > > > 3. details about KafkaStream and KafkaClusterId (the metadata) > > > > > 4. details about exactly once semantics and consistency guarantees > > > > > > > > > > This should give readers enough context about the design goals and > > > > > interactions before deep diving into the class interfaces. > > > > > > > > > > Best, > > > > > Mason > > > > > > > > > > On Tue, Jun 6, 2023 at 1:25 PM Jing Ge <j...@ververica.com.invalid > > > > > > wrote: > > > > > > > > > > > Hi Mason, > > > > > > > > > > > > It is a very practical feature that many users are keen to use. > > > Thanks > > > > to > > > > > > the previous discussion, the FLIP now looks informative. Thanks > for > > > > your > > > > > > proposal. One small suggestion is that the attached images are > > quite > > > > > small > > > > > > to read if we don't click and enlarge them. Besides that, It is > > > > difficult > > > > > > to read the text on the current sequence diagram because it has a > > > > > > transparent background. Would you like to replace it with a white > > > > > > background? > > > > > > > > > > > > Exactly-one is one of the key features of Kafka connector. I have > > the > > > > > same > > > > > > concern as Qingsheng. Since you have answered questions about it > > > > > > previously, would you like to create an extra section in your > FLIP > > to > > > > > > explicitly describe scenarios when exactly-one is supported and > > when > > > it > > > > > is > > > > > > not? > > > > > > > > > > > > Best regards, > > > > > > Jing > > > > > > > > > > > > On Mon, Jun 5, 2023 at 11:41 PM Mason Chen < > mas.chen6...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > I'm working on FLIP-246 again, for the Multi Cluster Kafka > Source > > > > > > > contribution. The document has been updated with some more > > context > > > > > about > > > > > > > how it can solve the Kafka topic removal scenario and a > sequence > > > > > diagram > > > > > > to > > > > > > > illustrate how the components interact. > > > > > > > > > > > > > > Looking forward to any feedback! > > > > > > > > > > > > > > Best, > > > > > > > Mason > > > > > > > > > > > > > > On Wed, Oct 12, 2022 at 11:12 PM Mason Chen < > > > mas.chen6...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Ryan, > > > > > > > > > > > > > > > > Thanks for the additional context! Yes, the offset > initializer > > > > would > > > > > > need > > > > > > > > to take a cluster as a parameter and the > > > > MultiClusterKafkaSourceSplit > > > > > > can > > > > > > > > be exposed in an initializer. > > > > > > > > > > > > > > > > Best, > > > > > > > > Mason > > > > > > > > > > > > > > > > On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot < > > > > > > > > ryan.vanhuuksl...@shopify.com> wrote: > > > > > > > > > > > > > > > >> Hi Mason, > > > > > > > >> > > > > > > > >> Thanks for the clarification! In regards to the addition to > > the > > > > > > > >> OffsetInitializer of this API - this would be an awesome > > > addition > > > > > and > > > > > > I > > > > > > > >> think this entire FLIP would be a great addition to the > Flink. > > > > > > > >> > > > > > > > >> To provide more context as to why we need particular > offsets, > > we > > > > use > > > > > > > >> Hybrid Source to currently backfill from buckets prior to > > > reading > > > > > from > > > > > > > >> Kafka. We have a service that will tell us what offset has > > last > > > > been > > > > > > > loaded > > > > > > > >> into said bucket which we will use to initialize the > > KafkaSource > > > > > > > >> OffsetsInitializer. We couldn't use a timestamp here and the > > > > offset > > > > > > > would > > > > > > > >> be different for each Cluster. > > > > > > > >> > > > > > > > >> In pseudocode, we'd want the ability to do something like > this > > > > with > > > > > > > >> HybridSources - if this is possible. > > > > > > > >> > > > > > > > >> ```scala > > > > > > > >> val offsetsMetadata: Map[TopicPartition, Long] = // Get > > current > > > > > > offsets > > > > > > > >> from OffsetReaderService > > > > > > > >> val multiClusterArchiveSource: MultiBucketFileSource[T] = // > > > Data > > > > is > > > > > > > read > > > > > > > >> from different buckets (multiple topics) > > > > > > > >> val multiClusterKafkaSource: MultiClusterKafkaSource[T] = > > > > > > > >> MultiClusterKafkaSource.builder() > > > > > > > >> .setKafkaMetadataService(new KafkaMetadataServiceImpl()) > > > > > > > >> .setStreamIds(List.of("my-stream-1", "my-stream-2")) > > > > > > > >> .setGroupId("myConsumerGroup") > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) > > > > > > > >> .setStartingOffsets(offsetsMetadata) > > > > > > > >> .setProperties(properties) > > > > > > > >> .build() > > > > > > > >> val source = > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build() > > > > > > > >> ``` > > > > > > > >> > > > > > > > >> Few notes: > > > > > > > >> - TopicPartition won't work because the topic may be the > same > > > name > > > > > as > > > > > > > >> this is something that is supported IIRC > > > > > > > >> - I chose to pass a map into starting offsets just for > > > > demonstrative > > > > > > > >> purposes, I would be fine with whatever data structure would > > > work > > > > > best > > > > > > > >> > > > > > > > >> Ryan van Huuksloot > > > > > > > >> Data Developer | Production Engineering | Streaming > > Capabilities > > > > > > > >> [image: Shopify] > > > > > > > >> < > > > > > > > > > > > > > > > > https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> > > > > > > > >> > > > > > > > >> > > > > > > > >> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen < > > > > mas.chen6...@gmail.com> > > > > > > > >> wrote: > > > > > > > >> > > > > > > > >>> Hi Ryan, > > > > > > > >>> > > > > > > > >>> Just copying your message over to the email chain. > > > > > > > >>> > > > > > > > >>> Hi Mason, > > > > > > > >>>> First off, thanks for putting this FLIP together! Sorry > for > > > the > > > > > > delay. > > > > > > > >>>> Full disclosure Mason and I chatted a little bit at Flink > > > > Forward > > > > > > > 2022 but > > > > > > > >>>> I have tried to capture the questions I had for him then. > > > > > > > >>>> I'll start the conversation with a few questions: > > > > > > > >>>> 1. The concept of streamIds is not clear to me in the > > proposal > > > > and > > > > > > > >>>> could use some more information. If I understand > correctly, > > > they > > > > > > will > > > > > > > be > > > > > > > >>>> used in the MetadataService to link KafkaClusters to ones > > you > > > > want > > > > > > to > > > > > > > use? > > > > > > > >>>> If you assign stream ids using `setStreamIds`, how can you > > > > > > dynamically > > > > > > > >>>> increase the number of clusters you consume if the list of > > > > > StreamIds > > > > > > > is > > > > > > > >>>> static? I am basing this off of your example > > > > > .setStreamIds(List.of( > > > > > > > >>>> "my-stream-1", "my-stream-2")) so I could be off base with > > my > > > > > > > >>>> assumption. If you don't mind clearing up the intention, > > that > > > > > would > > > > > > be > > > > > > > >>>> great! > > > > > > > >>>> 2. How would offsets work if you wanted to use this > > > > > > > >>>> MultiClusterKafkaSource with a file based backfill? In the > > > case > > > > I > > > > > am > > > > > > > >>>> thinking of, you have a bucket backed archive of Kafka > data > > > per > > > > > > > cluster. > > > > > > > >>>> and you want to pick up from the last offset in the > archived > > > > > system, > > > > > > > how > > > > > > > >>>> would you set OffsetInitializers "per cluster" potentially > > as > > > a > > > > > > > function or > > > > > > > >>>> are you limited to setting an OffsetInitializer for the > > entire > > > > > > Source? > > > > > > > >>>> 3. Just to make sure - because this system will layer on > top > > > of > > > > > > > >>>> Flink-27 and use KafkaSource for some aspects under the > > hood, > > > > the > > > > > > > watermark > > > > > > > >>>> alignment that was introduced in FLIP-182 / Flink 1.15 > would > > > be > > > > > > > possible > > > > > > > >>>> across multiple clusters if you assign them to the same > > > > alignment > > > > > > > group? > > > > > > > >>>> Thanks! > > > > > > > >>>> Ryan > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> 1. The stream ids are static--however, what the physical > > > clusters > > > > > and > > > > > > > >>> topics that they map to can mutate. Let's say my-stream-1 > > maps > > > to > > > > > > > cluster-1 > > > > > > > >>> and topic-1. The KafkaMetadataService can return a > different > > > > > mapping > > > > > > > when > > > > > > > >>> metadata is fetched the next time e.g. my-stream-1 mapping > to > > > > > > > cluster-1 and > > > > > > > >>> topic-1, and cluster-2 and topic-2. Let me add more details > > on > > > > how > > > > > > the > > > > > > > >>> KafkaMetadataService is used. > > > > > > > >>> 2. The current design limits itself to a single configured > > > > > > > >>> OffsetInitializer that is used for every underlying > > > KafkaSource. > > > > > > > >>> 3. Yes, it is in our plan to integrate this source with > > > watermark > > > > > > > >>> alignment in which the user can align watermarks from all > > > > clusters > > > > > > > within > > > > > > > >>> the single. It will leverage the Kafka Source > implementation > > to > > > > > > achieve > > > > > > > >>> this. > > > > > > > >>> > > > > > > > >>> With regards to 2, it's an interesting idea. I think we can > > > > extend > > > > > > the > > > > > > > >>> design to support a map of offset initializers to clusters, > > > which > > > > > > would > > > > > > > >>> solve your file based backfill. If you initialize the > source > > > > with a > > > > > > > single > > > > > > > >>> timestamp, the current design may work for your usecase, > but > > I > > > > > can't > > > > > > > tell > > > > > > > >>> without more details. Thanks for your interest and sorry > for > > > the > > > > > > delay! > > > > > > > >>> > > > > > > > >>> Best, > > > > > > > >>> Mason > > > > > > > >>> > > > > > > > >>> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen < > > > > > mas.chen6...@gmail.com> > > > > > > > >>> wrote: > > > > > > > >>> > > > > > > > >>>> Hi Max, > > > > > > > >>>> > > > > > > > >>>> Thanks for taking a look! > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>>> I'm wondering whether we can share some of the code of > the > > > > > existing > > > > > > > >>>>> KafkaSource. > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > >>>> That is the intention--let me call it out more explicitly. > > > > > > > >>>> > > > > > > > >>>> Regarding your questions: > > > > > > > >>>> > > > > > > > >>>> 1. Indeed, the KafkaMetadataService has the describe > stream > > > > method > > > > > > to > > > > > > > >>>> get a particular stream id. We decided to support getting > > all > > > > the > > > > > > > streams > > > > > > > >>>> for subscribing via a regex pattern (similar to the Kafka > > > Source > > > > > > > >>>> implementation). > > > > > > > >>>> > > > > > > > >>>> 2. The idea was that if metadata is removed that it is no > > > longer > > > > > > > active. > > > > > > > >>>> > > > > > > > >>>> 3. The MetadataUpdateEvent's format is specifically for > > > > > > communicating > > > > > > > >>>> to the reader what the clusters and topics it should read > > > from. > > > > It > > > > > > > doesn't > > > > > > > >>>> need stream information since it doesn't interact with the > > > > > > > >>>> KafkaMetadataService (only the enumerator interacts with > > it). > > > > > > > >>>> > > > > > > > >>>> 4. Metrics will be reported per cluster. For example, > > > > KafkaSource > > > > > > > >>>> already reports pendingRecords and the corresponding > metric, > > > for > > > > > > > example > > > > > > > >>>> for cluster0, would be a metric called > > > > > > > >>>> > > > `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`. > > > > In > > > > > > > cluster > > > > > > > >>>> removal, these metrics wouldn't be valid so the > > implementation > > > > can > > > > > > > close > > > > > > > >>>> them. > > > > > > > >>>> > > > > > > > >>>> 5. I'm fine with that name; however, I got some feedback > > > > > internally > > > > > > > >>>> since the bulk of the logic is in stopping the scheduled > > tasks > > > > of > > > > > > the > > > > > > > >>>> underlying enumerators and handling cluster unavailability > > > edge > > > > > > > cases. I'm > > > > > > > >>>> open to changing the name if the design changes (it is an > > > > internal > > > > > > > class > > > > > > > >>>> anyways, so we can make these name changes without > breaking > > > > > users). > > > > > > > >>>> > > > > > > > >>>> 6. Yes, there are some limitations but I have not > > > > > > > >>>> considered implementing that in the basic ConfigMap > > > > > > > >>>> implementation--currently users are allowed to do any > > changes. > > > > For > > > > > > > example, > > > > > > > >>>> a user should not delete and recreate a topic, on the same > > > > > cluster. > > > > > > > >>>> Regarding the logic to properly remove a cluster, a user > > could > > > > > > > certainly > > > > > > > >>>> support it with a custom KafkaMetadataService--I intended > to > > > > keep > > > > > > the > > > > > > > >>>> ConfigMap implementation basic for simple use cases (so > > users > > > > here > > > > > > > would > > > > > > > >>>> rely on manual monitoring or something built externally). > > > > However, > > > > > > > I'm open > > > > > > > >>>> to the idea if the usage changes and maybe there could be > > > > > > > improvements to > > > > > > > >>>> the Flink metric API to achieve more seamless integration. > > And > > > > > > > finally, > > > > > > > >>>> yes, the semantics are as such and gives reason to my > > response > > > > for > > > > > > > question > > > > > > > >>>> 2. > > > > > > > >>>> > > > > > > > >>>> I've updated the doc with more context from my question > > > > responses, > > > > > > let > > > > > > > >>>> me know if there are more questions! > > > > > > > >>>> > > > > > > > >>>> Best, > > > > > > > >>>> Mason > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels < > > > > > m...@apache.org> > > > > > > > >>>> wrote: > > > > > > > >>>> > > > > > > > >>>>> Hey Mason, > > > > > > > >>>>> > > > > > > > >>>>> I just had a look at the FLIP. If I understand correctly, > > you > > > > are > > > > > > > >>>>> proposing a very sophisticated way to read from multiple > > > Kafka > > > > > > > >>>>> clusters > > > > > > > >>>>> / topics. > > > > > > > >>>>> > > > > > > > >>>>> I'm wondering whether we can share some of the code of > the > > > > > existing > > > > > > > >>>>> KafkaSource. I suppose you don't want to modify > KafkaSource > > > > > itself > > > > > > to > > > > > > > >>>>> avoid any breakage. But it would be good not to duplicate > > too > > > > > much > > > > > > > >>>>> code, > > > > > > > >>>>> such that functionality that can be shared between the > > two > > > > > > > >>>>> implementations (e.g. the reader implementation). > > > > > > > >>>>> > > > > > > > >>>>> Some questions that I had when browsing the current > > version: > > > > > > > >>>>> > > > > > > > >>>>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320 > > > > > > > >>>>> > > > > > > > >>>>> 1. Why does KafkaMetadataService have the > `describeStream` > > > > method > > > > > > in > > > > > > > >>>>> addition to `getAllStreams`? This may be redundant. Is > the > > > idea > > > > > to > > > > > > > get > > > > > > > >>>>> the updated metadata for a particular StreamId? > > > > > > > >>>>> > > > > > > > >>>>> 2. KafkaMetaDataService#isClusterActive serves the > purpose > > to > > > > > check > > > > > > > >>>>> for > > > > > > > >>>>> activeness, couldn't this be included in the metadata of > > > > > > KafkaStream? > > > > > > > >>>>> > > > > > > > >>>>> 3. Shouldn't MetadataUpdateEvent contain a full list of > > > > > KafkaStream > > > > > > > >>>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`? > > > > > > > >>>>> > > > > > > > >>>>> 4. "In addition, restarting enumerators involve clearing > > > > outdated > > > > > > > >>>>> metrics" What metrics are we talking about here? > > > > > > > >>>>> > > > > > > > >>>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me. > > How > > > > > about > > > > > > > >>>>> `MultiKafkaSplitEnumeratorContext`? > > > > > > > >>>>> > > > > > > > >>>>> 6. What about the ConfigMap implementation? Are there any > > > > > > limitations > > > > > > > >>>>> on > > > > > > > >>>>> the type of configuration changes that we want to allow? > > For > > > > > > example, > > > > > > > >>>>> is > > > > > > > >>>>> it allowed to remove a cluster before it has been > drained / > > > > > > > >>>>> deactivated? > > > > > > > >>>>> Is "not active" semantically identical to having the > > cluster > > > / > > > > > > stream > > > > > > > >>>>> removed? > > > > > > > >>>>> > > > > > > > >>>>> This is an exciting new addition! > > > > > > > >>>>> > > > > > > > >>>>> Cheers, > > > > > > > >>>>> Max > > > > > > > >>>>> > > > > > > > >>>>> On 11.08.22 10:10, Mason Chen wrote: > > > > > > > >>>>> > 5. At startup, GetMetadataUpdateEvent is also used to > > allow > > > > the > > > > > > > >>>>> > MultiClusterKafkaSourceReader to get the latest > metadata > > > from > > > > > the > > > > > > > >>>>> > enumerator to filter out invalid splits This is how the > > > > reader > > > > > > can > > > > > > > >>>>> solve > > > > > > > >>>>> > "removing" splits/topics in the startup case. > > > > > > > >>>>> > > > > > > > > >>>>> > Sorry for the late response, really appreciate you > > taking a > > > > > look > > > > > > at > > > > > > > >>>>> the > > > > > > > >>>>> > FLIP! > > > > > > > >>>>> > > > > > > > > >>>>> > Best, > > > > > > > >>>>> > Mason > > > > > > > >>>>> > > > > > > > > >>>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen < > > > > > > mas.chen6...@gmail.com > > > > > > > > > > > > > > > >>>>> wrote: > > > > > > > >>>>> > > > > > > > > >>>>> >> Hi Qingsheng, > > > > > > > >>>>> >> > > > > > > > >>>>> >> Thanks for the feedback--these are great points to > > raise. > > > > > > > >>>>> >> > > > > > > > >>>>> >> 1. This is something I missed that is now added. More > > > > > generally, > > > > > > > it > > > > > > > >>>>> can > > > > > > > >>>>> >> locate multiple topics in multiple clusters (1 topic > on > > 1 > > > > > > cluster > > > > > > > >>>>> is the > > > > > > > >>>>> >> simplest case). > > > > > > > >>>>> >> > > > > > > > >>>>> >> 2. The KafkaMetadataService doesn't interact with the > > > > > > > >>>>> KafkaAdminClients. > > > > > > > >>>>> >> This source merely composes the functionality of the > > > > > KafkaSource > > > > > > > so > > > > > > > >>>>> >> KafkaAdminClient interaction is handled by the > > > > > KafkaSubscriber. > > > > > > > >>>>> >> > > > > > > > >>>>> >> 3. There are no requirements for the two > > > > clusters--KafkaStream > > > > > > > >>>>> should > > > > > > > >>>>> >> clarify this question. For example, you could move > from > > > > > > topicName1 > > > > > > > >>>>> in > > > > > > > >>>>> >> cluster 1 with 11 partitions to topicName2 in cluster > 2 > > > with > > > > > 22 > > > > > > > >>>>> >> partitions--only the KafkaStream id needs to remain > the > > > > same. > > > > > If > > > > > > > >>>>> there are > > > > > > > >>>>> >> no offsets in checkpoint, the offsets are handled by > the > > > > > offsets > > > > > > > >>>>> >> initializer from KafkaSource and currently the design > > only > > > > > > exposes > > > > > > > >>>>> 1 option > > > > > > > >>>>> >> for all Kafka clusters, although this could be a > > valuable > > > > > > > extension. > > > > > > > >>>>> >> > > > > > > > >>>>> >> 4. Regarding topic and cluster removal, metadata is > > > > checkpoint > > > > > > in > > > > > > > >>>>> state > > > > > > > >>>>> >> via the splits. Exactly once can be maintained with > the > > > > > > assumption > > > > > > > >>>>> that > > > > > > > >>>>> >> required data from the dead cluster lives in the live > > > > cluster. > > > > > > > This > > > > > > > >>>>> can be > > > > > > > >>>>> >> solved by not destroying the old Kafka cluster until > > > > consumers > > > > > > are > > > > > > > >>>>> already > > > > > > > >>>>> >> drained. In switchover, the consumer would consume > from > > > both > > > > > old > > > > > > > >>>>> and new > > > > > > > >>>>> >> clusters. And finally, the metadata can be changed to > > > point > > > > > only > > > > > > > to > > > > > > > >>>>> the new > > > > > > > >>>>> >> cluster when consumers are drained. With the regular > > > > > > KafkaSource, > > > > > > > >>>>> if Kafka > > > > > > > >>>>> >> deletes topic or a cluster is destroyed, the exactly > > once > > > > > > > semantics > > > > > > > >>>>> are not > > > > > > > >>>>> >> preserved and the semantic is tightly coupled with > > > storage. > > > > > The > > > > > > > >>>>> design > > > > > > > >>>>> >> composes and delegates the responsibilities to > > KafkaSource > > > > > > > >>>>> components so it > > > > > > > >>>>> >> is limited to whatever KafkaSource can do for exactly > > once > > > > > > > >>>>> semantics. > > > > > > > >>>>> >> > > > > > > > >>>>> >> 5. Yes, I added more in the FLIP. > GetMetadataUpdateEvent > > > was > > > > > > added > > > > > > > >>>>> to make > > > > > > > >>>>> >> the order of steps in reader restart during split > > > assignment > > > > > > > >>>>> deterministic. > > > > > > > >>>>> >> StoppableKafkaEnumContextProxy are used by the > > underlying > > > > > > > >>>>> >> KafkaSourceEnumerator to assign splits and do topic > > > periodic > > > > > > > >>>>> partition > > > > > > > >>>>> >> discovery. So, these scheduled thread pools need to be > > > > cleaned > > > > > > up > > > > > > > >>>>> properly > > > > > > > >>>>> >> and splits need to be wrapped with cluster > information. > > > > These > > > > > > > >>>>> details are > > > > > > > >>>>> >> added to the FLIP. > > > > > > > >>>>> >> > > > > > > > >>>>> >> Best, > > > > > > > >>>>> >> Mason > > > > > > > >>>>> >> > > > > > > > >>>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren < > > > > > > renqs...@gmail.com > > > > > > > > > > > > > > > >>>>> wrote: > > > > > > > >>>>> >> > > > > > > > >>>>> >>> Hi Mason, > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> Thank you for starting this FLIP! > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> From my first glance this FLIP looks like a > collection > > > of > > > > > many > > > > > > > new > > > > > > > >>>>> >>> interfaces, but I can’t stitch them together. It’ll > be > > > > great > > > > > to > > > > > > > >>>>> have some > > > > > > > >>>>> >>> brief descriptions about how the source works > > internally. > > > > > Here > > > > > > > are > > > > > > > >>>>> some > > > > > > > >>>>> >>> questions in my mind and please correct me if I > > > > misunderstand > > > > > > > your > > > > > > > >>>>> design. > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> 1. I can’t find the definition (in code) of > > KafkaStream. > > > > As a > > > > > > > part > > > > > > > >>>>> of the > > > > > > > >>>>> >>> public interface KafkaMetadataService it has to be > > public > > > > > too. > > > > > > > If I > > > > > > > >>>>> >>> understand correctly it locates a topic on a specific > > > > > cluster. > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> 2. I think there should be a default implementation / > > > > example > > > > > > for > > > > > > > >>>>> >>> KafkaMetadataService for out-of-box usage, for > example > > a > > > > > > wrapper > > > > > > > of > > > > > > > >>>>> >>> multiple Kafka AdminClients that watching clusters > > > > > > periodically. > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> 3. It looks like the source has the ability to handle > > > Kafka > > > > > > > cluster > > > > > > > >>>>> >>> failures, like switching connections to another > cluster > > > > > without > > > > > > > >>>>> restarting > > > > > > > >>>>> >>> the Flink job. Is there any requirement for the two > > > > clusters? > > > > > > For > > > > > > > >>>>> example > > > > > > > >>>>> >>> they have to be identical in topic names, number of > > > > > partitions > > > > > > > and > > > > > > > >>>>> offsets > > > > > > > >>>>> >>> etc. > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> 4. Regarding topic and cluster removal, how to handle > > and > > > > > > recover > > > > > > > >>>>> from > > > > > > > >>>>> >>> checkpoint? Let’s say a topic is removed or migrated > to > > > > > another > > > > > > > >>>>> cluster > > > > > > > >>>>> >>> after a successful checkpoint. If the job tries to > roll > > > > back > > > > > to > > > > > > > the > > > > > > > >>>>> >>> checkpoint which still contains the deleted topic or > > info > > > > of > > > > > a > > > > > > > dead > > > > > > > >>>>> >>> cluster, then how to keep the exactly-once semantic > > under > > > > > this > > > > > > > >>>>> case? > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> 5. I don’t quite get the design of > > > > > > StoppableKafkaEnumContextProxy > > > > > > > >>>>> and the > > > > > > > >>>>> >>> GetMeradataUpdateEvent. Could you elaborate more in > the > > > > FLIP? > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> In a nutshell I think the idea of this FLIP is good, > > > which > > > > > > > extends > > > > > > > >>>>> the > > > > > > > >>>>> >>> usage of Kafka source. However as a design doc, some > > > > details > > > > > > need > > > > > > > >>>>> to be > > > > > > > >>>>> >>> enriched for other users and developers to better > > > > understand > > > > > > how > > > > > > > >>>>> this > > > > > > > >>>>> >>> source works. > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> Best, > > > > > > > >>>>> >>> Qingsheng > > > > > > > >>>>> >>> > > > > > > > >>>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen < > > > > > mas.chen6...@gmail.com > > > > > > > > > > > > > > >>>>> wrote: > > > > > > > >>>>> >>>> > > > > > > > >>>>> >>>> Hi all, > > > > > > > >>>>> >>>> > > > > > > > >>>>> >>>> We would like to start a discussion thread on > > FLIP-246: > > > > > Multi > > > > > > > >>>>> Cluster > > > > > > > >>>>> >>> Kafka > > > > > > > >>>>> >>>> Source [1] where we propose to provide a source > > > connector > > > > > for > > > > > > > >>>>> >>> dynamically > > > > > > > >>>>> >>>> reading from Kafka multiple clusters, which will not > > > > require > > > > > > > >>>>> Flink job > > > > > > > >>>>> >>>> restart. This can greatly improve the Kafka > migration > > > > > > experience > > > > > > > >>>>> for > > > > > > > >>>>> >>>> clusters and topics, and it solves some existing > > > problems > > > > > with > > > > > > > the > > > > > > > >>>>> >>> current > > > > > > > >>>>> >>>> KafkaSource. There was some interest from users [2] > > > from a > > > > > > > meetup > > > > > > > >>>>> and > > > > > > > >>>>> >>> the > > > > > > > >>>>> >>>> mailing list. Looking forward to comments and > > feedback, > > > > > > thanks! > > > > > > > >>>>> >>>> > > > > > > > >>>>> >>>> [1] > > > > > > > >>>>> >>>> > > > > > > > >>>>> >>> > > > > > > > >>>>> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source > > > > > > > >>>>> >>>> [2] > > > > > > > >>>>> > > > > https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc > > > > > > > >>>>> >>>> > > > > > > > >>>>> >>>> Best, > > > > > > > >>>>> >>>> Mason > > > > > > > >>>>> >>> > > > > > > > >>>>> >>> > > > > > > > >>>>> > > > > > > > > >>>>> > > > > > > > >>>> > > > > > > > > > > > > > > > > > > > > > > > > > > > >