[ 
https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13985617#comment-13985617
 ] 

Michael Noll edited comment on KAFKA-1029 at 4/30/14 3:33 PM:
--------------------------------------------------------------

I could finally reproduce the bug again, though still not consistently yet.

And I think I have an idea what the root cause might be:  I was test-driving 
Kafka in several VMs locally on a Mac MBP laptop, and the energy saver settings 
(or sth similar) might be responsible for triggering the problem when the 
laptop was idle.  (In the test run below I intentionally let the laptop idle 
for 2-3 hours after creating a "test" topic and sending/consuming a few test 
massages.)  The timestamp of when the problem was triggered (11:01) correlates 
with me unlocking the laptop (user screen).  What's strange is that the laptop 
is configured not to go to sleep when idle, the hard disk is not asked to sleep 
either (and even if you did enable that setting in Mac OS X it does not have an 
effect on SSD's, which is the only disk installed in the laptop I was using), 
etc.  -- only the display is allowed to turn off after 10 minutes.  So even 
though the laptop is configured to be "always on" there apparently is something 
that throws off Kafka/ZooKeeper.  Also, as I said in my earlier comment I still 
cannot reproduce the issue consistently, i.e. sometimes Kafka/ZK work correctly 
after idling/unlocking;  still I think the root cause has ultimately to do with 
idling on Mac OS X.

Lastly, I don't know what the expected failure handling of Kafka/ZK is in such 
a scenario.  From what I can read from the logs below my setup seems to have 
simulated what could happen during a network partitioning (Kafka broker could 
not talk to ZooKeeper for a long time, hence ZK session expired, then Kafka 
could talk again to ZK, but couldn't fully recover).

FWIW I still list further log messages and details just in case that 
information may be useful in the future.


@Jun Rao:  Yes, I did see ZK expiration in {{controller.log}}:

{code}
INFO [SessionExpirationListener on 0], ZK expired; shut down all controller 
components and try to re-elect 
(kafka.controller.KafkaController$SessionExpirationListener)
{code}

Below are some further details.  Things turn bad at 11:01.

*kafka1 (Kafka broker)*

{{server.log}}

{code}
[2014-04-30 07:18:56,481] INFO Completed load of log test-0 with log end offset 
0 (kafka.log.Log)
[2014-04-30 07:18:56,485] INFO Created log for partition [test,0] in 
/app/kafka/log with properties {segment.index.bytes -> 10485760, 
file.delete.delay.ms -> 60000, segment.bytes -> 1073741824, flush.ms -> 
9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 
4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 172800000, 
max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, 
min.cleanable.dirty.ratio -> 0.5, retention.ms -> 172800000}. 
(kafka.log.LogManager)
[2014-04-30 07:18:56,486] WARN Partition [test,0] on broker 0: No checkpointed 
highwatermark is found for partition [test,0] (kafka.cluster.Partition)
[2014-04-30 07:19:32,637] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 07:19:37,328] INFO Closing socket connection to /127.0.0.1. 
(kafka.network.Processor)
[2014-04-30 07:19:56,356] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 07:20:52,090] ERROR Closing socket for /127.0.1.1 because of error 
(kafka.network.Processor)
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
        at sun.nio.ch.IOUtil.read(IOUtil.java:224)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
        at kafka.utils.Utils$.read(Utils.scala:375)
        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Processor.read(SocketServer.scala:347)
        at kafka.network.Processor.run(SocketServer.scala:245)
        at java.lang.Thread.run(Thread.java:701)
[2014-04-30 07:21:02,805] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 07:21:05,803] INFO Closing socket connection to /127.0.0.1. 
(kafka.network.Processor)
[2014-04-30 11:01:57,138] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 11:01:59,561] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 11:01:59,648] INFO 0 successfully elected as leader 
(kafka.server.ZookeeperLeaderElector)
[2014-04-30 11:01:59,692] INFO conflict in /controller data: 
{"version":1,"brokerid":0,"timestamp":"1398855719690"} stored data: 
{"version":1,"brokerid":0,"timestamp":"1398855719646"} (kafka.utils.ZkUtils$)
[2014-04-30 11:01:59,699] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":0,"timestamp":"1398855719690"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
[2014-04-30 11:02:05,704] INFO conflict in /controller data: 
{"version":1,"brokerid":0,"timestamp":"1398855719690"} stored data: 
{"version":1,"brokerid":0,"timestamp":"1398855719646"} (kafka.utils.ZkUtils$)
[2014-04-30 11:02:05,711] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":0,"timestamp":"1398855719690"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}


{{state-change.log}}

{code}
[2014-04-30 07:18:56,433] TRACE Controller 0 epoch 1 changed state of replica 0 
for partition [test,0] from NewReplica to OnlineReplica (state.change.logger)
[2014-04-30 07:18:56,443] TRACE Broker 0 received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)
 correlation id 7 from controller 0 epoch 1 for partition [test,0] 
