Hi all, I'm working on FLIP-246 again, for the Multi Cluster Kafka Source contribution. The document has been updated with some more context about how it can solve the Kafka topic removal scenario and a sequence diagram to illustrate how the components interact.
Looking forward to any feedback! Best, Mason On Wed, Oct 12, 2022 at 11:12 PM Mason Chen <mas.chen6...@gmail.com> wrote: > Hi Ryan, > > Thanks for the additional context! Yes, the offset initializer would need > to take a cluster as a parameter and the MultiClusterKafkaSourceSplit can > be exposed in an initializer. > > Best, > Mason > > On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot < > ryan.vanhuuksl...@shopify.com> wrote: > >> Hi Mason, >> >> Thanks for the clarification! In regards to the addition to the >> OffsetInitializer of this API - this would be an awesome addition and I >> think this entire FLIP would be a great addition to the Flink. >> >> To provide more context as to why we need particular offsets, we use >> Hybrid Source to currently backfill from buckets prior to reading from >> Kafka. We have a service that will tell us what offset has last been loaded >> into said bucket which we will use to initialize the KafkaSource >> OffsetsInitializer. We couldn't use a timestamp here and the offset would >> be different for each Cluster. >> >> In pseudocode, we'd want the ability to do something like this with >> HybridSources - if this is possible. >> >> ```scala >> val offsetsMetadata: Map[TopicPartition, Long] = // Get current offsets >> from OffsetReaderService >> val multiClusterArchiveSource: MultiBucketFileSource[T] = // Data is read >> from different buckets (multiple topics) >> val multiClusterKafkaSource: MultiClusterKafkaSource[T] = >> MultiClusterKafkaSource.builder() >> .setKafkaMetadataService(new KafkaMetadataServiceImpl()) >> .setStreamIds(List.of("my-stream-1", "my-stream-2")) >> .setGroupId("myConsumerGroup") >> >> .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) >> .setStartingOffsets(offsetsMetadata) >> .setProperties(properties) >> .build() >> val source = >> HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build() >> ``` >> >> Few notes: >> - TopicPartition won't work because the topic may be the same name as >> this is something that is supported IIRC >> - I chose to pass a map into starting offsets just for demonstrative >> purposes, I would be fine with whatever data structure would work best >> >> Ryan van Huuksloot >> Data Developer | Production Engineering | Streaming Capabilities >> [image: Shopify] >> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> >> >> >> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen <mas.chen6...@gmail.com> >> wrote: >> >>> Hi Ryan, >>> >>> Just copying your message over to the email chain. >>> >>> Hi Mason, >>>> First off, thanks for putting this FLIP together! Sorry for the delay. >>>> Full disclosure Mason and I chatted a little bit at Flink Forward 2022 but >>>> I have tried to capture the questions I had for him then. >>>> I'll start the conversation with a few questions: >>>> 1. The concept of streamIds is not clear to me in the proposal and >>>> could use some more information. If I understand correctly, they will be >>>> used in the MetadataService to link KafkaClusters to ones you want to use? >>>> If you assign stream ids using `setStreamIds`, how can you dynamically >>>> increase the number of clusters you consume if the list of StreamIds is >>>> static? I am basing this off of your example .setStreamIds(List.of( >>>> "my-stream-1", "my-stream-2")) so I could be off base with my >>>> assumption. If you don't mind clearing up the intention, that would be >>>> great! >>>> 2. How would offsets work if you wanted to use this >>>> MultiClusterKafkaSource with a file based backfill? In the case I am >>>> thinking of, you have a bucket backed archive of Kafka data per cluster. >>>> and you want to pick up from the last offset in the archived system, how >>>> would you set OffsetInitializers "per cluster" potentially as a function or >>>> are you limited to setting an OffsetInitializer for the entire Source? >>>> 3. Just to make sure - because this system will layer on top of >>>> Flink-27 and use KafkaSource for some aspects under the hood, the watermark >>>> alignment that was introduced in FLIP-182 / Flink 1.15 would be possible >>>> across multiple clusters if you assign them to the same alignment group? >>>> Thanks! >>>> Ryan >>> >>> >>> 1. The stream ids are static--however, what the physical clusters and >>> topics that they map to can mutate. Let's say my-stream-1 maps to cluster-1 >>> and topic-1. The KafkaMetadataService can return a different mapping when >>> metadata is fetched the next time e.g. my-stream-1 mapping to cluster-1 and >>> topic-1, and cluster-2 and topic-2. Let me add more details on how the >>> KafkaMetadataService is used. >>> 2. The current design limits itself to a single configured >>> OffsetInitializer that is used for every underlying KafkaSource. >>> 3. Yes, it is in our plan to integrate this source with watermark >>> alignment in which the user can align watermarks from all clusters within >>> the single. It will leverage the Kafka Source implementation to achieve >>> this. >>> >>> With regards to 2, it's an interesting idea. I think we can extend the >>> design to support a map of offset initializers to clusters, which would >>> solve your file based backfill. If you initialize the source with a single >>> timestamp, the current design may work for your usecase, but I can't tell >>> without more details. Thanks for your interest and sorry for the delay! >>> >>> Best, >>> Mason >>> >>> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen <mas.chen6...@gmail.com> >>> wrote: >>> >>>> Hi Max, >>>> >>>> Thanks for taking a look! >>>> >>>> >>>>> I'm wondering whether we can share some of the code of the existing >>>>> KafkaSource. >>>>> >>>> >>>> That is the intention--let me call it out more explicitly. >>>> >>>> Regarding your questions: >>>> >>>> 1. Indeed, the KafkaMetadataService has the describe stream method to >>>> get a particular stream id. We decided to support getting all the streams >>>> for subscribing via a regex pattern (similar to the Kafka Source >>>> implementation). >>>> >>>> 2. The idea was that if metadata is removed that it is no longer active. >>>> >>>> 3. The MetadataUpdateEvent's format is specifically for communicating >>>> to the reader what the clusters and topics it should read from. It doesn't >>>> need stream information since it doesn't interact with the >>>> KafkaMetadataService (only the enumerator interacts with it). >>>> >>>> 4. Metrics will be reported per cluster. For example, KafkaSource >>>> already reports pendingRecords and the corresponding metric, for example >>>> for cluster0, would be a metric called >>>> `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`. In cluster >>>> removal, these metrics wouldn't be valid so the implementation can close >>>> them. >>>> >>>> 5. I'm fine with that name; however, I got some feedback internally >>>> since the bulk of the logic is in stopping the scheduled tasks of the >>>> underlying enumerators and handling cluster unavailability edge cases. I'm >>>> open to changing the name if the design changes (it is an internal class >>>> anyways, so we can make these name changes without breaking users). >>>> >>>> 6. Yes, there are some limitations but I have not >>>> considered implementing that in the basic ConfigMap >>>> implementation--currently users are allowed to do any changes. For example, >>>> a user should not delete and recreate a topic, on the same cluster. >>>> Regarding the logic to properly remove a cluster, a user could certainly >>>> support it with a custom KafkaMetadataService--I intended to keep the >>>> ConfigMap implementation basic for simple use cases (so users here would >>>> rely on manual monitoring or something built externally). However, I'm open >>>> to the idea if the usage changes and maybe there could be improvements to >>>> the Flink metric API to achieve more seamless integration. And finally, >>>> yes, the semantics are as such and gives reason to my response for question >>>> 2. >>>> >>>> I've updated the doc with more context from my question responses, let >>>> me know if there are more questions! >>>> >>>> Best, >>>> Mason >>>> >>>> >>>> >>>> >>>> >>>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels <m...@apache.org> >>>> wrote: >>>> >>>>> Hey Mason, >>>>> >>>>> I just had a look at the FLIP. If I understand correctly, you are >>>>> proposing a very sophisticated way to read from multiple Kafka >>>>> clusters >>>>> / topics. >>>>> >>>>> I'm wondering whether we can share some of the code of the existing >>>>> KafkaSource. I suppose you don't want to modify KafkaSource itself to >>>>> avoid any breakage. But it would be good not to duplicate too much >>>>> code, >>>>> such that functionality that can be shared between the two >>>>> implementations (e.g. the reader implementation). >>>>> >>>>> Some questions that I had when browsing the current version: >>>>> >>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320 >>>>> >>>>> 1. Why does KafkaMetadataService have the `describeStream` method in >>>>> addition to `getAllStreams`? This may be redundant. Is the idea to get >>>>> the updated metadata for a particular StreamId? >>>>> >>>>> 2. KafkaMetaDataService#isClusterActive serves the purpose to check >>>>> for >>>>> activeness, couldn't this be included in the metadata of KafkaStream? >>>>> >>>>> 3. Shouldn't MetadataUpdateEvent contain a full list of KafkaStream >>>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`? >>>>> >>>>> 4. "In addition, restarting enumerators involve clearing outdated >>>>> metrics" What metrics are we talking about here? >>>>> >>>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me. How about >>>>> `MultiKafkaSplitEnumeratorContext`? >>>>> >>>>> 6. What about the ConfigMap implementation? Are there any limitations >>>>> on >>>>> the type of configuration changes that we want to allow? For example, >>>>> is >>>>> it allowed to remove a cluster before it has been drained / >>>>> deactivated? >>>>> Is "not active" semantically identical to having the cluster / stream >>>>> removed? >>>>> >>>>> This is an exciting new addition! >>>>> >>>>> Cheers, >>>>> Max >>>>> >>>>> On 11.08.22 10:10, Mason Chen wrote: >>>>> > 5. At startup, GetMetadataUpdateEvent is also used to allow the >>>>> > MultiClusterKafkaSourceReader to get the latest metadata from the >>>>> > enumerator to filter out invalid splits This is how the reader can >>>>> solve >>>>> > "removing" splits/topics in the startup case. >>>>> > >>>>> > Sorry for the late response, really appreciate you taking a look at >>>>> the >>>>> > FLIP! >>>>> > >>>>> > Best, >>>>> > Mason >>>>> > >>>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <mas.chen6...@gmail.com> >>>>> wrote: >>>>> > >>>>> >> Hi Qingsheng, >>>>> >> >>>>> >> Thanks for the feedback--these are great points to raise. >>>>> >> >>>>> >> 1. This is something I missed that is now added. More generally, it >>>>> can >>>>> >> locate multiple topics in multiple clusters (1 topic on 1 cluster >>>>> is the >>>>> >> simplest case). >>>>> >> >>>>> >> 2. The KafkaMetadataService doesn't interact with the >>>>> KafkaAdminClients. >>>>> >> This source merely composes the functionality of the KafkaSource so >>>>> >> KafkaAdminClient interaction is handled by the KafkaSubscriber. >>>>> >> >>>>> >> 3. There are no requirements for the two clusters--KafkaStream >>>>> should >>>>> >> clarify this question. For example, you could move from topicName1 >>>>> in >>>>> >> cluster 1 with 11 partitions to topicName2 in cluster 2 with 22 >>>>> >> partitions--only the KafkaStream id needs to remain the same. If >>>>> there are >>>>> >> no offsets in checkpoint, the offsets are handled by the offsets >>>>> >> initializer from KafkaSource and currently the design only exposes >>>>> 1 option >>>>> >> for all Kafka clusters, although this could be a valuable extension. >>>>> >> >>>>> >> 4. Regarding topic and cluster removal, metadata is checkpoint in >>>>> state >>>>> >> via the splits. Exactly once can be maintained with the assumption >>>>> that >>>>> >> required data from the dead cluster lives in the live cluster. This >>>>> can be >>>>> >> solved by not destroying the old Kafka cluster until consumers are >>>>> already >>>>> >> drained. In switchover, the consumer would consume from both old >>>>> and new >>>>> >> clusters. And finally, the metadata can be changed to point only to >>>>> the new >>>>> >> cluster when consumers are drained. With the regular KafkaSource, >>>>> if Kafka >>>>> >> deletes topic or a cluster is destroyed, the exactly once semantics >>>>> are not >>>>> >> preserved and the semantic is tightly coupled with storage. The >>>>> design >>>>> >> composes and delegates the responsibilities to KafkaSource >>>>> components so it >>>>> >> is limited to whatever KafkaSource can do for exactly once >>>>> semantics. >>>>> >> >>>>> >> 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent was added >>>>> to make >>>>> >> the order of steps in reader restart during split assignment >>>>> deterministic. >>>>> >> StoppableKafkaEnumContextProxy are used by the underlying >>>>> >> KafkaSourceEnumerator to assign splits and do topic periodic >>>>> partition >>>>> >> discovery. So, these scheduled thread pools need to be cleaned up >>>>> properly >>>>> >> and splits need to be wrapped with cluster information. These >>>>> details are >>>>> >> added to the FLIP. >>>>> >> >>>>> >> Best, >>>>> >> Mason >>>>> >> >>>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <renqs...@gmail.com> >>>>> wrote: >>>>> >> >>>>> >>> Hi Mason, >>>>> >>> >>>>> >>> Thank you for starting this FLIP! >>>>> >>> >>>>> >>> From my first glance this FLIP looks like a collection of many new >>>>> >>> interfaces, but I can’t stitch them together. It’ll be great to >>>>> have some >>>>> >>> brief descriptions about how the source works internally. Here are >>>>> some >>>>> >>> questions in my mind and please correct me if I misunderstand your >>>>> design. >>>>> >>> >>>>> >>> 1. I can’t find the definition (in code) of KafkaStream. As a part >>>>> of the >>>>> >>> public interface KafkaMetadataService it has to be public too. If I >>>>> >>> understand correctly it locates a topic on a specific cluster. >>>>> >>> >>>>> >>> 2. I think there should be a default implementation / example for >>>>> >>> KafkaMetadataService for out-of-box usage, for example a wrapper of >>>>> >>> multiple Kafka AdminClients that watching clusters periodically. >>>>> >>> >>>>> >>> 3. It looks like the source has the ability to handle Kafka cluster >>>>> >>> failures, like switching connections to another cluster without >>>>> restarting >>>>> >>> the Flink job. Is there any requirement for the two clusters? For >>>>> example >>>>> >>> they have to be identical in topic names, number of partitions and >>>>> offsets >>>>> >>> etc. >>>>> >>> >>>>> >>> 4. Regarding topic and cluster removal, how to handle and recover >>>>> from >>>>> >>> checkpoint? Let’s say a topic is removed or migrated to another >>>>> cluster >>>>> >>> after a successful checkpoint. If the job tries to roll back to the >>>>> >>> checkpoint which still contains the deleted topic or info of a dead >>>>> >>> cluster, then how to keep the exactly-once semantic under this >>>>> case? >>>>> >>> >>>>> >>> 5. I don’t quite get the design of StoppableKafkaEnumContextProxy >>>>> and the >>>>> >>> GetMeradataUpdateEvent. Could you elaborate more in the FLIP? >>>>> >>> >>>>> >>> In a nutshell I think the idea of this FLIP is good, which extends >>>>> the >>>>> >>> usage of Kafka source. However as a design doc, some details need >>>>> to be >>>>> >>> enriched for other users and developers to better understand how >>>>> this >>>>> >>> source works. >>>>> >>> >>>>> >>> Best, >>>>> >>> Qingsheng >>>>> >>> >>>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen <mas.chen6...@gmail.com> >>>>> wrote: >>>>> >>>> >>>>> >>>> Hi all, >>>>> >>>> >>>>> >>>> We would like to start a discussion thread on FLIP-246: Multi >>>>> Cluster >>>>> >>> Kafka >>>>> >>>> Source [1] where we propose to provide a source connector for >>>>> >>> dynamically >>>>> >>>> reading from Kafka multiple clusters, which will not require >>>>> Flink job >>>>> >>>> restart. This can greatly improve the Kafka migration experience >>>>> for >>>>> >>>> clusters and topics, and it solves some existing problems with the >>>>> >>> current >>>>> >>>> KafkaSource. There was some interest from users [2] from a meetup >>>>> and >>>>> >>> the >>>>> >>>> mailing list. Looking forward to comments and feedback, thanks! >>>>> >>>> >>>>> >>>> [1] >>>>> >>>> >>>>> >>> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source >>>>> >>>> [2] >>>>> https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc >>>>> >>>> >>>>> >>>> Best, >>>>> >>>> Mason >>>>> >>> >>>>> >>> >>>>> > >>>>> >>>>