[
https://issues.apache.org/jira/browse/CAMEL-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Davy De Waele updated CAMEL-7922:
---------------------------------
Description:
The FuseSource client can start receiving messages as a result of a topic
subscription BEFORE any consumers have been registered on the MQTT endpoint.
When the MQTT Endpoint is started the MQTT connection is established, causing
an immediate influx of persisted messages. Issue is that at this point, no
consumers are available yet to process these messages.
*Receiving a PUBLISH message*
Publish message are received without any consumers. Result : msg with QoS > 0
are never processed.
{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in
CallbackConnection))
CallbackConnection.toReceiver(PUBLISH) line: 815
CallbackConnection.processFrame(MQTTFrame) line: 732
CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51
CallbackConnection$6.onTransportCommand(Object) line: 392
TcpTransport.drainInbound() line: 709
TcpTransport$6.run() line: 588
NioDispatchSource$3.run() line: 209
SerialDispatchQueue.run() line: 100
SimpleThread.run() line: 77
{noformat}
*No consumers registered yet*
Only when this finishes will Camel be able to process the messages.
{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in
MQTTEndpoint))
owns: SpringCamelContext (id=92)
owns: Object (id=143)
owns: StandardContext (id=144)
MQTTEndpoint.addConsumer(MQTTConsumer) line: 164
MQTTConsumer.doStart() line: 35
MQTTConsumer(ServiceSupport).start() line: 61
SpringCamelContext(DefaultCamelContext).startService(Service) line:
2158
SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean, boolean) line: 2452
SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean) line: 2388
SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean,
boolean, boolean, boolean, Collection<RouteService>) line: 2318
SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
boolean, boolean, boolean, boolean) line: 2091
SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951
SpringCamelContext(DefaultCamelContext).doStart() line: 1777
{noformat}
These messages will never be picked up untill camel is restarted.
was:
The FuseSource client can start receiving messages as a result of a topic
subscription BEFORE any consumers have been registered on the MQTT endpoint.
*Receiving a PUBLISH message*
Publish message are received without any consumers. Result : msg with QoS > 0
are never processed.
{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in
CallbackConnection))
CallbackConnection.toReceiver(PUBLISH) line: 815
CallbackConnection.processFrame(MQTTFrame) line: 732
CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51
CallbackConnection$6.onTransportCommand(Object) line: 392
TcpTransport.drainInbound() line: 709
TcpTransport$6.run() line: 588
NioDispatchSource$3.run() line: 209
SerialDispatchQueue.run() line: 100
SimpleThread.run() line: 77
{noformat}
*No consumers registered yet*
Only when this finishes will Camel be able to process the messages.
{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in
MQTTEndpoint))
owns: SpringCamelContext (id=92)
owns: Object (id=143)
owns: StandardContext (id=144)
MQTTEndpoint.addConsumer(MQTTConsumer) line: 164
MQTTConsumer.doStart() line: 35
MQTTConsumer(ServiceSupport).start() line: 61
SpringCamelContext(DefaultCamelContext).startService(Service) line:
2158
SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean, boolean) line: 2452
SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean) line: 2388
SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean,
boolean, boolean, boolean, Collection<RouteService>) line: 2318
SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
boolean, boolean, boolean, boolean) line: 2091
SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951
SpringCamelContext(DefaultCamelContext).doStart() line: 1777
{noformat}
These messages will never be picked up untill camel is restarted.
> MQTT endpoint misses QoS > 0 messages due to startup timing issue
> -----------------------------------------------------------------
>
> Key: CAMEL-7922
> URL: https://issues.apache.org/jira/browse/CAMEL-7922
> Project: Camel
> Issue Type: Bug
> Components: camel-mqtt
> Affects Versions: 2.14.0
> Reporter: Davy De Waele
>
> The FuseSource client can start receiving messages as a result of a topic
> subscription BEFORE any consumers have been registered on the MQTT endpoint.
> When the MQTT Endpoint is started the MQTT connection is established, causing
> an immediate influx of persisted messages. Issue is that at this point, no
> consumers are available yet to process these messages.
> *Receiving a PUBLISH message*
> Publish message are received without any consumers. Result : msg with QoS > 0
> are never processed.
> {noformat}
> Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in
> CallbackConnection))
> CallbackConnection.toReceiver(PUBLISH) line: 815
> CallbackConnection.processFrame(MQTTFrame) line: 732
> CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51
> CallbackConnection$6.onTransportCommand(Object) line: 392
> TcpTransport.drainInbound() line: 709
> TcpTransport$6.run() line: 588
> NioDispatchSource$3.run() line: 209
> SerialDispatchQueue.run() line: 100
> SimpleThread.run() line: 77
> {noformat}
> *No consumers registered yet*
> Only when this finishes will Camel be able to process the messages.
> {noformat}
> Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in
> MQTTEndpoint))
> owns: SpringCamelContext (id=92)
> owns: Object (id=143)
> owns: StandardContext (id=144)
> MQTTEndpoint.addConsumer(MQTTConsumer) line: 164
> MQTTConsumer.doStart() line: 35
> MQTTConsumer(ServiceSupport).start() line: 61
> SpringCamelContext(DefaultCamelContext).startService(Service) line:
> 2158
>
> SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
> boolean, boolean) line: 2452
>
> SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
> boolean) line: 2388
>
> SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean,
> boolean, boolean, boolean, Collection<RouteService>) line: 2318
>
> SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
> boolean, boolean, boolean, boolean) line: 2091
> SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951
> SpringCamelContext(DefaultCamelContext).doStart() line: 1777
> {noformat}
> These messages will never be picked up untill camel is restarted.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)