[
https://issues.apache.org/jira/browse/CAMEL-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen resolved CAMEL-6717.
--------------------------------
Resolution: Fixed
Fix Version/s: (was: Future)
2.13.0
2.12.3
Assignee: Claus Ibsen
The word on the street is that this should be fixed in MQTT 1.7 client.
> camel-mqtt - dead lock when processing fetching/sending messages at high
> frequency
> ----------------------------------------------------------------------------------
>
> Key: CAMEL-6717
> URL: https://issues.apache.org/jira/browse/CAMEL-6717
> Project: Camel
> Issue Type: Bug
> Components: camel-mqtt
> Affects Versions: 2.11.0
> Environment: jdk 1.6.32
> Reporter: Ralf Kornberger
> Assignee: Claus Ibsen
> Fix For: 2.12.3, 2.13.0
>
>
> I'm using Apache Camel with MQTT to fetch data from a Mosquitto broker. Data
> are published there at high
> frequency (< 10s) by serveral devices. After receiving the data, I send an
> acknowlege message back. This is done by publishing a message
> to a topic for each device. I'm using the Fusesource MQTT Client (version
> 2.5) for this.
> I encountered the following problem: after some time (can be 15 minutes up to
> 1 day) some thing "weird" happens.
> The application stops receiving or sending any data via MQTT. Looking at it
> with jstack reveals the following:
> "hawtdispatch-DEFAULT-2" daemon prio=10 tid=0x00007facc1a2f000 nid=0x782d
> waiting on condition [0x00007fac42bcf000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000078e792b88> (a
> java.util.concurrent.CountDownLatch$Sync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> at org.fusesource.mqtt.client.Promise.await(Promise.java:88)
> at
> org.fusesource.mqtt.client.BlockingConnection.publish(BlockingConnection.java:73)
> at
> org.fusesource.mqtt.client.BlockingConnection.publish(BlockingConnection.java:82)
> at
> net.centersight.plugins.agent.protomqtt.comm.MQTTManager.sendACKMessage(MQTTManager.java:92)
> at
> net.centersight.plugins.agent.protomqtt.comm.MQTTCommunication.sendACKMessage(MQTTCommunication.java:116)
> at
> net.centersight.plugins.agent.protomqtt.camel.AgentMQTTbatchACKer.process(AgentMQTTbatchACKer.java:47)
> at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:341)
> at
> org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:238)
> at
> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:166)
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> at
> org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> at
> org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:334)
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:220)
> at
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> at
> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
> at
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> at
> org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
> at
> org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
> at
> org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
> at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
> at
> org.apache.camel.component.mqtt.MQTTConsumer.processExchange(MQTTConsumer.java:46)
> at
> org.apache.camel.component.mqtt.MQTTEndpoint$1.onPublish(MQTTEndpoint.java:88)
> at
> org.fusesource.mqtt.client.CallbackConnection.toReceiver(CallbackConnection.java:815)
> at
> org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:732)
> at
> org.fusesource.mqtt.client.CallbackConnection.access$17(CallbackConnection.java:727)
> at
> org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> at
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:660)
> at
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> at
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:226)
> at
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:96)
> at
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> Locked ownable synchronizers:
> - None
>
> Appearently, both the Camel receiving thread and the Fusesource client thread
> are hanging at
> at org.fusesource.mqtt.client.Promise.await(Promise.java:88)
> Since I use BlockingConnection in my sending client, I took a look at the
> Fusesource MQTT client.
> In BlockingConnection.java, function
> public void publish(final UTF8Buffer topic, final Buffer payload, final QoS
> qos, final boolean retain) throws Exception
> in line 80, a Future is received on publishing. And there is an await()
> afterwards.
> When I change this await() to await(30L, TimeUnit.SECONDS), the problem still
> occurs, but the application keeps working.
> I've put in debug printouts at the trace class which show me that at the time
> the problem occurs the MQTT client seems to loose the connection to the
> broker and tries to reestablish it. Debug logs also show that the timeout
> exception throw by the timeouted await comes every minute for ca. 20 minutes.
> Then the problem "vanishes" and comes again after serval hours.
> Ps.: I also posted this at github, in the Fusesource MQTT issue tracker:
> https://github.com/fusesource/mqtt-client/issues/21#issuecomment-23861700
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)