[ 
https://issues.apache.org/jira/browse/KAFKA-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16391863#comment-16391863
 ] 

Dong Lin commented on KAFKA-3978:
---------------------------------

[~junrao] [~hachikuji] [~ijuma] I have a good theory now. I will submit a patch 
to fix it.

 
- Broker A is leader for partition P1 and broker B is follower for partition P1.
 
- Broker C receives LeaderAndIsrRequest to become follower for P1. In broker 
C's memory, P1's highWatermarkMetadata = LogOffsetMetadata(hw=0, 
segmentBaseOffset=-1). Local replica's LEO for P1 is 0.
 
- Broker C's ReplicaFetchRequest sends leader epoch request to broker A and 
then truncates its local replica's LEO to 100.
 
- Broker C receives LeaderAndIsrRequest to become leader for P1 with 
ISR=(A,B,C). In broker C's memory, P1's highWatermarkMetadata = 
LogOffsetMetadata(hw=0, segmentBaseOffset=-1). And in broker C's memory, 
according to partition.makeLeader(), replica A's logEndOffsetMetadata is 
initialized to be LogOffsetMetadata.UnknownOffsetMetadata, which has HW= - 1 
and segmentBaseOffset = 0.
 
- Broker C receives FetchRequest from broker B. In 
Partition.maybeIncrementLeaderHW(), new highWaterMark will be derived with 
min(logEndOffset metadata of all replicas), which will be 
LogOffsetMetadata(HW=-1, segmentBaseOffset=0) because A's logEndOffset metadata 
is smallest. Because new hw's segmentBaseOffset > old hw's segmentBaseOffset, 
the high watermark is updated to be LogOffsetMetadata(hw=-1, 
segmentBaseOffset=100L).
 
- Now we have a partition whose HW is negative, which cause problem for broker 
B when it fetches from broker C.

> Cannot truncate to a negative offset (-1) exception at broker startup
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-3978
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3978
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.0.0
>         Environment: 3.13.0-87-generic 
>            Reporter: Juho Mäkinen
>            Priority: Critical
>              Labels: reliability, startup
>
> During broker startup sequence the broker server.log has this exception. 
> Problem persists after multiple restarts and also on another broker in the 
> cluster.
> {code}
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [Socket Server on Broker 1002], Started 1 acceptor threads 
> (kafka.network.SocketServer)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [ExpirationReaper-1002], Starting  
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Starting up. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [GroupCoordinator 1002]: Startup complete. 
> (kafka.coordinator.GroupCoordinator)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [Group Metadata Manager on Broker 1002]: Removed 0 expired offsets in 9 
> milliseconds. (kafka.coordinator.GroupMetadataManager)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Produce], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO [ThrottledRequestReaper-Fetch], Starting  
> (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Will not load MX4J, mx4j-tools.jar is not in the classpath 
> (kafka.utils.Mx4jLoader$)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Creating /brokers/ids/1002 (is it secure? false) 
> (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Registered broker 1002 at path /brokers/ids/1002 with addresses: 
> PLAINTEXT -> EndPoint(172.16.2.22,9092,PLAINTEXT) (kafka.utils.ZkUtils)
> INFO Kafka version : 0.10.0.0 (org.apache.kafka.common.utils.AppInfoParser)
> INFO Kafka commitId : b8642491e78c5a13 
> (org.apache.kafka.common.utils.AppInfoParser)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> INFO [Kafka Server 1002], started (kafka.server.KafkaServer)
> Error when handling request 
> {controller_id=1004,controller_epoch=1,partition_states=[..REALLY LONG OUTPUT 
> SNIPPED AWAY..], 
> live_leaders=[{id=1004,host=172.16.6.187,port=9092},{id=1003,host=172.16.2.21,port=9092}]}
>  (kafka.server.KafkaApis)
> ERROR java.lang.IllegalArgumentException: Cannot truncate to a negative 
> offset (-1).
>         at kafka.log.Log.truncateTo(Log.scala:731)
>         at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:288)
>         at 
> kafka.log.LogManager$$anonfun$truncateTo$2.apply(LogManager.scala:280)
>         at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>         at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>         at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>         at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>         at kafka.log.LogManager.truncateTo(LogManager.scala:280)
>         at kafka.server.ReplicaManager.makeFollowers(ReplicaManager.scala:802)
>         at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:648)
>         at 
> kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:144)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:80)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>         at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to