[ 
https://issues.apache.org/jira/browse/KAFKA-5745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16129725#comment-16129725
 ] 

Jun Rao commented on KAFKA-5745:
--------------------------------

The HW in the follower only includes the offset, but not the log segment and 
segment position. When a follower changes to a leader, in 
Partition.makeLeader(), we first mark the replica as the leader and then call 
leaderReplica.convertHWToLocalOffsetMetadata(), which adds the log segment info 
to the HW. So, if a consumer fetches data between these two steps,  
DelayedFetch.tryComplete() will hit the exception in the description since HW 
is missing log segment info.

We can fix this issue by first calling 
leaderReplica.convertHWToLocalOffsetMetadata() and then marking the replica as 
the leader.

> Partition.makeLeader() should convert HW to OffsetMetadata before becoming 
> the leader
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5745
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5745
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.11.0.0
>            Reporter: Jun Rao
>
> Saw the following uncaught exception in the log.
> ERROR [KafkaApi-411] Error when handling request Name: FetchRequest; Version: 
> 3; CorrelationId: 708572; ClientId: client-0; ReplicaId: -1; MaxWait: 500 ms; 
> MinBytes: 1 bytes; MaxBytes:52428800 bytes; RequestInfo: 
> ([topic1,3],PartitionFetchInfo(953794,1048576)) (kafka.server.KafkaApis) 
> org.apache.kafka.common.KafkaException: 953793 [-1 : -1] cannot compare its 
> segment info with 953794 [829749 : 564535961] since it only has message 
> offset info 
> at kafka.server.LogOffsetMetadata.onOlderSegment(LogOffsetMetadata.scala:48) 
> at 
> kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:93) 
> at 
> kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:77) 
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
> at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:77) 
> at kafka.server.DelayedOperation.safeTryComplete(DelayedOperation.scala:104)
> at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:196)
>  
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:516) 
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:534) 
> at kafka.server.KafkaApis.handle(KafkaApis.scala:79) 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) 
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to