Hi, Jose,

Thanks for the reply. A few more comments below.

20. Good point on metadata cache. I think we need to make a decision
consistently. For example, if we decide that dedicated voter nodes don't
serve metadata requests, then we don't need to expose the voters host/port
to the client. Which KIP should make this decision?

31. controller.snapshot.minimum.records: For a compacted topic, we use a
ratio instead of the number of records to determine when to compact. This
has some advantages. For example, if we use
controller.snapshot.minimum.records and set it to 1000, then it will
trigger the generation of a new snapshot when the existing snapshot is
either 10MB or 1GB. Intuitively, the larger the snapshot, the more
expensive it is to write to disk. So, we want to wait for more data to be
accumulated before generating the next snapshot. The ratio based setting
achieves this. For instance, a 50% ratio requires 10MB/1GB more data to be
accumulated to regenerate a 10MB/1GB snapshot respectively.

32. max.replication.lag.ms: It seems this is specific to the metadata
topic. Could we make that clear in the name?

Thanks,

Jun

On Fri, Sep 25, 2020 at 12:43 PM Jose Garcia Sancio <jsan...@confluent.io>
wrote:

> Thanks for the detailed feedback Jun.
>
> The changes are here:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763&selectedPageVersions=25&selectedPageVersions=24
>
> Here is a summary of the change to the KIP:
> 1. Use end offset for snapshot and snapshot id.
> 2. Include default for all of the new configuration options.
> 3. Provide more detail in the response handling for FetchSnapshot
>
> > 20. "Metadata Cache: The component that generates snapshots, reads
> > snapshots and reads logs for observer replicas of the topic partition
> > __cluster_metadata." It seems this is needed on every broker, not just
> > observers?
>
> Yes. I think we need some clarification and consensus here. Some
> people are advocating for Kafka brokers to only be observers and would
> only contain a Metadata Cache. With the Kafka Controllers being
> separate nodes that are voters (follower, candidate or leader) and not
> observers. Others are advocating for Kafka Brokers to be able to host
> both the Kafka Controller and the Metadata Cache. In this case if the
> Controller and Metadata Cache are sharing the same underlying topic
> partition then we need to make sure that we unify the snapshotting
> logic.
>
> I would like to be able to unify the in-memory state for both the
> Kafka Controller and the Metadata Cache so that we can share the same
> replicated log and snapshot.
>
> > 21. Our current convention is to use exclusive offset for naming
> > checkpoint files. For example, a producer snapshot file of 1234.snapshot
> > means that the file includes the producer state up to, but not including
> > offset 1234. So, we probably want to follow the same convention for the
> new
> > checkpoint file.
>
> Thanks for pointing this out. This sounds good to me. This was a
> detail that I was struggling with when reading the replication code.
> Updated the KIP. Wherever the offset is exclusive, I renamed it to
> "end offset" (EndOffset).
>
> > 22. Snapshot Format: KIP-631 only defines the format for individual
> > records. It seems that we need to define the container format here. For
> > example, we need to store the length of each record. Also, does the
> > snapshot file need a CRC field?
>
> Yes. I have added more information on this. In summary, we are going
> to use Kafka's log format version 2. This will give us support for
> compression and CRC at the record batch level. The Kafka Controller
> and Metadata Cache can control how big they want the batches to be.
>
> > 23. Could we provide the default value for the new
> > configs controller.snapshot.minimum.records and max.replication.lag.ms.
> > Also, max.replication.lag.ms seems to just control the snapshot
> frequency
> > by time and not directly relate to replication. So, maybe it should be
> > called sth like controller.snapshot.minimum.interval.ms?
>
> "max.replication.lag.ms" is very similar to "replica.lag.time.max.ms".
> Kafka uses "replica.lag.time.max.ms" to make progress on the
> high-watermark when replicas are slow or offline. We want to use
> "max.replication.lag.ms" to make progress on the LBO when replicas are
> slow or offline. These very similar names are confusing. How about
> "replica.lbo.lag.time.max.ms"?
>
> How often snapshotting will happen is determined by
> "controller.snapshot.minimum.records".
>
> > 24. "Kafka allows the clients to delete records that are less than a
> given
> > offset by using the DeleteRecords RPC . Those requests will be validated
> > using the same logic enumerated above." Hmm, should we allow deleteRecord
> > on the metadata topic? If we do, does it trim the snapshot accordingly?
>
> Yeah. After thinking about it some more, I don't think we shouldn't
> allow DeleteRecords to succeed on the __cluster_metadata partition.
> For the error that we return it looks like our options are the
> existing "POLICY_VIOLATIOIN" (the description for this error is
> "Request parameters do not satisfy the configured policy.') or
> introduce a new error. I think we should just return
> POLICY_VIOLATIOIN, what do you think?
>
> > 25. "The followers of the __cluster_metadata topic partition will
> > concurrently fetch the snapshot and replicated log. This means that
> > candidates with incomplete snapshots will send a vote request with a
> > LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the
> > replicated log." My understanding is that a follower will either fetch
> from
> > the snapshot or the log, but not both at the same time. Could you explain
> > how the concurrent part works? Also, what's an incomplete snapshot?
>
> Yes. I rewrote this section based on your comment and Jason's
> comments. Let me know if this addresses your concerns.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-630:+Kafka+Raft+Snapshot#KIP630:KafkaRaftSnapshot-ChangestoLeaderElection
>
> >
> > 26. FetchRequest:
> > 26.1 Handling Fetch Request: I agree with Jason that
> SnapshotOffsetAndEpoch
> > already tells us the next offset to fetch. So, we don't need to
> > set NextOffsetAndEpoch in the response.
>
> Agreed. The response will set one or the other. If SnapshotId (field
> renamed in the latest version of the KIP) is set then the follower
> will fetch the snapshot and follow the logic set in this KIP. If
> DivergingEpoch (field renamed in the latest version of KIP-595) is set
> then the follower will follow the truncation logic set in KIP-595.
>
> > 26.2 Is there a reason to rename LogStartOffset to LogBeginOffset? I am
> not
> > sure if they are truly identical semantically. For example, currently,
> the
> > follower moves it's logStartOffset based on the leader's. Will we do the
> > same thing with LogBeginOffset?
>
> They are identical. I renamed it so that we can use the acronym LBO
> and to match the concept of log end off (LEO). We can't use the
> acronym LSB because we use that for last stable offset. What do you
> think?
>
> In this KIP the follower will also use the leader's LogBeginOffset
> (currently known as log start offset) to advance it's LBO. The
> difference is that followers can only advance it if they also have a
> snapshot with an end offset greater than or equal to the new LBO. This
> is documented in the "When to Increase the Log Begin Offset" section:
>
> Followers and observers will increase their log begin offset to the
> value sent on the fetch response as long as the local Kafka Controller
> and Metadata Cache has generated a snapshot with an end offset greater
> than or equal to the new log begin offset.
>
> >
> > 27. FetchSnapshotRequest: It seems that SnapshotOffsetAndEpoch shouldn't
> be
> > optional. Also, its version number 12 is incorrect.
>
> Fixed.
>
> > 28. FetchSnapshotResponse: Do we need the position field? It seems it's
> the
> > same as in the request.
>
> I decided to return the position field so that the follower doesn't
> need to remember the request sent.
>
> > 29. "OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset is
> > greater than the size of the snapshot." By offset, do you mean position?
>
> Yes. Fixed.
>
> > 30. It's possible for a broker to die while copying the snapshot file
> from
> > the leader or saving its locally generated snapshot. On restart, how does
> > the broker know whether a local snapshot file is complete or not?
>
> When fetching a snapshot the current implementation writes the content
> to a "temporary" file in the "checkpoints" directory (FYI, I think
> this checkpoints directory was added after your review) of the topic
> partition.The name of this file is
> "<SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.part". After the
> follower has finished downloading the entire snapshot it does an
> atomic move/rename to
> "<SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint".
>
> We can safely ignore the file named *.part when determining the state
> of the replica with respect to leader election. When a replica starts
> up it can resume fetching the snapshot based on the state of the local
> *.part file.
>
> I updated the section "Response Handler" for FetchSnapshot with this
> information.
>
> --
> -Jose
>

Reply via email to