Hi all,

I'm working on FLIP-246 again, for the Multi Cluster Kafka Source
contribution. The document has been updated with some more context about
how it can solve the Kafka topic removal scenario and a sequence diagram to
illustrate how the components interact.

Looking forward to any feedback!

Best,
Mason

On Wed, Oct 12, 2022 at 11:12 PM Mason Chen <mas.chen6...@gmail.com> wrote:

> Hi Ryan,
>
> Thanks for the additional context! Yes, the offset initializer would need
> to take a cluster as a parameter and the MultiClusterKafkaSourceSplit can
> be exposed in an initializer.
>
> Best,
> Mason
>
> On Thu, Oct 6, 2022 at 11:00 AM Ryan van Huuksloot <
> ryan.vanhuuksl...@shopify.com> wrote:
>
>> Hi Mason,
>>
>> Thanks for the clarification! In regards to the addition to the
>> OffsetInitializer of this API - this would be an awesome addition and I
>> think this entire FLIP would be a great addition to the Flink.
>>
>> To provide more context as to why we need particular offsets, we use
>> Hybrid Source to currently backfill from buckets prior to reading from
>> Kafka. We have a service that will tell us what offset has last been loaded
>> into said bucket which we will use to initialize the KafkaSource
>> OffsetsInitializer. We couldn't use a timestamp here and the offset would
>> be different for each Cluster.
>>
>> In pseudocode, we'd want the ability to do something like this with
>> HybridSources - if this is possible.
>>
>> ```scala
>> val offsetsMetadata: Map[TopicPartition, Long] = // Get current offsets
>> from OffsetReaderService
>> val multiClusterArchiveSource: MultiBucketFileSource[T] = // Data is read
>> from different buckets (multiple topics)
>> val multiClusterKafkaSource: MultiClusterKafkaSource[T] =
>> MultiClusterKafkaSource.builder()
>>   .setKafkaMetadataService(new KafkaMetadataServiceImpl())
>>   .setStreamIds(List.of("my-stream-1", "my-stream-2"))
>>   .setGroupId("myConsumerGroup")
>>
>> .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
>>   .setStartingOffsets(offsetsMetadata)
>>   .setProperties(properties)
>>   .build()
>> val source =
>> HybridSource.builder(multiClusterArchiveSource).addSource(multiClusterKafkaSource).build()
>> ```
>>
>> Few notes:
>> - TopicPartition won't work because the topic may be the same name as
>> this is something that is supported IIRC
>> - I chose to pass a map into starting offsets just for demonstrative
>> purposes, I would be fine with whatever data structure would work best
>>
>> Ryan van Huuksloot
>> Data Developer | Production Engineering | Streaming Capabilities
>> [image: Shopify]
>> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>>
>>
>> On Mon, Oct 3, 2022 at 11:29 PM Mason Chen <mas.chen6...@gmail.com>
>> wrote:
>>
>>> Hi Ryan,
>>>
>>> Just copying your message over to the email chain.
>>>
>>> Hi Mason,
>>>> First off, thanks for putting this FLIP together! Sorry for the delay.
>>>> Full disclosure Mason and I chatted a little bit at Flink Forward 2022 but
>>>> I have tried to capture the questions I had for him then.
>>>> I'll start the conversation with a few questions:
>>>> 1. The concept of streamIds is not clear to me in the proposal and
>>>> could use some more information. If I understand correctly, they will be
>>>> used in the MetadataService to link KafkaClusters to ones you want to use?
>>>> If you assign stream ids using `setStreamIds`, how can you dynamically
>>>> increase the number of clusters you consume if the list of StreamIds is
>>>> static? I am basing this off of your example .setStreamIds(List.of(
>>>> "my-stream-1", "my-stream-2")) so I could be off base with my
>>>> assumption. If you don't mind clearing up the intention, that would be
>>>> great!
>>>> 2. How would offsets work if you wanted to use this
>>>> MultiClusterKafkaSource with a file based backfill? In the case I am
>>>> thinking of, you have a bucket backed archive of Kafka data per cluster.
>>>> and you want to pick up from the last offset in the archived system, how
>>>> would you set OffsetInitializers "per cluster" potentially as a function or
>>>> are you limited to setting an OffsetInitializer for the entire Source?
>>>> 3. Just to make sure - because this system will layer on top of
>>>> Flink-27 and use KafkaSource for some aspects under the hood, the watermark
>>>> alignment that was introduced in FLIP-182 / Flink 1.15 would be possible
>>>> across multiple clusters if you assign them to the same alignment group?
>>>> Thanks!
>>>> Ryan
>>>
>>>
>>> 1. The stream ids are static--however, what the physical clusters and
>>> topics that they map to can mutate. Let's say my-stream-1 maps to cluster-1
>>> and topic-1. The KafkaMetadataService can return a different mapping when
>>> metadata is fetched the next time e.g. my-stream-1 mapping to cluster-1 and
>>> topic-1, and cluster-2 and topic-2. Let me add more details on how the
>>> KafkaMetadataService is used.
>>> 2. The current design limits itself to a single configured
>>> OffsetInitializer that is used for every underlying KafkaSource.
>>> 3. Yes, it is in our plan to integrate this source with watermark
>>> alignment in which the user can align watermarks from all clusters within
>>> the single. It will leverage the Kafka Source implementation to achieve
>>> this.
>>>
>>> With regards to 2, it's an interesting idea. I think we can extend the
>>> design to support a map of offset initializers to clusters, which would
>>> solve your file based backfill. If you initialize the source with a single
>>> timestamp, the current design may work for your usecase, but I can't tell
>>> without more details. Thanks for your interest and sorry for the delay!
>>>
>>> Best,
>>> Mason
>>>
>>> On Mon, Aug 29, 2022 at 10:02 AM Mason Chen <mas.chen6...@gmail.com>
>>> wrote:
>>>
>>>> Hi Max,
>>>>
>>>> Thanks for taking a look!
>>>>
>>>>
>>>>> I'm wondering whether we can share some of the code of the existing
>>>>> KafkaSource.
>>>>>
>>>>
>>>> That is the intention--let me call it out more explicitly.
>>>>
>>>> Regarding your questions:
>>>>
>>>> 1. Indeed, the KafkaMetadataService has the describe stream method to
>>>> get a particular stream id. We decided to support getting all the streams
>>>> for subscribing via a regex pattern (similar to the Kafka Source
>>>> implementation).
>>>>
>>>> 2. The idea was that if metadata is removed that it is no longer active.
>>>>
>>>> 3. The MetadataUpdateEvent's format is specifically for communicating
>>>> to the reader what the clusters and topics it should read from. It doesn't
>>>> need stream information since it doesn't interact with the
>>>> KafkaMetadataService (only the enumerator interacts with it).
>>>>
>>>> 4. Metrics will be reported per cluster. For example, KafkaSource
>>>> already reports pendingRecords and the corresponding metric, for example
>>>> for cluster0, would be a metric called
>>>> `MultiClusterKafkaSource.kafkaCluster.cluster0.pendingRecords`. In cluster
>>>> removal, these metrics wouldn't be valid so the implementation can close
>>>> them.
>>>>
>>>> 5. I'm fine with that name; however, I got some feedback internally
>>>> since the bulk of the logic is in stopping the scheduled tasks of the
>>>> underlying enumerators and handling cluster unavailability edge cases. I'm
>>>> open to changing the name if the design changes (it is an internal class
>>>> anyways, so we can make these name changes without breaking users).
>>>>
>>>> 6. Yes, there are some limitations but I have not
>>>> considered implementing that in the basic ConfigMap
>>>> implementation--currently users are allowed to do any changes. For example,
>>>> a user should not delete and recreate a topic, on the same cluster.
>>>> Regarding the logic to properly remove a cluster, a user could certainly
>>>> support it with a custom KafkaMetadataService--I intended to keep the
>>>> ConfigMap implementation basic for simple use cases (so users here would
>>>> rely on manual monitoring or something built externally). However, I'm open
>>>> to the idea if the usage changes and maybe there could be improvements to
>>>> the Flink metric API to achieve more seamless integration. And finally,
>>>> yes, the semantics are as such and gives reason to my response for question
>>>> 2.
>>>>
>>>> I've updated the doc with more context from my question responses, let
>>>> me know if there are more questions!
>>>>
>>>> Best,
>>>> Mason
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Aug 17, 2022 at 8:40 AM Maximilian Michels <m...@apache.org>
>>>> wrote:
>>>>
>>>>> Hey Mason,
>>>>>
>>>>> I just had a look at the FLIP. If I understand correctly, you are
>>>>> proposing a very sophisticated way to read from multiple Kafka
>>>>> clusters
>>>>> / topics.
>>>>>
>>>>> I'm wondering whether we can share some of the code of the existing
>>>>> KafkaSource. I suppose you don't want to modify KafkaSource itself to
>>>>> avoid any breakage. But it would be good not to duplicate too much
>>>>> code,
>>>>>   such that functionality that can be shared between the two
>>>>> implementations (e.g. the reader implementation).
>>>>>
>>>>> Some questions that I had when browsing the current version:
>>>>>
>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
>>>>>
>>>>> 1. Why does KafkaMetadataService have the `describeStream` method in
>>>>> addition to `getAllStreams`? This may be redundant. Is the idea to get
>>>>> the updated metadata for a particular StreamId?
>>>>>
>>>>> 2. KafkaMetaDataService#isClusterActive serves the purpose to check
>>>>> for
>>>>> activeness, couldn't this be included in the metadata of KafkaStream?
>>>>>
>>>>> 3. Shouldn't MetadataUpdateEvent contain a full list of KafkaStream
>>>>> instead of `Map<KafkaClusterIdentifier, Set<String>>`?
>>>>>
>>>>> 4. "In addition, restarting enumerators involve clearing outdated
>>>>> metrics" What metrics are we talking about here?
>>>>>
>>>>> 5. `StoppableKafkaEnumContextProxy` doesn't ring with me. How about
>>>>> `MultiKafkaSplitEnumeratorContext`?
>>>>>
>>>>> 6. What about the ConfigMap implementation? Are there any limitations
>>>>> on
>>>>> the type of configuration changes that we want to allow? For example,
>>>>> is
>>>>> it allowed to remove a cluster before it has been drained /
>>>>> deactivated?
>>>>> Is "not active" semantically identical to having the cluster / stream
>>>>> removed?
>>>>>
>>>>> This is an exciting new addition!
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On 11.08.22 10:10, Mason Chen wrote:
>>>>> > 5. At startup, GetMetadataUpdateEvent is also used to allow the
>>>>> > MultiClusterKafkaSourceReader to get the latest metadata from the
>>>>> > enumerator to filter out invalid splits This is how the reader can
>>>>> solve
>>>>> > "removing" splits/topics in the startup case.
>>>>> >
>>>>> > Sorry for the late response, really appreciate you taking a look at
>>>>> the
>>>>> > FLIP!
>>>>> >
>>>>> > Best,
>>>>> > Mason
>>>>> >
>>>>> > On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <mas.chen6...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> >> Hi Qingsheng,
>>>>> >>
>>>>> >> Thanks for the feedback--these are great points to raise.
>>>>> >>
>>>>> >> 1. This is something I missed that is now added. More generally, it
>>>>> can
>>>>> >> locate multiple topics in multiple clusters (1 topic on 1 cluster
>>>>> is the
>>>>> >> simplest case).
>>>>> >>
>>>>> >> 2. The KafkaMetadataService doesn't interact with the
>>>>> KafkaAdminClients.
>>>>> >> This source merely composes the functionality of the KafkaSource so
>>>>> >> KafkaAdminClient interaction is handled by the KafkaSubscriber.
>>>>> >>
>>>>> >> 3. There are no requirements for the two clusters--KafkaStream
>>>>> should
>>>>> >> clarify this question. For example, you could move from topicName1
>>>>> in
>>>>> >> cluster 1 with 11 partitions to topicName2 in cluster 2 with 22
>>>>> >> partitions--only the KafkaStream id needs to remain the same. If
>>>>> there are
>>>>> >> no offsets in checkpoint, the offsets are handled by the offsets
>>>>> >> initializer from KafkaSource and currently the design only exposes
>>>>> 1 option
>>>>> >> for all Kafka clusters, although this could be a valuable extension.
>>>>> >>
>>>>> >> 4. Regarding topic and cluster removal, metadata is checkpoint in
>>>>> state
>>>>> >> via the splits. Exactly once can be maintained with the assumption
>>>>> that
>>>>> >> required data from the dead cluster lives in the live cluster. This
>>>>> can be
>>>>> >> solved by not destroying the old Kafka cluster until consumers are
>>>>> already
>>>>> >> drained. In switchover, the consumer would consume from both old
>>>>> and new
>>>>> >> clusters. And finally, the metadata can be changed to point only to
>>>>> the new
>>>>> >> cluster when consumers are drained. With the regular KafkaSource,
>>>>> if Kafka
>>>>> >> deletes topic or a cluster is destroyed, the exactly once semantics
>>>>> are not
>>>>> >> preserved and the semantic is tightly coupled with storage. The
>>>>> design
>>>>> >> composes and delegates the responsibilities to KafkaSource
>>>>> components so it
>>>>> >> is limited to whatever KafkaSource can do for exactly once
>>>>> semantics.
>>>>> >>
>>>>> >> 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent was added
>>>>> to make
>>>>> >> the order of steps in reader restart during split assignment
>>>>> deterministic.
>>>>> >> StoppableKafkaEnumContextProxy are used by the underlying
>>>>> >> KafkaSourceEnumerator to assign splits and do topic periodic
>>>>> partition
>>>>> >> discovery. So, these scheduled thread pools need to be cleaned up
>>>>> properly
>>>>> >> and splits need to be wrapped with cluster information. These
>>>>> details are
>>>>> >> added to the FLIP.
>>>>> >>
>>>>> >> Best,
>>>>> >> Mason
>>>>> >>
>>>>> >> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <renqs...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >>> Hi Mason,
>>>>> >>>
>>>>> >>> Thank you for starting this FLIP!
>>>>> >>>
>>>>> >>>  From my first glance this FLIP looks like a collection of many new
>>>>> >>> interfaces, but I can’t stitch them together. It’ll be great to
>>>>> have some
>>>>> >>> brief descriptions about how the source works internally. Here are
>>>>> some
>>>>> >>> questions in my mind and please correct me if I misunderstand your
>>>>> design.
>>>>> >>>
>>>>> >>> 1. I can’t find the definition (in code) of KafkaStream. As a part
>>>>> of the
>>>>> >>> public interface KafkaMetadataService it has to be public too. If I
>>>>> >>> understand correctly it locates a topic on a specific cluster.
>>>>> >>>
>>>>> >>> 2. I think there should be a default implementation / example for
>>>>> >>> KafkaMetadataService for out-of-box usage, for example a wrapper of
>>>>> >>> multiple Kafka AdminClients that watching clusters periodically.
>>>>> >>>
>>>>> >>> 3. It looks like the source has the ability to handle Kafka cluster
>>>>> >>> failures, like switching connections to another cluster without
>>>>> restarting
>>>>> >>> the Flink job. Is there any requirement for the two clusters? For
>>>>> example
>>>>> >>> they have to be identical in topic names, number of partitions and
>>>>> offsets
>>>>> >>> etc.
>>>>> >>>
>>>>> >>> 4. Regarding topic and cluster removal, how to handle and recover
>>>>> from
>>>>> >>> checkpoint? Let’s say a topic is removed or migrated to another
>>>>> cluster
>>>>> >>> after a successful checkpoint. If the job tries to roll back to the
>>>>> >>> checkpoint which still contains the deleted topic or info of a dead
>>>>> >>> cluster, then how to keep the exactly-once semantic under this
>>>>> case?
>>>>> >>>
>>>>> >>> 5. I don’t quite get the design of StoppableKafkaEnumContextProxy
>>>>> and the
>>>>> >>> GetMeradataUpdateEvent. Could you elaborate more in the FLIP?
>>>>> >>>
>>>>> >>> In a nutshell I think the idea of this FLIP is good, which extends
>>>>> the
>>>>> >>> usage of Kafka source. However as a design doc, some details need
>>>>> to be
>>>>> >>> enriched for other users and developers to better understand how
>>>>> this
>>>>> >>> source works.
>>>>> >>>
>>>>> >>> Best,
>>>>> >>> Qingsheng
>>>>> >>>
>>>>> >>>> On Jul 21, 2022, at 01:35, Mason Chen <mas.chen6...@gmail.com>
>>>>> wrote:
>>>>> >>>>
>>>>> >>>> Hi all,
>>>>> >>>>
>>>>> >>>> We would like to start a discussion thread on FLIP-246: Multi
>>>>> Cluster
>>>>> >>> Kafka
>>>>> >>>> Source [1] where we propose to provide a source connector for
>>>>> >>> dynamically
>>>>> >>>> reading from Kafka multiple clusters, which will not require
>>>>> Flink job
>>>>> >>>> restart. This can greatly improve the Kafka migration experience
>>>>> for
>>>>> >>>> clusters and topics, and it solves some existing problems with the
>>>>> >>> current
>>>>> >>>> KafkaSource. There was some interest from users [2] from a meetup
>>>>> and
>>>>> >>> the
>>>>> >>>> mailing list. Looking forward to comments and feedback, thanks!
>>>>> >>>>
>>>>> >>>> [1]
>>>>> >>>>
>>>>> >>>
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
>>>>> >>>> [2]
>>>>> https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc
>>>>> >>>>
>>>>> >>>> Best,
>>>>> >>>> Mason
>>>>> >>>
>>>>> >>>
>>>>> >
>>>>>
>>>>

Reply via email to