[ 
https://issues.apache.org/jira/browse/CAMEL-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen resolved CAMEL-6717.
--------------------------------

       Resolution: Fixed
    Fix Version/s:     (was: Future)
                   2.13.0
                   2.12.3
         Assignee: Claus Ibsen

The word on the street is that this should be fixed in MQTT 1.7 client.

> camel-mqtt - dead lock when processing fetching/sending messages at high 
> frequency
> ----------------------------------------------------------------------------------
>
>                 Key: CAMEL-6717
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6717
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-mqtt
>    Affects Versions: 2.11.0
>         Environment: jdk 1.6.32
>            Reporter: Ralf Kornberger
>            Assignee: Claus Ibsen
>             Fix For: 2.12.3, 2.13.0
>
>
> I'm using Apache Camel with MQTT to fetch data from a Mosquitto broker. Data 
> are published there at high
> frequency (< 10s) by serveral devices. After receiving the data, I send an 
> acknowlege message back. This is done by publishing a message
> to a topic for each device. I'm using the Fusesource MQTT Client (version 
> 2.5) for this.
> I encountered the following problem: after some time (can be 15 minutes up to 
> 1 day) some thing "weird" happens.
> The application stops receiving or sending any data via MQTT. Looking at it 
> with jstack reveals the following:
> "hawtdispatch-DEFAULT-2" daemon prio=10 tid=0x00007facc1a2f000 nid=0x782d 
> waiting on condition [0x00007fac42bcf000]
>    java.lang.Thread.State: WAITING (parking)
>       at sun.misc.Unsafe.park(Native Method)
>       - parking to wait for  <0x000000078e792b88> (a 
> java.util.concurrent.CountDownLatch$Sync)
>       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
>       at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
>       at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
>       at org.fusesource.mqtt.client.Promise.await(Promise.java:88)
>       at 
> org.fusesource.mqtt.client.BlockingConnection.publish(BlockingConnection.java:73)
>       at 
> org.fusesource.mqtt.client.BlockingConnection.publish(BlockingConnection.java:82)
>       at 
> net.centersight.plugins.agent.protomqtt.comm.MQTTManager.sendACKMessage(MQTTManager.java:92)
>       at 
> net.centersight.plugins.agent.protomqtt.comm.MQTTCommunication.sendACKMessage(MQTTCommunication.java:116)
>       at 
> net.centersight.plugins.agent.protomqtt.camel.AgentMQTTbatchACKer.process(AgentMQTTbatchACKer.java:47)
>       at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at 
> org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:341)
>       at 
> org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:238)
>       at 
> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:166)
>       at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>       at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
>       at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>       at 
> org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
>       at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>       at 
> org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:334)
>       at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:220)
>       at 
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>       at 
> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
>       at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)
>       at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
>       at 
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>       at 
> org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
>       at 
> org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
>       at 
> org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>       at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>       at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
>       at 
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
>       at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
>       at 
> org.apache.camel.component.mqtt.MQTTConsumer.processExchange(MQTTConsumer.java:46)
>       at 
> org.apache.camel.component.mqtt.MQTTEndpoint$1.onPublish(MQTTEndpoint.java:88)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.toReceiver(CallbackConnection.java:815)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:732)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.access$17(CallbackConnection.java:727)
>       at 
> org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:660)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:226)
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:96)
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
>    Locked ownable synchronizers:
>       - None
>       
> Appearently, both the Camel receiving thread and the Fusesource client thread 
> are hanging at
> at org.fusesource.mqtt.client.Promise.await(Promise.java:88)
> Since I use BlockingConnection in my sending client, I took a look at the 
> Fusesource MQTT client.
> In BlockingConnection.java, function 
> public void publish(final UTF8Buffer topic, final Buffer payload, final QoS 
> qos, final boolean retain) throws Exception 
> in line 80, a Future is received on publishing. And there is an await() 
> afterwards.
> When I change this await() to await(30L, TimeUnit.SECONDS), the problem still 
> occurs, but the application keeps working.
> I've put in debug printouts at the trace class which show me that at the time 
> the problem occurs the MQTT client seems to loose the connection to the 
> broker and tries to reestablish it. Debug logs also show that the timeout 
> exception throw by the timeouted await comes every minute for ca. 20 minutes. 
> Then the problem "vanishes" and comes again after serval hours.
> Ps.: I also posted this at github, in the Fusesource MQTT issue tracker:
> https://github.com/fusesource/mqtt-client/issues/21#issuecomment-23861700



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to