Hey all,

Thanks for the great discussions so far. I'm posting some KIP updates from
our working group discussion:

1. We will be changing the core RPCs from single-raft API to multi-raft.
This means all protocols will be "batch" in the first version, but the KIP
itself only illustrates the design for a single metadata topic partition.
The reason is to "keep the door open" for future extensions of this piece
of module such as a sharded controller or general quorum based topic
replication, beyond the current Kafka replication protocol.

2. We will piggy-back on the current Kafka Fetch API instead of inventing a
new FetchQuorumRecords RPC. The motivation is about the same as #1 as well
as making the integration work easier, instead of letting two similar RPCs
diverge.

3. In the EndQuorumEpoch protocol, instead of only sending the request to
the most caught-up voter, we shall broadcast the information to all voters,
with a sorted voter list in descending order of their corresponding
replicated offset. In this way, the top voter will become a candidate
immediately, while the other voters shall wait for an exponential back-off
to trigger elections, which helps ensure the top voter gets elected, and
the election eventually happens when the top voter is not responsive.

Please see the updated KIP and post any questions or concerns on the
mailing thread.

Boyang

On Fri, May 8, 2020 at 5:26 PM Jun Rao <j...@confluent.io> wrote:

> Hi, Guozhang and Jason,
>
> Thanks for the reply. A couple of more replies.
>
> 102. Still not sure about this. How is the tombstone issue addressed in the
> non-voter and the observer.  They can die at any point and restart at an
> arbitrary later time, and the advancing of the firstDirty offset and the
> removal of the tombstone can happen independently.
>
> 106. I agree that it would be less confusing if we used "epoch" instead of
> "leader epoch" consistently.
>
> Jun
>
> On Thu, May 7, 2020 at 4:04 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Thanks Jun. Further replies are in-lined.
> >
> > On Mon, May 4, 2020 at 11:58 AM Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Guozhang,
> > >
> > > Thanks for the reply. A few more replies inlined below.
> > >
> > > On Sun, May 3, 2020 at 6:33 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > > > Hello Jun,
> > > >
> > > > Thanks for your comments! I'm replying inline below:
> > > >
> > > > On Fri, May 1, 2020 at 12:36 PM Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > > 101. Bootstrapping related issues.
> > > > > 101.1 Currently, we support auto broker id generation. Is this
> > > supported
> > > > > for bootstrap brokers?
> > > > >
> > > >
> > > > The vote ids would just be the broker ids. "bootstrap.servers" would
> be
> > > > similar to what client configs have today, where "quorum.voters"
> would
> > be
> > > > pre-defined config values.
> > > >
> > > >
> > > My question was on the auto generated broker id. Currently, the broker
> > can
> > > choose to have its broker Id auto generated. The generation is done
> > through
> > > ZK to guarantee uniqueness. Without ZK, it's not clear how the broker
> id
> > is
> > > auto generated. "quorum.voters" also can't be set statically if broker
> > ids
> > > are auto generated.
> > >
> > > Jason has explained some ideas that we've discussed so far, the reason
> we
> > intentional did not include them so far is that we feel it is out-side
> the
> > scope of KIP-595. Under the umbrella of KIP-500 we should definitely
> > address them though.
> >
> > On the high-level, our belief is that "joining a quorum" and "joining (or
> > more specifically, registering brokers in) the cluster" would be
> > de-coupled a bit, where the former should be completed before we do the
> > latter. More specifically, assuming the quorum is already up and running,
> > after the newly started broker found the leader of the quorum it can
> send a
> > specific RegisterBroker request including its listener / protocol / etc,
> > and upon handling it the leader can send back the uniquely generated
> broker
> > id to the new broker, while also executing the "startNewBroker" callback
> as
> > the controller.
> >
> >
> > >
> > > > > 102. Log compaction. One weak spot of log compaction is for the
> > > consumer
> > > > to
> > > > > deal with deletes. When a key is deleted, it's retained as a
> > tombstone
> > > > > first and then physically removed. If a client misses the tombstone
> > > > > (because it's physically removed), it may not be able to update its
> > > > > metadata properly. The way we solve this in Kafka is based on a
> > > > > configuration (log.cleaner.delete.retention.ms) and we expect a
> > > consumer
> > > > > having seen an old key to finish reading the deletion tombstone
> > within
> > > > that
> > > > > time. There is no strong guarantee for that since a broker could be
> > > down
> > > > > for a long time. It would be better if we can have a more reliable
> > way
> > > of
> > > > > dealing with deletes.
> > > > >
> > > >
> > > > We propose to capture this in the "FirstDirtyOffset" field of the
> > quorum
> > > > record fetch response: the offset is the maximum offset that log
> > > compaction
> > > > has reached up to. If the follower has fetched beyond this offset it
> > > means
> > > > itself is safe hence it has seen all records up to that offset. On
> > > getting
> > > > the response, the follower can then decide if its end offset actually
> > > below
> > > > that dirty offset (and hence may miss some tombstones). If that's the
> > > case:
> > > >
> > > > 1) Naively, it could re-bootstrap metadata log from the very
> beginning
> > to
> > > > catch up.
> > > > 2) During that time, it would refrain itself from answering
> > > MetadataRequest
> > > > from any clients.
> > > >
> > > >
> > > I am not sure that the "FirstDirtyOffset" field fully addresses the
> > issue.
> > > Currently, the deletion tombstone is not removed immediately after a
> > round
> > > of cleaning. It's removed after a delay in a subsequent round of
> > cleaning.
> > > Consider an example where a key insertion is at offset 200 and a
> deletion
> > > tombstone of the key is at 400. Initially, FirstDirtyOffset is at 300.
> A
> > > follower/observer fetches from offset 0  and fetches the key at offset
> > 200.
> > > A few rounds of cleaning happen. FirstDirtyOffset is at 500 and the
> > > tombstone at 400 is physically removed. The follower/observer continues
> > the
> > > fetch, but misses offset 400. It catches all the way to
> FirstDirtyOffset
> > > and declares its metadata as ready. However, its metadata could be
> stale
> > > since it actually misses the deletion of the key.
> > >
> > > Yeah good question, I should have put more details in my explanation :)
> >
> > The idea is that we will adjust the log compaction for this raft based
> > metadata log: before more details to be explained, since we have two
> types
> > of "watermarks" here, whereas in Kafka the watermark indicates where
> every
> > replica have replicated up to and in Raft the watermark indicates where
> the
> > majority of replicas (here only indicating voters of the quorum, not
> > counting observers) have replicated up to, let's call them Kafka
> watermark
> > and Raft watermark. For this special log, we would maintain both
> > watermarks.
> >
> > When log compacting on the leader, we would only compact up to the Kafka
> > watermark, i.e. if there is at least one voter who have not replicated an
> > entry, it would not be compacted. The "dirty-offset" is the offset that
> > we've compacted up to and is communicated to other voters, and the other
> > voters would also compact up to this value --- i.e. the difference here
> is
> > that instead of letting each replica doing log compaction independently,
> > we'll have the leader to decide upon which offset to compact to, and
> > propagate this value to others to follow, in a more coordinated manner.
> > Also note when there are new voters joining the quorum who has not
> > replicated up to the dirty-offset, of because of other issues they
> > truncated their logs to below the dirty-offset, they'd have to
> re-bootstrap
> > from the beginning, and during this period of time the leader learned
> about
> > this lagging voter would not advance the watermark (also it would not
> > decrement it), and hence not compacting either, until the voter(s) has
> > caught up to that dirty-offset.
> >
> > So back to your example above, before the bootstrap voter gets to 300 no
> > log compaction would happen on the leader; and until later when the voter
> > have got to beyond 400 and hence replicated that tombstone, the log
> > compaction would possibly get to that tombstone and remove it. Say later
> it
> > the leader's log compaction reaches 500, it can send this back to the
> voter
> > who can then also compact locally up to 500.
> >
> >
> > > > > 105. Quorum State: In addition to VotedId, do we need the epoch
> > > > > corresponding to VotedId? Over time, the same broker Id could be
> > voted
> > > in
> > > > > different generations with different epoch.
> > > > >
> > > > >
> > > > Hmm, this is a good point. Originally I think the "LeaderEpoch" field
> > in
> > > > that file is corresponding to the "latest known leader epoch", not
> the
> > > > "current leader epoch". For example, if the current epoch is N, and
> > then
> > > a
> > > > vote-request with epoch N+1 is received and the voter granted the
> vote
> > > for
> > > > it, then it means for this voter it knows the "latest epoch" is N + 1
> > > > although it is unknown if that sending candidate will indeed become
> the
> > > new
> > > > leader (which would only be notified via begin-quorum request).
> > However,
> > > > when persisting the quorum state, we would encode leader-epoch to
> N+1,
> > > > while the leaderId to be the older leader.
> > > >
> > > > But now thinking about this a bit more, I feel we should use two
> > separate
> > > > epochs, one for the "lates known" and one for the "current" to pair
> > with
> > > > the leaderId. I will update the wiki page.
> > > >
> > > >
> > > Hmm, it's kind of weird to bump up the leader epoch before the new
> leader
> > > is actually elected, right.
> > >
> > >
> > > > > 106. "OFFSET_OUT_OF_RANGE: Used in the FetchQuorumRecords API to
> > > indicate
> > > > > that the follower has fetched from an invalid offset and should
> > > truncate
> > > > to
> > > > > the offset/epoch indicated in the response." Observers can't
> truncate
> > > > their
> > > > > logs. What should they do with OFFSET_OUT_OF_RANGE?
> > > > >
> > > > >
> > > > I'm not sure if I understand your question? Observers should still be
> > > able
> > > > to truncate their logs as well.
> > > >
> > > >
> > > Hmm, I thought only the quorum nodes have local logs and observers
> don't?
> > >
> > > > 107. "The leader will continue sending BeginQuorumEpoch to each known
> > > > voter
> > > > > until it has received its endorsement." If a voter is down for a
> long
> > > > time,
> > > > > sending BeginQuorumEpoch seems to add unnecessary overhead.
> > Similarly,
> > > > if a
> > > > > follower stops sending FetchQuorumRecords, does the leader keep
> > sending
> > > > > BeginQuorumEpoch?
> > > > >
> > > >
> > > > Regarding BeginQuorumEpoch: that is a good point. The
> > begin-quorum-epoch
> > > > request is for voters to quickly get the new leader information;
> > however
> > > > even if they do not get them they can still eventually learn about
> that
> > > > from others via gossiping FindQuorum. I think we can adjust the logic
> > to
> > > > e.g. exponential back-off or with a limited num.retries.
> > > >
> > > > Regarding FetchQuorumRecords: if the follower sends
> FetchQuorumRecords
> > > > already, it means that follower already knows that the broker is the
> > > > leader, and hence we can stop retrying BeginQuorumEpoch; however it
> is
> > > > possible that after a follower sends FetchQuorumRecords already,
> > suddenly
> > > > it stops send it (possibly because it learned about a higher epoch
> > > leader),
> > > > and hence this broker may be a "zombie" leader and we propose to use
> > the
> > > > fetch.timeout to let the leader to try to verify if it has already
> been
> > > > stale.
> > > >
> > > >
> > > It just seems that we should handle these two cases in a consistent
> way?
> > >
> > > Yes I agree, on the leader's side, the FetchQuorumRecords from a
> follower
> > could mean that we no longer needs to send BeginQuorumEpoch anymore ---
> and
> > it is already part of our current implementations in
> > https://github.com/confluentinc/kafka/commits/kafka-raft
> >
> >
> > > Thanks,
> > >
> > > Jun
> > >
> > > >
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Apr 29, 2020 at 8:56 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello Leonard,
> > > > > >
> > > > > > Thanks for your comments, I'm relying in line below:
> > > > > >
> > > > > > On Wed, Apr 29, 2020 at 1:58 AM Wang (Leonard) Ge <
> > w...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Kafka developers,
> > > > > > >
> > > > > > > It's great to see this proposal and it took me some time to
> > finish
> > > > > > reading
> > > > > > > it.
> > > > > > >
> > > > > > > And I have the following questions about the Proposal:
> > > > > > >
> > > > > > >    - How do we plan to test this design to ensure its
> > correctness?
> > > Or
> > > > > > more
> > > > > > >    broadly, how do we ensure that our new ‘pull’ based model is
> > > > > > functional
> > > > > > > and
> > > > > > >    correct given that it is different from the original RAFT
> > > > > > implementation
> > > > > > >    which has formal proof of correctness?
> > > > > > >
> > > > > >
> > > > > > We have two planned verifications on the correctness and liveness
> > of
> > > > the
> > > > > > design. One is via model verification (TLA+)
> > > > > > https://github.com/guozhangwang/kafka-specification
> > > > > >
> > > > > > Another is via the concurrent simulation tests
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/confluentinc/kafka/commit/5c0c054597d2d9f458cad0cad846b0671efa2d91
> > > > > >
> > > > > >    - Have we considered any sensible defaults for the
> > configuration,
> > > > i.e.
> > > > > > >    all the election timeout, fetch time out, etc.? Or we want
> to
> > > > leave
> > > > > > > this to
> > > > > > >    a later stage when we do the performance testing, etc.
> > > > > > >
> > > > > >
> > > > > > This is a good question, the reason we did not set any default
> > values
> > > > for
> > > > > > the timeout configurations is that we think it may take some
> > > > benchmarking
> > > > > > experiments to get these defaults right. Some high-level
> principles
> > > to
> > > > > > consider: 1) the fetch.timeout should be around the same scale
> with
> > > zk
> > > > > > session timeout, which is now 18 seconds by default -- in
> practice
> > > > we've
> > > > > > seen unstable networks having more than 10 secs of transient
> > > > > connectivity,
> > > > > > 2) the election.timeout, however, should be smaller than the
> fetch
> > > > > timeout
> > > > > > as is also suggested as a practical optimization in literature:
> > > > > > https://www.cl.cam.ac.uk/~ms705/pub/papers/2015-osr-raft.pdf
> > > > > >
> > > > > > Some more discussions can be found here:
> > > > > > https://github.com/confluentinc/kafka/pull/301/files#r415420081
> > > > > >
> > > > > >
> > > > > > >    - Have we considered piggybacking `BeginQuorumEpoch` with
> the
> > `
> > > > > > >    FetchQuorumRecords`? I might be missing something obvious
> but
> > I
> > > am
> > > > > > just
> > > > > > >    wondering why don’t we just use the `FindQuorum` and
> > > > > > > `FetchQuorumRecords`
> > > > > > >    APIs and remove the `BeginQuorumEpoch` API?
> > > > > > >
> > > > > >
> > > > > > Note that Begin/EndQuorumEpoch is sent from leader -> other voter
> > > > > > followers, while FindQuorum / Fetch are sent from follower to
> > leader.
> > > > > > Arguably one can eventually realize the new leader and epoch via
> > > > > gossiping
> > > > > > FindQuorum, but that could in practice require a long delay.
> > Having a
> > > > > > leader -> other voters request helps the new leader epoch to be
> > > > > propagated
> > > > > > faster under a pull model.
> > > > > >
> > > > > >
> > > > > > >    - And about the `FetchQuorumRecords` response schema, in the
> > > > > `Records`
> > > > > > >    field of the response, is it just one record or all the
> > records
> > > > > > starting
> > > > > > >    from the FetchOffset? It seems a lot more efficient if we
> sent
> > > all
> > > > > the
> > > > > > >    records during the bootstrapping of the brokers.
> > > > > > >
> > > > > >
> > > > > > Yes the fetching is batched: FetchOffset is just the starting
> > offset
> > > of
> > > > > the
> > > > > > batch of records.
> > > > > >
> > > > > >
> > > > > > >    - Regarding the disruptive broker issues, does our pull
> based
> > > > model
> > > > > > >    suffer from it? If so, have we considered the Pre-Vote
> stage?
> > If
> > > > > not,
> > > > > > > why?
> > > > > > >
> > > > > > >
> > > > > > The disruptive broker is stated in the original Raft paper which
> is
> > > the
> > > > > > result of the push model design. Our analysis showed that with
> the
> > > pull
> > > > > > model it is no longer an issue.
> > > > > >
> > > > > >
> > > > > > > Thanks a lot for putting this up, and I hope that my questions
> > can
> > > be
> > > > > of
> > > > > > > some value to make this KIP better.
> > > > > > >
> > > > > > > Hope to hear from you soon!
> > > > > > >
> > > > > > > Best wishes,
> > > > > > > Leonard
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Apr 29, 2020 at 1:46 AM Colin McCabe <
> cmcc...@apache.org
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Jason,
> > > > > > > >
> > > > > > > > It's amazing to see this coming together :)
> > > > > > > >
> > > > > > > > I haven't had a chance to read in detail, but I read the
> > outline
> > > > and
> > > > > a
> > > > > > > few
> > > > > > > > things jumped out at me.
> > > > > > > >
> > > > > > > > First, for every epoch that is 32 bits rather than 64, I sort
> > of
> > > > > wonder
> > > > > > > if
> > > > > > > > that's a good long-term choice.  I keep reading about stuff
> > like
> > > > > this:
> > > > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1277 .
> > > Obviously,
> > > > > > that
> > > > > > > > JIRA is about zxid, which increments much faster than we
> expect
> > > > these
> > > > > > > > leader epochs to, but it would still be good to see some
> rough
> > > > > > > calculations
> > > > > > > > about how long 32 bits (or really, 31 bits) will last us in
> the
> > > > cases
> > > > > > > where
> > > > > > > > we're using it, and what the space savings we're getting
> really
> > > is.
> > > > > It
> > > > > > > > seems like in most cases the tradeoff may not be worth it?
> > > > > > > >
> > > > > > > > Another thing I've been thinking about is how we do
> > > > bootstrapping.  I
> > > > > > > > would prefer to be in a world where formatting a new Kafka
> node
> > > > was a
> > > > > > > first
> > > > > > > > class operation explicitly initiated by the admin, rather
> than
> > > > > > something
> > > > > > > > that happened implicitly when you started up the broker and
> > > things
> > > > > > > "looked
> > > > > > > > blank."
> > > > > > > >
> > > > > > > > The first problem is that things can "look blank"
> accidentally
> > if
> > > > the
> > > > > > > > storage system is having a bad day.  Clearly in the non-Raft
> > > world,
> > > > > > this
> > > > > > > > leads to data loss if the broker that is (re)started this way
> > was
> > > > the
> > > > > > > > leader for some partitions.
> > > > > > > >
> > > > > > > > The second problem is that we have a bit of a chicken and egg
> > > > problem
> > > > > > > with
> > > > > > > > certain configuration keys.  For example, maybe you want to
> > > > configure
> > > > > > > some
> > > > > > > > connection security settings in your cluster, but you don't
> > want
> > > > them
> > > > > > to
> > > > > > > > ever be stored in a plaintext config file.  (For example,
> SCRAM
> > > > > > > passwords,
> > > > > > > > etc.)  You could use a broker API to set the configuration,
> but
> > > > that
> > > > > > > brings
> > > > > > > > up the chicken and egg problem.  The broker needs to be
> > > configured
> > > > to
> > > > > > > know
> > > > > > > > how to talk to you, but you need to configure it before you
> can
> > > > talk
> > > > > to
> > > > > > > > it.  Using an external secret manager like Vault is one way
> to
> > > > solve
> > > > > > > this,
> > > > > > > > but not everyone uses an external secret manager.
> > > > > > > >
> > > > > > > > quorum.voters seems like a similar configuration key.  In the
> > > > current
> > > > > > > KIP,
> > > > > > > > this is only read if there is no other configuration
> specifying
> > > the
> > > > > > > quorum
> > > > > > > > voter set.  If we had a kafka.mkfs command, we wouldn't need
> > this
> > > > key
> > > > > > > > because we could assume that there was always quorum
> > information
> > > > > stored
> > > > > > > > locally.
> > > > > > > >
> > > > > > > > best,
> > > > > > > > Colin
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Apr 16, 2020, at 16:44, Jason Gustafson wrote:
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > I'd like to start a discussion on KIP-595:
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum
> > > > > > > > .
> > > > > > > > > This proposal specifies a Raft protocol to ultimately
> replace
> > > > > > Zookeeper
> > > > > > > > > as
> > > > > > > > > documented in KIP-500. Please take a look and share your
> > > > thoughts.
> > > > > > > > >
> > > > > > > > > A few minor notes to set the stage a little bit:
> > > > > > > > >
> > > > > > > > > - This KIP does not specify the structure of the messages
> > used
> > > to
> > > > > > > > represent
> > > > > > > > > metadata in Kafka, nor does it specify the internal API
> that
> > > will
> > > > > be
> > > > > > > used
> > > > > > > > > by the controller. Expect these to come in later proposals.
> > > Here
> > > > we
> > > > > > are
> > > > > > > > > primarily concerned with the replication protocol and basic
> > > > > > operational
> > > > > > > > > mechanics.
> > > > > > > > > - We expect many details to change as we get closer to
> > > > integration
> > > > > > with
> > > > > > > > > the controller. Any changes we make will be made either as
> > > > > amendments
> > > > > > > to
> > > > > > > > > this KIP or, in the case of larger changes, as new
> proposals.
> > > > > > > > > - We have a prototype implementation which I will put
> online
> > > > within
> > > > > > the
> > > > > > > > > next week which may help in understanding some details. It
> > has
> > > > > > > diverged a
> > > > > > > > > little bit from our proposal, so I am taking a little time
> to
> > > > bring
> > > > > > it
> > > > > > > in
> > > > > > > > > line. I'll post an update to this thread when it is
> available
> > > for
> > > > > > > review.
> > > > > > > > >
> > > > > > > > > Finally, I want to mention that this proposal was drafted
> by
> > > > > myself,
> > > > > > > > Boyang
> > > > > > > > > Chen, and Guozhang Wang.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Jason
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Leonard Ge
> > > > > > > Software Engineer Intern - Confluent
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to