(state.change.logger)
[2014-04-30 07:18:56,448] TRACE Broker 0 handling LeaderAndIsr request 
correlationId 7 from controller 0 epoch 1 starting the become-leader transition 
for partition [test,0] (state.change.logger)
[2014-04-30 07:18:56,455] TRACE Broker 0 stopped fetchers as part of 
become-leader request from controller 0 epoch 1 with correlation id 7 for 
partition [test,0] (state.change.logger)
[2014-04-30 07:18:56,495] TRACE Broker 0 completed LeaderAndIsr request 
correlationId 7 from controller 0 epoch 1 for the become-leader transition for 
partition [test,0] (state.change.logger)
[2014-04-30 07:18:56,506] TRACE Controller 0 epoch 1 received response 
correlationId 7 for a request sent to broker id:0,host:kafka1,port:9092 
(state.change.logger)
[2014-04-30 07:18:56,525] TRACE Broker 0 cached leader info 
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)
 for partition [test,0] in response to UpdateMetadata request sent by 
controller 0 epoch 1 with correlation id 7 (state.change.logger)
[2014-04-30 07:18:56,526] TRACE Controller 0 epoch 1 received response 
correlationId 7 for a request sent to broker id:0,host:kafka1,port:9092 
(state.change.logger)
[2014-04-30 11:01:59,564] TRACE Controller 0 epoch 1 changed partition [test,0] 
state from OnlinePartition to OfflinePartition (state.change.logger)
[2014-04-30 11:01:59,569] TRACE Controller 0 epoch 1 started leader election 
for partition [test,0] (state.change.logger)
[2014-04-30 11:01:59,589] ERROR Controller 0 epoch 1 initiated state change for 
partition [test,0] from OfflinePartition to OnlinePartition failed 
(state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is 
alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
        at 
kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
        at 
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
        at 
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185)
        at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
        at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
        at 
kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
        at kafka.utils.Utils$.inLock(Utils.scala:538)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327)
        at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2014-04-30 11:01:59,625] TRACE Controller 0 epoch 1 changed state of replica 0 
