[ 
https://issues.apache.org/jira/browse/KAFKA-1574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14087882#comment-14087882
 ] 

Jun Rao commented on KAFKA-1574:
--------------------------------

This issue seems to be a race condition in AbstractServerThread. In Processor 
startup, we first call startupComplete(). However, if Processor.shutdown() is 
called before startupComplete() is called, the latter will set alive back to 
true and the thread will never be able to shutdown.

For the fix, since AbstractServerThread is not expected to be restarted, we can 
just set alive to true during the initialization, instead of during 
startupComplete().


> unit tests can hand on socketserver shutdown
> --------------------------------------------
>
>                 Key: KAFKA-1574
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1574
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8.2
>            Reporter: Jun Rao
>              Labels: newbie++
>
> Saw the following stacktrace.
> "kafka-network-thread-59843-2" prio=5 tid=7fc7e5943800 nid=0x11eefa000 
> runnable [11eef9000]
>    java.lang.Thread.State: RUNNABLE
>         at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
>         at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:136)
>         at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:69)
>         at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:69)
>         - locked <7f4a80328> (a sun.nio.ch.Util$2)
>         - locked <7f4a80310> (a java.util.Collections$UnmodifiableSet)
>         - locked <7f4a71968> (a sun.nio.ch.KQueueSelectorImpl)
>         at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:80)
>         at kafka.network.Processor.run(SocketServer.scala:296)
>         at java.lang.Thread.run(Thread.java:695)
> "Test worker" prio=5 tid=7fc7e50d4800 nid=0x11534c000 waiting on condition 
> [115349000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <7f4a69d50> (a 
> java.util.concurrent.CountDownLatch$Sync)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>         at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>         at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>         at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:113)
>         at 
> kafka.network.SocketServer$$anonfun$shutdown$2.apply(SocketServer.scala:92)
>         at 
> kafka.network.SocketServer$$anonfun$shutdown$2.apply(SocketServer.scala:91)
>         at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>         at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>         at kafka.network.SocketServer.shutdown(SocketServer.scala:91)
>         at 
> kafka.server.KafkaServer$$anonfun$shutdown$3.apply$mcV$sp(KafkaServer.scala:246)
>         at kafka.utils.Utils$.swallow(Utils.scala:172)
>         at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
>         at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
>         at kafka.utils.Logging$class.swallow(Logging.scala:94)
>         at kafka.utils.Utils$.swallow(Utils.scala:45)
>         at kafka.server.KafkaServer.shutdown(KafkaServer.scala:246)
>         at 
> kafka.admin.AdminTest$$anonfun$testPartitionReassignmentNonOverlappingReplicas$3.apply(AdminTest.scala:232)
>         at 
> kafka.admin.AdminTest$$anonfun$testPartitionReassignmentNonOverlappingReplicas$3.apply(AdminTest.scala:232)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to