[
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356289#comment-14356289
]
Ashwin Jayaprakash commented on KAFKA-1716:
-------------------------------------------
We upgraded to Kafka 0.8.2 last week and now we can reproduce this issue every
time on our Kafka consumer JVMs.
Our setup is like this. We start {{ConsumerConnector}} instances dynamically
based on a configurable property. Each of those {{ConsumerConnector}} instances
creates a {{ConsumerIterator}}. Right now we have 4 such instances in each JVM.
Naturally we have 4 separate threads consuming from those 4 iterators in
parallel.
All this worked ok until recently, where we faced some issues with consumer
rebalancing and an overloaded ZK subtree, see
http://markmail.org/thread/gnodacjjya6r573m. While we were trying to address
that we changed the defaults to these {{rebalance.max.retries 16}} and
{{rebalance.backoff.ms 10000}}. Note that we also upgraded to 0.8.2.
Everytime we shutdown the JVM, we first try to shutdown the consumers one by
one before exiting. With these recent changes, the JVM exit gets stuck because:
# The shutdown thread is different from the 4 consumer threads (in addition to
the background threads that ZK and Kafka create)
# The shutdown thread shuts down the first consumer and so that consumer exits
quickly and gracefully
# In the meanwhile the second, third and fourth consumers are trying to
rebalance the partitions
# Shutdown thread proceeds to call shutdown on the second consumer
## The shutdown thread appears to make some progress in shutting down the
second consumer but then gets stuck on a monitor that has been acquired by the
{{xx_watcher_executor}}
## This appears to be a deadlock because the {{xx_watcher_executor}} thread has
acquired the monitor lock and gone to sleep
# The shutdown then takes a long time because all the 3 remaining consumers
retry for {{16}} times and then give up
The thread dumps here should make it clear.
{code}
"Thread-13@8222" prio=5 tid=0x53 nid=NA waiting for monitor entry
java.lang.Thread.State: BLOCKED
waiting for
indexer-group_on-localhost-pid-58357-kafka-message-source-id-820_watcher_executor@8160
to release lock on <0x2b5e> (a java.lang.Object)
at
kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:191)
at
kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:119)
at
xx.yy.zz.processor.kafka.consumer.KafkaMessageSource.close(KafkaMessageSource.java:239)
at
xx.yy.zz.pipeline.source.MessageSourceStage.stop(MessageSourceStage.java:162)
at
xx.yy.common.util.lifecycle.LifecycleHelper.stopAll(LifecycleHelper.java:53)
at xx.yy.zz.pipeline.framework.Pipeline.stop(Pipeline.java:205)
at
xx.yy.common.util.lifecycle.LifecycleHelper.stopAll(LifecycleHelper.java:53)
at xx.yy.zz.pipeline.framework.Pipelines.stop(Pipelines.java:225)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-1)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at xx.yy.AppLifecycle.callAnnotatedMethods(AppLifecycle.java:163)
at xx.yy.AppLifecycle.stop(AppLifecycle.java:144)
- locked <0x2b31> (a xx.yy.AppLifecycle)
at xx.yy.AppLifecycle$6.stop(AppLifecycle.java:247)
at io.dropwizard.lifecycle.JettyManaged.doStop(JettyManaged.java:32)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:90)
- locked <0x2b83> (a java.lang.Object)
at
org.eclipse.jetty.util.component.ContainerLifeCycle.stop(ContainerLifeCycle.java:129)
at
org.eclipse.jetty.util.component.ContainerLifeCycle.doStop(ContainerLifeCycle.java:148)
at
org.eclipse.jetty.server.handler.AbstractHandler.doStop(AbstractHandler.java:71)
at org.eclipse.jetty.server.Server.doStop(Server.java:410)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:90)
- locked <0x2b84> (a java.lang.Object)
at
org.eclipse.jetty.util.thread.ShutdownThread.run(ShutdownThread.java:133)
"indexer-group_on-localhost-pid-58357-kafka-message-source-id-820_watcher_executor@8160"
daemon prio=5 tid=0x74 nid=NA sleeping
java.lang.Thread.State: TIMED_WAITING
blocks Thread-13@8222
at java.lang.Thread.sleep(Thread.java:-1)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:627)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:602)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:599)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:598)
- locked <0x2b5e> (a java.lang.Object)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:551)
{code}
Log snippet attached.
> hang during shutdown of ZookeeperConsumerConnector
> --------------------------------------------------
>
> Key: KAFKA-1716
> URL: https://issues.apache.org/jira/browse/KAFKA-1716
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8.1.1
> Reporter: Sean Fay
> Assignee: Neha Narkhede
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to
> wedge in the case that some consumer fetcher threads receive messages during
> the shutdown process.
> Shutdown thread:
> {code} -- Parking to wait for:
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> at
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> at
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> at
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code} -- Parking to wait for:
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> at
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at kafka/utils/Utils$.inLock(Utils.scala:538)
> at
> kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> at
> kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
> at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)