it make the cluster can not provide normal service,which leades some producer 
or fetch fail for a long time before I restart current broker.
 this error may be come from some formerly fetch operation which contain this 
partition,which leads many fetch response error.

The delayFetch's tryComplete() function implements as below,
 override def tryComplete() : Boolean = {
 var accumulatedSize = 0
 fetchMetadata.fetchPartitionStatus.foreach {
   case (topicAndPartition, fetchStatus) =>
     val fetchOffset = fetchStatus.startOffsetMetadata
     try {
       if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
         val replica = 
replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, 
topicAndPartition.partition)
         /*ignore some codes*/
       }
     } catch {
       /*ignore some code*/
       case nle: NotLeaderForPartitionException =>  // Case A
         debug("Broker is no longer the leader of %s, satisfy %s 
immediately".format(topicAndPartition, fetchMetadata))
         return forceComplete()
     }
 }
 /* ignore some codes */
}

when meet NotLeaderForPartitionException, it will invoke forceComplete() 
function, then it will invoke onComplete() function, which implements as below,
override def onComplete() {
 val logReadResults = 
replicaManager.readFromLocalLog(fetchMetadata.fetchOnlyLeader,
   fetchMetadata.fetchOnlyCommitted,
   fetchMetadata.fetchPartitionStatus.mapValues(status => status.fetchInfo))

 val fetchPartitionData = logReadResults.mapValues(result =>
   FetchResponsePartitionData(result.errorCode, result.hw, 
result.info.messageSet))

 responseCallback(fetchPartitionData)
}

so, I think it exit the tryComplete function in advance because of this 
partition, which makes the partition latter in this request may not be 
completely be satisfied and return to the fetch broker,
which leads some producer and consumer fail for a longtime,I don’t know is it 
correct

> 在 2016年10月25日,下午8:32,Json Tu <kafka...@126.com> 写道:
> 
> Hi all,
>       I use Kafka 0.9.0.0, and we have a cluster with 6 nodes, when I restart 
> a broker,we find there are many logs as below,
> 
> [2016-10-24 15:29:00,914] ERROR [KafkaApi-2141642] error when handling 
> request Name: FetchRequest; Version: 1; CorrelationId: 4928; ClientId: 
> ReplicaFetcherThread-0-2141642; ReplicaId: 2141386; MaxWait: 500 ms; 
> MinBytes: 1 bytes; RequestInfo: [retail.c.order.logistics,0] -> 
> PartitionFetchInfo(215258,1048576),[waimai_c_ugc_msg,1] -> 
> PartitionFetchInfo(12426588,1048576),[waimai_c_ucenter_asyncrelationbind_delay,25]
>  -> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,44] -> 
> PartitionFetchInfo(49555913,1048576),[__consumer_offsets,23] -> 
> PartitionFetchInfo(11846051,1048576),[retail.m.sp.sku.update,3] -> 
> PartitionFetchInfo(21563,1048576),[waimai_c_monitor_orderlogisticsstatus,28] 
> -> PartitionFetchInfo(26926356,1048576),[waimai_c_ucenter_loadrelation,0] -> 
> PartitionFetchInfo(54583,1048576),[__consumer_offsets,29] -> 
> PartitionFetchInfo(23479045,1048576),[waimai_c_order_databus_wmorder,14] -> 
> PartitionFetchInfo(49568225,1048576),[waimai_c_ordertag_orderdealremark,31] 
> -> PartitionFetchInfo(1829838,1048576),[retail.d.ris.spider.request,0] -> 
> PartitionFetchInfo(709845,1048576),[__consumer_offsets,13] -> 
> PartitionFetchInfo(9376691,1048576),[waimai_c_ugc_msg_staging,2] -> 
> PartitionFetchInfo(38,1048576),[retail.b.openapi.push.retry.stage,0] -> 
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_favoritepoi,15] -> 
> PartitionFetchInfo(390045,1048576),[retail.b.order.phonecall,0] -> 
> PartitionFetchInfo(1,1048576),[waimai_c_ucenter_loadrelation,45] -> 
> PartitionFetchInfo(53975,1048576),[waimai_c_ordertag_orderdealremark,1] -> 
> PartitionFetchInfo(1829848,1048576),[retail.d.ris.spider.jddj.request,0] -> 
> PartitionFetchInfo(5116337,1048576),[waimai_c_ucenter_asyncrelationbind_delay,13]
>  -> 
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_asyncrelationbind_delay,55] 
> -> PartitionFetchInfo(0,1048576),[waimai_push_e_operate_prod,3] -> 
> PartitionFetchInfo(442564,1048576),[waimai_ordersa_topic_user_order_in_poi_count_diff,5]
>  -> PartitionFetchInfo(23791010,1048576),[retail.c.order.create,4] -> 
> PartitionFetchInfo(72902,1048576),[waimai_c_ucenter_asyncrelationbind_staging,2]
>  -> PartitionFetchInfo(66,1048576),[waimai_c_order_orderevent_topic,35] -> 
> PartitionFetchInfo(0,1048576),[waimai_c_ucenter_syncuserrelation,43] -> 
> PartitionFetchInfo(0,1048576),[waimai_c_order_databus_wmorder,48] -> 
> PartitionFetchInfo(49496018,1048576),[waimai_c_monitor_orderstatus,32] -> 
> PartitionFetchInfo(50623699,1048576),[waimai_c_ucenter_loadrelation,15] -> 
> PartitionFetchInfo(54360,1048576),[waimai_c_monitor_orderstatus,2] -> 
> PartitionFetchInfo(50624881,1048576),[waimai_c_order_databus_wmorder,24] -> 
> PartitionFetchInfo(49548334,1048576),[waimai_c_order_databus_wmorder,18] -> 
> PartitionFetchInfo(49489397,1048576),[waimai_c_ucenter_asyncrelationbind,36] 
> -> PartitionFetchInfo(53430,1048576) (kafka.server.KafkaApis)
> kafka.common.NotLeaderForPartitionException: Leader not local for partition 
> [retail.d.ris.spider.request,1] on broker 2141642
>        at 
> kafka.server.ReplicaManager.getLeaderReplicaIfLocal(ReplicaManager.scala:296)
>        at 
> kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:77)
>        at 
> kafka.server.DelayedFetch$$anonfun$tryComplete$1.apply(DelayedFetch.scala:72)
>        at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>        at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:72)
>        at 
> kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:307)
>        at 
> kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:227)
>        at 
> kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:202)
>        at 
> kafka.cluster.Partition.tryCompleteDelayedRequests(Partition.scala:372)
>        at kafka.cluster.Partition.maybeExpandIsr(Partition.scala:294)
>        at 
> kafka.cluster.Partition.updateReplicaLogReadResult(Partition.scala:243)
>        at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:852)
>        at 
> kafka.server.ReplicaManager$$anonfun$updateFollowerLogReadResults$2.apply(ReplicaManager.scala:849)
>        at 
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
>        at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at 
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
>        at 
> kafka.server.ReplicaManager.updateFollowerLogReadResults(ReplicaManager.scala:849)
>        at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:467)
>        at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:434)
>        at kafka.server.KafkaApis.handle(KafkaApis.scala:69)
>        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
>        at java.lang.Thread.run(Thread.java:745)
> 
> 
> what confused me is that,retail.d.ris.spider.request is not contained in this 
> request,why will log it in handleFetchRequest,how can it happen and how to 
> resolve it?
> 
> 


Reply via email to