for partition [test,0] from OnlineReplica to OfflineReplica 
(state.change.logger)
[2014-04-30 11:01:59,676] TRACE Controller 0 epoch 2 started leader election 
for partition [test,0] (state.change.logger)
[2014-04-30 11:01:59,686] ERROR Controller 0 epoch 2 initiated state change for 
partition [test,0] from OfflinePartition to OnlinePartition failed 
(state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is 
alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
        at 
kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
{code}


{{controller.log}}

{code}
[2014-04-30 07:18:56,354] DEBUG [TopicChangeListener on Controller 0]: Topic 
change listener fired for path /brokers/topics with children test 
(kafka.controller.PartitionStateMachine$TopicChangeListener)
[2014-04-30 07:18:56,373] INFO [TopicChangeListener on Controller 0]: New 
topics: [Set(test)], deleted topics: [Set()], new partition replica assignment 
[Map([test,0] -> List(0))] 
(kafka.controller.PartitionStateMachine$TopicChangeListener)
[2014-04-30 07:18:56,373] INFO [Controller 0]: New topic creation callback for 
[test,0] (kafka.controller.KafkaController)
[2014-04-30 07:18:56,376] INFO [Controller 0]: New partition creation callback 
for [test,0] (kafka.controller.KafkaController)
[2014-04-30 07:18:56,376] INFO [Partition state machine on Controller 0]: 
Invoking state change to NewPartition for partitions [test,0] 
(kafka.controller.PartitionStateMachine)
[2014-04-30 07:18:56,391] INFO [Replica state machine on controller 0]: 
Invoking state change to NewReplica for replicas 
[Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
[2014-04-30 07:18:56,394] INFO [Partition state machine on Controller 0]: 
Invoking state change to OnlinePartition for partitions [test,0] 
(kafka.controller.PartitionStateMachine)
[2014-04-30 07:18:56,395] DEBUG [Partition state machine on Controller 0]: Live 
assigned replicas for partition [test,0] are: [List(0)] 
(kafka.controller.PartitionStateMachine)
[2014-04-30 07:18:56,398] DEBUG [Partition state machine on Controller 0]: 
Initializing leader and isr for partition [test,0] to 
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1) 
(kafka.controller.PartitionStateMachine)
[2014-04-30 07:18:56,431] INFO [Replica state machine on controller 0]: 
Invoking state change to OnlineReplica for replicas 
[Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
[2014-04-30 11:01:59,560] INFO [BrokerChangeListener on Controller 0]: Broker 
change listener fired for path /brokers/ids with children  
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2014-04-30 11:01:59,560] INFO [BrokerChangeListener on Controller 0]: Newly 
added brokers: , deleted brokers: 0, all live brokers:  
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2014-04-30 11:01:59,561] INFO [Controller-0-to-broker-0-send-thread], Shutting 
down (kafka.controller.RequestSendThread)
[2014-04-30 11:01:59,562] INFO [Controller-0-to-broker-0-send-thread], Stopped  
(kafka.controller.RequestSendThread)
[2014-04-30 11:01:59,562] INFO [Controller-0-to-broker-0-send-thread], Shutdown 
completed (kafka.controller.RequestSendThread)
[2014-04-30 11:01:59,563] INFO [Controller 0]: Broker failure callback for 0 
(kafka.controller.KafkaController)
[2014-04-30 11:01:59,564] INFO [Controller 0]: Removed ArrayBuffer() from list 
of shutting down brokers. (kafka.controller.KafkaController)
[2014-04-30 11:01:59,564] INFO [Partition state machine on Controller 0]: 
Invoking state change to OfflinePartition for partitions [test,0] 
(kafka.controller.PartitionStateMachine)
[2014-04-30 11:01:59,588] DEBUG [OfflinePartitionLeaderSelector]: No broker in 
ISR is alive for [test,0]. Pick the leader from the alive assigned replicas:  
(kafka.controller.OfflinePartitionLeaderSelector)
[2014-04-30 11:01:59,595] INFO [Replica state machine on controller 0]: 
Invoking state change to OfflineReplica for replicas 
[Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
[2014-04-30 11:01:59,597] DEBUG [Controller 0]: Removing replica 0 from ISR 0 
for partition [test,0]. (kafka.controller.KafkaController)
[2014-04-30 11:01:59,625] INFO [Controller 0]: New leader and ISR for partition 
[test,0] is {"leader":-1,"leader_epoch":1,"isr":[]} 
(kafka.controller.KafkaController)
[2014-04-30 11:01:59,628] DEBUG The stop replica request (delete = true) sent 
to broker 0 is  (kafka.controller.ControllerBrokerRequestBatch)
[2014-04-30 11:01:59,629] DEBUG The stop replica request (delete = false) sent 
to broker 0 is [Topic=test,Partition=0,Replica=0] 
(kafka.controller.ControllerBrokerRequestBatch)
[2014-04-30 11:01:59,635] WARN [Channel manager on controller 0]: Not sending 
request Name: StopReplicaRequest; Version: 0; CorrelationId: 11; ClientId: ; 
DeletePartitions: false; ControllerId: 0; ControllerEpoch: 1; Partitions: 
[test,0] to broker 0, since it is offline. 
(kafka.controller.ControllerChannelManager)
[2014-04-30 11:01:59,644] INFO [Controller 0]: Controller shutdown complete 
(kafka.controller.KafkaController)
[2014-04-30 11:01:59,649] INFO [Controller 0]: Broker 0 starting become 
controller state transition (kafka.controller.KafkaController)
[2014-04-30 11:01:59,651] INFO [Controller 0]: Controller 0 incremented epoch 
to 2 (kafka.controller.KafkaController)
[2014-04-30 11:01:59,672] INFO [Controller 0]: Partitions undergoing preferred 
replica election:  (kafka.controller.KafkaController)
[2014-04-30 11:01:59,672] INFO [Controller 0]: Partitions that completed 
preferred replica election:  (kafka.controller.KafkaController)
[2014-04-30 11:01:59,672] INFO [Controller 0]: Resuming preferred replica 
election for partitions:  (kafka.controller.KafkaController)
[2014-04-30 11:01:59,673] INFO [Controller 0]: Partitions being reassigned: 
Map() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,673] INFO [Controller 0]: Partitions already reassigned: 
List() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,673] INFO [Controller 0]: Resuming reassignment of 
partitions: Map() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: List of topics to be deleted:  
(kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: List of topics ineligible for 
deletion: test (kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: Currently active brokers in the 
cluster: Set() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: Currently shutting brokers in 
the cluster: Set() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: Current list of topics in the 
cluster: Set(test) (kafka.controller.KafkaController)
[2014-04-30 11:01:59,676] INFO [Replica state machine on controller 0]: Started 
replica state machine with initial state -> 
Map([Topic=test,Partition=0,Replica=0] -> ReplicaDeletionIneligible) 
(kafka.controller.ReplicaStateMachine)
[2014-04-30 11:01:59,685] DEBUG [OfflinePartitionLeaderSelector]: No broker in 
ISR is alive for [test,0]. Pick the leader from the alive assigned replicas:  
(kafka.controller.OfflinePartitionLeaderSelector)
[2014-04-30 11:01:59,686] INFO [Partition state machine on Controller 0]: 
Started partition state machine with initial state -> Map([test,0] -> 
OfflinePartition) (kafka.controller.PartitionStateMachine)
[2014-04-30 11:01:59,687] INFO [Controller 0]: Broker 0 is ready to serve as 
the new controller with epoch 2 (kafka.controller.KafkaController)
[2014-04-30 11:01:59,688] INFO [Controller 0]: Starting preferred replica 
leader election for partitions  (kafka.controller.KafkaController)
[2014-04-30 11:01:59,688] INFO [Partition state machine on Controller 0]: 
Invoking state change to OnlinePartition for partitions  
(kafka.controller.PartitionStateMachine)
[2014-04-30 11:01:59,690] INFO [SessionExpirationListener on 0], ZK expired; 
shut down all controller components and try to re-elect 
(kafka.controller.KafkaController$SessionExpirationListener)
[2014-04-30 11:01:59,690] INFO [Controller 0]: Controller shutdown complete 
(kafka.controller.KafkaController)
{code}


*zookeeper1 (ZooKeeper server)*

{{zookeeper.log}}

{code}
2014-04-30 07:32:37,460 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43468
2014-04-30 07:32:37,462 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43468; will be dropped if server is in r-o 
mode
2014-04-30 07:32:37,462 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client 
attempting to renew session 0x145b15f015d0000 at /10.0.0.21:43468
2014-04-30 07:32:37,463 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@595] - Established 
session 0x145b15f015d0000 with negotiated timeout 6000 for client 
/10.0.0.21:43468
2014-04-30 11:01:57,832 [myid:] - INFO  [SessionTracker:ZooKeeperServer@325] - 
Expiring session 0x145b15f015d0000, timeout of 6000ms exceeded
2014-04-30 11:01:57,833 [myid:] - INFO  [SessionTracker:ZooKeeperServer@325] - 
Expiring session 0x145b15f015d0009, timeout of 6000ms exceeded
2014-04-30 11:01:57,837 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@476] - Processed session termination for 
sessionid: 0x145b15f015d0000
2014-04-30 11:01:57,837 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@476] - Processed session termination for 
sessionid: 0x145b15f015d0009
2014-04-30 11:01:57,842 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1001] - 
Closed socket connection for client /10.0.0.21:43468 which had sessionid 
0x145b15f015d0000
2014-04-30 11:01:57,845 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1001] - 
Closed socket connection for client /10.0.0.21:43467 which had sessionid 
0x145b15f015d0009
2014-04-30 11:01:59,001 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43469
2014-04-30 11:01:59,002 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43469; will be dropped if server is in r-o 
mode
2014-04-30 11:01:59,002 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client 
attempting to renew session 0x145b15f015d0009 at /10.0.0.21:43469
2014-04-30 11:01:59,003 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@588] - Invalid 
session 0x145b15f015d0009 for client /10.0.0.21:43469, probably expired
2014-04-30 11:01:59,004 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket 
connection for client /10.0.0.21:43469 which had sessionid 0x145b15f015d0009
2014-04-30 11:01:59,005 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43470
2014-04-30 11:01:59,008 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43470; will be dropped if server is in r-o 
mode
2014-04-30 11:01:59,008 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@839] - Client 
attempting to establish new session at /10.0.0.21:43470
2014-04-30 11:01:59,012 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@595] - 
Established session 0x145b15f015d000b with negotiated timeout 6000 for client 
/10.0.0.21:43470
2014-04-30 11:01:59,545 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43471
2014-04-30 11:01:59,545 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43471; will be dropped if server is in r-o 
mode
2014-04-30 11:01:59,545 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client 
attempting to renew session 0x145b15f015d0000 at /10.0.0.21:43471
2014-04-30 11:01:59,546 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@588] - Invalid 
session 0x145b15f015d0000 for client /10.0.0.21:43471, probably expired
2014-04-30 11:01:59,546 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket 
connection for client /10.0.0.21:43471 which had sessionid 0x145b15f015d0000
2014-04-30 11:01:59,553 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43472
2014-04-30 11:01:59,555 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43472; will be dropped if server is in r-o 
mode
2014-04-30 11:01:59,556 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@839] - Client 
attempting to establish new session at /10.0.0.21:43472
2014-04-30 11:01:59,557 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@595] - 
Established session 0x145b15f015d000c with negotiated timeout 6000 for client 
/10.0.0.21:43472
2014-04-30 11:01:59,687 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when 
processing sessionid:0x145b15f015d000c type:delete cxid:0x19 zxid:0x5c 
txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election 
Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
2014-04-30 11:01:59,689 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when 
processing sessionid:0x145b15f015d000c type:create cxid:0x1a zxid:0x5d 
txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = 
NodeExists for /controller
2014-04-30 11:02:05,700 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when 
processing sessionid:0x145b15f015d000c type:create cxid:0x1d zxid:0x5e 
txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = 
NodeExists for /controller
{code}


