[ 
https://issues.apache.org/jira/browse/KAFKA-4485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-4485:
----------------------------
    Description: 
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is smaller than logEndOffset of 
leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR 
if the beginOffset of FetchRequest from this follower is equal or larger than 
high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to 
the ISR (e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

In the following we describe the solution to this problem.

Terminology:
- Definition of replica lag: we say a replica lags behind leader by X ms if its 
current log end offset if equivalent to the log end offset of leader X ms ago.
- Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | 
replica belongs to the given partition AND replica's lag <= replicaLagTimeMaxMs}
- Definition of high-watermark of a partition: high-watermark of a partition is 
the max(current high-watermark of the partition, min(offset of replicas in the 
pseudo-ISR set of this partition))
- Definition of ISR set: ISR set of a partition = {replica | replica is in 
pseudo-ISR set of the given partition AND log end offset of replica >= 
high-watermark of the given partition}

Guarantee:
1) If a follower is close enough to the replica in the sense that its replica 
lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. 
Thus the high-watermark will stop to increase until this follower's log end 
offset >= high-watermark, at which moment this follower will be added to the 
ISR set. This allows us the solve the 2nd problem described above.
2) If a follower lags behind leader for more than X ms, it will be removed out 
of ISR set.
3) High watermark of a partition will never decrease.
4) For any replica in ISR set, its log end offset >= high-watermark. 

Implementation:
1) For each follower, the leader keeps track of the time of the last fetch 
request from this follower. Let's call it lastFetchTime. In addition, the 
leader also maintains the log end offset of the leader at the lastFetchTime for 
each follower. Let's call it lastFetchLeaderLEO. Both variables will be updated 
after leader has processed a FetchRequest from a follower.
2) When leader receives FetchRequest from a follower, it will set the 
follower's lag to lastFetchTime if begin offset of the FetchRequest >= 
lastFetchLeaderLEO.
3) The leader can update pseudo-ISR set, high-watermark and ISR set of the 
partition based on the lag of replicas of this partition, according to the 
definition described above.





  was:
As of current implementation, we will exclude follower from ISR if the begin 
offset of FetchRequest from this follower is always smaller than logEndOffset 
of leader for more than replicaLagTimeMaxMs.

Also, we will add a follower to ISR if the beginOffset of FetchRequest from 
this follower is equal or larger than high watermark of this partition.

This is problematic for the following reasons:

1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
maybeShrinkIsr(). A follower may be repeatedly remove and added to the ISR 
(e.g. in the scenario described below).

2) A follower may be removed from the ISR even if its fetch rate can keep up 
with produce rate. Suppose a produce keeps producing a lot of small requests at 
high request rate but low byte rate (e.g. many mirror makers), and the follower 
is always able to read all the available data at the time leader receives it. 
However, the begin offset of fetch request will always be smaller than 
logEndOffset of leader. Thus the follower will be removed from ISR after 
replicaLagTimeMaxMs.

The solution to the problem is the following:

A follower should be in ISR if begin offset of its FetchRequest >= max(high 
watermark of partition, log end offset of leader at the time the leader 
receives the previous FetchRequest). The follower should be removed from ISR if 
this criteria is not met for more than replicaLagTimeMaxMs. Note that we are 
comparing begin offset of FetchRequest with log end offset of leader at the 
time the leader receives the previous FetchRequest as an approximate way to 
compare the end offset of fetched data with log end offset of leader. This is 
because we can not easily know the end offset of fetched data at the time 
broker receives fetch request.

This solution makes the following guarantee:

1) If a follower is in ISR, then its log end offset >= high watermark of 
partition at least sometime in the last replicaLagTimeMaxMs.

2) If a follower is not in ISR, then the end offset of its FetchRequest can not 
catch up with log end offset of leader for more than replicaLagTimeMaxMs. 
Either follower is in bootstrap phase, or the follower's average fetch rate is 
smaller than average produce rate into the partition for the last 
replicaLagTimeMaxMs.









> Follower should be in the isr if its FetchRequest has fetched up to the 
> logEndOffset of leader
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4485
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4485
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.1.0
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>             Fix For: 0.10.2.0
>
>
> As of current implementation, we will exclude follower from ISR if the begin 
> offset of FetchRequest from this follower is smaller than logEndOffset of 
> leader for more than replicaLagTimeMaxMs. Also, we will add a follower to ISR 
> if the beginOffset of FetchRequest from this follower is equal or larger than 
> high watermark of this partition.
> This is problematic for the following reasons:
> 1) The criteria for ISR is inconsistent between maybeExpandIsr() and 
> maybeShrinkIsr(). Therefore a follower may be repeatedly remove and added to 
> the ISR (e.g. in the scenario described below).
> 2) A follower may be removed from the ISR even if its fetch rate can keep up 
> with produce rate. Suppose a produce keeps producing a lot of small requests 
> at high request rate but low byte rate (e.g. many mirror makers), and the 
> follower is always able to read all the available data at the time leader 
> receives it. However, the begin offset of fetch request will always be 
> smaller than logEndOffset of leader. Thus the follower will be removed from 
> ISR after replicaLagTimeMaxMs.
> In the following we describe the solution to this problem.
> Terminology:
> - Definition of replica lag: we say a replica lags behind leader by X ms if 
> its current log end offset if equivalent to the log end offset of leader X ms 
> ago.
> - Definition of pseudo-ISR set: pseudo-ISR set of a partition = { replica | 
> replica belongs to the given partition AND replica's lag <= 
> replicaLagTimeMaxMs}
> - Definition of high-watermark of a partition: high-watermark of a partition 
> is the max(current high-watermark of the partition, min(offset of replicas in 
> the pseudo-ISR set of this partition))
> - Definition of ISR set: ISR set of a partition = {replica | replica is in 
> pseudo-ISR set of the given partition AND log end offset of replica >= 
> high-watermark of the given partition}
> Guarantee:
> 1) If a follower is close enough to the replica in the sense that its replica 
> lag <= replicaLagTimeMaxMs, then this follower will be in the pseudo-ISR set. 
> Thus the high-watermark will stop to increase until this follower's log end 
> offset >= high-watermark, at which moment this follower will be added to the 
> ISR set. This allows us the solve the 2nd problem described above.
> 2) If a follower lags behind leader for more than X ms, it will be removed 
> out of ISR set.
> 3) High watermark of a partition will never decrease.
> 4) For any replica in ISR set, its log end offset >= high-watermark. 
> Implementation:
> 1) For each follower, the leader keeps track of the time of the last fetch 
> request from this follower. Let's call it lastFetchTime. In addition, the 
> leader also maintains the log end offset of the leader at the lastFetchTime 
> for each follower. Let's call it lastFetchLeaderLEO. Both variables will be 
> updated after leader has processed a FetchRequest from a follower.
> 2) When leader receives FetchRequest from a follower, it will set the 
> follower's lag to lastFetchTime if begin offset of the FetchRequest >= 
> lastFetchLeaderLEO.
> 3) The leader can update pseudo-ISR set, high-watermark and ISR set of the 
> partition based on the lag of replicas of this partition, according to the 
> definition described above.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to