Pedro Marques created AMQ-4392:
----------------------------------

             Summary: MQTT BlockingConnection disconnect doesn't disconnects 
the client connection
                 Key: AMQ-4392
                 URL: https://issues.apache.org/jira/browse/AMQ-4392
             Project: ActiveMQ
          Issue Type: Bug
    Affects Versions: 5.8.0
            Reporter: Pedro Marques


The disconnect method doesn't work (especially when a client id is supplied).

If the connection object is reused, the client throws the following exception:
java.lang.IllegalStateException: Already connected
        at 
org.fusesource.mqtt.client.CallbackConnection.connect(CallbackConnection.java:109)
        at 
org.fusesource.mqtt.client.FutureConnection$2.run(FutureConnection.java:94)
        at 
org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:96)
        at 
org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)

If the connection object is not reused and the client id is, the client throws 
the following exception:
java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
        at 
org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:313)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:660)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
        at 
org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:226)
        at 
org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:96)
        at 
org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)

and, the server logs the following messages:

2013-03-20 11:36:04,893 WARN  [org.apache.activemq.broker.TransportConnection] 
(ActiveMQ Transport: tcp:///192.168.205.2:59401@1883) Failed to add Connection 
ID:TestServer-58505-1363685123521-2:34, reason: 
javax.jms.InvalidClientIDException: Broker: localhost - Client: test_id already 
connected from tcp://192.168.205.2:59398
2013-03-20 11:36:04,893 WARN  
[org.apache.activemq.broker.TransportConnection.Transport] (ActiveMQ Transport: 
tcp:///192.168.205.2:59401@1883) Transport Connection to: 
tcp://192.168.205.2:59401 failed: java.io.IOException: Broker: localhost - 
Client: teste_prm already connected from tcp://192.168.205.2:59398
2013-03-20 11:37:59,867 WARN  
[org.apache.activemq.broker.TransportConnection.Transport] 
(MQTTInactivityMonitor Async Task: 
java.util.concurrent.ThreadPoolExecutor$Worker@c7892e[State = 0, empty queue]) 
Transport Connection to: tcp://192.168.205.2:59398 failed: 
org.apache.activemq.transport.InactivityIOException: Channel was inactive for 
too (>45000) long: tcp://192.168.205.2:59398

Code example (reusing connection):

MQTT mqtt = new MQTT();
mqtt.setHost(url);
mqtt.setUserName(user);
mqtt.setPassword(password);
BlockingConnection connection = mqtt.blockingConnection();

int i = 0;
while (true) {
        connection.connect();           
        String message = "TestMessage: " + i;
        connection.publish("VendorOrderTopic", message.getBytes(), 
QoS.AT_LEAST_ONCE, false);
        System.out.println("Vendor: Sent message.");

        Thread.sleep(2500);
        connection.disconnect();
        Thread.sleep(2500);
        i++;
}

Code example (multiple connections):

MQTT mqtt = new MQTT();
mqtt.setHost(url);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setClientId("test_id");
        
int i = 0;
while (true) {
        BlockingConnection connection = mqtt.blockingConnection();
        connection.connect();           
        String message = "TestMessage: " + i;
        connection.publish("VendorOrderTopic", message.getBytes(), 
QoS.AT_LEAST_ONCE, false);
        System.out.println("Vendor: Sent message.");

        Thread.sleep(2500);
        connection.disconnect();
        Thread.sleep(2500);
        i++;
}

This problem also occurs when using the eclipse paho client API.



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to