5. At startup, GetMetadataUpdateEvent is also used to allow the
MultiClusterKafkaSourceReader to get the latest metadata from the
enumerator to filter out invalid splits This is how the reader can solve
"removing" splits/topics in the startup case.

Sorry for the late response, really appreciate you taking a look at the
FLIP!

Best,
Mason

On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <mas.chen6...@gmail.com> wrote:

> Hi Qingsheng,
>
> Thanks for the feedback--these are great points to raise.
>
> 1. This is something I missed that is now added. More generally, it can
> locate multiple topics in multiple clusters (1 topic on 1 cluster is the
> simplest case).
>
> 2. The KafkaMetadataService doesn't interact with the KafkaAdminClients.
> This source merely composes the functionality of the KafkaSource so
> KafkaAdminClient interaction is handled by the KafkaSubscriber.
>
> 3. There are no requirements for the two clusters--KafkaStream should
> clarify this question. For example, you could move from topicName1 in
> cluster 1 with 11 partitions to topicName2 in cluster 2 with 22
> partitions--only the KafkaStream id needs to remain the same. If there are
> no offsets in checkpoint, the offsets are handled by the offsets
> initializer from KafkaSource and currently the design only exposes 1 option
> for all Kafka clusters, although this could be a valuable extension.
>
> 4. Regarding topic and cluster removal, metadata is checkpoint in state
> via the splits. Exactly once can be maintained with the assumption that
> required data from the dead cluster lives in the live cluster. This can be
> solved by not destroying the old Kafka cluster until consumers are already
> drained. In switchover, the consumer would consume from both old and new
> clusters. And finally, the metadata can be changed to point only to the new
> cluster when consumers are drained. With the regular KafkaSource, if Kafka
> deletes topic or a cluster is destroyed, the exactly once semantics are not
> preserved and the semantic is tightly coupled with storage. The design
> composes and delegates the responsibilities to KafkaSource components so it
> is limited to whatever KafkaSource can do for exactly once semantics.
>
> 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent was added to make
> the order of steps in reader restart during split assignment deterministic.
> StoppableKafkaEnumContextProxy are used by the underlying
> KafkaSourceEnumerator to assign splits and do topic periodic partition
> discovery. So, these scheduled thread pools need to be cleaned up properly
> and splits need to be wrapped with cluster information. These details are
> added to the FLIP.
>
> Best,
> Mason
>
> On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <renqs...@gmail.com> wrote:
>
>> Hi Mason,
>>
>> Thank you for starting this FLIP!
>>
>> From my first glance this FLIP looks like a collection of many new
>> interfaces, but I can’t stitch them together. It’ll be great to have some
>> brief descriptions about how the source works internally. Here are some
>> questions in my mind and please correct me if I misunderstand your design.
>>
>> 1. I can’t find the definition (in code) of KafkaStream. As a part of the
>> public interface KafkaMetadataService it has to be public too. If I
>> understand correctly it locates a topic on a specific cluster.
>>
>> 2. I think there should be a default implementation / example for
>> KafkaMetadataService for out-of-box usage, for example a wrapper of
>> multiple Kafka AdminClients that watching clusters periodically.
>>
>> 3. It looks like the source has the ability to handle Kafka cluster
>> failures, like switching connections to another cluster without restarting
>> the Flink job. Is there any requirement for the two clusters? For example
>> they have to be identical in topic names, number of partitions and offsets
>> etc.
>>
>> 4. Regarding topic and cluster removal, how to handle and recover from
>> checkpoint? Let’s say a topic is removed or migrated to another cluster
>> after a successful checkpoint. If the job tries to roll back to the
>> checkpoint which still contains the deleted topic or info of a dead
>> cluster, then how to keep the exactly-once semantic under this case?
>>
>> 5. I don’t quite get the design of StoppableKafkaEnumContextProxy and the
>> GetMeradataUpdateEvent. Could you elaborate more in the FLIP?
>>
>> In a nutshell I think the idea of this FLIP is good, which extends the
>> usage of Kafka source. However as a design doc, some details need to be
>> enriched for other users and developers to better understand how this
>> source works.
>>
>> Best,
>> Qingsheng
>>
>> > On Jul 21, 2022, at 01:35, Mason Chen <mas.chen6...@gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > We would like to start a discussion thread on FLIP-246: Multi Cluster
>> Kafka
>> > Source [1] where we propose to provide a source connector for
>> dynamically
>> > reading from Kafka multiple clusters, which will not require Flink job
>> > restart. This can greatly improve the Kafka migration experience for
>> > clusters and topics, and it solves some existing problems with the
>> current
>> > KafkaSource. There was some interest from users [2] from a meetup and
>> the
>> > mailing list. Looking forward to comments and feedback, thanks!
>> >
>> > [1]
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
>> > [2] https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc
>> >
>> > Best,
>> > Mason
>>
>>

Reply via email to