Hey Mason,

I just had a look at the FLIP. If I understand correctly, you are proposing a very sophisticated way to read from multiple Kafka clusters / topics.

I'm wondering whether we can share some of the code of the existing KafkaSource. I suppose you don't want to modify KafkaSource itself to avoid any breakage. But it would be good not to duplicate too much code, such that functionality that can be shared between the two implementations (e.g. the reader implementation).

Some questions that I had when browsing the current version: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320

1. Why does KafkaMetadataService have the `describeStream` method in addition to `getAllStreams`? This may be redundant. Is the idea to get the updated metadata for a particular StreamId?

2. KafkaMetaDataService#isClusterActive serves the purpose to check for activeness, couldn't this be included in the metadata of KafkaStream?

3. Shouldn't MetadataUpdateEvent contain a full list of KafkaStream instead of `Map<KafkaClusterIdentifier, Set<String>>`?

4. "In addition, restarting enumerators involve clearing outdated metrics" What metrics are we talking about here?

5. `StoppableKafkaEnumContextProxy` doesn't ring with me. How about `MultiKafkaSplitEnumeratorContext`?

6. What about the ConfigMap implementation? Are there any limitations on the type of configuration changes that we want to allow? For example, is it allowed to remove a cluster before it has been drained / deactivated? Is "not active" semantically identical to having the cluster / stream removed?

This is an exciting new addition!

Cheers,
Max

On 11.08.22 10:10, Mason Chen wrote:
5. At startup, GetMetadataUpdateEvent is also used to allow the
MultiClusterKafkaSourceReader to get the latest metadata from the
enumerator to filter out invalid splits This is how the reader can solve
"removing" splits/topics in the startup case.

Sorry for the late response, really appreciate you taking a look at the
FLIP!

Best,
Mason

On Thu, Aug 11, 2022 at 1:03 AM Mason Chen <mas.chen6...@gmail.com> wrote:

Hi Qingsheng,

Thanks for the feedback--these are great points to raise.

1. This is something I missed that is now added. More generally, it can
locate multiple topics in multiple clusters (1 topic on 1 cluster is the
simplest case).

2. The KafkaMetadataService doesn't interact with the KafkaAdminClients.
This source merely composes the functionality of the KafkaSource so
KafkaAdminClient interaction is handled by the KafkaSubscriber.

3. There are no requirements for the two clusters--KafkaStream should
clarify this question. For example, you could move from topicName1 in
cluster 1 with 11 partitions to topicName2 in cluster 2 with 22
partitions--only the KafkaStream id needs to remain the same. If there are
no offsets in checkpoint, the offsets are handled by the offsets
initializer from KafkaSource and currently the design only exposes 1 option
for all Kafka clusters, although this could be a valuable extension.

4. Regarding topic and cluster removal, metadata is checkpoint in state
via the splits. Exactly once can be maintained with the assumption that
required data from the dead cluster lives in the live cluster. This can be
solved by not destroying the old Kafka cluster until consumers are already
drained. In switchover, the consumer would consume from both old and new
clusters. And finally, the metadata can be changed to point only to the new
cluster when consumers are drained. With the regular KafkaSource, if Kafka
deletes topic or a cluster is destroyed, the exactly once semantics are not
preserved and the semantic is tightly coupled with storage. The design
composes and delegates the responsibilities to KafkaSource components so it
is limited to whatever KafkaSource can do for exactly once semantics.

5. Yes, I added more in the FLIP. GetMetadataUpdateEvent was added to make
the order of steps in reader restart during split assignment deterministic.
StoppableKafkaEnumContextProxy are used by the underlying
KafkaSourceEnumerator to assign splits and do topic periodic partition
discovery. So, these scheduled thread pools need to be cleaned up properly
and splits need to be wrapped with cluster information. These details are
added to the FLIP.

Best,
Mason

On Fri, Jul 29, 2022 at 1:38 AM Qingsheng Ren <renqs...@gmail.com> wrote:

Hi Mason,

Thank you for starting this FLIP!

 From my first glance this FLIP looks like a collection of many new
interfaces, but I can’t stitch them together. It’ll be great to have some
brief descriptions about how the source works internally. Here are some
questions in my mind and please correct me if I misunderstand your design.

1. I can’t find the definition (in code) of KafkaStream. As a part of the
public interface KafkaMetadataService it has to be public too. If I
understand correctly it locates a topic on a specific cluster.

2. I think there should be a default implementation / example for
KafkaMetadataService for out-of-box usage, for example a wrapper of
multiple Kafka AdminClients that watching clusters periodically.

3. It looks like the source has the ability to handle Kafka cluster
failures, like switching connections to another cluster without restarting
the Flink job. Is there any requirement for the two clusters? For example
they have to be identical in topic names, number of partitions and offsets
etc.

4. Regarding topic and cluster removal, how to handle and recover from
checkpoint? Let’s say a topic is removed or migrated to another cluster
after a successful checkpoint. If the job tries to roll back to the
checkpoint which still contains the deleted topic or info of a dead
cluster, then how to keep the exactly-once semantic under this case?

5. I don’t quite get the design of StoppableKafkaEnumContextProxy and the
GetMeradataUpdateEvent. Could you elaborate more in the FLIP?

In a nutshell I think the idea of this FLIP is good, which extends the
usage of Kafka source. However as a design doc, some details need to be
enriched for other users and developers to better understand how this
source works.

Best,
Qingsheng

On Jul 21, 2022, at 01:35, Mason Chen <mas.chen6...@gmail.com> wrote:

Hi all,

We would like to start a discussion thread on FLIP-246: Multi Cluster
Kafka
Source [1] where we propose to provide a source connector for
dynamically
reading from Kafka multiple clusters, which will not require Flink job
restart. This can greatly improve the Kafka migration experience for
clusters and topics, and it solves some existing problems with the
current
KafkaSource. There was some interest from users [2] from a meetup and
the
mailing list. Looking forward to comments and feedback, thanks!

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
[2] https://lists.apache.org/thread/zmpnzx6jjsqc0oldvdm5y2n674xzc3jc

Best,
Mason



Reply via email to