Hi Jose,

A few comments/questions below:

1. There is a comment in the proposal which suggests that we will maintain
multiple snapshots:

> Having multiple snapshots is useful for minimizing re-fetching of the
snapshot when a new snapshot is generated.

However, the document later says that snapshots get deleted as the LBO
advances. Just wanted to clarify the intent. Will we generally only have
one snapshot?

2. The proposal says the following:

> During leader election, followers with incomplete or missing snapshot
will send a vote request and response as if they had an empty log.

Maybe you can help me understand the scenario we're talking about since I'm
not sure I understand the point of this. If the intent is to not allow such
a follower to become leader, why would it ever become a candidate? On the
other hand, if the intent is to still allow it to become leader in some
disaster scenario, then why would it not use its latest log state? For
inbound Vote requests, I think it should definitely still consider its
latest log state when deciding whether to grant a vote.

3. Are we overloading `replica.fetch.max.bytes` for snapshot fetches as
well? It looks like we are specifying this at the partition level, but it
might be more useful to track the maximum bytes at the request level. On a
related note, it might be useful to think through heuristics for balancing
between the requests in a partition. Unlike fetches, it seems like we'd
want to complete snapshot loading partition by partition. I wonder if it
would be simpler for FetchSnapshot to handle just one partition.

4. It would help if the document motivated the need to track the snapshot
epoch. Since we are only snapshotting below the high watermark, are you
thinking about recovering from data loss scenarios?

5. Might need to fix the following:

> Otherwise, the leader will respond with the offset and epoch of the
latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d)

We ended up renaming the next fetch offset and epoch. I think we should
just leave it empty in this case. The snapshot offset and epoch seem
sufficient.


Thanks,
Jason

On Fri, Aug 7, 2020 at 11:33 AM Jose Garcia Sancio <jsan...@confluent.io>
wrote:

