[ 
https://issues.apache.org/jira/browse/CAMEL-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davy De Waele updated CAMEL-7922:
---------------------------------
    Description: 
The FuseSource client can start receiving messages as a result of a topic 
subscription BEFORE any consumers have been registered on the MQTT endpoint. 
When the MQTT Endpoint is started the MQTT connection is established, causing 
an immediate influx of persisted messages. Issue is that at this point, no 
consumers are available yet to process these messages.

*Receiving a PUBLISH message*

Publish message are received without any consumers. Result : msg with QoS > 0 
are never processed.

{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in 
CallbackConnection))       
        CallbackConnection.toReceiver(PUBLISH) line: 815        
        CallbackConnection.processFrame(MQTTFrame) line: 732    
        CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51  
        CallbackConnection$6.onTransportCommand(Object) line: 392       
        TcpTransport.drainInbound() line: 709   
        TcpTransport$6.run() line: 588  
        NioDispatchSource$3.run() line: 209     
        SerialDispatchQueue.run() line: 100     
        SimpleThread.run() line: 77     
{noformat}

*No consumers registered yet*

Only when this finishes will Camel be able to process the messages.

{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in 
MQTTEndpoint))      
        owns: SpringCamelContext  (id=92)       
        owns: Object  (id=143)  
        owns: StandardContext  (id=144) 
        MQTTEndpoint.addConsumer(MQTTConsumer) line: 164        
        MQTTConsumer.doStart() line: 35 
        MQTTConsumer(ServiceSupport).start() line: 61   
        SpringCamelContext(DefaultCamelContext).startService(Service) line: 
2158        
        
SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
 boolean, boolean) line: 2452       
        
SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
 boolean) line: 2388        
        
SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean, 
boolean, boolean, boolean, Collection<RouteService>) line: 2318       
        
SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
 boolean, boolean, boolean, boolean) line: 2091  
        SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951       
        SpringCamelContext(DefaultCamelContext).doStart() line: 1777    
{noformat}

These messages will never be picked up untill camel is restarted.

  was:
The FuseSource client can start receiving messages as a result of a topic 
subscription BEFORE any consumers have been registered on the MQTT endpoint.

*Receiving a PUBLISH message*

Publish message are received without any consumers. Result : msg with QoS > 0 
are never processed.

{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in 
CallbackConnection))       
        CallbackConnection.toReceiver(PUBLISH) line: 815        
        CallbackConnection.processFrame(MQTTFrame) line: 732    
        CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51  
        CallbackConnection$6.onTransportCommand(Object) line: 392       
        TcpTransport.drainInbound() line: 709   
        TcpTransport$6.run() line: 588  
        NioDispatchSource$3.run() line: 209     
        SerialDispatchQueue.run() line: 100     
        SimpleThread.run() line: 77     
{noformat}

*No consumers registered yet*

Only when this finishes will Camel be able to process the messages.

{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in 
MQTTEndpoint))      
        owns: SpringCamelContext  (id=92)       
        owns: Object  (id=143)  
        owns: StandardContext  (id=144) 
        MQTTEndpoint.addConsumer(MQTTConsumer) line: 164        
        MQTTConsumer.doStart() line: 35 
        MQTTConsumer(ServiceSupport).start() line: 61   
        SpringCamelContext(DefaultCamelContext).startService(Service) line: 
2158        
        
SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
 boolean, boolean) line: 2452       
        
SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
 boolean) line: 2388        
        
SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean, 
boolean, boolean, boolean, Collection<RouteService>) line: 2318       
        
SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
 boolean, boolean, boolean, boolean) line: 2091  
        SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951       
        SpringCamelContext(DefaultCamelContext).doStart() line: 1777    
{noformat}

These messages will never be picked up untill camel is restarted.


> MQTT endpoint misses QoS > 0 messages due to startup timing issue
> -----------------------------------------------------------------
>
>                 Key: CAMEL-7922
>                 URL: https://issues.apache.org/jira/browse/CAMEL-7922
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-mqtt
>    Affects Versions: 2.14.0
>            Reporter: Davy De Waele
>
> The FuseSource client can start receiving messages as a result of a topic 
> subscription BEFORE any consumers have been registered on the MQTT endpoint. 
> When the MQTT Endpoint is started the MQTT connection is established, causing 
> an immediate influx of persisted messages. Issue is that at this point, no 
> consumers are available yet to process these messages.
> *Receiving a PUBLISH message*
> Publish message are received without any consumers. Result : msg with QoS > 0 
> are never processed.
> {noformat}
> Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in 
> CallbackConnection))     
>       CallbackConnection.toReceiver(PUBLISH) line: 815        
>       CallbackConnection.processFrame(MQTTFrame) line: 732    
>       CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51  
>       CallbackConnection$6.onTransportCommand(Object) line: 392       
>       TcpTransport.drainInbound() line: 709   
>       TcpTransport$6.run() line: 588  
>       NioDispatchSource$3.run() line: 209     
>       SerialDispatchQueue.run() line: 100     
>       SimpleThread.run() line: 77     
> {noformat}
> *No consumers registered yet*
> Only when this finishes will Camel be able to process the messages.
> {noformat}
> Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in 
> MQTTEndpoint))    
>       owns: SpringCamelContext  (id=92)       
>       owns: Object  (id=143)  
>       owns: StandardContext  (id=144) 
>       MQTTEndpoint.addConsumer(MQTTConsumer) line: 164        
>       MQTTConsumer.doStart() line: 35 
>       MQTTConsumer(ServiceSupport).start() line: 61   
>       SpringCamelContext(DefaultCamelContext).startService(Service) line: 
> 2158        
>       
> SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
>  boolean, boolean) line: 2452       
>       
> SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
>  boolean) line: 2388        
>       
> SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean, 
> boolean, boolean, boolean, Collection<RouteService>) line: 2318       
>       
> SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
>  boolean, boolean, boolean, boolean) line: 2091  
>       SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951       
>       SpringCamelContext(DefaultCamelContext).doStart() line: 1777    
> {noformat}
> These messages will never be picked up untill camel is restarted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to