*State of topic after issue did trigger*

{code}
$ date;bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic test
Wed Apr 30 11:02:25 UTC 2014
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: -1      Replicas: 0     Isr:
{code}


*Notes*

Kafka kinda recovered at some point.

What indicated a recovery:

- {{state-change.log}} reverted back to normal messages (partition wen tfrom 
OfflinePartition to OnlinePartition with leader 0; leader -1 was replaced with 
leader 0; etc.)
- {{kafka-topics.sh --descripe --topic test}} showed normal operations, too, 
i.e. one partition with one replica with one leader and with one ISR.

What speaks against a full recovery:

- {{server.log}} was still showing an indefinite loop of messages {{I wrote 
this conflicted ephemeral node 
[{"version":1,"brokerid":0,"timestamp":"1398860644679"}] at /controller a while 
back in a different session}}.


was (Author: miguno):
I could finally reproduce the bug again, though still not consistently yet.

And I think I have an idea what the root cause might be:  I was test-driving 
Kafka in several VMs locally on a Mac MBP laptop, and the energy saver settings 
(or sth similar) might be responsible for triggering the problem when the 
laptop was idle.  (In the test run below I intentionally let the laptop idle 
for 2-3 hours after creating a "test" topic and sending/consuming a few test 
massages.)  The timestamp of when the problem was triggered (11:01) correlates 
with me unlocking the laptop (user screen).  What's strange is that the laptop 
is configured not to go to sleep when idle, the hard disk is not asked to sleep 
either (and even if you did enable that setting in Mac OS X it does not have an 
effect on SSD's, which is the only disk installed in the laptop I was using), 
etc.  -- only the display is allowed to turn off after 10 minutes.  So even 
though the laptop is configured to be "always on" there apparently is something 
that throws off Kafka/ZooKeeper.  Also, as I said in my earlier comment I still 
cannot reproduce the issue consistently, i.e. sometimes Kafka/ZK work correctly 
after idling/unlocking;  still I think the root cause has ultimately to do with 
idling on Mac OS X.

Lastly, I don't know what the expected failure handling of Kafka/ZK is in such 
a scenario.  From what I can read from the logs below my setup seems to have 
simulated what could happen during a network partitioning (Kafka broker could 
not talk to ZooKeeper for a long time, hence ZK session expired, then things 
turned bad).

FWIW I still list further log messages and details just in case that 
information may be useful in the future.


@Jun Rao:  Yes, I did see ZK expiration in {{controller.log}}:

{code}
INFO [SessionExpirationListener on 0], ZK expired; shut down all controller 
components and try to re-elect 
(kafka.controller.KafkaController$SessionExpirationListener)
{code}

Below are some further details.  Things turn bad at 11:01.

*kafka1 (Kafka broker)*

{{server.log}}

{code}
[2014-04-30 07:18:56,481] INFO Completed load of log test-0 with log end offset 
0 (kafka.log.Log)
[2014-04-30 07:18:56,485] INFO Created log for partition [test,0] in 
/app/kafka/log with properties {segment.index.bytes -> 10485760, 
file.delete.delay.ms -> 60000, segment.bytes -> 1073741824, flush.ms -> 
9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 
4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 172800000, 
max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, 
min.cleanable.dirty.ratio -> 0.5, retention.ms -> 172800000}. 
(kafka.log.LogManager)
[2014-04-30 07:18:56,486] WARN Partition [test,0] on broker 0: No checkpointed 
highwatermark is found for partition [test,0] (kafka.cluster.Partition)
[2014-04-30 07:19:32,637] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 07:19:37,328] INFO Closing socket connection to /127.0.0.1. 
(kafka.network.Processor)
[2014-04-30 07:19:56,356] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 07:20:52,090] ERROR Closing socket for /127.0.1.1 because of error 
(kafka.network.Processor)
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)
        at sun.nio.ch.IOUtil.read(IOUtil.java:224)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)
        at kafka.utils.Utils$.read(Utils.scala:375)
        at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Processor.read(SocketServer.scala:347)
        at kafka.network.Processor.run(SocketServer.scala:245)
        at java.lang.Thread.run(Thread.java:701)
