[
https://issues.apache.org/jira/browse/CAMEL-15748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220325#comment-17220325
]
Marco Collovati commented on CAMEL-15748:
-----------------------------------------
[~davsclaus] I'm working on a PR for master, but I have a problem with the test.
As mentioned above, with paho 1.2.5 the call to MqttClient.connect hangs
indefinitely, regardless the connection timeout settings.
Can this be handled somehow in test code?
The only workaround I found is to set *timeToWait* on MqttClient and reset it
to *-1* after connection is established, but I think this is not a good idea
{code:java}
client.setTimeToWait(1000 +
getEndpoint().getConfiguration().getConnectionTimeout() * 1000);
client.connect(connectOptions);
client.setTimeToWait(-1);
{code}
> Paho consumer never connects if the broker is not reachable at startup
> ----------------------------------------------------------------------
>
> Key: CAMEL-15748
> URL: https://issues.apache.org/jira/browse/CAMEL-15748
> Project: Camel
> Issue Type: Bug
> Components: camel-paho
> Affects Versions: 3.5.0, 3.4.4, 3.6.0
> Reporter: Marco Collovati
> Priority: Major
> Fix For: 3.4.5, 3.7.0
>
> Attachments: PahoConsumerRestartTest.java
>
>
> Having a route with a paho consumer in a route, if the broker is not
> reachable at startup, camel context fail fast and shuts down.
> This can be avoided by setting the *SupervisingRouteController*, but this
> way, even if camel context does not fail, the consumer is never able to
> establish a connection.
> The reason is that when *PahoConsumer* starts for the first time, it creates
> a *MqttClient* and stores it into *client* field; the call to
> *client.connect* throws an exception due to broker down;
>
> {code:java}
>
> @Override
> protected void doStart() throws Exception {
> super.doStart(); connectOptions =
> PahoEndpoint.createMqttConnectOptions(getEndpoint().getConfiguration());
> if (client == null) {
> clientId = getEndpoint().getConfiguration().getClientId();
> if (clientId == null) {
> clientId = "camel-" + MqttClient.generateClientId();
> }
> stopClient = true;
> client = new MqttClient(
> getEndpoint().getConfiguration().getBrokerUrl(),
> clientId,
>
> PahoEndpoint.createMqttClientPersistence(getEndpoint().getConfiguration()));
> LOG.debug("Connecting client: {} to broker: {}", clientId,
> getEndpoint().getConfiguration().getBrokerUrl());
> client.connect(connectOptions);
> }
>
> // other code omitted for brevity
>
> client.subscribe(getEndpoint().getTopic(),
> getEndpoint().getConfiguration().getQos());{code}
>
> after that *doStop* is invoked but the *client* instance is not nullified,
> because it is not connected
>
> {code:java}
> @Override
> protected void doStop() throws Exception {
> super.doStop(); if (stopClient && client != null &&
> client.isConnected()) {
> String topic = getEndpoint().getTopic();
> // only unsubscribe if we are not durable
> if (getEndpoint().getConfiguration().isCleanSession()) {
> LOG.debug("Unsubscribing client: {} from topic: {}",
> clientId, topic);
> client.unsubscribe(topic);
> } else {
> LOG.debug("Client: {} is durable so will not unsubscribe from
> topic: {}", clientId, topic);
> }
> LOG.debug("Disconnecting client: {} from broker: {}", clientId,
> getEndpoint().getConfiguration().getBrokerUrl());
> client.disconnect();
> client = null;
> }
> }
> {code}
>
> when the supervisor tries to restart the route, *client* instance already
> exists, but the call to *client.subscribe* fails because the client is not
> connected.
> Perhaps always nullify *client* in *doStop* should resolve the issue; however
> I have no idea it this solution will impact in other ways.
> A better solution may be to handle automatic reconnect in the consumer, like
> *RabbitConsumer* does, for example.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)