Anuj, If you look at Partition.makeLeader(), leaderReplicaOpt gets updated to the latest leader.
As Guozhang pointed out, this error should be transient. Thanks, Neha On Sep 16, 2013 5:22 AM, "Anuj Mehta" <[email protected]> wrote: > Hi Guozhang > > Thanks for your reply. The problem is not transient in my case. I tried > debugging the issue and find problem in following code snippet (please > correct me if I am wrong) > > As part of code for handling ProducerRequest (from KafkaApis) following > code is executed > > val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, > topicAndPartition.partition) > val (start, end) = > partitionOpt match { > case Some(partition) => *partition.appendMessagesToLeader* > (messages.asInstanceOf[ByteBufferMessageSet]) > > Now problem lies with "appendMessagesToLeader" method of Partition.scala > where in the below code snippet the value of *"leaderReplicaOpt"* is always > *None *leading to "leader not local partition" > > def appendMessagesToLeader(messages: ByteBufferMessageSet): (Long, Long) = > { > leaderIsrUpdateLock synchronized { > val leaderReplicaOpt = leaderReplicaIfLocal() > > > Cheers!! > Anuj > http://anuj-mehta.blogspot.com/ > > > On Sat, Sep 14, 2013 at 1:09 AM, Guozhang Wang <[email protected]> wrote: > > > Hi Anuj, > > > > The LeaderNotLocalException could be transient: > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIseelotsofLeadernotlocalexceptionsonthebrokerduringcontrolledshutdown%3F > > > > > > Guozhang > > > > > > On Fri, Sep 13, 2013 at 12:21 AM, Anuj Mehta <[email protected]> > wrote: > > > > > Hi > > > > > > I am a newbie "Kafka" user. I had 0.8 branch code and trying to run a > > > simple producer (currently no consumer) from kafka-java-examples on > > *Window > > > XP* > > > > > > When the producer tries to send a message I get following > > > > > > [2013-09-13 12:39:28,244] WARN [KafkaApi-0] Produce request with > > > correlation id 2 from client on partition [test,0] *failed due to > Leader > > > not local for partition* [test,0] on broker 0 (kafka.server.KafkaApis) > > > [2013-09-13 12:39:28,431] INFO [KafkaApi-0] Send the close connection > > > response due to error handling produce request [clientId = , > > correlationId > > > = 2, topicAndPartition = [test,0]] with Ack=0 (kafka.server.KafkaApis) > > > > > > > > > Looks like there is a similar issue ( > > > https://issues.apache.org/jira/browse/KAFKA-876) but it's resolved. > > > I am attaching server and zookeeper logs for reference. > > > Please guide. > > > > > > Cheers!! > > > Anuj > > > http://anuj-mehta.blogspot.com/ > > > > > > > > > > > -- > > -- Guozhang > > >