[2014-04-30 07:21:02,805] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 07:21:05,803] INFO Closing socket connection to /127.0.0.1. 
(kafka.network.Processor)
[2014-04-30 11:01:57,138] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 11:01:59,561] INFO Closing socket connection to /127.0.1.1. 
(kafka.network.Processor)
[2014-04-30 11:01:59,648] INFO 0 successfully elected as leader 
(kafka.server.ZookeeperLeaderElector)
[2014-04-30 11:01:59,692] INFO conflict in /controller data: 
{"version":1,"brokerid":0,"timestamp":"1398855719690"} stored data: 
{"version":1,"brokerid":0,"timestamp":"1398855719646"} (kafka.utils.ZkUtils$)
[2014-04-30 11:01:59,699] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":0,"timestamp":"1398855719690"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
[2014-04-30 11:02:05,704] INFO conflict in /controller data: 
{"version":1,"brokerid":0,"timestamp":"1398855719690"} stored data: 
{"version":1,"brokerid":0,"timestamp":"1398855719646"} (kafka.utils.ZkUtils$)
[2014-04-30 11:02:05,711] INFO I wrote this conflicted ephemeral node 
[{"version":1,"brokerid":0,"timestamp":"1398855719690"}] at /controller a while 
back in a different session, hence I will backoff for this node to be deleted 
by Zookeeper and retry (kafka.utils.ZkUtils$)
{code}


{{state-change.log}}

{code}
[2014-04-30 07:18:56,433] TRACE Controller 0 epoch 1 changed state of replica 0 
for partition [test,0] from NewReplica to OnlineReplica (state.change.logger)
[2014-04-30 07:18:56,443] TRACE Broker 0 received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)
 correlation id 7 from controller 0 epoch 1 for partition [test,0] 
(state.change.logger)
[2014-04-30 07:18:56,448] TRACE Broker 0 handling LeaderAndIsr request 
correlationId 7 from controller 0 epoch 1 starting the become-leader transition 
for partition [test,0] (state.change.logger)
[2014-04-30 07:18:56,455] TRACE Broker 0 stopped fetchers as part of 
become-leader request from controller 0 epoch 1 with correlation id 7 for 
partition [test,0] (state.change.logger)
[2014-04-30 07:18:56,495] TRACE Broker 0 completed LeaderAndIsr request 
correlationId 7 from controller 0 epoch 1 for the become-leader transition for 
partition [test,0] (state.change.logger)
[2014-04-30 07:18:56,506] TRACE Controller 0 epoch 1 received response 
correlationId 7 for a request sent to broker id:0,host:kafka1,port:9092 
(state.change.logger)
[2014-04-30 07:18:56,525] TRACE Broker 0 cached leader info 
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)
 for partition [test,0] in response to UpdateMetadata request sent by 
