[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13984416#comment-13984416
 ] 

James Blackburn commented on KAFKA-1382:
----------------------------------------

The other thing about this is that it can quickly churn through log.  There are 
log lines every few milliseconds at INFO level leading to  GBs of log in a very 
short time, e.g.:

{code}
[2014-04-28 00:01:37,010] INFO Partition [RSF_OPTIONS,10] on broker 1: Cached 
zkVersion [21] not equal to that in zookeeper, ski
p updating ISR (kafka.cluster.Partition)
[2014-04-28 00:01:37,017] INFO Partition [RSF_OPTIONS,10] on broker 1: 
Expanding ISR for partition [RSF_OPTIONS,10] from 1 to 1,
0 (kafka.cluster.Partition)
[2014-04-28 00:01:37,019] ERROR Conditional update of path 
/brokers/topics/RSF_OPTIONS/partitions/10/state with data {"controlle
r_epoch":19,"leader":1,"version":1,"leader_epoch":6,"isr":[1,0]} and expected 
version 21 failed due to org.apache.zookeeper.Keep
erException$BadVersionException: KeeperErrorCode = BadVersion for 
/brokers/topics/RSF_OPTIONS/partitions/10/state (kafka.utils.Z
kUtils$)
[2014-04-28 00:01:37,019] INFO Partition [RSF_OPTIONS,10] on broker 1: Cached 
zkVersion [21] not equal to that in zookeeper, ski
p updating ISR (kafka.cluster.Partition)
[2014-04-28 00:01:37,019] INFO Partition [RSF,14] on broker 1: Expanding ISR 
for partition [RSF,14] from 1 to 1,0 (kafka.cluster
.Partition)
[2014-04-28 00:01:37,020] ERROR Conditional update of path 
/brokers/topics/RSF/partitions/14/state with data {"controller_epoch"
:19,"leader":1,"version":1,"leader_epoch":6,"isr":[1,0]} and expected version 
21 failed due to org.apache.zookeeper.KeeperExcept
ion$BadVersionException: KeeperErrorCode = BadVersion for 
/brokers/topics/RSF/partitions/14/state (kafka.utils.ZkUtils$)
[2014-04-28 00:01:37,020] INFO Partition [RSF,14] on broker 1: Cached zkVersion 
[21] not equal to that in zookeeper, skip updati
ng ISR (kafka.cluster.Partition)
[2014-04-28 00:01:37,035] INFO Partition [RSF_OPTIONS,10] on broker 1: 
Expanding ISR for partition [RSF_OPTIONS,10] from 1 to 1,
0 (kafka.cluster.Partition)
[2014-04-28 00:01:37,037] ERROR Conditional update of path 
/brokers/topics/RSF_OPTIONS/partitions/10/state with data {"controlle
r_epoch":19,"leader":1,"version":1,"leader_epoch":6,"isr":[1,0]} and expected 
version 21 failed due to org.apache.zookeeper.Keep
erException$BadVersionException: KeeperErrorCode = BadVersion for 
/brokers/topics/RSF_OPTIONS/partitions/10/state (kafka.utils.Z
kUtils$)
[2014-04-28 00:01:37,037] INFO Partition [RSF_OPTIONS,10] on broker 1: Cached 
zkVersion [21] not equal to that in zookeeper, ski
p updating ISR (kafka.cluster.Partition)
[2014-04-28 00:01:37,037] INFO Partition [RSF,14] on broker 1: Expanding ISR 
for partition [RSF,14] from 1 to 1,0 (kafka.cluster
.Partition)
{code}

> Update zkVersion on partition state update failures
> ---------------------------------------------------
>
>                 Key: KAFKA-1382
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1382
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Joel Koshy
>             Fix For: 0.8.2
>
>
> Our updateIsr code is currently:
>   private def updateIsr(newIsr: Set[Replica]) {
>     debug("Updated ISR for partition [%s,%d] to %s".format(topic, 
> partitionId, newIsr.mkString(",")))
>     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
> newIsr.map(r => r.brokerId).toList, zkVersion)
>     // use the epoch of the controller that made the leadership decision, 
> instead of the current controller epoch
>     val (updateSucceeded, newVersion) = 
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
>       ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
>       ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
>     if (updateSucceeded){
>       inSyncReplicas = newIsr
>       zkVersion = newVersion
>       trace("ISR updated to [%s] and zkVersion updated to 
> [%d]".format(newIsr.mkString(","), zkVersion))
>     } else {
>       info("Cached zkVersion [%d] not equal to that in zookeeper, skip 
> updating ISR".format(zkVersion))
>     }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
>   seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
>   the write - but uses the old zkVersion. This fails because the zkVersion
>   has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
>   is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to