[
https://issues.apache.org/jira/browse/KAFKA-1716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14386019#comment-14386019
]
Jiangjie Qin commented on KAFKA-1716:
-------------------------------------
[~dchu] Do you mean that the fetchers have never been created? That's a good
point, but I still do not totally understand the cause.
The first rebalance of ZookeeperConsumeConnector occurs when KafkaStreams are
created. That means you need to specify a topic count map and create streams.
So leader finder thread will send TopicMetadataRequest to brokers to get back
the topic metadata for the topic. By default auto topic creation is enabled on
Kafka brokers. That means when broker saw a TopicMetadataRequest asking for a
topic that does not exist yet, it will created it and return the topic
metadata. So the consumer fetcher thread will be created for the topic on
ZookeeperConsumerConnector. However, if auto topic creation is turned off, your
description looks possible.
About the shutdown issue. You are right, that is an issue that has been fixed
in KAFKA-1848, but seems not included in 0.8.2. I just changed the fix version
from 0.9.0 to 0.8.3 instead.
> hang during shutdown of ZookeeperConsumerConnector
> --------------------------------------------------
>
> Key: KAFKA-1716
> URL: https://issues.apache.org/jira/browse/KAFKA-1716
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8.1.1
> Reporter: Sean Fay
> Assignee: Neha Narkhede
> Attachments: after-shutdown.log, before-shutdown.log,
> kafka-shutdown-stuck.log
>
>
> It appears to be possible for {{ZookeeperConsumerConnector.shutdown()}} to
> wedge in the case that some consumer fetcher threads receive messages during
> the shutdown process.
> Shutdown thread:
> {code} -- Parking to wait for:
> java/util/concurrent/CountDownLatch$Sync@0x2aaaf3ef06d0
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java/util/concurrent/CountDownLatch.await(CountDownLatch.java:207)
> at kafka/utils/ShutdownableThread.shutdown(ShutdownableThread.scala:36)
> at
> kafka/server/AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:71)
> at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:121)
> at
> kafka/server/AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:120)
> at
> scala/collection/TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala/collection/mutable/HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala/collection/mutable/HashTable$class.foreachEntry(HashTable.scala:226)
> at scala/collection/mutable/HashMap.foreachEntry(HashMap.scala:39)
> at scala/collection/mutable/HashMap.foreach(HashMap.scala:98)
> at
> scala/collection/TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> kafka/server/AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:120)
> ^-- Holding lock: java/lang/Object@0x2aaaebcc7318[thin lock]
> at
> kafka/consumer/ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:148)
> at
> kafka/consumer/ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:171)
> at
> kafka/consumer/ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:167){code}
> ConsumerFetcherThread:
> {code} -- Parking to wait for:
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x2aaaebcc7568
> at jrockit/vm/Locks.park0(J)V(Native Method)
> at jrockit/vm/Locks.park(Locks.java:2230)
> at sun/misc/Unsafe.park(ZJ)V(Native Method)
> at java/util/concurrent/locks/LockSupport.park(LockSupport.java:156)
> at
> java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> at
> java/util/concurrent/LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> at kafka/consumer/PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> at
> kafka/consumer/ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111)
> at scala/collection/immutable/HashMap$HashMap1.foreach(HashMap.scala:224)
> at
> scala/collection/immutable/HashMap$HashTrieMap.foreach(HashMap.scala:403)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at
> kafka/server/AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111)
> at kafka/utils/Utils$.inLock(Utils.scala:538)
> at
> kafka/server/AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110)
> at
> kafka/server/AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
> at kafka/utils/ShutdownableThread.run(ShutdownableThread.scala:51)
> at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method){code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)