controller 0 epoch 1 with correlation id 7 (state.change.logger)
[2014-04-30 07:18:56,526] TRACE Controller 0 epoch 1 received response 
correlationId 7 for a request sent to broker id:0,host:kafka1,port:9092 
(state.change.logger)
[2014-04-30 11:01:59,564] TRACE Controller 0 epoch 1 changed partition [test,0] 
state from OnlinePartition to OfflinePartition (state.change.logger)
[2014-04-30 11:01:59,569] TRACE Controller 0 epoch 1 started leader election 
for partition [test,0] (state.change.logger)
[2014-04-30 11:01:59,589] ERROR Controller 0 epoch 1 initiated state change for 
partition [test,0] from OfflinePartition to OnlinePartition failed 
(state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is 
alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
        at 
kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
        at 
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:336)
        at 
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:185)
        at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:99)
        at 
kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:96)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at 
kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:96)
        at 
kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:433)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:344)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:330)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:329)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:328)
        at kafka.utils.Utils$.inLock(Utils.scala:538)
        at 
kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:327)
        at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
        at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2014-04-30 11:01:59,625] TRACE Controller 0 epoch 1 changed state of replica 0 
for partition [test,0] from OnlineReplica to OfflineReplica 
(state.change.logger)
[2014-04-30 11:01:59,676] TRACE Controller 0 epoch 2 started leader election 
for partition [test,0] (state.change.logger)
[2014-04-30 11:01:59,686] ERROR Controller 0 epoch 2 initiated state change for 
partition [test,0] from OfflinePartition to OnlinePartition failed 
(state.change.logger)
kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is 
alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
        at 
kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:61)
{code}


{{controller.log}}

