Hi Dong,

Yes, in terms of fault tolerance "quorum" does not do better than "all", as
I said, with {min.isr} to X+1 Kafka is able to tolerate X failures only. So
if A and B are partitioned off at the same time, then there are two
concurrent failures and we do not guarantee all acked messages will be
retained.

The goal of my approach is to maintain the behavior of ack="all", which
happen to do better than what Kafka is actually guaranteed: when both A and
B are partitioned off, produced records will not be acked since "all"
requires all replicas (not only ISRs, my previous email has an incorrect
term) are required. This is doing better than tolerating X failures, which
I was proposing to keep, so that we would not introduce any regression
"surprises" to users who are already using "all". In other words, "quorum"
is trading a bit of failure tolerance that is strictly defined on min.isr
for better tail latency.


Guozhang


On Fri, Feb 2, 2018 at 6:25 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Guozhang,
>
> According to the new proposal, with 3 replicas, min.isr=2 and
> acks="quorum", it seems that acknowledged messages can still be truncated
> in the network partition scenario you mentioned, right? So I guess the goal
> is for some user to achieve better tail latency at the cost of potential
> message loss?
>
> If this is the case, then I think it may be better to adopt an approach
> where controller dynamically turn on/off this optimization. This provides
> user with peace of mind (i.e. no message loss) while still reducing tail
> latency. What do you think?
>
> Thanks,
> Dong
>
>
> On Fri, Feb 2, 2018 at 11:11 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hello Litao,
>>
>> Just double checking on the leader election details, do you have time to
>> complete the proposal on that part?
>>
>> Also Jun mentioned one caveat related to KIP-250 on the KIP-232
>> discussion thread that Dong is working on, I figured it is worth pointing
>> out here with a tentative solution:
>>
>>
>> ```
>> Currently, if the producer uses acks=-1, a write will only succeed if
>> the write is received by all in-sync replicas (i.e., committed). This is
>> true even when min.isr is set since we first wait for a message to be
>> committed and then check the min.isr requirement. KIP-250 may change
>> that, but we can discuss the implication there.
>> ```
>>
>> The caveat is that, if we change the acking semantics in KIP-250 that we
>> will only requires num of {min.isr} to acknowledge a produce, then the
>> above scenario will have a caveat: imagine you have {A, B, C} replicas of a
>> partition with A as the leader, all in the isr list, and min.isr is 2.
>>
>> 1. Say there is a network partition and both A and B are fenced off. C is
>> elected as the new leader, it shrinks its isr list to only {C}; from A's
>> point of view it does not know it becomes the "ghost" and no longer the
>> leader, all it does is shrinking the isr list to {A, B}.
>>
>> 2. At this time, any new writes with ack=-1 to C will not be acked, since
>> from C's pov there is only one replica. This is correct.
>>
>> 3. However, any writes that are send to A (NOTE this is totally possible,
>> since producers would only refresh metadata periodically, additionally if
>> they happen to ask A or B they will get the stale metadata that A's still
>> the leader), since A thinks that isr list is {A, B} and as long as B has
>> replicated the message, A can acked the produce.
>>
>>     This is not correct behavior, since when network heals, A would
>> realize it is not the leader and will truncate its log. And hence as a
>> result the acked records are lost, violating Kafka's guarantees. And
>> KIP-232 would not help preventing this scenario.
>>
>>
>> Although one can argue that, with 3 replicas and min.isr set to 2, Kafka
>> is guaranteeing to tolerate only one failure, while the above scenario is
>> actually two concurrent failures (both A and B are considered wedged), this
>> is still a regression to the current version.
>>
>> So to resolve this issue, I'd propose we can change the semantics in the
>> following way (this is only slightly different from your proposal):
>>
>>
>> 1. Add one more value to client-side acks config:
>>
>>    0: no acks needed at all.
>>    1: ack from the leader.
>>    all: ack from ALL the ISR replicas AND that current number of isr
>> replicas has to be no smaller than {min.isr} (i.e. not changing this
>> semantic).
>>    quorum: this is the new value, it requires ack from enough number of
>> ISR replicas no smaller than majority of the replicas AND no smaller than
>> {min.isr}.
>>
>> 2. Clarify in the docs that if a user wants to tolerate X failures, she
>> needs to set client acks=all or acks=quorum (better tail latency than
>> "all") with broker {min.sir} to be X+1; however, "all" is not necessarily
>> stronger than "quorum":
>>
>> For example, with 3 replicas, and {min.isr} set to 2. Here is a list of
>> scenarios:
>>
>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them.
>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
>> c. ISR list has 1: "all" and "quorum" would not ack.
>>
>> If {min.isr} is set to 1, interestingly, here would be the list of
>> scenarios:
>>
>> a. ISR list has 3: "all" waits for all 3, "quorum" waits for 2 of them.
>> b. ISR list has 2: "all" and "quorum" waits for both 2 of them.
>> c. ISR list has 1: "all" waits for leader to return, while "quorum" would
>> not ack (because it requires that number > {min.isr}, AND >= {majority of
>> num.replicas}, so its actually stronger than "all").
>>
>>
>> WDYT?
>>
>>
>> Guozhang
>>
>>
>> On Thu, Jan 25, 2018 at 8:13 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>>> Hey Litao,
>>>
>>> Not sure there will be an easy way to select the broker with highest LEO
>>> without losing acknowledged message. In case it is useful, here is
>>> another
>>> idea. Maybe we can have a mechanism to turn switch between the min.isr
>>> and
>>> isr set for determining when to acknowledge a message. Controller can
>>> probably use RPC to request the current leader to use isr set before it
>>> sends LeaderAndIsrRequest for leadership change.
>>>
>>> Regards,
>>> Dong
>>>
>>>
>>> On Wed, Jan 24, 2018 at 7:29 PM, Litao Deng
>>> <litao.d...@airbnb.com.invalid>
>>> wrote:
>>>
>>> > Thanks Jun for the detailed feedback.
>>> >
>>> > Yes, for #1, I mean the live replicas from the ISR.
>>> >
>>> > Actually, I believe for all of the 4 new leader election strategies
>>> > (offline, reassign, preferred replica and controlled shutdown), we
>>> need to
>>> > make corresponding changes. Will document the details in the KIP.
>>> >
>>> > On Wed, Jan 24, 2018 at 3:59 PM, Jun Rao <j...@confluent.io> wrote:
>>> >
>>> > > Hi, Litao,
>>> > >
>>> > > Thanks for the KIP. Good proposal. A few comments below.
>>> > >
>>> > > 1. The KIP says "select the live replica with the largest LEO".  I
>>> guess
>>> > > what you meant is selecting the live replicas in ISR with the largest
>>> > LEO?
>>> > >
>>> > > 2. I agree that we can probably just reuse the current min.isr
>>> > > configuration, but with a slightly different semantics. Currently, if
>>> > > min.isr is set, a user expects the record to be in at least min.isr
>>> > > replicas on successful ack. This KIP guarantees this too. Most
>>> people are
>>> > > probably surprised that currently the ack is only sent back after all
>>> > > replicas in ISR receive the record. This KIP will change the ack to
>>> only
>>> > > wait on min.isr replicas, which matches the user's expectation and
>>> gives
>>> > > better latency. Currently, we guarantee no data loss if there are
>>> fewer
>>> > > than replication factor failures. The KIP changes that to fewer than
>>> > > min.isr failures. The latter probably matches the user expectation.
>>> > >
>>> > > 3. I agree that the new leader election process is a bit more
>>> > complicated.
>>> > > The controller now needs to contact all replicas in ISR to determine
>>> who
>>> > > has the longest log. However, this happens infrequently. So, it's
>>> > probably
>>> > > worth doing for the better latency in #2.
>>> > >
>>> > > 4. We have to think through the preferred leader election process.
>>> > > Currently, the first assigned replica is preferred for load
>>> balancing.
>>> > > There is a process to automatically move the leader to the preferred
>>> > > replica when it's in sync. The issue is that the preferred replica
>>> may no
>>> > > be the replica with the longest log. Naively switching to the
>>> preferred
>>> > > replica may cause data loss when there are actually fewer failures
>>> than
>>> > > configured min.isr. One way to address this issue is to do the
>>> following
>>> > > steps during preferred leader election: (a) controller sends an RPC
>>> > request
>>> > > to the current leader; (b) the current leader stops taking new writes
>>> > > (sending a new error code to the clients) and returns its LEO (call
>>> it L)
>>> > > to the controller; (c) the controller issues an RPC request to the
>>> > > preferred replica and waits its LEO to reach L; (d) the controller
>>> > changes
>>> > > the leader to the preferred replica.
>>> > >
>>> > > Jun
>>> > >
>>> > > On Wed, Jan 24, 2018 at 2:51 PM, Litao Deng
>>> > <litao.d...@airbnb.com.invalid
>>> > > >
>>> > > wrote:
>>> > >
>>> > > > Sorry folks, just realized I didn't use the correct thread format
>>> for
>>> > the
>>> > > > discussion. I started this new one and copied all of the responses
>>> from
>>> > > the
>>> > > > old one.
>>> > > >
>>> > > > @Dong
>>> > > > It makes sense to just use the min.insync.replicas instead of
>>> > > introducing a
>>> > > > new config, and we must make this change together with the
>>> LEO-based
>>> > new
>>> > > > leader election.
>>> > > >
>>> > > > @Xi
>>> > > > I thought about embedding the LEO information to the
>>> ControllerContext,
>>> > > > didn't find a way. Using RPC will make the leader election period
>>> > longer
>>> > > > and this should happen in very rare cases (broker failure,
>>> controlled
>>> > > > shutdown, preferred leader election and partition reassignment).
>>> > > >
>>> > > > @Jeff
>>> > > > The current leader election is to pick the first replica from AR
>>> which
>>> > > > exists both in the live brokers and ISR sets. I agree with you
>>> about
>>> > > > changing the current/default behavior will cause many confusions,
>>> and
>>> > > > that's the reason the title is "Add Support ...". In this case, we
>>> > > wouldn't
>>> > > > break any current promises and provide a separate option for our
>>> user.
>>> > > > In terms of KIP-250, I feel it is more like the "Semisynchronous
>>> > > > Replication" in the MySQL world, and yes it is something between
>>> acks=1
>>> > > and
>>> > > > acks=insync.replicas. Additionally, I feel KIP-250 and KIP-227 are
>>> > > > two orthogonal improvements. KIP-227 is to improve the replication
>>> > > protocol
>>> > > > (like the introduction of parallel replication in MySQL), and
>>> KIP-250
>>> > is
>>> > > an
>>> > > > enhancement for the replication architecture (sync, semi-sync, and
>>> > > async).
>>> > > >
>>> > > >
>>> > > > Dong Lin
>>> > > >
>>> > > > > Thanks for the KIP. I have one quick comment before you provide
>>> more
>>> > > > detail
>>> > > > > on how to select the leader with the largest LEO.
>>> > > > > Do you think it would make sense to change the default behavior
>>> of
>>> > > > acks=-1,
>>> > > > > such that broker will acknowledge the message once the message
>>> has
>>> > been
>>> > > > > replicated to min.insync.replicas brokers? This would allow us to
>>> > keep
>>> > > > the
>>> > > > > same durability guarantee, improve produce request latency
>>> without
>>> > > > having a
>>> > > > > new config.
>>> > > >
>>> > > >
>>> > > > Hu Xi
>>> > > >
>>> > > > > Currently,  with holding the assigned replicas(AR) for all
>>> > partitions,
>>> > > > > controller is now able to elect new leaders by selecting the
>>> first
>>> > > > replica
>>> > > > > of AR which occurs in both live replica set and ISR. If
>>> switching to
>>> > > the
>>> > > > > LEO-based strategy, controller context might need to be enriched
>>> or
>>> > > > > augmented to store those values.  If retrieving those LEOs
>>> real-time,
>>> > > > > several rounds of RPCs are unavoidable which seems to violate the
>>> > > > original
>>> > > > > intention of this KIP.‚Äč
>>> > > >
>>> > > >
>>> > > > Jeff Widman
>>> > > >
>>> > > > > I agree with Dong, we should see if it's possible to change the
>>> > default
>>> > > > > behavior so that as soon as min.insync.replicas brokers respond
>>> than
>>> > > the
>>> > > > > broker acknowledges the message back to the client without
>>> waiting
>>> > for
>>> > > > > additional brokers who are in the in-sync replica list to
>>> respond. (I
>>> > > > > actually thought it already worked this way).
>>> > > > > As you implied in the KIP though, changing this default
>>> introduces a
>>> > > > weird
>>> > > > > state where an in-sync follower broker is not guaranteed to have
>>> a
>>> > > > > message...
>>> > > > > So at a minimum, the leadership failover algorithm would need to
>>> be
>>> > > sure
>>> > > > to
>>> > > > > pick the most up-to-date follower... I thought it already did
>>> this?
>>> > > > > But if multiple brokers fail in quick succession, then a broker
>>> that
>>> > > was
>>> > > > in
>>> > > > > the ISR could become a leader without ever receiving the
>>> message...
>>> > > > > violating the current promises of unclean.leader.election.
>>> > > > enable=False...
>>> > > > > so changing the default might be not be a tenable solution.
>>> > > > > What also jumped out at me in the KIP was the goal of reducing
>>> p999
>>> > > when
>>> > > > > setting replica lag time at 10 seconds(!!)... I understand the
>>> desire
>>> > > to
>>> > > > > minimize frequent ISR shrink/expansion, as I face this same
>>> issue at
>>> > my
>>> > > > day
>>> > > > > job. But what you're essentially trying to do here is create an
>>> > > > additional
>>> > > > > replication state that is in-between acks=1 and acks = ISR to
>>> paper
>>> > > over
>>> > > > a
>>> > > > > root problem of ISR shrink/expansion...
>>> > > > > I'm just wary of shipping more features (and more operational
>>> > > confusion)
>>> > > > if
>>> > > > > it's only addressing the symptom rather than the root cause. For
>>> > > example,
>>> > > > > my day job's problem is we run a very high number of low-traffic
>>> > > > > partitions-per-broker, so the fetch requests hit many partitions
>>> > before
>>> > > > > they fill. Solving that requires changing our architecture +
>>> making
>>> > the
>>> > > > > replication protocol more efficient (KIP-227).
>>> > > >
>>> > > >
>>> > > > On Tue, Jan 23, 2018 at 10:02 PM, Litao Deng <
>>> litao.d...@airbnb.com>
>>> > > > wrote:
>>> > > >
>>> > > > > Hey folks. I would like to add a feature to support the
>>> quorum-based
>>> > > > > acknowledgment for the producer request. We have been running a
>>> > > modified
>>> > > > > version of Kafka on our testing cluster for weeks, the
>>> improvement of
>>> > > > P999
>>> > > > > is significant with very stable latency. Additionally, I have a
>>> > > proposal
>>> > > > to
>>> > > > > achieve a similar data durability as with the
>>> insync.replicas-based
>>> > > > > acknowledgment through LEO-based leader election.
>>> > > > >
>>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> > > > > 250+Add+Support+for+Quorum-based+Producer+Acknowledge
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


-- 
-- Guozhang

Reply via email to