[
https://issues.apache.org/jira/browse/KAFKA-914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13663146#comment-13663146
]
Joel Koshy commented on KAFKA-914:
----------------------------------
One more point: [td3] above does not need to originate from a watcher-triggered
rebalance. The initial rebalance can also run into the same deadlock. i.e., as
long as one or more watcher-triggered rebalances succeed and start fetchers
prior to the initial rebalance, we may end up in this wedged state. E.g., on
another instance I saw [td3] but on the main thread:
2013-05-21_17:07:14.34308 "main" prio=10 tid=0x00007f5e34008000 nid=0x4e49
waiting on condition [0x00007f5e3b410000]
2013-05-21_17:07:14.34308 java.lang.Thread.State: WAITING (parking)
2013-05-21_17:07:14.34309 at sun.misc.Unsafe.park(Native Method)
2013-05-21_17:07:14.34309 - parking to wait for <0x00007f5d36d99fa0> (a
java.util.concurrent.CountDownLatch$Sync)
2013-05-21_17:07:14.34309 at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
2013-05-21_17:07:14.34310 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
2013-05-21_17:07:14.34310 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
2013-05-21_17:07:14.34310 at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
2013-05-21_17:07:14.34311 at
java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
2013-05-21_17:07:14.34312 at
kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
2013-05-21_17:07:14.34313 at
kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
2013-05-21_17:07:14.34313 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
nnector.scala:486)
2013-05-21_17:07:14.34313 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
2013-05-21_17:07:14.34314 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
:420)
2013-05-21_17:07:14.34314 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
2013-05-21_17:07:14.34315 at
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
2013-05-21_17:07:14.34315 at
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
2013-05-21_17:07:14.34316 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
2013-05-21_17:07:14.34316 - locked <0x00007f5d36d4b2e0> (a
java.lang.Object)
2013-05-21_17:07:14.34317 at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
2013-05-21_17:07:14.34317 at
kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:712)
2013-05-21_17:07:14.34318 at
kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
2013-05-21_17:07:14.34318 at
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34318 at
kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
2013-05-21_17:07:14.34319 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319 at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
2013-05-21_17:07:14.34319 at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
2013-05-21_17:07:14.34320 at
scala.collection.immutable.List.foreach(List.scala:45)
2013-05-21_17:07:14.34320 at
scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
2013-05-21_17:07:14.34320 at
scala.collection.immutable.List.map(List.scala:45)
2013-05-21_17:07:14.34321 at
kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
2013-05-21_17:07:14.34322 at
kafka.tools.MirrorMaker.main(MirrorMaker.scala)
> Deadlock between initial rebalance and watcher-triggered rebalances
> -------------------------------------------------------------------
>
> Key: KAFKA-914
> URL: https://issues.apache.org/jira/browse/KAFKA-914
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8
> Reporter: Joel Koshy
> Fix For: 0.8
>
>
> Summary doesn't give the full picture and the fetcher-manager/fetcher-thread
>
>
> code is very complex so it's a bit hard to articulate the following very
>
>
> clearly. I will try and describe the sequence that results in a deadlock
>
>
> when starting up a large number of consumers at around the same time:
>
>
>
>
>
> - When a consumer's createMessageStream method is called, it initiates an
>
>
> initial inline rebalance.
>
>
> - However, before the above initial rebalance actually begins, a ZK watch
>
>
> may trigger (due to some other consumers starting up) and initiate a
>
>
> rebalance. This happens successfully so fetchers start and start filling
>
>
> up the chunk queues.
>
>
> - Another watch triggers and initiates yet another rebalance. This rebalance
>
>
> attempt tries to close the fetchers. Before the fetchers are stopped, we
>
>
> shutdown the leader-finder-thread to prevent new fetchers from being
>
>
> started.
>
>
> - The shutdown is accomplished by interrupting the leader-finder-thread and
>
>
> then awaiting its shutdown latch.
>
>
> - If the leader-finder-thread still has a partition without leader to
>
>
> process and tries to add a fetcher for it, it will get an exception
>
>
> (InterruptedException if acquiring the partitionMapLock or
>
>
> ClosedByInterruptException if performing an offset request). If we get an
>
>
> InterruptedException the thread's interrupted flag is cleared.
>
>
> - However, the leader-finder-thread may have multiple partitions without
>
>
> leader that it is currently processing. So if the interrupted flag is
>
>
> cleared and the leader-finder-thread tries to add a fetcher for a another
>
>
> partition, it does not receive an InterruptedException when it tries to
>
>
> acquire the partitionMapLock. It can end up blocking indefinitely at that
>
>
> point.
>
>
> - The problem is that until now, the createMessageStream's initial inline
>
>
> rebalance has not yet returned - it is blocked on the rebalance lock
>
>
> waiting on the second watch-triggered rebalance to complete. i.e., the
>
>
> consumer iterators have not been created yet and thus the fetcher queues
>
>
> get filled up. [td1]
>
>
> - As a result, processPartitionData (which holds on to the partitionMapLock)
>
>
> in one or more fetchers will be blocked trying to enqueue into a full
>
>
> chunk queue.[td2]
>
>
> - So the leader-finder-thread cannot finish adding fetchers for the
>
>
> remaining partitions without leader and thus cannot shutdown.
>
>
>
>
>
> One way to fix would be to let the exception from the leader-finder-thread
>
>
> propagate outside if the leader-finder-thread is currently shutting down -
>
>
> and avoid the subsequent (unnecessary) attempt to add a fetcher and lock
>
>
> partitionMapLock. There may be more elegant fixes (such as rewriting the
>
>
> whole consumer manager logic) but obviously we want to avoid extensive
>
>
> changes at this point in 0.8.
>
>
>
>
>
> Relevant portions of the thread-dump are here:
>
>
>
>
>
> [td1] createMessageStream's initial inline rebalance (blocked on the ongoing
>
>
> watch-triggered rebalance)
>
>
>
>
>
> 2013-05-20_17:50:13.04848 "main" prio=10 tid=0x00007f5960008000 nid=0x3772
> waiting for monitor entry [0x00007f59666c3000]
>
> 2013-05-20_17:50:13.04848 java.lang.Thread.State: BLOCKED (on object
> monitor)
>
> 2013-05-20_17:50:13.04848 at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
>
> 2013-05-20_17:50:13.04849 - waiting to lock <0x00007f58637dfe10> (a
> java.lang.Object)
>
> 2013-05-20_17:50:13.04849 at
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:678)
>
> 2013-05-20_17:50:13.04850 at
> kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:712)
>
> 2013-05-20_17:50:13.04850 at
> kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:140)
>
> 2013-05-20_17:50:13.04850 at
> kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
>
>
> 2013-05-20_17:50:13.04850 at
> kafka.tools.MirrorMaker$$anonfun$4.apply(MirrorMaker.scala:118)
>
>
> 2013-05-20_17:50:13.04850 at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>
>
> 2013-05-20_17:50:13.04851 at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>
>
> 2013-05-20_17:50:13.04851 at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>
>
> 2013-05-20_17:50:13.04851 at
> scala.collection.immutable.List.foreach(List.scala:45)
>
>
> 2013-05-20_17:50:13.04851 at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>
>
> 2013-05-20_17:50:13.04852 at
> scala.collection.immutable.List.map(List.scala:45)
>
>
> 2013-05-20_17:50:13.04852 at
> kafka.tools.MirrorMaker$.main(MirrorMaker.scala:118)
>
>
> 2013-05-20_17:50:13.04852 at
> kafka.tools.MirrorMaker.main(MirrorMaker.scala)
>
>
>
>
>
> [td2] A consumer fetcher thread blocked on full queue.
>
>
>
>
>
> 2013-05-20_17:50:13.04703
> "ConsumerFetcherThread-xxxx-1368836182178-2009023c-0-3248" prio=10
> tid=0x00007f57a4010800 nid=0x3920 waiting on condition [0x00
>
> 007f58316ae000]
>
>
> 2013-05-20_17:50:13.04703 java.lang.Thread.State: WAITING (parking)
>
>
> 2013-05-20_17:50:13.04703 at sun.misc.Unsafe.park(Native Method)
>
>
> 2013-05-20_17:50:13.04704 - parking to wait for <0x00007f586381d6c0>
> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>
> 2013-05-20_17:50:13.04704 at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>
>
> 2013-05-20_17:50:13.04704 at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>
> 2013-05-20_17:50:13.04704 at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>
>
> 2013-05-20_17:50:13.04704 at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
>
>
> 2013-05-20_17:50:13.04705 at
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:50)
>
>
> 2013-05-20_17:50:13.04706 at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:131)
>
> 2013-05-20_17:50:13.04707 at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$4.apply(AbstractFetcherThread.scala:112)
>
> 2013-05-20_17:50:13.04708 at
> scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
>
>
> 2013-05-20_17:50:13.04709 at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:112)
>
>
> 2013-05-20_17:50:13.04709 at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
>
>
> 2013-05-20_17:50:13.04709 at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
> 2
>
>
>
>
>
> [td3] Second watch-triggered rebalance
>
>
>
>
>
> 2013-05-20_17:50:13.04725 "xxxx-1368836182178-2009023c_watcher_executor"
> prio=10 tid=0x00007f5960777800 nid=0x37af waiting on condition
> [0x00007f58318b00
> 00]
>
>
> 2013-05-20_17:50:13.04725 java.lang.Thread.State: WAITING (parking)
>
>
> 2013-05-20_17:50:13.04726 at sun.misc.Unsafe.park(Native Method)
>
>
> 2013-05-20_17:50:13.04726 - parking to wait for <0x00007f5863728de8>
> (a java.util.concurrent.CountDownLatch$Sync)
>
> 2013-05-20_17:50:13.04726 at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>
>
> 2013-05-20_17:50:13.04727 at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>
> 2013-05-20_17:50:13.04727 at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>
> 2013-05-20_17:50:13.04728 at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>
> 2013-05-20_17:50:13.04728 at
> java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>
>
> 2013-05-20_17:50:13.04729 at
> kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36)
>
>
> 2013-05-20_17:50:13.04729 at
> kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:125)
>
>
> 2013-05-20_17:50:13.04730 at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerCo
> nnector.scala:486)
>
>
> 2013-05-20_17:50:13.04730 at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:523)
>
> 2013-05-20_17:50:13.04731 at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala
> :420)
>
>
> 2013-05-20_17:50:13.04731 at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:373)
>
> 2013-05-20_17:50:13.04732 at
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
>
>
> 2013-05-20_17:50:13.04733 at
> scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
>
>
> 2013-05-20_17:50:13.04733 at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:368)
>
> 2013-05-20_17:50:13.04733 - locked <0x00007f58637dfe10> (a
> java.lang.Object)
>
> 2013-05-20_17:50:13.04734 at
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:325)
>
>
>
>
> [td4] leader-finder-thread still trying to process partitions without leader,
> blocked on the partitionMapLock held by processPartitionData in td2.
>
>
>
>
> 2013-05-20_17:50:13.04712 "xxxx-1368836182178-2009023c-leader-finder-thread"
> prio=10 tid=0x00007f57b0027800 nid=0x38d8 waiting on condition [0x00007f5831
>
> 7af000]
>
>
> 2013-05-20_17:50:13.04712 java.lang.Thread.State: WAITING (parking)
>
>
> 2013-05-20_17:50:13.04713 at sun.misc.Unsafe.park(Native Method)
>
>
> 2013-05-20_17:50:13.04713 - parking to wait for <0x00007f586375e3d8>
> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
>
> 2013-05-20_17:50:13.04713 at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
>
>
> 2013-05-20_17:50:13.04714 at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>
> 2013-05-20_17:50:13.04714 at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:867)
>
> 2013-05-20_17:50:13.04717 at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1201)
>
> 2013-05-20_17:50:13.04718 at
> java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:312)
>
>
> 2013-05-20_17:50:13.04718 at
> kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:173)
>
>
> 2013-05-20_17:50:13.04719 at
> kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:48)
>
>
> 2013-05-20_17:50:13.04719 - locked <0x00007f586374b040> (a
> java.lang.Object)
>
> 2013-05-20_17:50:13.04719 at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:83)
>
> 2013-05-20_17:50:13.04720 at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread$$anonfun$doWork$4.apply(ConsumerFetcherManager.scala:79)
>
> 2013-05-20_17:50:13.04721 at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>
>
> 2013-05-20_17:50:13.04721 at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
>
>
> 2013-05-20_17:50:13.04721 at
> scala.collection.Iterator$class.foreach(Iterator.scala:631)
>
>
> 2013-05-20_17:50:13.04722 at
> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
>
>
> 2013-05-20_17:50:13.04723 at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
>
>
> 2013-05-20_17:50:13.04723 at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>
>
> 2013-05-20_17:50:13.04723 at
> scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
>
>
> 2013-05-20_17:50:13.04724 at
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:79)
>
> 2013-05-20_17:50:13.04724 at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
>
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira
