[ 
https://issues.apache.org/jira/browse/KAFKA-3471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuto Kawamura updated KAFKA-3471:
---------------------------------
    Description: 
 tl;dr;
Partition.checkEnoughReplicasReachOffset should see the number of followers 
which are already caught up until requiredOffset instead of high watermark to 
consider whether there are enough number of replicas for a produce request.

# Description
Just recently I found an interesting metric on our Kafka cluster.
During the peak time, the number of produce requests significantly decreased 
only on single broker. Let's say this broker's id=1.
- broker-1 holds leadership for 3 partitions of the topic T.
- For each producer they configured to have acks=all.
- broker-1 contains some topics and each topic is configured to have 3 
replicas, and min.insync.replicas is configured to 2.
- For the partition 1 of topic T(T-1) there are 3 replicas namely: 
broker-1(leader), broker-2(follower-A), broker-3(follower-B).

When I see the logs of broker-1, there was lot's of logs indicating ISR expand 
and shrink happening frequently for T-1.
After investigating a while, we restarted broker-1 and unexpectedly continuous 
ISR expand/shrink had gone.
Since it is highly likely a state corruption issue(because it's fixed by a 
simple restart) and it's never reproduced after a broker restart,
unfortunately, but we lost clue to understand what was happening actually so 
until today I'm not knowing the cause of this phenomenon.

By the way we continued investigating why frequent ISR shrink/expand causes 
reduction of the number of produce requests and found that kafka broker isn't 
likely respecting min.insync.replicas as the document of this config describes.
Here's the scenario:

1. Everything working well.
   ISR(=LogEndOffset): leader=3, follower-A=2, follower-B=2
   HW: 2
2. Producer client produces some records. For simplicity it contains only one 
record so the LogEndOffset is updated to 4, and the request will put into 
purgatory since it has requiredOffset=4 while HW stay in 2.
   ISR(=LogEndOffset): leader=4, follower-A=2, follower-B=2
   HW: 2
3. follower-A performs fetch and updated it's LogEndOffset to 4. IIUC, the 
request received at 2. can be considered as succeeded ATM since it requires 
only 2 out of 3 replicas are in sync(min.insync.replicas=2, acks=all), but it's 
not with current implementation because of HW(ref: 
https://github.com/apache/kafka/blob/1fbe445dde71df0023a978c5e54dd229d3d23e1b/core/src/main/scala/kafka/cluster/Partition.scala#L325).
   ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
   HW: 2
4. By some reason, follower-B couldn't perform fetch for a while. ATM 
follower-B still in ISR because of replica.lag.time.max.ms, meaning it still 
affects HW.
   ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
   HW: 2
5. By timeout the produce request received at 2. considered as failed and 
client retries. Any incomming requests for T-1 will never succeed during this 
moment.
   ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
   HW: 2
6. The leader decides to abandon follower-B from ISR because of 
replica.lag.time.max.ms. HW increased to 4 and all produce requests can now 
successfully processed.
   ISR(=LogEndOffset): leader=4, follower-A=4
   HW: 4
7. After a while follower-B came back and caught up until the LogEndOffset so 
the leader let him in to ISR again. The situation goes back to 1., continues 
again.

So here I understand that records on a producer are accumulated to single batch 
while the produce request for the T-1 blocked(and retried) during 4-6 and 
that's why the total number of requests decreased significantly on broker-1 
while the total number of messages hasn't changed.

As I commented on 3., the leader should consider a produce request succeeded 
after it confirms min.insync.replicas's number of acks, so the current 
implementation which makes produce requests dependent to HW isn't makes sense 
IMO.

When I confirmed this scenario our Kafka cluster used version 0.8.2.1 but I 
confirmed that this scenario still could happen with the build from the latest 
version of trunk.

Actually I still don't understand whether this is intentional behavior or not 
but anyway it's differnt from what I understand when I read the doc of 
min.insync.replicas, so I would like to hear what Kafka people think about this.
I will prepared PR for this issue so please review my patch if my scenario 
makes sense to you :)

  was:
# tl;dr;
Partition.checkEnoughReplicasReachOffset should see the number of followers 
which are already caught up until requiredOffset instead of high watermark to 
consider whether there are enough number of replicas for a produce request.

# Description
Just recently I found an interesting metric on our Kafka cluster.
During the peak time, the number of produce requests significantly decreased 
only on single broker. Let's say this broker's id=1.
- broker-1 holds leadership for 3 partitions of the topic T.
- For each producer they configured to have acks=all.
- broker-1 contains some topics and each topic is configured to have 3 
replicas, and min.insync.replicas is configured to 2.
- For the partition 1 of topic T(T-1) there are 3 replicas namely: 
broker-1(leader), broker-2(follower-A), broker-3(follower-B).

When I see the logs of broker-1, there was lot's of logs indicating ISR expand 
and shrink happening frequently for T-1.
After investigating a while, we restarted broker-1 and unexpectedly continuous 
ISR expand/shrink had gone.
Since it is highly likely a state corruption issue(because it's fixed by a 
simple restart) and it's never reproduced after a broker restart,
unfortunately, but we lost clue to understand what was happening actually so 
until today I'm not knowing the cause of this phenomenon.

By the way we continued investigating why frequent ISR shrink/expand causes 
reduction of the number of produce requests and found that kafka broker isn't 
likely respecting min.insync.replicas as the document of this config describes.
Here's the scenario:

1. Everything working well.
   ISR(=LogEndOffset): leader=3, follower-A=2, follower-B=2
   HW: 2
2. Producer client produces some records. For simplicity it contains only one 
record so the LogEndOffset is updated to 4, and the request will put into 
purgatory since it has requiredOffset=4 while HW stay in 2.
   ISR(=LogEndOffset): leader=4, follower-A=2, follower-B=2
   HW: 2
3. follower-A performs fetch and updated it's LogEndOffset to 4. IIUC, the 
request received at 2. can be considered as succeeded ATM since it requires 
only 2 out of 3 replicas are in sync(min.insync.replicas=2, acks=all), but it's 
not with current implementation because of HW(ref: 
https://github.com/apache/kafka/blob/1fbe445dde71df0023a978c5e54dd229d3d23e1b/core/src/main/scala/kafka/cluster/Partition.scala#L325).
   ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
   HW: 2
4. By some reason, follower-B couldn't perform fetch for a while. ATM 
follower-B still in ISR because of replica.lag.time.max.ms, meaning it still 
affects HW.
   ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
   HW: 2
5. By timeout the produce request received at 2. considered as failed and 
client retries. Any incomming requests for T-1 will never succeed during this 
moment.
   ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
   HW: 2
6. The leader decides to abandon follower-B from ISR because of 
replica.lag.time.max.ms. HW increased to 4 and all produce requests can now 
successfully processed.
   ISR(=LogEndOffset): leader=4, follower-A=4
   HW: 4
7. After a while follower-B came back and caught up until the LogEndOffset so 
the leader let him in to ISR again. The situation goes back to 1., continues 
again.

So here I understand that records on a producer are accumulated to single batch 
while the produce request for the T-1 blocked(and retried) during 4-6 and 
that's why the total number of requests decreased significantly on broker-1 
while the total number of messages hasn't changed.

As I commented on 3., the leader should consider a produce request succeeded 
after it confirms min.insync.replicas's number of acks, so the current 
implementation which makes produce requests dependent to HW isn't makes sense 
IMO.

When I confirmed this scenario our Kafka cluster used version 0.8.2.1 but I 
confirmed that this scenario still could happen with the build from the latest 
version of trunk.

Actually I still don't understand whether this is intentional behavior or not 
but anyway it's differnt from what I understand when I read the doc of 
min.insync.replicas, so I would like to hear what Kafka people think about this.
I will prepared PR for this issue so please review my patch if my scenario 
makes sense to you :)


> min.insync.replicas isn't respected when there's a delaying follower who 
> still in ISR
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3471
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3471
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.1
>            Reporter: Yuto Kawamura
>
>  tl;dr;
> Partition.checkEnoughReplicasReachOffset should see the number of followers 
> which are already caught up until requiredOffset instead of high watermark to 
> consider whether there are enough number of replicas for a produce request.
> # Description
> Just recently I found an interesting metric on our Kafka cluster.
> During the peak time, the number of produce requests significantly decreased 
> only on single broker. Let's say this broker's id=1.
> - broker-1 holds leadership for 3 partitions of the topic T.
> - For each producer they configured to have acks=all.
> - broker-1 contains some topics and each topic is configured to have 3 
> replicas, and min.insync.replicas is configured to 2.
> - For the partition 1 of topic T(T-1) there are 3 replicas namely: 
> broker-1(leader), broker-2(follower-A), broker-3(follower-B).
> When I see the logs of broker-1, there was lot's of logs indicating ISR 
> expand and shrink happening frequently for T-1.
> After investigating a while, we restarted broker-1 and unexpectedly 
> continuous ISR expand/shrink had gone.
> Since it is highly likely a state corruption issue(because it's fixed by a 
> simple restart) and it's never reproduced after a broker restart,
> unfortunately, but we lost clue to understand what was happening actually so 
> until today I'm not knowing the cause of this phenomenon.
> By the way we continued investigating why frequent ISR shrink/expand causes 
> reduction of the number of produce requests and found that kafka broker isn't 
> likely respecting min.insync.replicas as the document of this config 
> describes.
> Here's the scenario:
> 1. Everything working well.
>    ISR(=LogEndOffset): leader=3, follower-A=2, follower-B=2
>    HW: 2
> 2. Producer client produces some records. For simplicity it contains only one 
> record so the LogEndOffset is updated to 4, and the request will put into 
> purgatory since it has requiredOffset=4 while HW stay in 2.
>    ISR(=LogEndOffset): leader=4, follower-A=2, follower-B=2
>    HW: 2
> 3. follower-A performs fetch and updated it's LogEndOffset to 4. IIUC, the 
> request received at 2. can be considered as succeeded ATM since it requires 
> only 2 out of 3 replicas are in sync(min.insync.replicas=2, acks=all), but 
> it's not with current implementation because of HW(ref: 
> https://github.com/apache/kafka/blob/1fbe445dde71df0023a978c5e54dd229d3d23e1b/core/src/main/scala/kafka/cluster/Partition.scala#L325).
>    ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>    HW: 2
> 4. By some reason, follower-B couldn't perform fetch for a while. ATM 
> follower-B still in ISR because of replica.lag.time.max.ms, meaning it still 
> affects HW.
>    ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>    HW: 2
> 5. By timeout the produce request received at 2. considered as failed and 
> client retries. Any incomming requests for T-1 will never succeed during this 
> moment.
>    ISR(=LogEndOffset): leader=4, follower-A=4, follower-B=2
>    HW: 2
> 6. The leader decides to abandon follower-B from ISR because of 
> replica.lag.time.max.ms. HW increased to 4 and all produce requests can now 
> successfully processed.
>    ISR(=LogEndOffset): leader=4, follower-A=4
>    HW: 4
> 7. After a while follower-B came back and caught up until the LogEndOffset so 
> the leader let him in to ISR again. The situation goes back to 1., continues 
> again.
> So here I understand that records on a producer are accumulated to single 
> batch while the produce request for the T-1 blocked(and retried) during 4-6 
> and that's why the total number of requests decreased significantly on 
> broker-1 while the total number of messages hasn't changed.
> As I commented on 3., the leader should consider a produce request succeeded 
> after it confirms min.insync.replicas's number of acks, so the current 
> implementation which makes produce requests dependent to HW isn't makes sense 
> IMO.
> When I confirmed this scenario our Kafka cluster used version 0.8.2.1 but I 
> confirmed that this scenario still could happen with the build from the 
> latest version of trunk.
> Actually I still don't understand whether this is intentional behavior or not 
> but anyway it's differnt from what I understand when I read the doc of 
> min.insync.replicas, so I would like to hear what Kafka people think about 
> this.
> I will prepared PR for this issue so please review my patch if my scenario 
> makes sense to you :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to