[ 
https://issues.apache.org/jira/browse/KAFKA-2770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14997287#comment-14997287
 ] 

ASF GitHub Bot commented on KAFKA-2770:
---------------------------------------

GitHub user guozhangwang opened a pull request:

    https://github.com/apache/kafka/pull/470

    KAFKA-2770: Catch and ignore WakeupException for commit upon closing

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/guozhangwang/kafka K2770

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/470.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #470
    
----

----


> Race condition causes Mirror Maker to hang during shutdown (new consumer)
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-2770
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2770
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Geoff Anderson
>            Assignee: Guozhang Wang
>             Fix For: 0.9.0.1
>
>
> I recently added clean bounce with new consumer to the mirror maker tests 
> (https://github.com/apache/kafka/pull/427), and noticed that in this case the 
> mirror maker process (with new consumer) sometimes hangs and fails to stop 
> when stopped with kill -15
> {code:title=mirror_maker.log|borderStyle=solid}
> [2015-11-06 22:06:04,213] INFO Start clean shutdown. 
> (kafka.tools.MirrorMaker$)
> [2015-11-06 22:06:04,221] INFO Shutting down consumer threads. 
> (kafka.tools.MirrorMaker$)
> [2015-11-06 22:06:04,239] INFO [mirrormaker-thread-0] mirrormaker-thread-0 
> shutting down (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2015-11-06 22:06:04,253] INFO [mirrormaker-thread-0] Flushing producer. 
> (kafka.tools.MirrorMaker$MirrorMakerThread)
> [2015-11-06 22:06:04,254] INFO [mirrormaker-thread-0] Committing consumer 
> offsets. (kafka.tools.MirrorMaker$MirrorMakerThread)
> Exception in thread "mirrormaker-thread-0" 
> org.apache.kafka.common.errors.WakeupException
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:304)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:194)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:154)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:347)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:895)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:869)
>       at 
> kafka.tools.MirrorMaker$MirrorMakerNewConsumer.commit(MirrorMaker.scala:522)
>       at kafka.tools.MirrorMaker$.commitOffsets(MirrorMaker.scala:338)
>       at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:406)
> [2015-11-06 22:06:29,448] DEBUG Connection with worker4/192.168.50.104 
> disconnected (org.apache.kafka.common.network.Selector)
> java.io.EOFException
>       at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
>       at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>       at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:160)
>       at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:141)
>       at org.apache.kafka.common.network.Selector.poll(Selector.java:288)
>       at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>       at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> The current working hypothesis is this:
> a WakeupException is being triggered during the finally block in mirror maker 
> by the call to commitOffsets, and the mirror maker thread dies before the 
> call to shutdownLatch.countDown(). Therefore the shutdownLatch.await() call 
> in awaitShutdown() blocks forever and the process never exits.
> Why can commitOffsets trigger a wakeup exception?
> The shutdown hook is triggered in another thread, and does this:
>         shuttingDown = true
>         mirrorMakerConsumer.stop()  # Calls consumer.wakeup()
> If the timing is right (wrong), the wakeup flag is set, but the mirrormaker 
> produce/consume loop exits without triggering the WakeupException, and the 
> WakeupException isn't thrown until commitOffsets() is called in the finally 
> block.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to