Hey all,

I've experimented with a solution and opened a PR:
https://github.com/apache/kafka/pull/13429
I'd appreciate any comments you might have, and please ask questions about
the core algorithm. I've done my best to explain the index in comments, and
you can see how an arbitrary state is constructed with the included log
message, but I feel it's still very opaque and want to try and explain it
better.

Thanks!

On Thu, Mar 16, 2023 at 11:57 AM Greg Harris <greg.har...@aiven.io> wrote:

> Thanks all for the discussion!
>
> Mickael,
>
> I agree. I think that while someone may appreciate the current high
> availability, everyone expects at-least-once delivery from this feature.
> Not meeting the expectation for at-least-once delivery may be more damaging
> than excessive re-delivery in a failure scenario, especially where systems
> are designed to cope with re-delivery.
>
> Chris,
>
> Thanks for pointing out the option to change the offset syncs topic
> structure as an alternative implementation. I think that may be necessary
> to support translating extremely old offsets that may be compacted away
> with the current topic structure.
>
> I think that if we relax KAFKA-14666 to only translate offsets since the
> latest restart (which is the current semantics), we can implement a bounded
> memory solution without re-reading the checkpoints topic. I will pursue
> this as a tactical fix for KAFKA-14666 that hopefully can be reviewed and
> merged prior to 3.5.0.
> The holistic solution to KAFKA-14666 (if/when designed) could then address
> translating offsets prior to the latest restart, and we can spend more time
> considering this problem in a KIP.
>
> I understand that there are significant risks in reverting the
> already-merged correctness fix, and I think we can leave it merged for the
> time being. I'll do everything I can to get this feature back to an
> unambiguously releasable state, but if we're not able to merge the
> KAFKA-14666 tactical fix before 3.5.0, we may need to have the discussion
> about reverting, possibly with some MM2 operator stakeholders.
>
> Ryanne,
>
> Thanks for providing context on the original intent of the offset syncs
> frequency and the behavior of the translation feature. Thanks also for
> pointing out that the existing translation only works since the latest
> restart, I think that is very helpful in choosing a tactical fix for
> KAFKA-14666.
>
> I think that your response didn't address the tradeoff that we're
> currently faced with however. While offset translation is currently most
> available with rare offset syncs, and offset syncs are rare in some (most?)
> MM2 use-cases today, it isn't always the case, and comes at the cost of
> correctness.
> The correctness bug KAFKA-12468 is focused on the "dead-reckoning" offset
> math in the MirrorCheckpointTask. That logic, while allowing for a very
> infrequent offset sync to translate offsets very precisely and arbitrarily
> far ahead of the sync, can sometimes produce inaccurate offsets which can
> cause certain records to be not delivered to either source or target
> consumer groups, sacrificing at-least-once delivery guarantees. See the
> description of https://github.com/apache/kafka/pull/13178 for a concrete
> example of a non-delivery/data loss scenario.
> I believe that many Kafka users, especially ones which are using MM2 in a
> disaster-recovery scenario, expect at-least-once semantics. Because this
> conflicts with the infrequent offset sync design, we're now having to find
> ways to cope with more frequent offset syncs.
>
> > It should be possible to read the last offset sync at startup instead of
> starting over with zero knowledge. That would mitigate this problem for
> clusters that frequently restart.
>
> I'm going to assume that you're referring to the MirrorSourceTask, and
> this suggestion is to work-around the "emit an offset sync if we haven't
> yet" condition triggering after each restart. I think that this is
> consistent with the original design of the offset translation feature where
> syncs are rare, but as I've mentioned above, we cannot rely on their rarity
> any longer.
>
> Thanks!
> Greg
>
>
> On Thu, Mar 16, 2023 at 7:24 AM Ryanne Dolan <ryannedo...@gmail.com>
> wrote:
>
>> "it becomes impossible to ensure that MM2 is
>> able to sync offsets from consumer groups that are behind the last-synced
>> offset emitted by MirrorSourceTask."
>>
>> That's sorta true. The idea is that offset syncs are exceptionally rare.
>> They basically occur when the tasks start or restart and that should be
>> it.
>> If it's more than that, something is wrong. The model won't really work in
>> that case.
>>
>> It's true that consumer groups can lag behind the offset syncs, but since
>> offset syncs are rare (and checkpoints are not) any normal consumer group
>> will eventually catch up. Essentially, there is a window of time when you
>> first start MM2 where a lagging consumer group won't be able to
>> checkpoint.
>> But once it commits past the last offset sync, it will work from there.
>>
>> For example, a consumer lagging an hour behind real-time would need to
>> wait
>> around an hour after MM2 starts before it can checkpoint.
>>
>> At steady-state this should not be a problem, but there are of course edge
>> cases. For example, an MM2 cluster that is auto-scaled will keep
>> restarting
>> this process, re-introducing this checkpoint lag for lagging consumer
>> groups. This doesn't mean the checkpoints are lost, but they'll be stale
>> for a while.
>>
>> It should be possible to read the last offset sync at startup instead of
>> starting over with zero knowledge. That would mitigate this problem for
>> clusters that frequently restart.
>>
>> Ryanne
>>
>> On Thu, Mar 16, 2023, 8:41 AM Chris Egerton <chr...@aiven.io.invalid>
>> wrote:
>>
>> > Hi Greg,
>> >
>> > I agree with others that a KIP isn't necessarily required as long as we
>> > don't alter public interface.
>> >
>> > Reading between the lines (please correct me if I'm wrong), it seems
>> like
>> > part of the concern is that the design of the offset syncs topic (parts
>> of
>> > which are counted as part of the public interface for MM2 in KIP-382
>> [1])
>> > is inherently limiting, especially with regards to KAFKA-14666. By
>> using a
>> > compacted topic to store offset syncs, and deriving keys for those
>> syncs'
>> > records from the topic partitions they come from, we can never assume
>> that
>> > there is more than one offset sync available at any time for a given
>> topic
>> > partition. And, given that, it becomes impossible to ensure that MM2 is
>> > able to sync offsets from consumer groups that are behind the
>> last-synced
>> > offset emitted by MirrorSourceTask.
>> >
>> > I think that your idea to implement two different solutions--a tactical,
>> > backport-eligible fix, and a holistic, comprehensive fix that may
>> require a
>> > KIP--is fine. Storing a bounded number of offset syncs in memory as we
>> read
>> > them from the offset syncs topic in MirrorCheckpointTask is a decent
>> option
>> > for the tactical approach, although the above-mentioned limitations mean
>> > that this won't cover all cases. IMO it's still worth a shot as a
>> stop-the
>> > bleeding measure.
>> >
>> > I don't know if it's really a viable alternative to revert some of the
>> > recent changes we made to the offset sync emission logic, since these
>> were
>> > all bug fixes and the underlying issue (KAFKA-14666) is present either
>> > way.  Plus, as Mickael has noted, the correctness issues with how we
>> were
>> > performing offset syncs make that feature difficult to rely on. But it
>> is
>> > still an option if you or others believe that there's a case to be made
>> on
>> > that front.
>> >
>> > TL;DR:
>> >
>> > Q1. Probably, if we want to cover all edge cases involving combinations
>> of
>> > slow/fast production into replicated topics,
>> > transactional/compacted/filtered topics, aggressive compaction of the
>> > offset syncs topic, etc.
>> > Q2. Yes, but given that the offsets we were syncing were in many cases
>> > incorrect anyways, I'm currently not in favor of a revert.
>> > Q3. Right now, probably. If we can apply a decent tactical fix, then
>> > definitely.
>> > Q4. Depends on the quality of the tactical fix, but given the
>> correctness
>> > issues we've already noticed with consumer group offset syncing, I
>> think it
>> > should generally be eligible for backport.
>> > Q5. Yes please!
>> > Q6. The options outlined in KAFKA-14666 seem pretty good, except ones
>> that
>> > involve storing the unbounded contents of the offset syncs topic in
>> memory.
>> > I'll think about it some more but nothing significantly better has come
>> to
>> > mind yet.
>> >
>> > [1] -
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-PublicInterfaces
>> >
>> > Cheers,
>> >
>> > Chris
>> >
>> > On Thu, Mar 16, 2023 at 5:38 AM Mickael Maison <
>> mickael.mai...@gmail.com>
>> > wrote:
>> >
>> > > Hi Greg,
>> > >
>> > > Thanks for looking into this issue!
>> > >
>> > > To me the current behavior is just buggy. I don't see how anyone could
>> > > rely on incorrect offset translation, at this point it's pretty much
>> > > like picking a random offset when consumers start on the target
>> > > cluster.
>> > >
>> > > Apart if we have to introduce new configurations, metrics, or public
>> > > APIs, making the offset translation correct and available should not
>> > > require a KIP. For a few releases we've been making quick fixes
>> > > without really getting it to work in all cases. As you suggest, it's
>> > > maybe time to think about the pros and cons of the current
>> > > implementation and properly investigate our options.
>> > >
>> > > Thanks,
>> > > Mickael
>> > >
>> > > On Wed, Mar 15, 2023 at 7:52 PM Greg Harris
>> > > <greg.har...@aiven.io.invalid> wrote:
>> > > >
>> > > > Luke,
>> > > >
>> > > > Thanks for leaving your thoughts.
>> > > >
>> > > > I agree that this does not directly change an API, and doesn't fall
>> > > within
>> > > > the typical KIP guidelines. These proposals would change the
>> behavior
>> > of
>> > > > translation within the bounds of the current API, and I wasn't sure
>> > > whether
>> > > > this fell under the KIP process.
>> > > > I was also concerned that the potential solutions to KAFKA-14666
>> were
>> > > > complex enough that we needed a formal discussion and approval
>> process.
>> > > I'm
>> > > > happy to pursue this further in a non-KIP discussion thread until
>> > someone
>> > > > suggests escalating.
>> > > >
>> > > > Thanks!
>> > > > Greg
>> > > >
>> > > > On Wed, Mar 15, 2023 at 3:47 AM Luke Chen <show...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hi Greg,
>> > > > >
>> > > > > Thanks for your summary.
>> > > > > I'm not very familiar with MM2, actually, but I'd like to give my
>> > > answer to
>> > > > > some of the questions below:
>> > > > >
>> > > > > Q1. Would an improvement to allow translation from earlier in the
>> > > topic be
>> > > > > reasonable to propose in a KIP?
>> > > > > --> I checked the proposed 5 solutions in KAFKA-14666, I think
>> all of
>> > > them
>> > > > > are trying to fix a bug, not changing any public API.
>> > > > > So, IMO, no KIP is needed.
>> > > > >
>> > > > > Q2. Is anyone relying on the current poor correctness and high
>> > > availability
>> > > > > translation, such that making the availability worse is a
>> > > > > backwards-incompatible regression?
>> > > > > Q3. Should we prioritize correctness, even if it hurts
>> availability?
>> > > > > --> This is obviously hard to choose. But I hope we can have a
>> way to
>> > > have
>> > > > > both of them.
>> > > > >
>> > > > > Q4. Should we address correctness and availability of this feature
>> > in a
>> > > > > patch or only minor releases?
>> > > > > --> If it's a bug fix, patch or a new release can be included.
>> > > > >
>> > > > > Q5. Is there some tactical improvement to availability we can make
>> > > which
>> > > > > does not count as backwards-incompatible, allowing us to land the
>> > > > > correctness fix without a KIP?
>> > > > > --> Again, if it's a bug fix or an improvement without affecting
>> > public
>> > > > > API, I don't think a KIP is needed.
>> > > > >
>> > > > >
>> > > > > Q6. Do you have any suggestions on how to improve availability of
>> > > offset
>> > > > > translation?
>> > > > >
>> > > > >
>> > > > >
>> > > > > Thank you.
>> > > > > Luke
>> > > > >
>> > > > > On Wed, Mar 15, 2023 at 4:39 AM Greg Harris
>> > > <greg.har...@aiven.io.invalid>
>> > > > > wrote:
>> > > > >
>> > > > > > Hey all!
>> > > > > >
>> > > > > > I realized that the information above is a bit in-the-weeds,
>> and I
>> > > think
>> > > > > a
>> > > > > > re-framing of the situation might be necessary.
>> > > > > >
>> > > > > > Since the release of MM2, offsets translation has been limited
>> to
>> > > only
>> > > > > > performing translation ahead of the most recent offset sync.
>> This
>> > > > > > limitation appears to have worked for existing use-cases where
>> > offset
>> > > > > syncs
>> > > > > > are infrequent.
>> > > > > > For topics which emit offset syncs frequently, the window for
>> > offset
>> > > > > > translation becomes shorter, and may become unusable. In those
>> > > unusable
>> > > > > > cases, offset translation may stop completely for an otherwise
>> > > > > > fully-functional steady-state MM2 instance.
>> > > > > > Recently, we have been interested in improving the correctness
>> of
>> > > offset
>> > > > > > translation to address data loss, and those fixes end up causing
>> > more
>> > > > > > offset syncs to be emitted, making the translation window
>> smaller
>> > > than
>> > > > > > before, and often unusable.
>> > > > > >
>> > > > > > Q1. Would an improvement to allow translation from earlier in
>> the
>> > > topic
>> > > > > be
>> > > > > > reasonable to propose in a KIP?
>> > > > > > Q2. Is anyone relying on the current poor correctness and high
>> > > > > availability
>> > > > > > translation, such that making the availability worse is a
>> > > > > > backwards-incompatible regression?
>> > > > > > Q3. Should we prioritize correctness, even if it hurts
>> > availability?
>> > > > > > Q4. Should we address correctness and availability of this
>> feature
>> > > in a
>> > > > > > patch or only minor releases?
>> > > > > > Q5. Is there some tactical improvement to availability we can
>> make
>> > > which
>> > > > > > does not count as backwards-incompatible, allowing us to land
>> the
>> > > > > > correctness fix without a KIP?
>> > > > > > Q6. Do you have any suggestions on how to improve availability
>> of
>> > > offset
>> > > > > > translation?
>> > > > > >
>> > > > > > I'm interested in finding a tactical solution that we can
>> backport,
>> > > and a
>> > > > > > holistic solution for more future use-cases.
>> > > > > > I hope that the above is more clear.
>> > > > > >
>> > > > > > Thanks!
>> > > > > > Greg
>> > > > > >
>> > > > > > On Fri, Mar 10, 2023 at 12:16 PM Greg Harris <
>> greg.har...@aiven.io
>> > >
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi all,
>> > > > > > >
>> > > > > > > Recently, we've been experimenting with using MM2 to mirror
>> > topics
>> > > that
>> > > > > > > were populated by transactional producers. We've noticed that
>> MM2
>> > > > > > > replicates records but not transaction markers, causing
>> certain
>> > > offsets
>> > > > > > to
>> > > > > > > appear in the source topic but not destination topic. These
>> > > behaviors
>> > > > > can
>> > > > > > > also be seen when using Filter SMTs, or when replicating
>> topics
>> > > which
>> > > > > > have
>> > > > > > > undergone compaction, which cause the same concentration of
>> > > offsets in
>> > > > > > the
>> > > > > > > target topic.
>> > > > > > >
>> > > > > > > This has the following negative effects with offset
>> translation:
>> > > > > > > P1. When starting replication on an existing topic with
>> existing
>> > > > > consumer
>> > > > > > > groups, offsets are translated beyond the end of the topic,
>> > > leading to
>> > > > > > > "negative lag" for the downstream consumer group
>> > > > > > > P2. When in a "negative lag" situation, and a consumer
>> fail-over
>> > > from
>> > > > > > > source to is triggered, downstream consumption will stall
>> until
>> > the
>> > > > > > > downstream offsets exceed the "negative lag" offsets.
>> > > > > > > P3. When failing over from source to target, certain records
>> may
>> > > have
>> > > > > > been
>> > > > > > > ahead of the upstream consumer group and behind the downstream
>> > > consumer
>> > > > > > > group, leading to records not being delivered at least once.
>> > > > > > >
>> > > > > > > We merged a solution the above by making a change to the
>> > > translation
>> > > > > > logic
>> > > > > > > in https://issues.apache.org/jira/browse/KAFKA-12468 , and
>> > > settled on
>> > > > > a
>> > > > > > > strategy to make offset translation more conservative,
>> > effectively
>> > > > > making
>> > > > > > > it such that the MirrorCheckpointTask only emits offsets at or
>> > > > > > immediately
>> > > > > > > after the latest offset sync. This has the effect that offsets
>> > are
>> > > more
>> > > > > > > correct than previously, but that did not come without costs:
>> > > > > > >
>> > > > > > > P4. More offset syncs must be emitted to the offset syncs
>> topic
>> > to
>> > > > > > enforce
>> > > > > > > the `offset.lag.max` config property, once per
>> `offset.max.lag`
>> > > records
>> > > > > > > (regression in the original PR, addressed by
>> > > > > > > https://issues.apache.org/jira/browse/KAFKA-14797)
>> > > > > > > P5. More recent offset syncs narrow the window in which
>> > > translation can
>> > > > > > > take place, leading to some translated offsets becoming
>> > excessively
>> > > > > > stale.
>> > > > > > > This limitation is captured in
>> > > > > > > https://issues.apache.org/jira/browse/KAFKA-14666 .
>> > > > > > > P6. Even with the above fixes, offset translation won't be
>> able
>> > to
>> > > > > > > translate ahead the latest offset sync, and offsets may not
>> > > converge
>> > > > > > > exactly to the end of the topic.
>> > > > > > >
>> > > > > > > Fixing KAFKA-14797 appears possible without a KIP, but it is
>> > > unclear
>> > > > > > > whether KAFKA-14666 requires a KIP to resolve.
>> > > > > > >
>> > > > > > > To summarize:
>> > > > > > > * Released versions of Kafka have reasonable behavior for
>> normal
>> > > > > topics,
>> > > > > > > and correctness problems for compacted, filtered, and
>> > transactional
>> > > > > > topics.
>> > > > > > > * KAFKA-12468 fixes correctness for compacted, filtered, and
>> > > > > > transactional
>> > > > > > > topics, and regresses availability for all topics
>> > > > > > > * KAFKA-14797 makes availability better for normal topics, but
>> > > still
>> > > > > > worse
>> > > > > > > than release.
>> > > > > > > * KAFKA-14666 makes availability better for all topics, but
>> still
>> > > worse
>> > > > > > > than release.
>> > > > > > >
>> > > > > > > Questions:
>> > > > > > > Q1. Does KAFKA-14666 require a KIP to resolve?
>> > > > > > > Q2. Is the increased likelihood of KAFKA-14666 caused by
>> > > KAFKA-14797 a
>> > > > > > > regression in behavior?
>> > > > > > > Q3. Is the KAFKA-12468 correctness fix worth the general
>> > > availability
>> > > > > > loss
>> > > > > > > (P6) that is bounded by offset.lag.max?
>> > > > > > > Q4. Is some or all of the above eligible for release in a
>> patch
>> > > > > release,
>> > > > > > > or should these fixes be contained to just a minor release?
>> > > > > > > Q5. Can we make a tactical fix for KAFKA-14666 to enable
>> users to
>> > > > > > > workaround the issue?
>> > > > > > > Q6. Do you have any alternative solutions for KAFKA-14666
>> that we
>> > > > > should
>> > > > > > > consider?
>> > > > > > >
>> > > > > > > I want to understand if we need to revert the correctness fix
>> > > already
>> > > > > > > merged, or if we can address correctness now and availability
>> > > later.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Greg
>> > > > > > >
>> > > > > >
>> > > > >
>> > >
>> >
>>
>

Reply via email to