Hi Mickael Thank you for your comments!
IIUC it seems this targets active/active environments, so bidirectional > mirroring, A -> B and B -> A running at the same time. Yes, that is correct. clients expected to switch between both tend to mostly use to local topics. I don't really understand this point: why replicate between clusters if the replicated data is ignored, even on failover? Also I've seen the exact opposite, that based on message ordering guarantees, consumers do use local and remote topics (either one after the other, or both at the same time). If cluster A goes offline, clients connect to cluster B and produce and > consume again from the local topic T. Going back to the previous point, it seems to me that a step is missing from here - what happens with the data replicated from A to B? Consumers might not be fully caught up with those messages, so they should consume from the remote topic. Then once your consumers have reach the end of the local topic T on cluster > B, you move them back to cluster A where they restart consuming from the > local topic T which should have the correct last committed data. So this is the part where I've seen different asks/requirements from users. Assuming that the consumers do read from remote topics, it can happen that some messages were consumed from the remote topic in B instead of from the local topic in A. On failback, they will be processed again, which depending on the client application can be a costly thing. Again what I've seen is that clients that use remote topics tend to stick > to a single cluster. For example they are on cluster A and can consume both > topics T and B.T. What happens to these clients when cluster A goes down? Have you tried implementing it? I have a draft PR: https://github.com/apache/kafka/pull/17593 I reused the existing code, and only needed to swap the upstream/downstream offsets in the offset sync store to be able to perform the reverse lookup. There is a corresponding small section for mapping the topic names back to the upstream name, but also using the topic filter (see more on that later). I think filtering by group would be more natural. We could introduce a group filter to limit which groups can get reverse checkpointed. My thinking was that if the feature is enabled (reverse.checkpointing.enabled=true), then all groups are eligible to be reverse checkpointed. If users need finer control over the reverse checkpointing, I'll gladly add it, but then it will be a more complex change, as the MirrorCheckpointConnector will need to distribute "reverse checkpointable groups" among the tasks as a separate config. why did you choose topics for the filter The topic filter has a different goal, it is specifically added to support replication flows where the policy is incapable of deciding if a topic is a replica topic (i.e. IdentityReplicationPolicy). For most use-cases, with the DefaultReplicationPolicy, the topic name can be used to easily decide if the topic was replicated from the target cluster of the flow, and thus eligible for reverse mappings. In case the topic names do not help, users should be able to configure which topics need to be considered for reverse checkpointing. I've seen some use-cases where users basically implemented their own name prefixing policy without the use of DefaultReplicationPolicy by using IdentityRP + manual topic name prefixes + topic filters. The topic filter allows the use of the reverse checkpointing feature in such use-cases. Thank you, Daniel Mickael Maison <mickael.mai...@gmail.com> ezt írta (időpont: 2025. márc. 28., P, 15:52): > Hi Daniel, > > I think I roughly understand what you're trying to do, but I'm still a > bit confused. > IIUC it seems this targets active/active environments, so > bidirectional mirroring, A -> B and B -> A running at the same time. > > In my experience with these environments, clients expected to switch > between both tend to mostly use to local topics. So while cluster A is > up, clients produce and consume from the local topic T. If cluster A > goes offline, clients connect to cluster B and produce and consume > again from the local topic T. When cluster A is back and you want to > fail back to it, you first switch your producers, so they start > producing to the local topic T on cluster A. Then once your consumers > have reach the end of the local topic T on cluster B, you move them > back to cluster A where they restart consuming from the local topic T > which should have the correct last committed data. > > Again what I've seen is that clients that use remote topics tend to > stick to a single cluster. For example they are on cluster A and can > consume both topics T and B.T. > > The offset translation logic is already pretty complicated and not > without issues. I'm worried adding reverse checkpointing in the mix > will make this even more complicated. Have you tried implementing it? > The last section in Proposed Changes looks like it also won't be > trivial for users as there's quite a few conditions that need to be > satisfied for this to work. > Another question I have is why did you choose topics for the filter. > Since it's a MirrorCheckpoint feature, I think filtering by group > would be more natural. > > Thanks, > Mickael > > On Tue, Nov 26, 2024 at 9:12 AM Viktor Somogyi-Vass > <viktor.somo...@cloudera.com.invalid> wrote: > > > > Hi Daniel, > > > > SVV3. I think we can leave it as it is for now. I can't really figure out > > the perfect solution for this either. Introducing yet another interface > > doesn't seem like what we should do here and since the filter > > implementations can be reused, it has some benefits. > > I have no further questions. If we don't see any more responses in the > next > > few days, I think you can start a vote. > > > > Best, > > Viktor > > > > On Mon, Nov 25, 2024 at 11:26 AM Dániel Urbán <urb.dani...@gmail.com> > wrote: > > > > > Hi VIktor, > > > > > > SVV3. I think that ReplicationPolicy is all about naming topics, and > not > > > about what should be included in the replication flow - the *Filter > plugins > > > have the responsibility to decide about including/excluding resources. > > > Additionally, pushing this responsibility to the policy would lead to > > > duplicates, as the generic logic outlined in the KIP would be instead > > > implemented by each ReplicationPolicy implementation. > > > I think that reusing the TopicFilter has a lot of benefits (e.g. > existing > > > TopicFilter implementations can be easily reused in this feature), but > if > > > we worry about the clarity around the interface, we can introduce a > > > dedicated interface. So instead of extending the existing TopicFilter, > we > > > can add a new interface called ReverseCheckpontedTopicFilter, and the > > > DefaultTopicFilter can implement the new interface, too. Only drawback > is > > > that existing TopicFilter implementations will need an update to be > usable > > > in the reverse checkpointing context. Wdyt? > > > > > > Thanks, > > > Daniel > > > > > > Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt írta > > > (időpont: 2024. nov. 22., P, 15:23): > > > > > > > Hey Daniel, > > > > > > > > SVV3. Thanks for the explanation. I don't think there is any > preference > > > in > > > > terms of null values vs defaults. Regarding the TopicFilter, I still > have > > > > some concerns as "shouldReplicateTopic" could be a little bit > misleading > > > in > > > > terms of wording. Just modifying my SVV4 point, have you thought of > just > > > > extending the ReplicationPolicy with a shouldReverseCheckpoint > method? > > > This > > > > would encapsulate the default behavior with DefaultReplicationPolicy. > > > Also, > > > > I think it would be nice to provide an implementation for Identity as > > > well > > > > since we support that replication policy too. What do you think? > > > > > > > > Best, > > > > Viktor > > > > > > > > On Wed, Nov 20, 2024 at 12:06 PM Dániel Urbán <urb.dani...@gmail.com > > > > > > wrote: > > > > > > > > > Hi everyone, > > > > > Bumping this - I think this gap affects the lives of many MM2 > users, > > > > would > > > > > be great to fill it. Any comments are welcome. > > > > > TIA > > > > > Daniel > > > > > > > > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: 2024. nov. > > > 15., > > > > P, > > > > > 13:05): > > > > > > > > > > > Hi Vidor, > > > > > > > > > > > > KV2 and KV3. I think it is a common setup, but it is usually > > > described > > > > as > > > > > > separating the produce and the consume workload. Instead of > "prod" > > > and > > > > > > "backup" think of "produce cluster" and "consume cluster". > Currently, > > > > > even > > > > > > if a group does not exist in the target cluster, both > checkpointing > > > and > > > > > > group offset syncing still happens. As for the setup you > mentioned, > > > if > > > > > the > > > > > > topic replication is unidirectional, there will be no > checkpointing > > > > > > happening inside B->A. With the current proposal, reverse > > > checkpointing > > > > > > could still occur if enabled. My issue with this example is that > it > > > > > sounds > > > > > > like the topic replication is unidirectional - if that is the > case, > > > > > > checkpointing in general doesn't make sense in B->A, the groups > > > filter > > > > > will > > > > > > only apply to reverse checkpointing, and have no effect on > > > > checkpointing > > > > > > (as there are no topics replicated in B->A). I'm not strongly > against > > > > > > adding a separate reverse groups filter, but I don't really see > the > > > > > > use-case where it would make sense. > > > > > > > > > > > > Thanks, > > > > > > Daniel > > > > > > > > > > > > Vidor Kanalas <vidor.kana...@gmail.com> ezt írta (időpont: 2024. > > > nov. > > > > > > 15., P, 10:36): > > > > > > > > > > > >> Hi Daniel, > > > > > >> > > > > > >> KV1. Thanks for the rundown, good to know that the impact is not > > > > > >> concerning, I agree with the optimization not worth the effort > > > > > >> > > > > > >> KV2 and KV3. Here’s the setup I was thinking about. Suppose we > have > > > a > > > > > >> topic on a prod cluster A that is replicated to a backup > cluster B. > > > > > There > > > > > >> is a CG that is working through the messages on the backup > cluster, > > > > > before > > > > > >> it’s promoted to the prod cluster. In this case that CG does not > > > exist > > > > > on > > > > > >> cluster A, and it won’t be checkpointed (obviously), but it’s > not > > > > clear > > > > > to > > > > > >> me if it will be reverse checkpointed. > > > > > >> I’m not certain if the above setup is an actual real-world use > case, > > > > but > > > > > >> if it is, we need to make sure that CGs can get reverse > checkpointed > > > > > even > > > > > >> if they initially don’t exist on the cluster. (it’s not a > > > traditional > > > > > >> failover + failback scenario). This is why I was thinking that a > > > > reverse > > > > > >> checkpointing group filter could be useful, but I agree that the > > > same > > > > > can > > > > > >> be achieved with the existing filter. > > > > > >> > > > > > >> Best, > > > > > >> Vidor > > > > > >> > > > > > >> > > > > > >> From: Dániel Urbán <urb.dani...@gmail.com> > > > > > >> Date: Friday, 15 November 2024 at 09:35 > > > > > >> To: dev@kafka.apache.org <dev@kafka.apache.org> > > > > > >> Subject: Re: [DISCUSS] KIP-1098: Reverse Checkpointing in > > > MirrorMaker > > > > > >> Hi Viktor, > > > > > >> > > > > > >> SVV3. In the current proposal, if the TopicFilter is not > provided, > > > it > > > > > >> enables a different logic for reverse checkpointing - the task > > > relies > > > > on > > > > > >> the ReplicationPolicy to detect if a topic in the source > cluster is > > > a > > > > > >> replica originating from the target cluster, e.g. in the A->B > flow, > > > > the > > > > > >> topic "B.T" in cluster A originates from B, and thus can be > reverse > > > > > >> checkpointed. This should work fine as long as the > ReplicationPolicy > > > > can > > > > > >> reliably tell the source cluster of a topic. For other > > > > ReplicationPolicy > > > > > >> implementations (e.g. Identity), we cannot rely on the policy, > and > > > in > > > > > that > > > > > >> case, the TopicFilter takes over. If we have a default > TopicFIlter, > > > > then > > > > > >> we > > > > > >> need another flag to tell the task if it should rely on the > policy > > > or > > > > > the > > > > > >> filter for identifying the reverse checkpointable topics. That > seems > > > > > >> redundant, I think a null filter is cleaner, and also means > fewer > > > > > configs. > > > > > >> Or do we have a generic preference in null vs default+enable > flag in > > > > > terms > > > > > >> of configs? > > > > > >> > > > > > >> SVV4. I think the current TopicFilter is a minimal interface, > which > > > > > works > > > > > >> well for choosing topics. Adding more methods would bloat the > > > > interface, > > > > > >> and require new configs for the existing implementations. It > would > > > > also > > > > > be > > > > > >> confusing in some cases to figure out if we need to configure > the > > > new > > > > > >> properties - e.g. in MirrorSourceConnector, the new > > > > > >> "reverse.checkpoint.topics" wouldn't make sense, but could > still be > > > > > >> configurable. In short, I think it introduces a few fuzzy > > > situations, > > > > > and > > > > > >> would rather just reuse the existing implementations with > prefixed > > > > > config > > > > > >> overrides. > > > > > >> > > > > > >> Thanks, > > > > > >> Daniel > > > > > >> > > > > > >> Viktor Somogyi-Vass <viktor.somo...@cloudera.com.invalid> ezt > írta > > > > > >> (időpont: 2024. nov. 14., Cs, 16:56): > > > > > >> > > > > > >> > Hi Daniel, > > > > > >> > > > > > > >> > SVV3. Kind of an implementation detail. So I think using > > > TopicFilter > > > > > is > > > > > >> > good, however I was wondering if we should provide a default > > > > > >> implementation > > > > > >> > instead of null? We have to implement the pass-through > behavior > > > > > anyways, > > > > > >> > and it makes sense to me to do it in a filter. > > > > > >> > SVV4. Also, an alternative to the previous one, instead of > > > > introducing > > > > > >> > another topic filter, we could extend the current TopicFilter > > > with a > > > > > >> > shouldReverseCheckpointTopic() method with > > > reverse.checkpoint.topics > > > > > and > > > > > >> > reverse.checkpoints.topics.exclude configs in the > > > DefaultTopicFilter > > > > > >> where > > > > > >> > we configure it for pass-through. We wouldn't really have > fewer > > > > > configs > > > > > >> > with this option, but we'd use an existing filter that has > similar > > > > > >> > functionality (filtering topics) and users could be more > familiar > > > > with > > > > > >> it. > > > > > >> > > > > > > >> > What do you think? > > > > > >> > > > > > > >> > Viktor > > > > > >> > > > > > > >> > On Thu, Nov 7, 2024 at 9:21 AM Dániel Urbán < > > > urb.dani...@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > > >> > > Gentle bump - any comments are welcome. > > > > > >> > > This could fill an important gap in MM2, and would be nice > to > > > fix. > > > > > >> > > TIA > > > > > >> > > Daniel > > > > > >> > > > > > > > >> > > Dániel Urbán <urb.dani...@gmail.com> ezt írta (időpont: > 2024. > > > > nov. > > > > > >> 4., > > > > > >> > H, > > > > > >> > > 11:00): > > > > > >> > > > > > > > >> > > > Hi Vidor, > > > > > >> > > > > > > > > >> > > > Thank you for your comments! > > > > > >> > > > > > > > > >> > > > 1. I think the optimization sounds nice, but would not > work > > > well > > > > > >> with > > > > > >> > > > TopicFilter implementations which can be dynamically > updated > > > in > > > > > the > > > > > >> > > > background (without touching the Connector > configuration). If > > > we > > > > > >> > actually > > > > > >> > > > dropped the offset sync records for a topic when the task > is > > > > > >> started, > > > > > >> > > then > > > > > >> > > > the topic is added to the filter (without triggering a > task > > > > > >> restart), > > > > > >> > > then > > > > > >> > > > we would not have the history of offset syncs for the > "new" > > > > topic. > > > > > >> I'm > > > > > >> > > not > > > > > >> > > > saying that this makes the optimization you are suggesting > > > > > >> impossible, > > > > > >> > > but > > > > > >> > > > it's a larger chunk of work - we would need to start > > > monitoring > > > > > the > > > > > >> > > topics > > > > > >> > > > in MirrorCheckpointConnector the same way we do in > > > > > >> > MirrorSourceConnector, > > > > > >> > > > and send config update requests to the Connect framework > on > > > > topic > > > > > >> > > changes. > > > > > >> > > > Just to clarify the "memory intensive" nature of the > offset > > > sync > > > > > >> > store, a > > > > > >> > > > full offset sync history of a single partition takes less > > > than 2 > > > > > >> KBs (1 > > > > > >> > > > offset sync contains the topic partition, and 2 longs, > and the > > > > > >> history > > > > > >> > > has > > > > > >> > > > a max size of 64). We need to multiply this by the number > of > > > > > >> replicated > > > > > >> > > > partitions and the number of checkpoint tasks to get the > full > > > > > memory > > > > > >> > > usage > > > > > >> > > > increase when the feature is enabled. Even with thousands > of > > > > > >> partitions > > > > > >> > > and > > > > > >> > > > hundreds of tasks, this will be below 1 GB, which is > > > distributed > > > > > >> across > > > > > >> > > the > > > > > >> > > > Connect worker nodes. So I don't think that this would be > an > > > > > >> alarming > > > > > >> > > > increase in memory usage, and the optimization is not > worth it > > > > > with > > > > > >> the > > > > > >> > > > extra complexity of the dynamic config updates. > > > > > >> > > > 2. I don't really see the use-case for the reverse > > > checkpointing > > > > > >> group > > > > > >> > > > filter. If a group is already checkpointed in the opposite > > > flow, > > > > > it > > > > > >> > > > suggests that the intent is to be able to fail over. Why > > > > wouldn't > > > > > >> the > > > > > >> > > user > > > > > >> > > > want to also perform a failback for that same group? > > > > > >> > > > > > > > > >> > > > Not sure what you mean by replication being > bidirectional, but > > > > > topic > > > > > >> > > > replication not being bi-directional. With > > > > > >> DefaultReplicationPolicy, we > > > > > >> > > > usually have the same topic on both clusters to be used by > > > > > >> producers - > > > > > >> > > the > > > > > >> > > > name is the same, but those are logically 2 different > topics. > > > > With > > > > > >> > > > IdentityReplicationPolicy, we require users to avoid loops > > > (i.e. > > > > > >> > > > replicating the same messages back and forth infinitely). > > > > > >> > > > > > > > > >> > > > As for being more flexible with failovers and failbacks, > yes, > > > I > > > > > >> agree, > > > > > >> > > the > > > > > >> > > > fact the re-processing is minimized, it might enable more > > > > > >> use-cases, or > > > > > >> > > > allow more frequent failovers and failbacks. I'd say that > in > > > > most > > > > > >> > > > use-cases, users will still want to avoid doing this > procedure > > > > > >> > > frequently, > > > > > >> > > > since it requires a client restart/reconfiguration, which > is > > > not > > > > > >> > > risk-free. > > > > > >> > > > > > > > > >> > > > Thanks, > > > > > >> > > > Daniel > > > > > >> > > > > > > > > >> > > > Vidor Kanalas <vidor.kana...@gmail.com> ezt írta > (időpont: > > > > 2024. > > > > > >> okt. > > > > > >> > > > 30., Sze, 22:21): > > > > > >> > > > > > > > > >> > > >> Hi Daniel, > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> This would indeed greatly reduce the duplicate > processing on > > > > > >> > failbacks. > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> Few questions: > > > > > >> > > >> > > > > > >> > > >> 1. Since having a second offset-sync store can be > memory > > > > > >> intensive, > > > > > >> > > >> would it make sense to filter the topics in it based > on > > > the > > > > > >> > > >> reverseCheckpointingTopicFilter? > > > > > >> > > >> 2. Would it make sense to add a > > > > > reverseCheckpointingGroupFilter > > > > > >> as > > > > > >> > > >> well, > > > > > >> > > >> so that one can control not just the topics for > reverse > > > > > >> > checkpointing > > > > > >> > > >> but > > > > > >> > > >> also the groups? > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> Do I understand this correctly, that the replication flow > > > > itself > > > > > >> must > > > > > >> > be > > > > > >> > > >> bidirectional, but the topic replication doesn’t? If so, > this > > > > > >> seems to > > > > > >> > > >> unlock another use case. With this change, one can more > > > > > confidently > > > > > >> > fail > > > > > >> > > >> over the consumer group to the passive cluster and back > (in > > > the > > > > > >> > context > > > > > >> > > of > > > > > >> > > >> the topic itself), without much reprocessing; I see this > > > useful > > > > > >> when a > > > > > >> > > >> cluster gets busy at times. Or even have a new consumer > group > > > > > >> consume > > > > > >> > > >> messages from the passive cluster for a while, before > > > “failing > > > > it > > > > > >> > over” > > > > > >> > > to > > > > > >> > > >> the active cluster. Is this something that you would > > > recommend > > > > > >> using > > > > > >> > the > > > > > >> > > >> feature for? > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> Best, > > > > > >> > > >> > > > > > >> > > >> Vidor > > > > > >> > > >> > > > > > >> > > >> On Mon, Oct 28, 2024 at 7:25 PM Dániel Urbán < > > > > > >> urb.dani...@gmail.com> > > > > > >> > > >> wrote: > > > > > >> > > >> > > > > > >> > > >> > Hi Viktor, > > > > > >> > > >> > > > > > > >> > > >> > SVV1. Not easy to provide a number, but yes, it does > scale > > > > with > > > > > >> the > > > > > >> > > >> number > > > > > >> > > >> > of replicated topic partitions. Enabling this feature > will > > > > add > > > > > >> the > > > > > >> > > >> overhead > > > > > >> > > >> > of an extra consumer, and allocates memory for an > > > offset-sync > > > > > >> index > > > > > >> > > for > > > > > >> > > >> > each partition. The index is limited to 64 entries. I > could > > > > > give > > > > > >> an > > > > > >> > > >> upper > > > > > >> > > >> > bound of the memory usage as a function of the number > of > > > > > >> replicated > > > > > >> > > >> > topic-partitions, but not sure if it would be useful > for > > > > users, > > > > > >> and > > > > > >> > to > > > > > >> > > >> > where exactly document this. Wdyt? > > > > > >> > > >> > > > > > > >> > > >> > No worries, thanks for looking at the KIP! > > > > > >> > > >> > Daniel > > > > > >> > > >> > > > > > > >> > > >> > Viktor Somogyi-Vass <viktor.somo...@cloudera.com > .invalid> > > > > ezt > > > > > >> írta > > > > > >> > > >> > (időpont: 2024. okt. 28., H, 17:07): > > > > > >> > > >> > > > > > > >> > > >> > > Hi Daniel, > > > > > >> > > >> > > > > > > > >> > > >> > > SVV1. Fair points about the performance impact. The > next > > > > > >> question > > > > > >> > is > > > > > >> > > >> that > > > > > >> > > >> > > can we quantify it somehow, ie. does it scale with > the > > > > number > > > > > >> of > > > > > >> > > >> topics > > > > > >> > > >> > to > > > > > >> > > >> > > reverse checkpoints, the offsets emitted, etc.? > > > > > >> > > >> > > > > > > > >> > > >> > > I'll do one more pass on the KIP in the following > days > > > but > > > > I > > > > > >> > wanted > > > > > >> > > to > > > > > >> > > >> > > reply to you with what I have so far to keep this > going. > > > > > >> > > >> > > > > > > > >> > > >> > > Best, > > > > > >> > > >> > > Viktor > > > > > >> > > >> > > > > > > > >> > > >> > > On Fri, Oct 25, 2024 at 5:32 PM Dániel Urbán < > > > > > >> > urb.dani...@gmail.com > > > > > >> > > > > > > > > >> > > >> > > wrote: > > > > > >> > > >> > > > > > > > >> > > >> > > > Hi, > > > > > >> > > >> > > > > > > > > >> > > >> > > > One more update. As I was working on the PR, I > realized > > > > > that > > > > > >> the > > > > > >> > > >> only > > > > > >> > > >> > way > > > > > >> > > >> > > > to support IdentityReplicationPolicy is to add an > extra > > > > > topic > > > > > >> > > >> filter to > > > > > >> > > >> > > the > > > > > >> > > >> > > > checkpointing. I updated the KIP accordingly. > > > > > >> > > >> > > > I also opened a draft PR to demonstrate the > proposed > > > > > changes: > > > > > >> > > >> > > > https://github.com/apache/kafka/pull/17593 > > > > > >> > > >> > > > > > > > > >> > > >> > > > Daniel > > > > > >> > > >> > > > > > > > > >> > > >> > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta > > > (időpont: > > > > > >> 2024. > > > > > >> > > okt. > > > > > >> > > >> > 24., > > > > > >> > > >> > > > Cs, > > > > > >> > > >> > > > 15:22): > > > > > >> > > >> > > > > > > > > >> > > >> > > > > Hi all, > > > > > >> > > >> > > > > Just a bump/minor update here: > > > > > >> > > >> > > > > As I've started working on a POC of the proposed > > > > > solution, > > > > > >> > I've > > > > > >> > > >> > > realised > > > > > >> > > >> > > > > that the hard requirement related to the > > > > > ReplicationPolicy > > > > > >> > > >> > > implementation > > > > > >> > > >> > > > > can be eliminated, updated the KIP accordingly. > > > > > >> > > >> > > > > Daniel > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > Dániel Urbán <urb.dani...@gmail.com> ezt írta > > > > (időpont: > > > > > >> 2024. > > > > > >> > > >> okt. > > > > > >> > > >> > > 21., > > > > > >> > > >> > > > > H, 16:18): > > > > > >> > > >> > > > > > > > > > >> > > >> > > > >> Hi Mickael, > > > > > >> > > >> > > > >> Good point, I renamed the KIP and this thread: > > > > > >> > > >> > > > >> > > > > > >> > > >> > > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > >> > > >> > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing+in+MirrorMaker > > > > > >> > > >> > > > >> Thank you, > > > > > >> > > >> > > > >> Daniel > > > > > >> > > >> > > > >> > > > > > >> > > >> > > > >> Mickael Maison <mickael.mai...@gmail.com> ezt > írta > > > > > >> (időpont: > > > > > >> > > >> 2024. > > > > > >> > > >> > > okt. > > > > > >> > > >> > > > >> 21., H, 15:22): > > > > > >> > > >> > > > >> > > > > > >> > > >> > > > >>> Hi Daniel, > > > > > >> > > >> > > > >>> > > > > > >> > > >> > > > >>> I've not had time to take a close look at the > KIP > > > but > > > > > my > > > > > >> > > initial > > > > > >> > > >> > > > >>> feedback would be to adjust the name to make it > > > clear > > > > > >> it's > > > > > >> > > about > > > > > >> > > >> > > > >>> MirrorMaker. > > > > > >> > > >> > > > >>> The word "checkpoint" has several meanings in > Kafka > > > > and > > > > > >> from > > > > > >> > > the > > > > > >> > > >> > > > >>> current KIP name it's not clear if it's about > > > KRaft, > > > > > >> Streams > > > > > >> > > or > > > > > >> > > >> > > > >>> Connect. > > > > > >> > > >> > > > >>> > > > > > >> > > >> > > > >>> Thanks, > > > > > >> > > >> > > > >>> Mickael > > > > > >> > > >> > > > >>> > > > > > >> > > >> > > > >>> On Mon, Oct 21, 2024 at 2:55 PM Dániel Urbán < > > > > > >> > > >> > urb.dani...@gmail.com> > > > > > >> > > >> > > > >>> wrote: > > > > > >> > > >> > > > >>> > > > > > > >> > > >> > > > >>> > Hi Viktor, > > > > > >> > > >> > > > >>> > > > > > > >> > > >> > > > >>> > Thank you for the comments! > > > > > >> > > >> > > > >>> > > > > > > >> > > >> > > > >>> > SVV1: I think the feature has some > performance > > > > > >> > implications. > > > > > >> > > >> If > > > > > >> > > >> > the > > > > > >> > > >> > > > >>> reverse > > > > > >> > > >> > > > >>> > checkpointing is enabled, task startup will > be > > > > > possibly > > > > > >> > > >> slower, > > > > > >> > > >> > > since > > > > > >> > > >> > > > >>> it > > > > > >> > > >> > > > >>> > will need to consume from a second > offset-syncs > > > > > topic; > > > > > >> and > > > > > >> > > it > > > > > >> > > >> > will > > > > > >> > > >> > > > >>> also use > > > > > >> > > >> > > > >>> > more memory, to keep the second offset-sync > > > > history. > > > > > >> > > >> > Additionally, > > > > > >> > > >> > > it > > > > > >> > > >> > > > >>> is > > > > > >> > > >> > > > >>> > also possible to have an offset-syncs topic > > > present > > > > > >> > without > > > > > >> > > an > > > > > >> > > >> > > > actual, > > > > > >> > > >> > > > >>> > opposite flow being active - I think only > users > > > can > > > > > >> tell > > > > > >> > if > > > > > >> > > >> the > > > > > >> > > >> > > > reverse > > > > > >> > > >> > > > >>> > checkpointing should be active, and they > should > > > be > > > > > the > > > > > >> one > > > > > >> > > >> opting > > > > > >> > > >> > > in > > > > > >> > > >> > > > >>> for > > > > > >> > > >> > > > >>> > the higher resource usage. > > > > > >> > > >> > > > >>> > > > > > > >> > > >> > > > >>> > SVV2: I mention the DefaultReplicationPolicy > to > > > > > provide > > > > > >> > > >> > examples. I > > > > > >> > > >> > > > >>> don't > > > > > >> > > >> > > > >>> > think it is required. The actual requirement > > > > related > > > > > to > > > > > >> > the > > > > > >> > > >> > > > >>> > ReplicationPolicy is that the policy should > be > > > able > > > > > to > > > > > >> > > >> correctly > > > > > >> > > >> > > tell > > > > > >> > > >> > > > >>> which > > > > > >> > > >> > > > >>> > topic was replicated from which cluster. > Because > > > of > > > > > >> this, > > > > > >> > > >> > > > >>> > IdentityReplicationPolicy would not work, but > > > > > >> > > >> > > > >>> DefaultReplicationPolicy, or > > > > > >> > > >> > > > >>> > any other ReplicationPolicy implementations > with > > > a > > > > > >> > correctly > > > > > >> > > >> > > > >>> implemented > > > > > >> > > >> > > > >>> > "topicSource" method should work. I will > make an > > > > > >> explicit > > > > > >> > > >> note of > > > > > >> > > >> > > > this > > > > > >> > > >> > > > >>> in > > > > > >> > > >> > > > >>> > the KIP. > > > > > >> > > >> > > > >>> > > > > > > >> > > >> > > > >>> > Thank you, > > > > > >> > > >> > > > >>> > Daniel > > > > > >> > > >> > > > >>> > > > > > > >> > > >> > > > >>> > Viktor Somogyi-Vass < > viktor.somo...@cloudera.com > > > > > >> .invalid> > > > > > >> > > ezt > > > > > >> > > >> > írta > > > > > >> > > >> > > > >>> > (időpont: 2024. okt. 18., Pén 17:28): > > > > > >> > > >> > > > >>> > > > > > > >> > > >> > > > >>> > > Hey Dan, > > > > > >> > > >> > > > >>> > > > > > > > >> > > >> > > > >>> > > I think this is a very useful idea. Two > > > > questions: > > > > > >> > > >> > > > >>> > > > > > > > >> > > >> > > > >>> > > SVV1: Do you think we need the feature > flag at > > > > > all? I > > > > > >> > know > > > > > >> > > >> that > > > > > >> > > >> > > not > > > > > >> > > >> > > > >>> having > > > > > >> > > >> > > > >>> > > this flag may technically render the KIP > > > > > unnecessary > > > > > >> > > >> (however > > > > > >> > > >> > it > > > > > >> > > >> > > > may > > > > > >> > > >> > > > >>> still > > > > > >> > > >> > > > >>> > > be useful to discuss this topic and create > a > > > > > >> concensus). > > > > > >> > > As > > > > > >> > > >> you > > > > > >> > > >> > > > >>> wrote in > > > > > >> > > >> > > > >>> > > the KIP, we may be able to look up the > target > > > and > > > > > >> source > > > > > >> > > >> topics > > > > > >> > > >> > > and > > > > > >> > > >> > > > >>> if we > > > > > >> > > >> > > > >>> > > can do this, we can probably detect if the > > > > > >> replication > > > > > >> > is > > > > > >> > > >> > one-way > > > > > >> > > >> > > > or > > > > > >> > > >> > > > >>> > > prefixless (identity). So that means we > don't > > > > need > > > > > >> this > > > > > >> > > >> flag to > > > > > >> > > >> > > > >>> control > > > > > >> > > >> > > > >>> > > when we want to use this. Then it is really > > > just > > > > > >> there > > > > > >> > to > > > > > >> > > >> act > > > > > >> > > >> > as > > > > > >> > > >> > > > >>> something > > > > > >> > > >> > > > >>> > > that can turn the feature on and off if > needed, > > > > but > > > > > >> I'm > > > > > >> > > not > > > > > >> > > >> > > really > > > > > >> > > >> > > > >>> sure if > > > > > >> > > >> > > > >>> > > there is a great risk in just enabling > this by > > > > > >> default. > > > > > >> > If > > > > > >> > > >> we > > > > > >> > > >> > > > really > > > > > >> > > >> > > > >>> just > > > > > >> > > >> > > > >>> > > turn back the B -> A checkpoints and save > them > > > in > > > > > >> the A > > > > > >> > -> > > > > > >> > > >> B, > > > > > >> > > >> > > then > > > > > >> > > >> > > > >>> maybe > > > > > >> > > >> > > > >>> > > it's not too risky and users would get this > > > > > >> immediately > > > > > >> > by > > > > > >> > > >> just > > > > > >> > > >> > > > >>> upgrading. > > > > > >> > > >> > > > >>> > > > > > > > >> > > >> > > > >>> > > SVV2: You write that we need > > > > > >> DefaultReplicationPolicy to > > > > > >> > > use > > > > > >> > > >> > this > > > > > >> > > >> > > > >>> feature, > > > > > >> > > >> > > > >>> > > but most of the functionality is there on > > > > interface > > > > > >> > level > > > > > >> > > in > > > > > >> > > >> > > > >>> > > ReplicationPolicy. Is there anything that > is > > > > > missing > > > > > >> > from > > > > > >> > > >> there > > > > > >> > > >> > > and > > > > > >> > > >> > > > >>> if so, > > > > > >> > > >> > > > >>> > > what do you think about pulling it into the > > > > > >> interface? > > > > > >> > If > > > > > >> > > >> this > > > > > >> > > >> > > > >>> improvement > > > > > >> > > >> > > > >>> > > only works with the default replication > policy, > > > > > then > > > > > >> > it's > > > > > >> > > >> > > somewhat > > > > > >> > > >> > > > >>> limiting > > > > > >> > > >> > > > >>> > > as users may have their own policy for > various > > > > > >> reasons, > > > > > >> > > but > > > > > >> > > >> if > > > > > >> > > >> > we > > > > > >> > > >> > > > >>> can make > > > > > >> > > >> > > > >>> > > it work on the interface level, then we > could > > > > > provide > > > > > >> > this > > > > > >> > > >> > > feature > > > > > >> > > >> > > > to > > > > > >> > > >> > > > >>> > > everyone. Of course there can be > replication > > > > > policies > > > > > >> > like > > > > > >> > > >> the > > > > > >> > > >> > > > >>> identity one > > > > > >> > > >> > > > >>> > > that by design disallows this feature, but > for > > > > > that, > > > > > >> see > > > > > >> > > my > > > > > >> > > >> > > > previous > > > > > >> > > >> > > > >>> point. > > > > > >> > > >> > > > >>> > > > > > > > >> > > >> > > > >>> > > Best, > > > > > >> > > >> > > > >>> > > Viktor > > > > > >> > > >> > > > >>> > > > > > > > >> > > >> > > > >>> > > On Fri, Oct 18, 2024 at 3:30 PM Dániel > Urbán < > > > > > >> > > >> > > > urb.dani...@gmail.com> > > > > > >> > > >> > > > >>> > > wrote: > > > > > >> > > >> > > > >>> > > > > > > > >> > > >> > > > >>> > > > Hi everyone, > > > > > >> > > >> > > > >>> > > > > > > > > >> > > >> > > > >>> > > > I'd like to start the discussion on > KIP-1098: > > > > > >> Reverse > > > > > >> > > >> > > > >>> Checkpointing ( > > > > > >> > > >> > > > >>> > > > > > > > > >> > > >> > > > >>> > > > > > > > > >> > > >> > > > >>> > > > > > > > >> > > >> > > > >>> > > > > > >> > > >> > > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > >> > > >> > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1098%3A+Reverse+Checkpointing > > > > > >> > > >> > > > >>> > > > ) > > > > > >> > > >> > > > >>> > > > which aims to minimize message > reprocessing > > > for > > > > > >> > > consumers > > > > > >> > > >> in > > > > > >> > > >> > > > >>> failbacks. > > > > > >> > > >> > > > >>> > > > > > > > > >> > > >> > > > >>> > > > TIA, > > > > > >> > > >> > > > >>> > > > Daniel > > > > > >> > > >> > > > >>> > > > > > > > > >> > > >> > > > >>> > > > > > > > >> > > >> > > > >>> > > > > > >> > > >> > > > >> > > > > > >> > > >> > > > > > > > > >> > > >> > > > > > > > >> > > >> > > > > > > >> > > >> > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > >