Hi all, I wanted to check the expected behaviour of this KIP in the following case. Assume you have a partition of topic with minISR=2, RF=3. The leader is receiving a lot of traffic on that partition, so much that followers can't keep up. One follower is booted out of ISR as normal, but currently the second follower flaps in and out of ISR. This is because ISR expand / shrink perform different checks:
- Shrink ISR, attempted on a schedule, triggers if the follower's LEO has not recently caught up with the leader's LEO (see `isCaughtUp(...)` in `Replica.scala`) - Expand ISR, attempted on processing follower fetch, triggers if the follower's LEO caught up with the leader's HWM in this fetch (via `isFollowerInSync(...)` in `Partition.scala`) and both of these will always trigger: shrink because the follower's LEO will be way behind because it has way less data, but also expand because in this case the leader's HWM is equal to follower's LEO. Expand triggering is new behaviour with KIP-966, the `isFollowerInSync(...)` code itself is unchanged but the behaviour is different with the change in HWM advancement logic. This causes the flapping which means load on the control plane (for each such partition we trigger two requests in every iteration of the "isr-expiration" schedule). We noticed this with a producer using acks=1 on a minISR=2 topic (which, FWIW, we consider a misconfiguration). Due to acks=1 we are able to pile up a large amount of data on the leader. The HWM is advancing slowly (dictated by the follower's speed), but arguably the HWM should not be advancing at all because the follower is lagging significantly. With acks=all I believe the behaviour would be different, we'd backpressure the producer (either via the replication speed or via the partition becoming RO) and wouldn't be able to accumulate so much unreplicated data on the leader. An alternative would be to change the implementation of "expand" to also use the leader's LEO. This would be more in line with the replica.lag.max.time.ms documentation. I can imagine we don't want to over-index on the behaviour of acks=1/minISR=2 examples, but on the other hand I'm not immediately seeing why shrink and expand should perform different checks in general. Thanks, Martin On 2023/08/10 22:46:52 Calvin Liu wrote: > Hi everyone, > I'd like to discuss a series of enhancement to the replication protocol. > > A partition replica can experience local data loss in unclean shutdown > scenarios where unflushed data in the OS page cache is lost - such as an > availability zone power outage or a server error. The Kafka replication > protocol is designed to handle these situations by removing such replicas > from the ISR and only re-adding them once they have caught up and therefore > recovered any lost data. This prevents replicas that lost an arbitrary log > suffix, which included committed data, from being elected leader. > However, there is a "last replica standing" state which when combined with > a data loss unclean shutdown event can turn a local data loss scenario into > a global data loss scenario, i.e., committed data can be removed from all > replicas. When the last replica in the ISR experiences an unclean shutdown > and loses committed data, it will be reelected leader after starting up > again, causing rejoining followers to truncate their logs and thereby > removing the last copies of the committed records which the leader lost > initially. > > The new KIP will maximize the protection and provides MinISR-1 tolerance to > data loss unclean shutdown events. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-966%3A+Eligible+Leader+Replicas >
