[
https://issues.apache.org/jira/browse/KAFKA-2770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-2770.
----------------------------------
Resolution: Fixed
Issue resolved by pull request 470
[https://github.com/apache/kafka/pull/470]
> Race condition causes Mirror Maker to hang during shutdown (new consumer)
> -------------------------------------------------------------------------
>
> Key: KAFKA-2770
> URL: https://issues.apache.org/jira/browse/KAFKA-2770
> Project: Kafka
> Issue Type: Bug
> Reporter: Geoff Anderson
> Assignee: Guozhang Wang
> Priority: Blocker
> Fix For: 0.9.0.0
>
>
> I recently added clean bounce with new consumer to the mirror maker tests
> (https://github.com/apache/kafka/pull/427), and noticed that in this case the
> mirror maker process (with new consumer) sometimes hangs and fails to stop
> when stopped with kill -15
> {code:title=mirror_maker.log|borderStyle=solid}
> [2015-11-06 22:06:04,213] INFO Start clean shutdown.
> (kafka.tools.MirrorMaker$)
> [2015-11-06 22:06:04,221] INFO Shutting down consumer threads.
> (kafka.tools.MirrorMaker$)
> [2015-11-06 22:06:04,239] INFO [mirrormaker-thread-0] mirrormaker-thread-0
> shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2015-11-06 22:06:04,253] INFO [mirrormaker-thread-0] Flushing producer.
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2015-11-06 22:06:04,254] INFO [mirrormaker-thread-0] Committing consumer
> offsets. (kafka.tools.MirrorMaker$MirrorMakerThread)
> Exception in thread "mirrormaker-thread-0"
> org.apache.kafka.common.errors.WakeupException
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:304)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:194)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:154)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:347)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:895)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:869)
> at
> kafka.tools.MirrorMaker$MirrorMakerNewConsumer.commit(MirrorMaker.scala:522)
> at kafka.tools.MirrorMaker$.commitOffsets(MirrorMaker.scala:338)
> at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:406)
> [2015-11-06 22:06:29,448] DEBUG Connection with worker4/192.168.50.104
> disconnected (org.apache.kafka.common.network.Selector)
> java.io.EOFException
> at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160)
> at
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:288)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> The current working hypothesis is this:
> a WakeupException is being triggered during the finally block in mirror maker
> by the call to commitOffsets, and the mirror maker thread dies before the
> call to shutdownLatch.countDown(). Therefore the shutdownLatch.await() call
> in awaitShutdown() blocks forever and the process never exits.
> Why can commitOffsets trigger a wakeup exception?
> The shutdown hook is triggered in another thread, and does this:
> shuttingDown = true
> mirrorMakerConsumer.stop() # Calls consumer.wakeup()
> If the timing is right (wrong), the wakeup flag is set, but the mirrormaker
> produce/consume loop exits without triggering the WakeupException, and the
> WakeupException isn't thrown until commitOffsets() is called in the finally
> block.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)