{code}
[2014-04-30 07:18:56,354] DEBUG [TopicChangeListener on Controller 0]: Topic 
change listener fired for path /brokers/topics with children test 
(kafka.controller.PartitionStateMachine$TopicChangeListener)
[2014-04-30 07:18:56,373] INFO [TopicChangeListener on Controller 0]: New 
topics: [Set(test)], deleted topics: [Set()], new partition replica assignment 
[Map([test,0] -> List(0))] 
(kafka.controller.PartitionStateMachine$TopicChangeListener)
[2014-04-30 07:18:56,373] INFO [Controller 0]: New topic creation callback for 
[test,0] (kafka.controller.KafkaController)
[2014-04-30 07:18:56,376] INFO [Controller 0]: New partition creation callback 
for [test,0] (kafka.controller.KafkaController)
[2014-04-30 07:18:56,376] INFO [Partition state machine on Controller 0]: 
Invoking state change to NewPartition for partitions [test,0] 
(kafka.controller.PartitionStateMachine)
[2014-04-30 07:18:56,391] INFO [Replica state machine on controller 0]: 
Invoking state change to NewReplica for replicas 
[Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
[2014-04-30 07:18:56,394] INFO [Partition state machine on Controller 0]: 
Invoking state change to OnlinePartition for partitions [test,0] 
(kafka.controller.PartitionStateMachine)
[2014-04-30 07:18:56,395] DEBUG [Partition state machine on Controller 0]: Live 
assigned replicas for partition [test,0] are: [List(0)] 
(kafka.controller.PartitionStateMachine)
[2014-04-30 07:18:56,398] DEBUG [Partition state machine on Controller 0]: 
Initializing leader and isr for partition [test,0] to 
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1) 
(kafka.controller.PartitionStateMachine)
[2014-04-30 07:18:56,431] INFO [Replica state machine on controller 0]: 
Invoking state change to OnlineReplica for replicas 
[Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
[2014-04-30 11:01:59,560] INFO [BrokerChangeListener on Controller 0]: Broker 
change listener fired for path /brokers/ids with children  
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2014-04-30 11:01:59,560] INFO [BrokerChangeListener on Controller 0]: Newly 
added brokers: , deleted brokers: 0, all live brokers:  
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2014-04-30 11:01:59,561] INFO [Controller-0-to-broker-0-send-thread], Shutting 
down (kafka.controller.RequestSendThread)
[2014-04-30 11:01:59,562] INFO [Controller-0-to-broker-0-send-thread], Stopped  
(kafka.controller.RequestSendThread)
[2014-04-30 11:01:59,562] INFO [Controller-0-to-broker-0-send-thread], Shutdown 
completed (kafka.controller.RequestSendThread)
[2014-04-30 11:01:59,563] INFO [Controller 0]: Broker failure callback for 0 
(kafka.controller.KafkaController)
[2014-04-30 11:01:59,564] INFO [Controller 0]: Removed ArrayBuffer() from list 
of shutting down brokers. (kafka.controller.KafkaController)
[2014-04-30 11:01:59,564] INFO [Partition state machine on Controller 0]: 
Invoking state change to OfflinePartition for partitions [test,0] 
(kafka.controller.PartitionStateMachine)
[2014-04-30 11:01:59,588] DEBUG [OfflinePartitionLeaderSelector]: No broker in 
ISR is alive for [test,0]. Pick the leader from the alive assigned replicas:  
(kafka.controller.OfflinePartitionLeaderSelector)
[2014-04-30 11:01:59,595] INFO [Replica state machine on controller 0]: 
Invoking state change to OfflineReplica for replicas 
[Topic=test,Partition=0,Replica=0] (kafka.controller.ReplicaStateMachine)
[2014-04-30 11:01:59,597] DEBUG [Controller 0]: Removing replica 0 from ISR 0 
for partition [test,0]. (kafka.controller.KafkaController)
[2014-04-30 11:01:59,625] INFO [Controller 0]: New leader and ISR for partition 
[test,0] is {"leader":-1,"leader_epoch":1,"isr":[]} 
(kafka.controller.KafkaController)
[2014-04-30 11:01:59,628] DEBUG The stop replica request (delete = true) sent 
to broker 0 is  (kafka.controller.ControllerBrokerRequestBatch)
[2014-04-30 11:01:59,629] DEBUG The stop replica request (delete = false) sent 
to broker 0 is [Topic=test,Partition=0,Replica=0] 
(kafka.controller.ControllerBrokerRequestBatch)
[2014-04-30 11:01:59,635] WARN [Channel manager on controller 0]: Not sending 
request Name: StopReplicaRequest; Version: 0; CorrelationId: 11; ClientId: ; 
DeletePartitions: false; ControllerId: 0; ControllerEpoch: 1; Partitions: 
[test,0] to broker 0, since it is offline. 
(kafka.controller.ControllerChannelManager)
[2014-04-30 11:01:59,644] INFO [Controller 0]: Controller shutdown complete 
(kafka.controller.KafkaController)
[2014-04-30 11:01:59,649] INFO [Controller 0]: Broker 0 starting become 
controller state transition (kafka.controller.KafkaController)
[2014-04-30 11:01:59,651] INFO [Controller 0]: Controller 0 incremented epoch 
to 2 (kafka.controller.KafkaController)
[2014-04-30 11:01:59,672] INFO [Controller 0]: Partitions undergoing preferred 
replica election:  (kafka.controller.KafkaController)
[2014-04-30 11:01:59,672] INFO [Controller 0]: Partitions that completed 
preferred replica election:  (kafka.controller.KafkaController)
[2014-04-30 11:01:59,672] INFO [Controller 0]: Resuming preferred replica 
election for partitions:  (kafka.controller.KafkaController)
[2014-04-30 11:01:59,673] INFO [Controller 0]: Partitions being reassigned: 
Map() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,673] INFO [Controller 0]: Partitions already reassigned: 
List() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,673] INFO [Controller 0]: Resuming reassignment of 
partitions: Map() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: List of topics to be deleted:  
(kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: List of topics ineligible for 
deletion: test (kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: Currently active brokers in the 
cluster: Set() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: Currently shutting brokers in 
the cluster: Set() (kafka.controller.KafkaController)
[2014-04-30 11:01:59,675] INFO [Controller 0]: Current list of topics in the 
cluster: Set(test) (kafka.controller.KafkaController)
[2014-04-30 11:01:59,676] INFO [Replica state machine on controller 0]: Started 
replica state machine with initial state -> 
Map([Topic=test,Partition=0,Replica=0] -> ReplicaDeletionIneligible) 
(kafka.controller.ReplicaStateMachine)
[2014-04-30 11:01:59,685] DEBUG [OfflinePartitionLeaderSelector]: No broker in 
ISR is alive for [test,0]. Pick the leader from the alive assigned replicas:  
(kafka.controller.OfflinePartitionLeaderSelector)
[2014-04-30 11:01:59,686] INFO [Partition state machine on Controller 0]: 
Started partition state machine with initial state -> Map([test,0] -> 
OfflinePartition) (kafka.controller.PartitionStateMachine)
[2014-04-30 11:01:59,687] INFO [Controller 0]: Broker 0 is ready to serve as 
the new controller with epoch 2 (kafka.controller.KafkaController)
[2014-04-30 11:01:59,688] INFO [Controller 0]: Starting preferred replica 
leader election for partitions  (kafka.controller.KafkaController)
[2014-04-30 11:01:59,688] INFO [Partition state machine on Controller 0]: 
Invoking state change to OnlinePartition for partitions  
(kafka.controller.PartitionStateMachine)
[2014-04-30 11:01:59,690] INFO [SessionExpirationListener on 0], ZK expired; 
shut down all controller components and try to re-elect 
(kafka.controller.KafkaController$SessionExpirationListener)
[2014-04-30 11:01:59,690] INFO [Controller 0]: Controller shutdown complete 
(kafka.controller.KafkaController)
{code}


*zookeeper1 (ZooKeeper server)*

{{zookeeper.log}}

{code}
2014-04-30 07:32:37,460 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43468
2014-04-30 07:32:37,462 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43468; will be dropped if server is in r-o 
mode
2014-04-30 07:32:37,462 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client 
attempting to renew session 0x145b15f015d0000 at /10.0.0.21:43468
2014-04-30 07:32:37,463 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@595] - Established 
session 0x145b15f015d0000 with negotiated timeout 6000 for client 
/10.0.0.21:43468
2014-04-30 11:01:57,832 [myid:] - INFO  [SessionTracker:ZooKeeperServer@325] - 
Expiring session 0x145b15f015d0000, timeout of 6000ms exceeded
2014-04-30 11:01:57,833 [myid:] - INFO  [SessionTracker:ZooKeeperServer@325] - 
Expiring session 0x145b15f015d0009, timeout of 6000ms exceeded
2014-04-30 11:01:57,837 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@476] - Processed session termination for 
sessionid: 0x145b15f015d0000
2014-04-30 11:01:57,837 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@476] - Processed session termination for 
sessionid: 0x145b15f015d0009
2014-04-30 11:01:57,842 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1001] - 
Closed socket connection for client /10.0.0.21:43468 which had sessionid 
0x145b15f015d0000
2014-04-30 11:01:57,845 [myid:] - INFO  [SyncThread:0:NIOServerCnxn@1001] - 
Closed socket connection for client /10.0.0.21:43467 which had sessionid 
0x145b15f015d0009
2014-04-30 11:01:59,001 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43469
2014-04-30 11:01:59,002 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43469; will be dropped if server is in r-o 
mode
2014-04-30 11:01:59,002 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client 
attempting to renew session 0x145b15f015d0009 at /10.0.0.21:43469
2014-04-30 11:01:59,003 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@588] - Invalid 
session 0x145b15f015d0009 for client /10.0.0.21:43469, probably expired
2014-04-30 11:01:59,004 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket 
connection for client /10.0.0.21:43469 which had sessionid 0x145b15f015d0009
2014-04-30 11:01:59,005 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43470
2014-04-30 11:01:59,008 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43470; will be dropped if server is in r-o 
mode
2014-04-30 11:01:59,008 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@839] - Client 
attempting to establish new session at /10.0.0.21:43470
2014-04-30 11:01:59,012 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@595] - 
Established session 0x145b15f015d000b with negotiated timeout 6000 for client 
/10.0.0.21:43470
2014-04-30 11:01:59,545 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43471
2014-04-30 11:01:59,545 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43471; will be dropped if server is in r-o 
mode
2014-04-30 11:01:59,545 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@832] - Client 
attempting to renew session 0x145b15f015d0000 at /10.0.0.21:43471
2014-04-30 11:01:59,546 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@588] - Invalid 
session 0x145b15f015d0000 for client /10.0.0.21:43471, probably expired
2014-04-30 11:01:59,546 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1001] - Closed socket 
connection for client /10.0.0.21:43471 which had sessionid 0x145b15f015d0000
2014-04-30 11:01:59,553 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@197] - Accepted 
socket connection from /10.0.0.21:43472
2014-04-30 11:01:59,555 [myid:] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@793] - Connection 
request from old client /10.0.0.21:43472; will be dropped if server is in r-o 
mode
2014-04-30 11:01:59,556 [myid:] - INFO  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@839] - Client 
attempting to establish new session at /10.0.0.21:43472
2014-04-30 11:01:59,557 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@595] - 
Established session 0x145b15f015d000c with negotiated timeout 6000 for client 
/10.0.0.21:43472
2014-04-30 11:01:59,687 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when 
processing sessionid:0x145b15f015d000c type:delete cxid:0x19 zxid:0x5c 
txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election 
Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
2014-04-30 11:01:59,689 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when 
processing sessionid:0x145b15f015d000c type:create cxid:0x1a zxid:0x5d 
txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = 
NodeExists for /controller
2014-04-30 11:02:05,700 [myid:] - INFO  [ProcessThread(sid:0 
cport:-1)::PrepRequestProcessor@627] - Got user-level KeeperException when 
processing sessionid:0x145b15f015d000c type:create cxid:0x1d zxid:0x5e 
txntype:-1 reqpath:n/a Error Path:/controller Error:KeeperErrorCode = 
NodeExists for /controller
{code}


