[
https://issues.apache.org/jira/browse/KAFKA-9796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rajini Sivaram resolved KAFKA-9796.
-----------------------------------
Fix Version/s: 2.6.0
Reviewer: Rajini Sivaram
Resolution: Fixed
> Broker shutdown could be stuck forever under certain conditions
> ---------------------------------------------------------------
>
> Key: KAFKA-9796
> URL: https://issues.apache.org/jira/browse/KAFKA-9796
> Project: Kafka
> Issue Type: Bug
> Reporter: David Jacot
> Assignee: David Jacot
> Priority: Major
> Fix For: 2.6.0
>
>
> During the broker initialisation, the Acceptor threads are started early to
> know the bound port and delays starting the processors to the end of the
> initialisation sequence. We have found out that the shutdown of a broker
> could be stuck forever under the following conditions:
> - the shutdown procedure is started before the processors are started;
> - the `newConnections` queues of the processors are full; and
> - an extra new connection has been accepted but can't be queued up in a
> processor.
> For instance, this could happen if a `NodeExistsException` is raised when the
> broker tries to register itself in ZK.
> When the above conditions happens, the shutting down triggers the shutdown of
> the acceptor threads and waits until they are (first thread dump bellow). If
> an acceptor as a pending connection which can't be queued up in a processor,
> it ends up waiting until space is made is new queue to accept the new
> connection (second thread dump bellow). As the processors are not started,
> the new connection queues are not drained so it never releases the acceptor
> thread.
> *Shutdown wait on acceptor to shutdown*
> {noformat}
> "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s
> tid=0x00007f625001c800 nid=0x272 waiting on condition [0x00007f6257ca4000]
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> - parking to wait for <0x0000000689a61800> (a
> java.util.concurrent.CountDownLatch$Sync)
> at
> java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt([email protected]/AbstractQueuedSynchronizer.java:885)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1039)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1345)
> at
> java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:232)
> at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
> at kafka.network.Acceptor.shutdown(SocketServer.scala:521)
> at
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
> at
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
> at
> kafka.network.SocketServer$$Lambda$604/0x0000000840540840.apply(Unknown
> Source)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at
> scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
> at
> kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
> - locked <0x0000000689a61ac0> (a kafka.network.SocketServer)
> at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806)
> at
> kafka.server.KafkaServer$$Lambda$602/0x000000084052b040.apply$mcV$sp(Unknown
> Source)
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806)
> at kafka.server.KafkaServer.startup(KafkaServer.scala:522)
> at
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
> {noformat}
> *Acceptor waits on processor to accept the new connection*
> {noformat}
> "data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54
> prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s tid=0x00007f62523b5000
> nid=0x2ca waiting on condition [0x00007f6157130000]
> java.lang.Thread.State: WAITING (parking)
> at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> - parking to wait for <0x0000000689a7cad8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at
> java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
> at
> java.util.concurrent.ArrayBlockingQueue.put([email protected]/ArrayBlockingQueue.java:367)
> at kafka.network.Processor.accept(SocketServer.scala:1020)
> at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:639)
> at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:566)
> at kafka.network.Acceptor.run(SocketServer.scala:550)
> at java.lang.Thread.run([email protected]/Thread.java:834)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)