David Jacot created KAFKA-9796:
----------------------------------
Summary: Broker shutdown could be stuck forever under certain
conditions
Key: KAFKA-9796
URL: https://issues.apache.org/jira/browse/KAFKA-9796
Project: Kafka
Issue Type: Bug
Reporter: David Jacot
Assignee: David Jacot
During the broker initialisation, the Acceptor threads are started early to
know the bound port and delays starting the processors to the end of the
initialisation sequence. We have found out that the shutdown of a broker could
be stuck forever under the following conditions:
- the shutdown procedure is started before the processors are started;
- the `newConnections` queues of the processors are full; and
- an extra new connection has been accepted but can't be queued up in a
processor.
For instance, this could happen if a `NodeExistsException` is raised when the
broker tries to register itself in ZK.
When the above conditions happens, the shutting down triggers the shutdown of
the acceptor threads and waits until they are (first thread dump bellow). If an
acceptor as a pending connection which can't be queued up in a processor, it
ends up waiting until space is made is new queue to accept the new connection
(second thread dump bellow). As the processors are not started, the new
connection queues are not drained so it never releases the acceptor thread.
*Shutdown wait on acceptor to shutdown*
{noformat}
"main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s
tid=0x00007f625001c800 nid=0x272 waiting on condition [0x00007f6257ca4000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x0000000689a61800> (a
java.util.concurrent.CountDownLatch$Sync)
at
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt([email protected]/AbstractQueuedSynchronizer.java:885)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1039)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly([email protected]/AbstractQueuedSynchronizer.java:1345)
at
java.util.concurrent.CountDownLatch.await([email protected]/CountDownLatch.java:232)
at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
at kafka.network.Acceptor.shutdown(SocketServer.scala:521)
at
kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
at
kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
at
kafka.network.SocketServer$$Lambda$604/0x0000000840540840.apply(Unknown Source)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:213)
at
kafka.network.SocketServer.stopProcessingRequests(SocketServer.scala:267)
- locked <0x0000000689a61ac0> (a kafka.network.SocketServer)
at kafka.server.KafkaServer.$anonfun$shutdown$5(KafkaServer.scala:806)
at
kafka.server.KafkaServer$$Lambda$602/0x000000084052b040.apply$mcV$sp(Unknown
Source)
at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:68)
at kafka.server.KafkaServer.shutdown(KafkaServer.scala:806)
at kafka.server.KafkaServer.startup(KafkaServer.scala:522)
at
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44)
at kafka.Kafka$.main(Kafka.scala:82)
at kafka.Kafka.main(Kafka.scala)
{noformat}
*Acceptor waits on processor to accept the new connection*
{noformat}
"data-plane-kafka-socket-acceptor-ListenerName(EXTERNAL)-SASL_SSL-9092" #54
prio=5 os_prio=0 cpu=16.23ms elapsed=106346.62s tid=0x00007f62523b5000
nid=0x2ca waiting on condition [0x00007f6157130000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x0000000689a7cad8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:194)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:2081)
at
java.util.concurrent.ArrayBlockingQueue.put([email protected]/ArrayBlockingQueue.java:367)
at kafka.network.Processor.accept(SocketServer.scala:1020)
at kafka.network.Acceptor.assignNewConnection(SocketServer.scala:639)
at kafka.network.Acceptor.$anonfun$run$1(SocketServer.scala:566)
at kafka.network.Acceptor.run(SocketServer.scala:550)
at java.lang.Thread.run([email protected]/Thread.java:834)
{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)