I updated the KIP On Thu, Apr 29, 2021 at 4:43 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
> Sure, this would make it easier, we can make these functions returns the > original behaviour (<clusterAlias>.checkpoints.internal, > "mm2-offset-syncs.<clusterAlias>.internal", heartbeat) without any > customisation using `replication.policy.separator` and use the separator in > the DefaultReplicationPolicy > > On Wed, Apr 28, 2021 at 1:31 AM Ryanne Dolan <ryannedo...@gmail.com> > wrote: > >> Thanks Omnia, makes sense to me. >> >> > Customers who have their customised ReplicationPolicy will need to add >> the definition of their internal topics naming convention >> >> I wonder should we include default impls in the interface to avoid that >> requirement? >> >> Ryanne >> >> On Sun, Apr 25, 2021, 2:20 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >> wrote: >> >>> Hi Mickael and Ryanne, >>> I updated the KIP to add these methods to the ReplicationPolicy instead >>> of an extra interface to simplify the changes. Please have a look and let >>> me know your thoughts. >>> >>> Thanks >>> >>> On Tue, Mar 30, 2021 at 7:19 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>> wrote: >>> >>>> *(sorry forgot to Replay to All) * >>>> Hi Ryanne, >>>> It's a valid concern, I was trying to separate the concerns of internal >>>> and replicated policy away from each other and to make the code readable as >>>> extending ReplicationPolicy to manage both internal and replicated topic is >>>> a bit odd. Am not against simplifying things out to make ReplicationPolicy >>>> handling both at the end of the day if an MM2 user has a special naming >>>> convention for topics it will be affecting both replicated and MM2 internal >>>> topics. >>>> >>>> For simplifying things we can extend `ReplicationPolicy` to the >>>> following instead of adding an extra class >>>> >>>>> *public interface ReplicationPolicy {* >>>>> String topicSource(String topic); >>>>> String upstreamTopic(String topic); >>>>> >>>>> >>>>> */** Returns heartbeats topic name.*/ String heartbeatsTopic();* >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> * /** Returns the offset-syncs topic for given cluster alias. */ >>>>> String offsetSyncTopic(String targetAlias); /** Returns the name >>>>> checkpoint topic for given cluster alias. */ String >>>>> checkpointTopic(String sourceAlias); * >>>>> >>>>> default String originalTopic(String topic) { >>>>> String upstream = upstreamTopic(topic); >>>>> if (upstream == null) { >>>>> return topic; >>>>> } else { >>>>> return originalTopic(upstream); >>>>> } >>>>> } >>>>> >>>>> >>>>> * /** Internal topics are never replicated. */ >>>>> isInternalTopic(String topic) *//the implementaion will be moved to >>>>> `DefaultReplicationPolicy` to handle both kafka topics and MM2 internal >>>>> topics. >>>>> } >>>>> >>>> >>>> On Fri, Mar 26, 2021 at 3:05 PM Ryanne Dolan <ryannedo...@gmail.com> >>>> wrote: >>>> >>>>> Omnia, have we considered just adding methods to ReplicationPolicy? >>>>> I'm reluctant to add a new class because, as Mickael points out, we'd need >>>>> to carry it around in client code. >>>>> >>>>> Ryanne >>>>> >>>>> On Fri, Feb 19, 2021 at 8:31 AM Mickael Maison < >>>>> mickael.mai...@gmail.com> wrote: >>>>> >>>>>> Hi Omnia, >>>>>> >>>>>> Thanks for the clarifications. >>>>>> >>>>>> - I'm still a bit uneasy with the overlap between these 2 methods as >>>>>> currently `ReplicationPolicy.isInternalTopic` already handles MM2 >>>>>> internal topics. Should we make it only handle Kafka internal topics >>>>>> and `isMM2InternalTopic()` only handle MM2 topics? >>>>>> >>>>>> - I'm not sure I understand what this method is used for. There are no >>>>>> such methods for the other 2 topics (offset-sync and heartbeat). Also >>>>>> what happens if there are other MM2 instances using different naming >>>>>> schemes in the same cluster. Do all instances have to know about the >>>>>> other naming schemes? What are the expected issues if they don't? >>>>>> >>>>>> - RemoteClusterUtils is a client-side utility so it does not have >>>>>> access to the MM2 configuration. Since this new API can affect the >>>>>> name of the checkpoint topic, it will need to be used client-side too >>>>>> so users can find the checkpoint topic name. I had to realized this >>>>>> was the case. >>>>>> >>>>>> Thanks >>>>>> >>>>>> On Mon, Feb 15, 2021 at 9:33 PM Omnia Ibrahim < >>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>> > >>>>>> > Hi Mickael, did you have some time to check my answer? >>>>>> > >>>>>> > On Thu, Jan 21, 2021 at 10:10 PM Omnia Ibrahim < >>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>> >> >>>>>> >> Hi Mickael, >>>>>> >> Thanks for taking another look into the KIP, regards your questions >>>>>> >> >>>>>> >> - I believe we need both "isMM2InternalTopic" and >>>>>> `ReplicationPolicy.isInternalTopic` as >>>>>> `ReplicationPolicy.isInternalTopic` >>>>>> does check if a topic is Kafka internal topic, while `isMM2InternalTopic` >>>>>> is just focusing if a topic is MM2 internal topic or not(which is >>>>>> heartbeat/checkpoint/offset-sync). The fact that the default for MM2 >>>>>> internal topics matches "ReplicationPolicy.isInternalTopic" will not be >>>>>> an >>>>>> accurate assumption anymore once we implement this KIP. >>>>>> >> >>>>>> >> - "isCheckpointTopic" will detect all checkpoint topics for all >>>>>> MM2 instances this is needed for "MirrorClient.checkpointTopics" which >>>>>> originally check if the topic name ends with CHECKPOINTS_TOPIC_SUFFIX. So >>>>>> this method just to keep the same functionality that originally exists in >>>>>> MM2 >>>>>> >> >>>>>> >> - "checkpointTopic" is used in two places 1. At topic creation in >>>>>> "MirrorCheckpointConnector.createInternalTopics" which use >>>>>> "sourceClusterAlias() + CHECKPOINTS_TOPIC_SUFFIX" and 2. At >>>>>> "MirrorClient.remoteConsumerOffsets" which is called by >>>>>> "RemoteClusterUtils.translateOffsets" the cluster alias here referred to >>>>>> as "remoteCluster" where the topic name is "remoteClusterAlias + >>>>>> CHECKPOINTS_TOPIC_SUFFIX" (which is an argument in RemoteClusterUtils, >>>>>> not >>>>>> a config) This why I called the variable cluster instead of source and >>>>>> instead of using the config to figure out the cluster aliases from config >>>>>> as we use checkpoints to keep `RemoteClusterUtils` compatible for >>>>>> existing >>>>>> users. I see a benefit of just read the config a find out the cluster >>>>>> aliases but on the other side, I'm not sure why "RemoteClusterUtils" >>>>>> doesn't get the name of the cluster from the properties instead of an >>>>>> argument, so I decided to keep it just for compatibility. >>>>>> >> >>>>>> >> Hope these answer some of your concerns. >>>>>> >> Best >>>>>> >> Omnia >>>>>> >> >>>>>> >> On Thu, Jan 21, 2021 at 3:37 PM Mickael Maison < >>>>>> mickael.mai...@gmail.com> wrote: >>>>>> >>> >>>>>> >>> Hi Omnia, >>>>>> >>> >>>>>> >>> Thanks for the updates. Sorry for the delay but I have a few more >>>>>> >>> small questions about the API: >>>>>> >>> - Do we really need "isMM2InternalTopic()"? There's already >>>>>> >>> "ReplicationPolicy.isInternalTopic()". If so, we need to explain >>>>>> the >>>>>> >>> difference between these 2 methods. >>>>>> >>> >>>>>> >>> - Is "isCheckpointTopic()" expected to detect all checkpoint >>>>>> topics >>>>>> >>> (for all MM2 instances) or only the ones for this connector >>>>>> instance. >>>>>> >>> If it's the later, I wonder if we could do without the method. As >>>>>> this >>>>>> >>> interface is only called by MM2, we could first call >>>>>> >>> "checkpointTopic()" and check if that's equal to the topic we're >>>>>> >>> checking. If it's the former, we don't really know topic names >>>>>> other >>>>>> >>> MM2 instances may be using! >>>>>> >>> >>>>>> >>> - The 3 methods returning topic names have different APIs: >>>>>> >>> "heartbeatsTopic()" takes no arguments, "offsetSyncTopic()" takes >>>>>> the >>>>>> >>> target cluster alias and "checkpointTopic()" takes "clusterAlias" >>>>>> >>> (which one is it? source or target?). As the interface extends >>>>>> >>> Configurable, maybe we could get rid of all the arguments and use >>>>>> the >>>>>> >>> config to find the cluster aliases. WDYT? >>>>>> >>> >>>>>> >>> These are minor concerns, just making sure I fully understand how >>>>>> the >>>>>> >>> API is expected to be used. Once these are cleared, I'll be happy >>>>>> to >>>>>> >>> vote for this KIP. >>>>>> >>> >>>>>> >>> Thanks >>>>>> >>> >>>>>> >>> On Fri, Jan 8, 2021 at 12:06 PM Omnia Ibrahim < >>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>> >>> > >>>>>> >>> > Hi Mickael, >>>>>> >>> > Did you get time to review the changes to the KIP? If you okay >>>>>> with it could you vote for the KIP here ttps:// >>>>>> www.mail-archive.com/dev@kafka.apache.org/msg113575.html? >>>>>> >>> > Thanks >>>>>> >>> > >>>>>> >>> > On Thu, Dec 10, 2020 at 2:19 PM Omnia Ibrahim < >>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>> >>> >> >>>>>> >>> >> Hi Mickael, >>>>>> >>> >> 1) That's right the interface and default implementation will >>>>>> in mirror-connect >>>>>> >>> >> 2) Renaming the interface should be fine too especially if you >>>>>> planning to move other functionality related to the creation there, I can >>>>>> edit this >>>>>> >>> >> >>>>>> >>> >> if you are okay with that please vote for the KIP here >>>>>> https://www.mail-archive.com/dev@kafka.apache.org/msg113575.html >>>>>> >>> >> >>>>>> >>> >> >>>>>> >>> >> Thanks >>>>>> >>> >> Omnia >>>>>> >>> >> On Thu, Dec 10, 2020 at 12:58 PM Mickael Maison < >>>>>> mickael.mai...@gmail.com> wrote: >>>>>> >>> >>> >>>>>> >>> >>> Hi Omnia, >>>>>> >>> >>> >>>>>> >>> >>> Thank you for the reply, it makes sense. >>>>>> >>> >>> >>>>>> >>> >>> A couple more comments: >>>>>> >>> >>> >>>>>> >>> >>> 1) I'm assuming the new interface and default implementation >>>>>> will be >>>>>> >>> >>> in the mirror-client project? as the names of some of these >>>>>> topics are >>>>>> >>> >>> needed by RemoteClusterUtils on the client-side. >>>>>> >>> >>> >>>>>> >>> >>> 2) I'm about to open a KIP to specify where the offset-syncs >>>>>> topic is >>>>>> >>> >>> created by MM2. In restricted environments, we'd prefer MM2 >>>>>> to only >>>>>> >>> >>> have read access to the source cluster and have the >>>>>> offset-syncs on >>>>>> >>> >>> the target cluster. I think allowing to specify the cluster >>>>>> where to >>>>>> >>> >>> create that topic would be a natural extension of the >>>>>> interface you >>>>>> >>> >>> propose here. >>>>>> >>> >>> >>>>>> >>> >>> So I wonder if your interface could be named >>>>>> InternalTopicsPolicy? >>>>>> >>> >>> That's a bit more generic than InternalTopicNamingPolicy. >>>>>> That would >>>>>> >>> >>> also match the configuration setting, >>>>>> internal.topics.policy.class, >>>>>> >>> >>> you're proposing. >>>>>> >>> >>> >>>>>> >>> >>> Thanks >>>>>> >>> >>> >>>>>> >>> >>> On Thu, Dec 3, 2020 at 10:15 PM Omnia Ibrahim < >>>>>> o.g.h.ibra...@gmail.com> wrote: >>>>>> >>> >>> > >>>>>> >>> >>> > Hi Mickael, >>>>>> >>> >>> > Thanks for your feedback! >>>>>> >>> >>> > Regards your question about having more configurations, I >>>>>> considered adding >>>>>> >>> >>> > configuration per each topic however this meant adding more >>>>>> configurations >>>>>> >>> >>> > for MM2 which already have so many, also the more >>>>>> complicated and advanced >>>>>> >>> >>> > replication pattern you have between clusters the more >>>>>> configuration lines >>>>>> >>> >>> > will be added to your MM2 config which isn't going to be >>>>>> pretty if you >>>>>> >>> >>> > don't have the same topics names across your clusters. >>>>>> >>> >>> > >>>>>> >>> >>> > Also, it added more complexity to the implementation as MM2 >>>>>> need to >>>>>> >>> >>> > 1- identify if a topic is checkpoints so we could list the >>>>>> checkpoints >>>>>> >>> >>> > topics in MirrorMaker 2 utils as one cluster could have X >>>>>> numbers >>>>>> >>> >>> > checkpoints topics if it's connected to X clusters, this is >>>>>> done right now >>>>>> >>> >>> > by listing any topic with suffix `.checkpoints.internal`. >>>>>> This could be >>>>>> >>> >>> > done by add `checkpoints.topic.suffix` config but this >>>>>> would make an >>>>>> >>> >>> > assumption that checkpoints will always have a suffix also >>>>>> having a suffix >>>>>> >>> >>> > means that we may need a separator as well to concatenate >>>>>> this suffix with >>>>>> >>> >>> > a prefix to identify source cluster name. >>>>>> >>> >>> > 2- identify if a topic is internal, so it shouldn't be >>>>>> replicated or track >>>>>> >>> >>> > checkpoints for it, right now this is relaying on disallow >>>>>> topics with >>>>>> >>> >>> > `.internal` suffix to be not replicated and not tracked in >>>>>> checkpoints but >>>>>> >>> >>> > with making topics configurable we need a way to define >>>>>> what is an internal >>>>>> >>> >>> > topic. This could be done by making using a list of all >>>>>> internal topics >>>>>> >>> >>> > have been entered to the configuration. >>>>>> >>> >>> > >>>>>> >>> >>> > So having an interface seemed easier and also give more >>>>>> flexibility for >>>>>> >>> >>> > users to define their own topics name, define what is >>>>>> internal topic means, >>>>>> >>> >>> > how to find checkpoints topics and it will be one line >>>>>> config for each >>>>>> >>> >>> > herder, also it more consistence with MM2 code as MM2 >>>>>> config have >>>>>> >>> >>> > TopicFilter, ReplicationPolicy, GroupFilter, etc as >>>>>> interface and they can >>>>>> >>> >>> > be overridden by providing a custom implementation for them >>>>>> or have some >>>>>> >>> >>> > config that change their default implementations. >>>>>> >>> >>> > >>>>>> >>> >>> > Hope this answer your question. I also updated the KIP to >>>>>> add this to the >>>>>> >>> >>> > rejected solutions. >>>>>> >>> >>> > >>>>>> >>> >>> > >>>>>> >>> >>> > On Thu, Dec 3, 2020 at 3:19 PM Mickael Maison < >>>>>> mickael.mai...@gmail.com> >>>>>> >>> >>> > wrote: >>>>>> >>> >>> > >>>>>> >>> >>> > > Hi Omnia, >>>>>> >>> >>> > > >>>>>> >>> >>> > > Thanks for the KIP. Indeed being able to configure MM2's >>>>>> internal >>>>>> >>> >>> > > topic names would be a nice improvement. >>>>>> >>> >>> > > >>>>>> >>> >>> > > Looking at the KIP, I was surprised you propose an >>>>>> interface to allow >>>>>> >>> >>> > > users to specify names. Have you considered making names >>>>>> changeable >>>>>> >>> >>> > > via configurations? If so, we should definitely mention >>>>>> it in the >>>>>> >>> >>> > > rejected alternatives as it's the first method that comes >>>>>> to mind. >>>>>> >>> >>> > > >>>>>> >>> >>> > > I understand an interface gives a lot of flexibility but >>>>>> I'd expect >>>>>> >>> >>> > > topic names to be relatively simple and known in advance >>>>>> in most >>>>>> >>> >>> > > cases. >>>>>> >>> >>> > > >>>>>> >>> >>> > > I've not checked all use cases but something like below >>>>>> felt appropriate: >>>>>> >>> >>> > > clusters = primary,backup >>>>>> >>> >>> > > >>>>>> primary->backup.offsets-sync.topic=backup.mytopic-offsets-sync >>>>>> >>> >>> > > >>>>>> >>> >>> > > On Tue, Dec 1, 2020 at 3:36 PM Omnia Ibrahim < >>>>>> o.g.h.ibra...@gmail.com> >>>>>> >>> >>> > > wrote: >>>>>> >>> >>> > > > >>>>>> >>> >>> > > > Hey everyone, >>>>>> >>> >>> > > > Please take a look at KIP-690: >>>>>> >>> >>> > > > >>>>>> >>> >>> > > > >>>>>> >>> >>> > > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-690%3A+Add+additional+configuration+to+control+MirrorMaker+2+internal+topics+naming+convention >>>>>> >>> >>> > > > >>>>>> >>> >>> > > > Thanks for your feedback and support. >>>>>> >>> >>> > > > >>>>>> >>> >>> > > > Omnia >>>>>> >>> >>> > > > >>>>>> >>> >>> > > >>>>>> >>>>>