*State of topic after issue did trigger*

{code}
$ date;bin/kafka-topics.sh --zookeeper zookeeper1 --describe --topic test
Wed Apr 30 11:02:25 UTC 2014
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: -1      Replicas: 0     Isr:
{code}


*Notes*

Kafka kinda recovered at some point.

What indicated a recovery:

- {{state-change.log}} reverted back to normal messages (partition wen tfrom 
OfflinePartition to OnlinePartition with leader 0; leader -1 was replaced with 
leader 0; etc.)
- {{kafka-topics.sh --descripe --topic test}} showed normal operations, too, 
i.e. one partition with one replica with one leader and with one ISR.

What speaks against a full recovery:

- {{server.log}} was still showing an indefinite loop of messages {{I wrote 
this conflicted ephemeral node 
[{"version":1,"brokerid":0,"timestamp":"1398860644679"}] at /controller a while 
back in a different session}}.

> Zookeeper leader election stuck in ephemeral node retry loop
> ------------------------------------------------------------
>
>                 Key: KAFKA-1029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1029
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 0.8.0
>            Reporter: Sam Meder
>            Assignee: Sam Meder
>            Priority: Blocker
>             Fix For: 0.8.0
>
>         Attachments: 
> 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, 
> "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, 
> "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ 
> "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a 
> while back in a different session, hence I will backoff for this node to be 
> deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with 
> left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
>   def elect: Boolean = {
>     controllerContext.zkClient.subscribeDataChanges(electionPath, 
> leaderChangeListener)
>     val timestamp = SystemTime.milliseconds.toString
>     val electString = ...
>     try {
>       
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, leaderId,
>         (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
>         controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it 
> looks like a new registration will be added every elect call - shouldn't it 
> register in startup()?) so can update leaderId to the current leader before 
> the call to create. If that happens then we will continuously get node exists 
> exceptions and the checker function will always return true, i.e. we will 
> never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when 
> calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, brokerId,
>         (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
>         controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the 
> broker that owned the node previously, although I am still not 100% sure if 
> that is sufficient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to