[ 
https://issues.apache.org/jira/browse/CAMEL-15748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220993#comment-17220993
 ] 

Marco Collovati commented on CAMEL-15748:
-----------------------------------------

I think the reason of the hang is this change in paho *CommsReceiver*, inside 
`run` main loop

{code:java}
// paho 1.2.4
if (message != null) {
        // A new message has arrived
        clientState.notifyReceivedMsg(message);
}  
else {
    // fix for bug 719
        if (!clientComms.isConnected()) {
                throw new IOException("Connection is lost.");
        }
}


// paho 1.2.5
if (message != null) {
        // A new message has arrived
        clientState.notifyReceivedMsg(message);
}  
else {
        // fix for bug 719
        if (!clientComms.isConnected() && !clientComms.isConnecting()) {
        throw new IOException("Connection is lost.");
        }
}
{code}

During connection attempt *clientComms* is in status connecting; when broker is 
down a null message is received and, in paho 1.2.4,an exception is immediately 
thrown because *clientComms* is not connected. In 1.2.5 however the exception 
is never throw because *clientComms* is in connecting state and this leads to 
an infinite loop.

The issue 719, mentioned in the comment, is also referred by other issues but 
none of this are about infinite loops.
I can try to do some additional research on paho issue tracker and to get in 
touch to understand if this is a bug or an expected behavior, but this may take 
some time because I can do it only on my spare time.


> Paho consumer never connects if the broker is not reachable at startup
> ----------------------------------------------------------------------
>
>                 Key: CAMEL-15748
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15748
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-paho
>    Affects Versions: 3.5.0, 3.4.4, 3.6.0
>            Reporter: Marco Collovati
>            Priority: Major
>             Fix For: 3.4.5, 3.7.0
>
>         Attachments: PahoConsumerRestartTest.java
>
>
> Having a route with a paho consumer in a route, if the broker is not 
> reachable at startup, camel context fail fast and shuts down.
>  This can be avoided by setting the *SupervisingRouteController*, but this 
> way, even if camel context does not fail, the consumer is never able to 
> establish a connection.
> The reason is that when *PahoConsumer* starts for the first time, it creates 
> a *MqttClient* and stores it into *client* field; the call to 
> *client.connect* throws an exception due to broker down;
>  
> {code:java}
>  
>     @Override
>     protected void doStart() throws Exception {
>         super.doStart();        connectOptions = 
> PahoEndpoint.createMqttConnectOptions(getEndpoint().getConfiguration());      
>   if (client == null) {
>             clientId = getEndpoint().getConfiguration().getClientId();
>             if (clientId == null) {
>                 clientId = "camel-" + MqttClient.generateClientId();
>             }
>             stopClient = true;
>             client = new MqttClient(
>                     getEndpoint().getConfiguration().getBrokerUrl(),
>                     clientId,
>                     
> PahoEndpoint.createMqttClientPersistence(getEndpoint().getConfiguration()));
>             LOG.debug("Connecting client: {} to broker: {}", clientId, 
> getEndpoint().getConfiguration().getBrokerUrl());
>             client.connect(connectOptions);
>         }
>         
>         // other code omitted for brevity
>         
>         client.subscribe(getEndpoint().getTopic(), 
> getEndpoint().getConfiguration().getQos());{code}
>  
> after that *doStop* is invoked but the *client* instance is not nullified, 
> because it is not connected
>  
> {code:java}
>     @Override
>     protected void doStop() throws Exception {
>         super.doStop();        if (stopClient && client != null && 
> client.isConnected()) {
>             String topic = getEndpoint().getTopic();
>             // only unsubscribe if we are not durable
>             if (getEndpoint().getConfiguration().isCleanSession()) {
>                 LOG.debug("Unsubscribing client: {} from topic: {}", 
> clientId, topic);
>                 client.unsubscribe(topic);
>             } else {
>                 LOG.debug("Client: {} is durable so will not unsubscribe from 
> topic: {}", clientId, topic);
>             }
>             LOG.debug("Disconnecting client: {} from broker: {}", clientId, 
> getEndpoint().getConfiguration().getBrokerUrl());
>             client.disconnect();
>             client = null;
>         }
>     }
> {code}
>  
> when the supervisor tries to restart the route, *client* instance already 
> exists, but the call to *client.subscribe* fails because the client is not 
> connected.
> Perhaps always nullify *client* in *doStop* should resolve the issue; however 
> I have no idea it this solution will impact in other ways.
>  A better solution may be to handle automatic reconnect in the consumer, like 
> *RabbitConsumer* does, for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to