[
https://issues.apache.org/jira/browse/CAMEL-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Davy De Waele updated CAMEL-7922:
---------------------------------
Description:
When the MQTT Endpoint is started the MQTT connection is immediately
established, causing an immediate influx of persisted messages (put on the
topic when the client was not available).
Issue is that at this point, most likely no consumers are available yet to
process these messages.
*Receiving a PUBLISH message*
Publish message are received without any consumers. Result : msg with QoS > 0
that were put on the topic while the client was not connected are never
processed.
{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in
CallbackConnection))
CallbackConnection.toReceiver(PUBLISH) line: 815
CallbackConnection.processFrame(MQTTFrame) line: 732
CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51
CallbackConnection$6.onTransportCommand(Object) line: 392
TcpTransport.drainInbound() line: 709
TcpTransport$6.run() line: 588
NioDispatchSource$3.run() line: 209
SerialDispatchQueue.run() line: 100
SimpleThread.run() line: 77
{noformat}
*No consumers registered yet*
Only when this finishes will Camel be able to process the messages.
{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in
MQTTEndpoint))
owns: SpringCamelContext (id=92)
owns: Object (id=143)
owns: StandardContext (id=144)
MQTTEndpoint.addConsumer(MQTTConsumer) line: 164
MQTTConsumer.doStart() line: 35
MQTTConsumer(ServiceSupport).start() line: 61
SpringCamelContext(DefaultCamelContext).startService(Service) line:
2158
SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean, boolean) line: 2452
SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean) line: 2388
SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean,
boolean, boolean, boolean, Collection<RouteService>) line: 2318
SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
boolean, boolean, boolean, boolean) line: 2091
SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951
SpringCamelContext(DefaultCamelContext).doStart() line: 1777
{noformat}
These messages will never be picked up.
Perhaps it's more the responsibility of the consumer / producer to start a
connection when they get attached to the endpoint ?
was:
The FuseSource client can start receiving messages as a result of a topic
subscription BEFORE any consumers have been registered on the MQTT endpoint.
When the MQTT Endpoint is started the MQTT connection is established, causing
an immediate influx of persisted messages. Issue is that at this point, no
consumers are available yet to process these messages.
*Receiving a PUBLISH message*
Publish message are received without any consumers. Result : msg with QoS > 0
are never processed.
{noformat}
Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in
CallbackConnection))
CallbackConnection.toReceiver(PUBLISH) line: 815
CallbackConnection.processFrame(MQTTFrame) line: 732
CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51
CallbackConnection$6.onTransportCommand(Object) line: 392
TcpTransport.drainInbound() line: 709
TcpTransport$6.run() line: 588
NioDispatchSource$3.run() line: 209
SerialDispatchQueue.run() line: 100
SimpleThread.run() line: 77
{noformat}
*No consumers registered yet*
Only when this finishes will Camel be able to process the messages.
{noformat}
Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in
MQTTEndpoint))
owns: SpringCamelContext (id=92)
owns: Object (id=143)
owns: StandardContext (id=144)
MQTTEndpoint.addConsumer(MQTTConsumer) line: 164
MQTTConsumer.doStart() line: 35
MQTTConsumer(ServiceSupport).start() line: 61
SpringCamelContext(DefaultCamelContext).startService(Service) line:
2158
SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean, boolean) line: 2452
SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
boolean) line: 2388
SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean,
boolean, boolean, boolean, Collection<RouteService>) line: 2318
SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
boolean, boolean, boolean, boolean) line: 2091
SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951
SpringCamelContext(DefaultCamelContext).doStart() line: 1777
{noformat}
These messages will never be picked up untill camel is restarted.
> MQTT endpoint misses QoS > 0 messages due to startup timing issue
> -----------------------------------------------------------------
>
> Key: CAMEL-7922
> URL: https://issues.apache.org/jira/browse/CAMEL-7922
> Project: Camel
> Issue Type: Bug
> Components: camel-mqtt
> Affects Versions: 2.14.0
> Reporter: Davy De Waele
> Assignee: Willem Jiang
>
> When the MQTT Endpoint is started the MQTT connection is immediately
> established, causing an immediate influx of persisted messages (put on the
> topic when the client was not available).
> Issue is that at this point, most likely no consumers are available yet to
> process these messages.
> *Receiving a PUBLISH message*
> Publish message are received without any consumers. Result : msg with QoS > 0
> that were put on the topic while the client was not connected are never
> processed.
> {noformat}
> Daemon Thread [hawtdispatch-DEFAULT-3] (Suspended (breakpoint at line 815 in
> CallbackConnection))
> CallbackConnection.toReceiver(PUBLISH) line: 815
> CallbackConnection.processFrame(MQTTFrame) line: 732
> CallbackConnection.access$1500(CallbackConnection, MQTTFrame) line: 51
> CallbackConnection$6.onTransportCommand(Object) line: 392
> TcpTransport.drainInbound() line: 709
> TcpTransport$6.run() line: 588
> NioDispatchSource$3.run() line: 209
> SerialDispatchQueue.run() line: 100
> SimpleThread.run() line: 77
> {noformat}
> *No consumers registered yet*
> Only when this finishes will Camel be able to process the messages.
> {noformat}
> Daemon Thread [localhost-startStop-1] (Suspended (breakpoint at line 164 in
> MQTTEndpoint))
> owns: SpringCamelContext (id=92)
> owns: Object (id=143)
> owns: StandardContext (id=144)
> MQTTEndpoint.addConsumer(MQTTConsumer) line: 164
> MQTTConsumer.doStart() line: 35
> MQTTConsumer(ServiceSupport).start() line: 61
> SpringCamelContext(DefaultCamelContext).startService(Service) line:
> 2158
>
> SpringCamelContext(DefaultCamelContext).doStartOrResumeRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
> boolean, boolean) line: 2452
>
> SpringCamelContext(DefaultCamelContext).doStartRouteConsumers(Map<Integer,DefaultRouteStartupOrder>,
> boolean) line: 2388
>
> SpringCamelContext(DefaultCamelContext).safelyStartRouteServices(boolean,
> boolean, boolean, boolean, Collection<RouteService>) line: 2318
>
> SpringCamelContext(DefaultCamelContext).doStartOrResumeRoutes(Map<String,RouteService>,
> boolean, boolean, boolean, boolean) line: 2091
> SpringCamelContext(DefaultCamelContext).doStartCamel() line: 1951
> SpringCamelContext(DefaultCamelContext).doStart() line: 1777
> {noformat}
> These messages will never be picked up.
> Perhaps it's more the responsibility of the consumer / producer to start a
> connection when they get attached to the endpoint ?
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)