Thanks, Jose.  It's looking good.  Here is one minor correction:

<<< If the Kafka topic partition leader receives a fetch request with an
offset and epoch greater than or equal to the LBO (x + 1, a)
>>> If the Kafka topic partition leader receives a fetch request with an
offset and epoch greater than or equal to the LBO (x + 1, b)

Here is one more question.  Is there an ability to evolve the snapshot
format over time, and if so, how is that managed for upgrades? It would be
both Controllers and Brokers that would depend on the format, correct?
Those could be the same thing if the controller was running inside the
broker JVM, but that is an option rather than a requirement, I think.
Might the Controller upgrade have to be coordinated with the broker upgrade
in the separate-JVM case?  Perhaps a section discussing this would be
appropriate?

Ron


On Tue, Jul 28, 2020 at 11:14 PM Jose Garcia Sancio <jsan...@confluent.io>
wrote:

> Thanks Ron. Your comments and suggestions were helpful. You can see my
> changes to the KIP here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763&selectedPageVersions=15&selectedPageVersions=14
>
> My comments are below...
>
> On Mon, Jul 27, 2020 at 11:29 AM Ron Dagostino <rndg...@gmail.com> wrote:
> >
> > Hi Jose.  Thanks for the KIP.  Here are some questions and some nit
> corrections.
> >
> > <<< In KIP-500 the Kafka Controller, which is the quorum leader from
> > KIP-595, will materialize the entries in the metadata log into memory.
> > Technically I think the quorum leader is referred to as the Active
> > Controller in KIP-500.  Maybe replace "Kafka Controller" with "Active
> > Controller"?  I think the term "Kafka Controller" is fine as used
> > throughout the rest of the KIP to refer to the entire thing, but when
> > referring specifically to the leader I think "Active Controller" is
> > the term that is defined in KIP-500.
> >
>
> Made those changes.
>
> >
> > <<< Each broker in KIP-500, which will be a replica of the metadata
> > log, will materialize the entries in the log into a metadata cache
> > This wording confused me because I assumed that "replica" was a formal
> > term and only (non-Active) Controllers are formally "replicas" of the
> > metadata log -- Kafka brokers would be clients that read the log and
> > then use the data for their own purpose as opposed to formally being
> > replicas with this understanding of the term "replica".  Is that
> > correct, and if so, maybe replace "replica" with "client"?
> >
>
> In KIP-595 we have two types of replicas: voters and observers. Voter
> replicas are Kafka Controllers and one one of them will become the
> Active controller. Observer replicas fetch from the log and attempt to
> keep up with the LEO of the Active Controller. I think you can
> consider all of them as "client" of the replicated log.
>
> >
> > <<< The type of in-memory state machines what we plan to implement
> > >>> The type of in-memory state machines that we plan to implement
> > nit
> >
>
> Done.
>
> >
> > <<< doesn't map very well to an key and offset based clean up policy.
> > >>> doesn't map very well to a key and offset based clean up policy.
> > nit
> >
>
> Done.
>
> >
> > <<< When starting a broker either because it is a new broker, a broker
> > was upgraded or a failed broker is restarting. Loading the state
> > represented by the __cluster_metadata topic partition is required
> > before the broker is available
> > >>> When starting a broker either because it is a new broker or it is
> restarting, loading the state represented by the __cluster_metadata topic
> partition is required before the broker is available.
> > Reword for simplicity and clarity?
> >
>
> Done.
>
> >
> > <<< With snapshot based of the in-memory state Kafka can be much more
> aggressive
> > >>> By taking and transmitting a snapshot of the in-memory state as
> described below Kafka can be much more aggressive
> > Tough to refer to the concept of snapshot here without having
> > described what it is, so refer to "as described below" to help orient
> > the reader?
> >
>
> Made some changes to these sentences. I agree that fully understanding
> parts of the motivated section requires reading the rest of the
> document. I wanted to make sure that we had this in the motivation
> section.
>
> >
> > <<< In the future this mechanism will also be useful for
> > high-throughput topic partitions like the Group Coordinator and
> > Transaction Coordinator.
> > >>> In the future this mechanism may also be useful for high-throughput
> topic partitions like the Group Coordinator and Transaction Coordinator.
> > Tough to say "will" when that is an assumption that would depend on a
> KIP?
> >
>
> Yeah. Changed it.
>
> >
> > <<<If events/deltas are not allowed in the replicated log for the
> > __cluster_metadata topic partition then the ... Kafka Controller will
> > need to replicate 3.81 MB to each broker in the cluster (10) or 38.14
> > MB.
> > It might be good to append a sentence that explicitly states how much
> > data is replicated for the delta/event -- right now it is implied to
> > be very small, but that's kind of like leaving the punch line to a
> > joke implied :-)
> >
>
> Thanks. I updated the example and added numbers for the events/deltas case.
>
>
> >
> > <<< Follower and observer replicas fetch the snapshots from the leader
> > they attempt to fetch an offset from the leader and the leader doesn’t
> > have that offset in the log
> > >>> Follower and observer replicas fetch a snapshot from the leader when
> they attempt to fetch an offset from the leader and the leader doesn’t have
> that offset in the log
> > nit
> >
>
> Done.
>
> >
> > >>> Generating and loading the snapshot will be delegated to the Kafka
> Controller.
> > >>> The Kafka Controller will notify the Kafka Raft client when it has
> generated a snapshot and up to which offset is included in the snapshot.
> > >>> The Kafka Raft client will notify the Kafka Controller when a new
> snapshot has been fetched from the leader.
> > This paragraph confuses me.  What is the "Kafka Raft client" -- is
> > this the broker? Or is it some other subsystem (or all other
> > subsystems aside from log replication) within the Controller?  Has
> > this been defined somewhere?  If so it would be good to refer to that
> > definition.  (Actually, now that I've read further down, I think you
> > refer to this as "Kafka Raft" later in the KIP; a reference to these
> > later sections or naming it Kafka Raft Client later on would have
> > helped me avoid confusion -- I searched the doc for raft client rather
> > than kafka raft, so I missed this when I searched.)
> >
>
> I cleaned up this paragraph a bit. Kafka Raft client refers to the
> implementation of KIP-595. I changed this phrasing and added
> references. Let me know what you think?
>
> >
> > <<< The Kafka Controller will notify the Kafka Raft client when it has
> > finished generating a new snapshots.
> > Same comment about "Kafka Raft client".
> >
>
> Yes. I updated this wording. Let me know what you think.
>
> >
> > <<< It is safe for the log to truncate a prefix of the log up to the
> > latest snapshot.
> > "log to truncate a prefix of the log" -- That first mention of "log"
> > needs to be something else I assume -- LogManager maybe?
> >
>
> I fixed the sentence. I don't think it is necessary to mention the
> component that is doing the truncation in this section of the
> document. We can discuss this in the implementation/PR instead.
>
> >
> > <<< In the example above, if the Kafka topic partition leader receives
> > a fetch request with an offset and epoch greater than or equal to the
> > log begin offset (x, a)
> > <<< In the example above, offset=x, epoch=a does not appear in the
> > diagram because it is before the log begin offset (x+1, b)   If the
> > Kafka topic partition leader receives a fetch request with an offset
> > and epoch greater than or equal to the LBO
> > Maybe add an explicit comment that offset=x, epoch=a does not appear
> > in the diagram because it is before the LBO of (x+1, b)?  Also need to
> > fix LBO reference (currently incorrectly stated as (x, a).
> >
>
> Thanks for the suggestion. I incorporated these changes in the KIP.
>
> >
> > <<< LBO can be increase/set to an offset X if the following is true:
> > <<< 2. One of the following is true:
> > Do the pair of conditions in (2) only apply to the leader/Active
> Controller?
> >
>
> Yes. For the followers we have "Followers and observers will increase
> their log begin offset to the value sent on the fetch response as long
> as the local state machine has generated a snapshot that includes to
> the log begin offset minus one."
>
> I re-organized the section to first talk about the leader in full and
> later talk about the followers/observers in full.
>
> >
> > <<< The broker will delete any snapshot with a latest offset and epoch
> > (SnapshotOffsetAndEpoch) less than the LBO - 1 (log begin offset).
> > Is it also a condition that there must be a snapshot in existence
> > "greater than" (in terms of offset, epoch) the one to be deleted?
> >
>
> Yes. I think that follows from bullet 1. in section "When to Increase
> the Log Begin Offset".
>
> >
> > <<< There are two cases when the Kafka Controller needs to load the
> snapshot:
> > <<< 1. When the broker is booting.
> > >>> 1. When the Controller is booting.
> > (Or "When it is booting")
> >
>
> Fixed.
>
> >
> > <<< 2. When the follower and observer replicas finishes fetches a new
> > snapshot from the leader
> > <<< 2. When a follower or observer replica finishes fetching a new
> > snapshot from the leader
> > grammar
> >
>
> Fixed.
>
> >
> > <<< The replica’s handling of fetch request will be extended such that
> > if FetchOffset is less than LogBeginOffset then the leader will
> > respond with
> > >>> The leader's handling of fetch request will be extended such that if
> FetchOffset is less than LogBeginOffset then it will respond with
> > clarifying corrections
> >
>
> Fixed.
>
> >
> > <<< The replica's handling of fetch response will be extended such
> > that if SnapshotOffsetAndEpoch is set
> > <<< then the follower will truncate its entire log and start fetching
> > from both the log and the snapshot.
> > Does it logically truncate or physically truncate, and does it make a
> > difference which one it is?  Logical deletion is "softer" than a hard,
> > physical delete and may be less risky if there were to be a
> > catastrophe (i.e. we never want to lose any data, but if we ever get
> > into the situation then might it be best to lose less data?)
> >
>
> It will be a "hard" truncate. We need to make sure that we don't go to
> the old state if the broker/replica happens to restart. I changed the
> wording in this paragraph.
>
> >
> > <<< If Continue is true, the follower is expected to send another
> > FetchSnapshotRequest with a Position set to the response's Position
> > plus the response's len(Bytes).
> > Is there a cleanup mechanism necessary for the case where a partial
> > snapshot is downloaded but the download never completes -- perhaps if
> > the leader dies and doesn't come back for a long time?  What would a
> > follower do in that case, and how does cleanup work?
> >
>
> I don't mention this in the KIP but in the implementation we will most
> likely write to a file named "<offset>-<epoch>.checkpoint.part. Once
> the download is complete then the file is renamed to
> <offset>-<epoch>.checkpoint.
>
> >
> > <<<Change leader when generating snapshots: Instead of supporting
> > concurrent snapshot generation and writing to the in-memory state, we
> > could require that only non-leaders generate snapshots.
> > Did the KIP ever explicitly state that leadership would or would not
> > change?  This is mentioned in KIP-631; maybe this belongs in that KIP
> > rather than here?
> >
>
> We discussed it as part of this KIP also. I am adding it here for
> completeness.
>
> --
> -Jose
>

Reply via email to