5. At startup, GetMetadataUpdateEvent is also used to allow the MultiClusterKafkaSourceReader to get the latest metadata from the enumerator to filter out invalid splits This is how the reader can solve "removing" splits/topics in the startup case.
Sorry for the late response, really appreciate you taking a look at the FLIP! Best, Mason On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <mas.chen6...@gmail.com> wrote: > Hi Qingsheng, > > Thanks for the feedback--these are great points to raise. > > 1. This is something I missed that is now added. More generally, it can > locate multiple topics in multiple clusters (1 topic on 1 cluster is the > simplest case). > > 2. The KafkaMetadataService doesn't interact with the KafkaAdminClients. > This source merely composes the functionality of the KafkaSource so > KafkaAdminClient interaction is handled by the KafkaSubscriber. > > 3. There are no requirements for the two clusters--KafkaStream should > clarify this question. For example, you could move from topicName1 in > cluster 1 with 11 partitions to topicName2 in cluster 2 with 22 > partitions--only the KafkaStream id needs to remain the same. If there are > no offsets in checkpoint, the offsets are handled by the offsets > initializer from KafkaSource and currently the design only exposes 1 option > for all Kafka clusters, although this could be a valuable extension. > > 4. Regarding topic and cluster removal, metadata is checkpoint in state > via the splits. Exactly once can be maintained with the assumption that > required data from the dead cluster lives in the live cluster. This can be > solved by not destroying the old Kafka cluster until consumers are already > drained. In switchover, the consumer would consume from both old and new > clusters. And finally, the metadata can be changed to point only to the new > cluster when consumers are drained. With the regular KafkaSource, if Kafka > deletes topic or a cluster is destroyed, the exactly once semantics are not > preserved and the semantic is tightly coupled with storage. The design > composes and delegates the responsibilities to KafkaSource components so it > is limited to whatever KafkaSource can do for exactly once semantics. > > 5. Yes, I added more in the FLIP. GetMetadataUpdateEvent was added to make > the order of steps in reader restart during split assignment deterministic. > StoppableKafkaEnumContextProxy are used by the underlying > KafkaSourceEnumerator to assign splits and do topic periodic partition > discovery. So, these scheduled thread pools need to be cleaned up properly > and splits need to be wrapped with cluster information. These details are > added to the FLIP. > > Best, > Mason > > On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <renqs...@gmail.com> wrote: > >> Hi Mason, >> >> Thank you for starting this FLIP! >> >> From my first glance this FLIP looks like a collection of many new >> interfaces, but I can’t stitch them together. It’ll be great to have some >> brief descriptions about how the source works internally. Here are some >> questions in my mind and please correct me if I misunderstand your design. >> >> 1. I can’t find the definition (in code) of KafkaStream. As a part of the >> public interface KafkaMetadataService it has to be public too. If I >> understand correctly it locates a topic on a specific cluster. >> >> 2. I think there should be a default implementation / example for >> KafkaMetadataService for out-of-box usage, for example a wrapper of >> multiple Kafka AdminClients that watching clusters periodically. >> >> 3. It looks like the source has the ability to handle Kafka cluster >> failures, like switching connections to another cluster without restarting >> the Flink job. Is there any requirement for the two clusters? For example >> they have to be identical in topic names, number of partitions and offsets >> etc. >> >> 4. Regarding topic and cluster removal, how to handle and recover from >> checkpoint? Let’s say a topic is removed or migrated to another cluster >> after a successful checkpoint. If the job tries to roll back to the >> checkpoint which still contains the deleted topic or info of a dead >> cluster, then how to keep the exactly-once semantic under this case? >> >> 5. I don’t quite get the design of StoppableKafkaEnumContextProxy and the >> GetMeradataUpdateEvent. Could you elaborate more in the FLIP? >> >> In a nutshell I think the idea of this FLIP is good, which extends the >> usage of Kafka source. However as a design doc, some details need to be >> enriched for other users and developers to better understand how this >> source works. >> >> Best, >> Qingsheng >> >> > On Jul 21, 2022, at 01:35, Mason Chen <mas.chen6...@gmail.com> wrote: >> > >> > Hi all, >> > >> > We would like to start a discussion thread on FLIP-246: Multi Cluster >> Kafka >> > Source [1] where we propose to provide a source connector for >> dynamically >> > reading from Kafka multiple clusters, which will not require Flink job >> > restart. This can greatly improve the Kafka migration experience for >> > clusters and topics, and it solves some existing problems with the >> current >> > KafkaSource. There was some interest from users [2] from a meetup and >> the >> > mailing list. Looking forward to comments and feedback, thanks! >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source >> > [2] https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc >> > >> > Best, >> > Mason >> >>