> Thanks for your feedback Jun.
>
> Here are my changes to the KIP:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158864763&selectedPageVersions=21&selectedPageVersions=20
>
> My comments are below...
>
> On Wed, Aug 5, 2020 at 1:59 PM Jun Rao <j...@confluent.io> wrote:
> >
> > Hi, Jose,
> >
> > Thanks for the KIP. A few comments blow.
> >
> > 10. I agree with Jason that it's useful to document the motivation a bit
> > clearer. Regarding semantic/performance, one benefit of snapshotting is
> > that it allows changes to be encoded incrementally instead of using the
> > full post image. For example, in KIP-631, each partition has multiple
> > fields like assigned replicas, leader, epoch, isr, etc. If only isr is
> > changed, the snapshotting approach allows the change to be represented
> with
> > just the new value in isr. Compaction will require all existing fields to
> > be included in order to represent just an isr change. This is just
> because
> > we can customize the combining logic with snapshotting.
> >
>
> Yes. Right now the IsrChange record from KIP-631 has both the ISR and
> the leader and epoch. I think we can split this record into two
> records:
> 1. ISR change that includes the topic, partition, and isr.
> 2. Leader change that includes the topic, partition, leader and leader
> epoch.
> I'll bring this up in the discussion thread for that KIP.
>
> > As for the
> > performance benefit, I guess in theory snapshotting allows the snapshot
> to
> > be updated in-place incrementally without having to read the full state
> in
> > the snapshot. BTW, during compaction, we only read the cleaned data once
> > instead of 3 times.
> >
>
> Doesn't compaction need to read the clean records to compare if the
> key is in the map of keys to offset? I made the following changes to
> the KIP:
>
> 2. With log compaction the broker needs to
>   a. read 1MB/s from the head of the log to update the in-memory state
>   b. read 1MB/s to update the map of keys to offsets
>   c. read 3MB/s (100MB from the already compacted log, 50MB from the
> new key-value records) from the older segments. The log will
> accumulate 50MB in 50 seconds worth of changes before compacting
> because the default configuration has a minimum clean ratio of 50%.
>
> The "100MB in the already compacted log" are these cleaned records.
> Let me know what you think and if I am missing something.
>
> > 11. The KIP mentions topic id. Currently there is no topic id. Does this
> > KIP depend on KIP-516?
> >
>
> For the purpose of measuring the impact, I was using the records
> proposed by KIP-631.This KIP doesn't depend on KIP-516 or KIIP-631 on
> its design and implementation. I was just referencing that KIP in the
> motivation and analysis. The KIP only assumes the changes in KIP-595
> which has been approved but it are not part of trunk yet.
>
> In the overview section the KIP mentions: "This KIP assumes that
> KIP-595 has been approved and implemented. "
>
> > 12. Is there a need to keep more than 1 snapshot? It seems we always
> expose
> > the latest snapshot to clients.
> >
>
> The KIP proposes keeping more than one snapshot to not invalidate any
> pending/concurrent `FetchSnapshot` that are attempting to fetch a
> snapshot that can be deleted. I'll remove this wording as the first
> version of this implementation will probably won't have this feature
> as it requires extra coordination. The implementation will still allow
> for multiple snapshots because generating a snapshot is not atomic
> with respect to increasing the LBO.
>
>
> > 13. "During leader election, followers with incomplete or missing
> snapshot
> > will send a vote request and response as if they had an empty log." Hmm,
> a
> > follower may not have a snapshot created, but that doesn't imply its log
> is
> > empty.
> >
>
> Yes. I fixed the "Validation of Snapshot and Log"  and that sentence.
> I basically added an additional condition where a snapshot is not
> required if the LBO is 0.
>
> > 14. "LBO is max.replication.lag.ms old." Not sure that I follow. How do
> we
> > compare an offset to a time?
> >
>
> Yeah. This may be hard to implement. I am trying to avoid invalidating
> followers and observers by aggressively deleting an offset/record
> which they are trying to fetch. It is possible that
> `controller.snapshot.minimum.records` is good enough to throttle
> increasing LBO.
>
> > 15. "Followers and observers will increase their log begin offset to the
> > value sent on the fetch response as long as the local state machine has
> > generated a snapshot that includes to the log begin offset minus one."
> Does
> > the observer store the log? I thought it only needed to maintain a
> > snapshot. If so, the observer doesn't need to maintain LBO.
> >
>
> In KIP-595 observers are similar to voters/followers in that they
> Fetch the log and the snapshot. Two of the distinctions are that they
> don't participate in the leader election and they are not included
> when computing the high-watermark. Regarding storing: it is possible
> that observers never need to store the log since they don't vote or
> become leaders. I think in the future we would like to implement
> "KIP-642: Dynamic quorum reassignment" which would add the capability
> to upgrade an observer to a voter. In that case we do want to store
> the log.
>
>
> > 16. "There are two cases when the Kafka Controller needs to load the
> > snapshot: When it is booting. When the follower and observer replicas
> > finishes fetching a new snapshot from the leader." For faster failover,
> it
> > seems it's useful for a non-controller voter to maintain the in-memory
> > metadata state. In order to do that, it seems that every voter needs to
> > load the snapshot on booting?
> >
>
> Yeah. I think there is some confusion on terminology. This document
> refers to Kafka Controller as the component in every voter that reads
> the snapshot and log. The Active Controller is the leader from that
> set of voters (Kafka Controller). I added a terminology section in the
> Overview section.
>
> > 17. "There is an invariant that every log must have at least one snapshot
> > where the offset of the snapshot is between LogBeginOffset - 1 and
> > High-Watermark. If this is not the case then the replica should assume
> that
> > the log is empty." Does that invariant hold when there is no snapshot
> > initially?
> >
>
> You are correct that this invariant doesn't hold for a replica that
> has a LBO of 0. I fixed this section to make this additional case
> clear.
>
> > 18. "The __cluster_metadata topic will have an cleanup.policy value of
> > snapshot" Is there a need to make this configurable if it's read-only?
> >
>
> Hmm. Every topic has a default cleanup.policy of delete. Looking at
> the code we also required that at least one of them is set. We need a
> way to disable the existing delete and compaction clean up policy. In
> LogConfig.scala we have:
>
> ```
> ...
> .define(CleanupPolicyProp, LIST, Defaults.CleanupPolicy,
> ValidList.in(LogConfig.Compact, LogConfig.Delete), MEDIUM, CompactDoc,
> KafkaConfig.LogCleanupPolicyProp)
> ...
> ```
>
> > 19. OFFSET_OUT_OF_RANGE in FetchSnapshotResponse: It seems that
> > POSITION_OUT_OF_RANGE is more appropriate?
> >
>
> Are you suggesting we introduce a new error type? For
> OFFSET_OUT_OF_RANGE we have
>
> OFFSET_OUT_OF_RANGE(1, "The requested offset is not within the range
> of offsets maintained by the server.",
> OffsetOutOfRangeException::new),
>
> Are you arguing that this is not entirely correct and instead the
> request offset is encoded/included in a snapshot?
>
> --
> -Jose
>

Reply via email to