[
https://issues.apache.org/jira/browse/CAMEL-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
André Stein updated CAMEL-8756:
-------------------------------
Summary: KafkaConsumer doesn't stop consuming when suspended, preventing
graceful route shutdown (was: KafkaConsumer doesn't stop consuming when
suspended, preventing graceful shutdown)
> KafkaConsumer doesn't stop consuming when suspended, preventing graceful
> route shutdown
> ---------------------------------------------------------------------------------------
>
> Key: CAMEL-8756
> URL: https://issues.apache.org/jira/browse/CAMEL-8756
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 2.15.2
> Reporter: André Stein
> Labels: consumer, kafka
> Attachments: CAMEL-8756.patch, SimpleKafkaToCamelExample.java
>
>
> Hi,
> we encountered a problem when consuming events from Kafka and stopping the
> route while there are still events in Kafka.
> Obviously, the {{run}}-method doesn't stop reading events from Kafka stream
> and so there are always events in-flight what prevents the route from being
> gracefully shut down.
> {noformat}
> 12:30:27.611 INFO o.a.c.impl.DefaultCamelContext - Apache Camel 2.15.2
> (CamelContext: camel-1) is shutting down
> 12:30:27.612 INFO o.a.c.i.DefaultShutdownStrategy - Starting to graceful
> shutdown 1 routes (timeout 300 seconds)
> 12:30:27.615 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 300 seconds.
> 12:30:28.615 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 299 seconds.
> 12:30:29.615 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 298 seconds.
> 12:30:30.616 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 297 seconds.
> 12:30:31.616 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 296 seconds.
> 12:30:32.616 INFO o.a.c.i.DefaultShutdownStrategy - Waiting as there are
> still 1 inflight and pending exchanges to complete, timeout in 295 seconds.
> 12:30:33.618 INFO o.a.c.c.kafka.KafkaConsumer - Stopping Kafka consumer
> 12:30:33.618 INFO k.c.ZookeeperConsumerConnector -
> [camelGroup1_localhorst-1431081021945-a3d9f455], ZKConsumerConnector shutting
> down
> 12:30:33.626 INFO k.c.ConsumerFetcherManager -
> [ConsumerFetcherManager-1431081022017] Stopping leader finder thread
> 12:30:33.626 INFO k.c.ConsumerFetcherManager$LeaderFinderThread -
> [camelGroup1_localhorst-1431081021945-a3d9f455-leader-finder-thread],
> Shutting down
> 12:30:33.626 INFO k.c.ConsumerFetcherManager$LeaderFinderThread -
> [camelGroup1_localhorst-1431081021945-a3d9f455-leader-finder-thread], Stopped
> 12:30:33.627 INFO k.c.ConsumerFetcherManager$LeaderFinderThread -
> [camelGroup1_localhorst-1431081021945-a3d9f455-leader-finder-thread],
> Shutdown completed
> 12:30:33.627 INFO k.c.ConsumerFetcherManager -
> [ConsumerFetcherManager-1431081022017] Stopping all fetchers
> 12:30:33.627 INFO k.consumer.ConsumerFetcherThread -
> [ConsumerFetcherThread-camelGroup1_localhorst-1431081021945-a3d9f455-0-0],
> Shutting down
> 12:30:33.627 INFO k.consumer.ConsumerFetcherThread -
> [ConsumerFetcherThread-camelGroup1_localhorst-1431081021945-a3d9f455-0-0],
> Stopped
> 12:30:33.627 INFO k.consumer.ConsumerFetcherThread -
> [ConsumerFetcherThread-camelGroup1_localhorst-1431081021945-a3d9f455-0-0],
> Shutdown completed
> 12:30:33.627 INFO k.c.ConsumerFetcherManager -
> [ConsumerFetcherManager-1431081022017] All connections stopped
> 12:30:33.662 INFO o.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event
> thread.
> 12:30:33.696 INFO org.apache.zookeeper.ZooKeeper - Session: 0x14d2d359d900022
> closed
> 12:30:33.696 INFO org.apache.zookeeper.ClientCnxn - EventThread shut down
> 12:30:33.696 INFO k.c.ZookeeperConsumerConnector -
> [camelGroup1_localhorst-1431081021945-a3d9f455], ZKConsumerConnector shutdown
> completed in 78 ms
> 12:30:33.698 INFO o.a.c.i.DefaultShutdownStrategy - Route: route1 shutdown
> complete, was consuming from:
> Endpoint[kafka://localhost:9092?groupId=camelGroup1&topic=mykafkatopic&zookeeperHost=localhost]
> 12:30:33.699 INFO o.a.c.i.DefaultShutdownStrategy - Graceful shutdown of 1
> routes completed in 6 seconds
> 12:30:33.703 INFO o.a.c.impl.DefaultCamelContext - Apache Camel 2.15.2
> (CamelContext: camel-1) uptime 12.510 seconds
> 12:30:33.703 INFO o.a.c.impl.DefaultCamelContext - Apache Camel 2.15.2
> (CamelContext: camel-1) is shutdown in 6.091 seconds
> 12:30:33.717 ERROR o.a.c.p.DefaultErrorHandler - Failed delivery for
> (MessageId: ID-localhorst-33557-1431081020733-0-1656461 on ExchangeId:
> ID-localhorst-33557-1431081020733-0-1656462). Exhausted after delivery
> attempt: 1 caught: java.util.concurrent.RejectedExecutionException
> Message History
> ---------------------------------------------------------------------------------------------------------------------------------------
> RouteId ProcessorId Processor
> Elapsed (ms)
> [route1 ] [route1 ] [
> ] [ 14]
> [route1 ] [filter1 ]
> [filter[{SimpleKafkaToCamelExample$1$$Lambda$1/1468303011@a38d7a3}]
> ] [ 1]
> [route1 ] [bean1 ]
> [bean[SimpleKafkaToCamelExample$1$$Lambda$2/1354011814@e4ce7ae]
> ] [ 0]
> Exchange
> ---------------------------------------------------------------------------------------------------------------------------------------
> Exchange[
> Id ID-localhorst-33557-1431081020733-0-1656462
> ExchangePattern InOnly
> Headers
> {breadcrumbId=ID-localhorst-33557-1431081020733-0-1656461,
> CamelRedelivered=false, CamelRedeliveryCounter=0, kafka.EXCHANGE_NAME=0,
> kafka.TOPIC=mykafkatopic}
> BodyType byte[]
> Body SOMESTRING
> ]
> Stacktrace
> ---------------------------------------------------------------------------------------------------------------------------------------
> java.util.concurrent.RejectedExecutionException: null
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:349)
> ~[camel-core-2.15.2.jar:2.15.2]
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
> [camel-core-2.15.2.jar:2.15.2]
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
> ~[camel-core-2.15.2.jar:2.15.2]
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
> ~[camel-core-2.15.2.jar:2.15.2]
> at
> org.apache.camel.processor.FilterProcessor.process(FilterProcessor.java:58)
> ~[camel-core-2.15.2.jar:2.15.2]
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
> ~[camel-core-2.15.2.jar:2.15.2]
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)
> ~[camel-core-2.15.2.jar:2.15.2]
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
> [camel-core-2.15.2.jar:2.15.2]
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
> [camel-core-2.15.2.jar:2.15.2]
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)
> [camel-core-2.15.2.jar:2.15.2]
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)
> [camel-core-2.15.2.jar:2.15.2]
> at
> org.apache.camel.component.kafka.KafkaConsumer$AutoCommitConsumerTask.run(KafkaConsumer.java:209)
> [camel-kafka-2.15.2.jar:2.15.2]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [na:1.8.0_45]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [na:1.8.0_45]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [na:1.8.0_45]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [na:1.8.0_45]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45]
> 12:30:34.171 INFO k.c.ZookeeperConsumerConnector -
> [camelGroup1_localhorst-1431081021945-a3d9f455], stopping watcher executor
> thread for consumer camelGroup1_localhorst-1431081021945-a3d9f455
> {noformat}
> Regards,
> --
> André
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)