[ 
https://issues.apache.org/jira/browse/CAMEL-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davy De Waele updated CAMEL-7922:
---------------------------------
    Description: 
When the MQTT Endpoint is started the MQTT connection is immediately 
established, causing an immediate influx of persisted messages (put on the 
topic when the client was not available). 

Issue is that at this point, most likely no consumers are available yet to 
process these messages.


*Receiving a PUBLISH message*

Publish message are received without any consumers. Result : msg with QoS > 0 
that were put on the topic while the client was not connected are never 
processed.

{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in 
CallbackConnection))       
        CallbackConnection.toReceiver(PUBLISH) line: 815        
        CallbackConnection.processFrame(MQTTFrame) line: 732    
        CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51  
        CallbackConnection$6.onTransportCommand(Object) line: 392       
        TcpTransport.drainInbound() line: 709   
        TcpTransport$6.run() line: 588  
        NioDispatchSource$3.run() line: 209     
        SerialDispatchQueue.run() line: 100     
        SimpleThread.run() line: 77     
{noformat}

*No consumers registered yet*

Only when this finishes will Camel be able to process the messages.

{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in 
MQTTEndpoint))      
        owns: SpringCamelContext  (id=92)       
        owns: Object  (id=143)  
        owns: StandardContext  (id=144) 
        MQTTEndpoint.addConsumer(MQTTConsumer) line: 164        
        MQTTConsumer.doStart() line: 35 
        MQTTConsumer(ServiceSupport).start() line: 61   
        SpringCamelContext(DefaultCamelContext).startService(Service) line: 
2158        
        
SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
 boolean, boolean) line: 2452       
        
SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
 boolean) line: 2388        
        
SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean, 
boolean, boolean, boolean, Collection<RouteService>) line: 2318       
        
SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
 boolean, boolean, boolean, boolean) line: 2091  
        SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951       
        SpringCamelContext(DefaultCamelContext).doStart() line: 1777    
{noformat}

These messages will never be picked up.

Perhaps it's more the responsibility of the consumer / producer to start a 
connection when they get attached to the endpoint ? 


  was:
The FuseSource client can start receiving messages as a result of a topic 
subscription BEFORE any consumers have been registered on the MQTT endpoint. 
When the MQTT Endpoint is started the MQTT connection is established, causing 
an immediate influx of persisted messages. Issue is that at this point, no 
consumers are available yet to process these messages.

*Receiving a PUBLISH message*

Publish message are received without any consumers. Result : msg with QoS > 0 
are never processed.

{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in 
CallbackConnection))       
        CallbackConnection.toReceiver(PUBLISH) line: 815        
        CallbackConnection.processFrame(MQTTFrame) line: 732    
        CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51  
        CallbackConnection$6.onTransportCommand(Object) line: 392       
        TcpTransport.drainInbound() line: 709   
        TcpTransport$6.run() line: 588  
        NioDispatchSource$3.run() line: 209     
        SerialDispatchQueue.run() line: 100     
        SimpleThread.run() line: 77     
{noformat}

*No consumers registered yet*

Only when this finishes will Camel be able to process the messages.

{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in 
MQTTEndpoint))      
        owns: SpringCamelContext  (id=92)       
        owns: Object  (id=143)  
        owns: StandardContext  (id=144) 
        MQTTEndpoint.addConsumer(MQTTConsumer) line: 164        
        MQTTConsumer.doStart() line: 35 
        MQTTConsumer(ServiceSupport).start() line: 61   
        SpringCamelContext(DefaultCamelContext).startService(Service) line: 
2158        
        
SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
 boolean, boolean) line: 2452       
        
SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
 boolean) line: 2388        
        
SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean, 
boolean, boolean, boolean, Collection<RouteService>) line: 2318       
        
SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
 boolean, boolean, boolean, boolean) line: 2091  
        SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951       
        SpringCamelContext(DefaultCamelContext).doStart() line: 1777    
{noformat}

These messages will never be picked up untill camel is restarted.


> MQTT endpoint misses QoS > 0 messages due to startup timing issue
> -----------------------------------------------------------------
>
>                 Key: CAMEL-7922
>                 URL: https://issues.apache.org/jira/browse/CAMEL-7922
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-mqtt
>    Affects Versions: 2.14.0
>            Reporter: Davy De Waele
>            Assignee: Willem Jiang
>
> When the MQTT Endpoint is started the MQTT connection is immediately 
> established, causing an immediate influx of persisted messages (put on the 
> topic when the client was not available). 
> Issue is that at this point, most likely no consumers are available yet to 
> process these messages.
> *Receiving a PUBLISH message*
> Publish message are received without any consumers. Result : msg with QoS > 0 
> that were put on the topic while the client was not connected are never 
> processed.
> {noformat}
> Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in 
> CallbackConnection))     
>       CallbackConnection.toReceiver(PUBLISH) line: 815        
>       CallbackConnection.processFrame(MQTTFrame) line: 732    
>       CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51  
>       CallbackConnection$6.onTransportCommand(Object) line: 392       
>       TcpTransport.drainInbound() line: 709   
>       TcpTransport$6.run() line: 588  
>       NioDispatchSource$3.run() line: 209     
>       SerialDispatchQueue.run() line: 100     
>       SimpleThread.run() line: 77     
> {noformat}
> *No consumers registered yet*
> Only when this finishes will Camel be able to process the messages.
> {noformat}
> Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in 
> MQTTEndpoint))    
>       owns: SpringCamelContext  (id=92)       
>       owns: Object  (id=143)  
>       owns: StandardContext  (id=144) 
>       MQTTEndpoint.addConsumer(MQTTConsumer) line: 164        
>       MQTTConsumer.doStart() line: 35 
>       MQTTConsumer(ServiceSupport).start() line: 61   
>       SpringCamelContext(DefaultCamelContext).startService(Service) line: 
> 2158        
>       
> SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
>  boolean, boolean) line: 2452       
>       
> SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
>  boolean) line: 2388        
>       
> SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean, 
> boolean, boolean, boolean, Collection<RouteService>) line: 2318       
>       
> SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
>  boolean, boolean, boolean, boolean) line: 2091  
>       SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951       
>       SpringCamelContext(DefaultCamelContext).doStart() line: 1777    
> {noformat}
> These messages will never be picked up.
> Perhaps it's more the responsibility of the consumer / producer to start a 
> connection when they get attached to the endpoint ? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to