Hi, Guozhang and Jason,

Thanks for the reply. A couple of more replies.

102. Still not sure about this. How is the tombstone issue addressed in the
non-voter and the observer.  They can die at any point and restart at an
arbitrary later time, and the advancing of the firstDirty offset and the
removal of the tombstone can happen independently.

106. I agree that it would be less confusing if we used "epoch" instead of
"leader epoch" consistently.

Jun

On Thu, May 7, 2020 at 4:04 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Thanks Jun. Further replies are in-lined.
>
> On Mon, May 4, 2020 at 11:58 AM Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Guozhang,
> >
> > Thanks for the reply. A few more replies inlined below.
> >
> > On Sun, May 3, 2020 at 6:33 PM Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > Hello Jun,
> > >
> > > Thanks for your comments! I'm replying inline below:
> > >
> > > On Fri, May 1, 2020 at 12:36 PM Jun Rao <j...@confluent.io> wrote:
> > >
> > > > 101. Bootstrapping related issues.
> > > > 101.1 Currently, we support auto broker id generation. Is this
> > supported
> > > > for bootstrap brokers?
> > > >
> > >
> > > The vote ids would just be the broker ids. "bootstrap.servers" would be
> > > similar to what client configs have today, where "quorum.voters" would
> be
> > > pre-defined config values.
> > >
> > >
> > My question was on the auto generated broker id. Currently, the broker
> can
> > choose to have its broker Id auto generated. The generation is done
> through
> > ZK to guarantee uniqueness. Without ZK, it's not clear how the broker id
> is
> > auto generated. "quorum.voters" also can't be set statically if broker
> ids
> > are auto generated.
> >
> > Jason has explained some ideas that we've discussed so far, the reason we
> intentional did not include them so far is that we feel it is out-side the
> scope of KIP-595. Under the umbrella of KIP-500 we should definitely
> address them though.
>
> On the high-level, our belief is that "joining a quorum" and "joining (or
> more specifically, registering brokers in) the cluster" would be
> de-coupled a bit, where the former should be completed before we do the
> latter. More specifically, assuming the quorum is already up and running,
> after the newly started broker found the leader of the quorum it can send a
> specific RegisterBroker request including its listener / protocol / etc,
> and upon handling it the leader can send back the uniquely generated broker
> id to the new broker, while also executing the "startNewBroker" callback as
> the controller.
>
>
> >
> > > > 102. Log compaction. One weak spot of log compaction is for the
> > consumer
> > > to
> > > > deal with deletes. When a key is deleted, it's retained as a
> tombstone
> > > > first and then physically removed. If a client misses the tombstone
> > > > (because it's physically removed), it may not be able to update its
> > > > metadata properly. The way we solve this in Kafka is based on a
> > > > configuration (log.cleaner.delete.retention.ms) and we expect a
> > consumer
> > > > having seen an old key to finish reading the deletion tombstone
> within
> > > that
> > > > time. There is no strong guarantee for that since a broker could be
> > down
> > > > for a long time. It would be better if we can have a more reliable
> way
> > of
> > > > dealing with deletes.
> > > >
> > >
> > > We propose to capture this in the "FirstDirtyOffset" field of the
> quorum
> > > record fetch response: the offset is the maximum offset that log
> > compaction
> > > has reached up to. If the follower has fetched beyond this offset it
> > means
> > > itself is safe hence it has seen all records up to that offset. On
> > getting
> > > the response, the follower can then decide if its end offset actually
> > below
> > > that dirty offset (and hence may miss some tombstones). If that's the
> > case:
> > >
> > > 1) Naively, it could re-bootstrap metadata log from the very beginning
> to
> > > catch up.
> > > 2) During that time, it would refrain itself from answering
> > MetadataRequest
> > > from any clients.
> > >
> > >
> > I am not sure that the "FirstDirtyOffset" field fully addresses the
> issue.
> > Currently, the deletion tombstone is not removed immediately after a
> round
> > of cleaning. It's removed after a delay in a subsequent round of
> cleaning.
> > Consider an example where a key insertion is at offset 200 and a deletion
> > tombstone of the key is at 400. Initially, FirstDirtyOffset is at 300. A
> > follower/observer fetches from offset 0  and fetches the key at offset
> 200.
> > A few rounds of cleaning happen. FirstDirtyOffset is at 500 and the
> > tombstone at 400 is physically removed. The follower/observer continues
> the
> > fetch, but misses offset 400. It catches all the way to FirstDirtyOffset
> > and declares its metadata as ready. However, its metadata could be stale
> > since it actually misses the deletion of the key.
> >
> > Yeah good question, I should have put more details in my explanation :)
>
> The idea is that we will adjust the log compaction for this raft based
> metadata log: before more details to be explained, since we have two types
> of "watermarks" here, whereas in Kafka the watermark indicates where every
> replica have replicated up to and in Raft the watermark indicates where the
> majority of replicas (here only indicating voters of the quorum, not
> counting observers) have replicated up to, let's call them Kafka watermark
> and Raft watermark. For this special log, we would maintain both
> watermarks.
>
> When log compacting on the leader, we would only compact up to the Kafka
> watermark, i.e. if there is at least one voter who have not replicated an
> entry, it would not be compacted. The "dirty-offset" is the offset that
> we've compacted up to and is communicated to other voters, and the other
> voters would also compact up to this value --- i.e. the difference here is
> that instead of letting each replica doing log compaction independently,
> we'll have the leader to decide upon which offset to compact to, and
> propagate this value to others to follow, in a more coordinated manner.
> Also note when there are new voters joining the quorum who has not
> replicated up to the dirty-offset, of because of other issues they
> truncated their logs to below the dirty-offset, they'd have to re-bootstrap
> from the beginning, and during this period of time the leader learned about
> this lagging voter would not advance the watermark (also it would not
> decrement it), and hence not compacting either, until the voter(s) has
> caught up to that dirty-offset.
>
> So back to your example above, before the bootstrap voter gets to 300 no
> log compaction would happen on the leader; and until later when the voter
> have got to beyond 400 and hence replicated that tombstone, the log
> compaction would possibly get to that tombstone and remove it. Say later it
> the leader's log compaction reaches 500, it can send this back to the voter
> who can then also compact locally up to 500.
>
>
> > > > 105. Quorum State: In addition to VotedId, do we need the epoch
> > > > corresponding to VotedId? Over time, the same broker Id could be
> voted
> > in
> > > > different generations with different epoch.
> > > >
> > > >
> > > Hmm, this is a good point. Originally I think the "LeaderEpoch" field
> in
> > > that file is corresponding to the "latest known leader epoch", not the
> > > "current leader epoch". For example, if the current epoch is N, and
> then
> > a
> > > vote-request with epoch N+1 is received and the voter granted the vote
> > for
> > > it, then it means for this voter it knows the "latest epoch" is N + 1
> > > although it is unknown if that sending candidate will indeed become the
> > new
> > > leader (which would only be notified via begin-quorum request).
> However,
> > > when persisting the quorum state, we would encode leader-epoch to N+1,
> > > while the leaderId to be the older leader.
> > >
> > > But now thinking about this a bit more, I feel we should use two
> separate
> > > epochs, one for the "lates known" and one for the "current" to pair
> with
> > > the leaderId. I will update the wiki page.
> > >
> > >
> > Hmm, it's kind of weird to bump up the leader epoch before the new leader
> > is actually elected, right.
> >
> >
> > > > 106. "OFFSET_OUT_OF_RANGE: Used in the FetchQuorumRecords API to
> > indicate
> > > > that the follower has fetched from an invalid offset and should
> > truncate
> > > to
> > > > the offset/epoch indicated in the response." Observers can't truncate
> > > their
> > > > logs. What should they do with OFFSET_OUT_OF_RANGE?
> > > >
> > > >
> > > I'm not sure if I understand your question? Observers should still be
> > able
> > > to truncate their logs as well.
> > >
> > >
> > Hmm, I thought only the quorum nodes have local logs and observers don't?
> >
> > > 107. "The leader will continue sending BeginQuorumEpoch to each known
> > > voter
> > > > until it has received its endorsement." If a voter is down for a long
> > > time,
> > > > sending BeginQuorumEpoch seems to add unnecessary overhead.
> Similarly,
> > > if a
> > > > follower stops sending FetchQuorumRecords, does the leader keep
> sending
> > > > BeginQuorumEpoch?
> > > >
> > >
> > > Regarding BeginQuorumEpoch: that is a good point. The
> begin-quorum-epoch
> > > request is for voters to quickly get the new leader information;
> however
> > > even if they do not get them they can still eventually learn about that
> > > from others via gossiping FindQuorum. I think we can adjust the logic
> to
> > > e.g. exponential back-off or with a limited num.retries.
> > >
> > > Regarding FetchQuorumRecords: if the follower sends FetchQuorumRecords
> > > already, it means that follower already knows that the broker is the
> > > leader, and hence we can stop retrying BeginQuorumEpoch; however it is
> > > possible that after a follower sends FetchQuorumRecords already,
> suddenly
> > > it stops send it (possibly because it learned about a higher epoch
> > leader),
> > > and hence this broker may be a "zombie" leader and we propose to use
> the
> > > fetch.timeout to let the leader to try to verify if it has already been
> > > stale.
> > >
> > >
> > It just seems that we should handle these two cases in a consistent way?
> >
> > Yes I agree, on the leader's side, the FetchQuorumRecords from a follower
> could mean that we no longer needs to send BeginQuorumEpoch anymore --- and
> it is already part of our current implementations in
> https://github.com/confluentinc/kafka/commits/kafka-raft
>
>
> > Thanks,
> >
> > Jun
> >
> > >
> > > >
> > > > Jun
> > > >
> > > > On Wed, Apr 29, 2020 at 8:56 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Leonard,
> > > > >
> > > > > Thanks for your comments, I'm relying in line below:
> > > > >
> > > > > On Wed, Apr 29, 2020 at 1:58 AM Wang (Leonard) Ge <
> w...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi Kafka developers,
> > > > > >
> > > > > > It's great to see this proposal and it took me some time to
> finish
> > > > > reading
> > > > > > it.
> > > > > >
> > > > > > And I have the following questions about the Proposal:
> > > > > >
> > > > > >    - How do we plan to test this design to ensure its
> correctness?
> > Or
> > > > > more
> > > > > >    broadly, how do we ensure that our new ‘pull’ based model is
> > > > > functional
> > > > > > and
> > > > > >    correct given that it is different from the original RAFT
> > > > > implementation
> > > > > >    which has formal proof of correctness?
> > > > > >
> > > > >
> > > > > We have two planned verifications on the correctness and liveness
> of
> > > the
> > > > > design. One is via model verification (TLA+)
> > > > > https://github.com/guozhangwang/kafka-specification
> > > > >
> > > > > Another is via the concurrent simulation tests
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/confluentinc/kafka/commit/5c0c054597d2d9f458cad0cad846b0671efa2d91
> > > > >
> > > > >    - Have we considered any sensible defaults for the
> configuration,
> > > i.e.
> > > > > >    all the election timeout, fetch time out, etc.? Or we want to
> > > leave
> > > > > > this to
> > > > > >    a later stage when we do the performance testing, etc.
> > > > > >
> > > > >
> > > > > This is a good question, the reason we did not set any default
> values
> > > for
> > > > > the timeout configurations is that we think it may take some
> > > benchmarking
> > > > > experiments to get these defaults right. Some high-level principles
> > to
> > > > > consider: 1) the fetch.timeout should be around the same scale with
> > zk
> > > > > session timeout, which is now 18 seconds by default -- in practice
> > > we've
> > > > > seen unstable networks having more than 10 secs of transient
> > > > connectivity,
> > > > > 2) the election.timeout, however, should be smaller than the fetch
> > > > timeout
> > > > > as is also suggested as a practical optimization in literature:
> > > > > https://www.cl.cam.ac.uk/~ms705/pub/papers/2015-osr-raft.pdf
> > > > >
> > > > > Some more discussions can be found here:
> > > > > https://github.com/confluentinc/kafka/pull/301/files#r415420081
> > > > >
> > > > >
> > > > > >    - Have we considered piggybacking `BeginQuorumEpoch` with the
> `
> > > > > >    FetchQuorumRecords`? I might be missing something obvious but
> I
> > am
> > > > > just
> > > > > >    wondering why don’t we just use the `FindQuorum` and
> > > > > > `FetchQuorumRecords`
> > > > > >    APIs and remove the `BeginQuorumEpoch` API?
> > > > > >
> > > > >
> > > > > Note that Begin/EndQuorumEpoch is sent from leader -> other voter
> > > > > followers, while FindQuorum / Fetch are sent from follower to
> leader.
> > > > > Arguably one can eventually realize the new leader and epoch via
> > > > gossiping
> > > > > FindQuorum, but that could in practice require a long delay.
> Having a
> > > > > leader -> other voters request helps the new leader epoch to be
> > > > propagated
> > > > > faster under a pull model.
> > > > >
> > > > >
> > > > > >    - And about the `FetchQuorumRecords` response schema, in the
> > > > `Records`
> > > > > >    field of the response, is it just one record or all the
> records
> > > > > starting
> > > > > >    from the FetchOffset? It seems a lot more efficient if we sent
> > all
> > > > the
> > > > > >    records during the bootstrapping of the brokers.
> > > > > >
> > > > >
> > > > > Yes the fetching is batched: FetchOffset is just the starting
> offset
> > of
> > > > the
> > > > > batch of records.
> > > > >
> > > > >
> > > > > >    - Regarding the disruptive broker issues, does our pull based
> > > model
> > > > > >    suffer from it? If so, have we considered the Pre-Vote stage?
> If
> > > > not,
> > > > > > why?
> > > > > >
> > > > > >
> > > > > The disruptive broker is stated in the original Raft paper which is
> > the
> > > > > result of the push model design. Our analysis showed that with the
> > pull
> > > > > model it is no longer an issue.
> > > > >
> > > > >
> > > > > > Thanks a lot for putting this up, and I hope that my questions
> can
> > be
> > > > of
> > > > > > some value to make this KIP better.
> > > > > >
> > > > > > Hope to hear from you soon!
> > > > > >
> > > > > > Best wishes,
> > > > > > Leonard
> > > > > >
> > > > > >
> > > > > > On Wed, Apr 29, 2020 at 1:46 AM Colin McCabe <cmcc...@apache.org
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Jason,
> > > > > > >
> > > > > > > It's amazing to see this coming together :)
> > > > > > >
> > > > > > > I haven't had a chance to read in detail, but I read the
> outline
> > > and
> > > > a
> > > > > > few
> > > > > > > things jumped out at me.
> > > > > > >
> > > > > > > First, for every epoch that is 32 bits rather than 64, I sort
> of
> > > > wonder
> > > > > > if
> > > > > > > that's a good long-term choice.  I keep reading about stuff
> like
> > > > this:
> > > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-1277 .
> > Obviously,
> > > > > that
> > > > > > > JIRA is about zxid, which increments much faster than we expect
> > > these
> > > > > > > leader epochs to, but it would still be good to see some rough
> > > > > > calculations
> > > > > > > about how long 32 bits (or really, 31 bits) will last us in the
> > > cases
> > > > > > where
> > > > > > > we're using it, and what the space savings we're getting really
> > is.
> > > > It
> > > > > > > seems like in most cases the tradeoff may not be worth it?
> > > > > > >
> > > > > > > Another thing I've been thinking about is how we do
> > > bootstrapping.  I
> > > > > > > would prefer to be in a world where formatting a new Kafka node
> > > was a
> > > > > > first
> > > > > > > class operation explicitly initiated by the admin, rather than
> > > > > something
> > > > > > > that happened implicitly when you started up the broker and
> > things
> > > > > > "looked
> > > > > > > blank."
> > > > > > >
> > > > > > > The first problem is that things can "look blank" accidentally
> if
> > > the
> > > > > > > storage system is having a bad day.  Clearly in the non-Raft
> > world,
> > > > > this
> > > > > > > leads to data loss if the broker that is (re)started this way
> was
> > > the
> > > > > > > leader for some partitions.
> > > > > > >
> > > > > > > The second problem is that we have a bit of a chicken and egg
> > > problem
> > > > > > with
> > > > > > > certain configuration keys.  For example, maybe you want to
> > > configure
> > > > > > some
> > > > > > > connection security settings in your cluster, but you don't
> want
> > > them
> > > > > to
> > > > > > > ever be stored in a plaintext config file.  (For example, SCRAM
> > > > > > passwords,
> > > > > > > etc.)  You could use a broker API to set the configuration, but
> > > that
> > > > > > brings
> > > > > > > up the chicken and egg problem.  The broker needs to be
> > configured
> > > to
> > > > > > know
> > > > > > > how to talk to you, but you need to configure it before you can
> > > talk
> > > > to
> > > > > > > it.  Using an external secret manager like Vault is one way to
> > > solve
> > > > > > this,
> > > > > > > but not everyone uses an external secret manager.
> > > > > > >
> > > > > > > quorum.voters seems like a similar configuration key.  In the
> > > current
> > > > > > KIP,
> > > > > > > this is only read if there is no other configuration specifying
> > the
> > > > > > quorum
> > > > > > > voter set.  If we had a kafka.mkfs command, we wouldn't need
> this
> > > key
> > > > > > > because we could assume that there was always quorum
> information
> > > > stored
> > > > > > > locally.
> > > > > > >
> > > > > > > best,
> > > > > > > Colin
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Apr 16, 2020, at 16:44, Jason Gustafson wrote:
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > I'd like to start a discussion on KIP-595:
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum
> > > > > > > .
> > > > > > > > This proposal specifies a Raft protocol to ultimately replace
> > > > > Zookeeper
> > > > > > > > as
> > > > > > > > documented in KIP-500. Please take a look and share your
> > > thoughts.
> > > > > > > >
> > > > > > > > A few minor notes to set the stage a little bit:
> > > > > > > >
> > > > > > > > - This KIP does not specify the structure of the messages
> used
> > to
> > > > > > > represent
> > > > > > > > metadata in Kafka, nor does it specify the internal API that
> > will
> > > > be
> > > > > > used
> > > > > > > > by the controller. Expect these to come in later proposals.
> > Here
> > > we
> > > > > are
> > > > > > > > primarily concerned with the replication protocol and basic
> > > > > operational
> > > > > > > > mechanics.
> > > > > > > > - We expect many details to change as we get closer to
> > > integration
> > > > > with
> > > > > > > > the controller. Any changes we make will be made either as
> > > > amendments
> > > > > > to
> > > > > > > > this KIP or, in the case of larger changes, as new proposals.
> > > > > > > > - We have a prototype implementation which I will put online
> > > within
> > > > > the
> > > > > > > > next week which may help in understanding some details. It
> has
> > > > > > diverged a
> > > > > > > > little bit from our proposal, so I am taking a little time to
> > > bring
> > > > > it
> > > > > > in
> > > > > > > > line. I'll post an update to this thread when it is available
> > for
> > > > > > review.
> > > > > > > >
> > > > > > > > Finally, I want to mention that this proposal was drafted by
> > > > myself,
> > > > > > > Boyang
> > > > > > > > Chen, and Guozhang Wang.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Jason
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Leonard Ge
> > > > > > Software Engineer Intern - Confluent
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to