[ 
https://issues.apache.org/jira/browse/KAFKA-4595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15823594#comment-15823594
 ] 

huxi commented on KAFKA-4595:
-----------------------------

[~pengwei] I don't think it's doable since it would leave inconsistent states 
for the involved partitions, thus polluting controller's cache. As in the 
current design, the topic deleting is totally asynchronous. Users nearly always 
see the topic is marked as deleted successfully although there are several 
steps the controller needs to finish in the background. If we time out the 
deleteTopicStopReplicaCallback, completeReplicaDeletion will not be invoked.

Does it make sense?

> Controller send thread can't stop when broker change listener event trigger 
> for  dead brokers
> ---------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4595
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4595
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.0, 0.10.1.1
>            Reporter: Pengwei
>            Priority: Critical
>              Labels: reliability
>             Fix For: 0.10.2.0
>
>
> In our test env, we found controller is not working after a delete topic 
> opertation and network issue, the stack is below:
> "ZkClient-EventThread-15-192.168.1.3:2184,192.168.1.4:2184,192.168.1.5:2184" 
> #15 daemon prio=5 os_prio=0 tid=0x00007fb76416e000 nid=0x3019 waiting on 
> condition [0x00007fb76b7c8000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c05497b8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>       at 
> kafka.utils.ShutdownableThread.awaitShutdown(ShutdownableThread.scala:50)
>       at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:32)
>       at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$removeExistingBroker(ControllerChannelManager.scala:128)
>       at 
> kafka.controller.ControllerChannelManager.removeBroker(ControllerChannelManager.scala:81)
>       - locked <0x00000000c0258760> (a java.lang.Object)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply$mcVI$sp(ReplicaStateMachine.scala:369)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcV$sp$2.apply(ReplicaStateMachine.scala:369)
>       at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:369)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
>       at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>       at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
>       at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
>       at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
>    Locked ownable synchronizers:
>       - <0x00000000c02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> "Controller-1001-to-broker-1003-send-thread" #88 prio=5 os_prio=0 
> tid=0x00007fb778342000 nid=0x5a4c waiting on condition [0x00007fb761de0000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x00000000c02587f8> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
>       at 
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
>       at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
>       at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:260)
>       at 
> kafka.controller.TopicDeletionManager.kafka$controller$TopicDeletionManager$$deleteTopicStopReplicaCallback(TopicDeletionManager.scala:378)
>       at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345)
>       at 
> kafka.controller.TopicDeletionManager$$anonfun$startReplicaDeletion$2$$anonfun$apply$3.apply(TopicDeletionManager.scala:345)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294)
>       at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addStopReplicaRequestForBrokers$2$$anonfun$apply$mcVI$sp$2.apply(ControllerChannelManager.scala:294)
>       at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:206)
>       - locked <0x00000000c04af988> (a java.lang.Object)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>    Locked ownable synchronizers:
>       - None



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to