[ 
https://issues.apache.org/jira/browse/AMQ-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pedro Marques updated AMQ-4576:
-------------------------------

    Description: 
When more than one topic is supplied to BlockingConnection.subscribe the 
BlockingConnection.receive fails and the following exception is thrown:
{code}
java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
        at 
org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
        at 
org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
        at 
org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
        at 
org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
        at 
org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
On the server shows the following messages:
{code}
2013-06-06 15:06:00,125 WARN  
[org.apache.activemq.transport.mqtt.MQTTProtocolConverter] (ActiveMQ 
BrokerService[localhost] Task-1) Exception occurred processing: 
null: javax.jms.JMSException: Durable consumer is in use for client: 
6056@3232261834SOC and subscriptionName: 6056@3232261834SOC
2013-06-06 15:06:00,130 WARN  [org.apache.activemq.broker.TransportConnection] 
(ActiveMQ Transport: tcp:///127.0.0.1:53389@1883) Failed to add Connection 
ID:LTD-SFW004-53303-1370527418664-2:14, reason: 
javax.jms.InvalidClientIDException: Broker: localhost - Client: 
6056@3232261834SOC already connected from tcp://127.0.0.1:53388
2013-06-06 15:06:00,130 WARN  
[org.apache.activemq.broker.TransportConnection.Transport] (ActiveMQ Transport: 
tcp:///127.0.0.1:53389@1883) Transport Connection to: tcp://127.0.0.1:53389 
failed: java.io.IOException: Broker: localhost - Client: 6056@3232261834SOC 
already connected from tcp://127.0.0.1:53388
2013-06-06 15:06:00,130 ERROR 
[pt.intellicare.onecare.mqtt.OneCareFuseMqttClient] 
(DefaultQuartzScheduler_Worker-8) Problem receiving mqtt messages: 
java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
        at 
org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
 [:1.5-SNAPSHOT]
        at 
org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
 [:1.17]
        at 
org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) 
[:1.17]
        at 
org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
 [:1.17]
        at 
org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
 [:1.17]
        at 
org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
 [:1.17]
{code}
Code example:
{code}
MQTT = new MQTT();
mqtt.setHost(url);
mqtt.setClientId(clientId);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setCleanSession(false);

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", 
QoS.EXACTLY_ONCE)};
byte[] qoses = connection.subscribe(topics);
while (true) {
    Message message = connection.receive();
    byte[] payload = message.getPayload();
    String messageContent = new String(payload);
    System.out.println("Received message from topic: " + message.getTopic() + " 
Message content: " + messageContent);
    message.ack();
}
{code}
The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, 
on Mosquitto mqtt the code works correctly.


  was:
When more than one topic is supplied to BlockingConnection.subscribe the 
BlockingConnection.receive fails and the following exception is thrown:
{code}
java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
        at 
org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
        at 
org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
        at 
org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
        at 
org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
        at 
org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
Code example:
{code}
MQTT = new MQTT();
mqtt.setHost(url);
mqtt.setClientId(clientId);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setCleanSession(false);

BlockingConnection connection = mqtt.blockingConnection();
connection.connect();
Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", 
QoS.EXACTLY_ONCE)};
byte[] qoses = connection.subscribe(topics);
while (true) {
    Message message = connection.receive();
    byte[] payload = message.getPayload();
    String messageContent = new String(payload);
    System.out.println("Received message from topic: " + message.getTopic() + " 
Message content: " + messageContent);
    message.ack();
}
{code}
The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, 
on Mosquitto mqtt the code works correctly.


    
> MQTT BlockingConnection.receive fails when subscribing multiple topics
> ----------------------------------------------------------------------
>
>                 Key: AMQ-4576
>                 URL: https://issues.apache.org/jira/browse/AMQ-4576
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.9.0
>            Reporter: Pedro Marques
>
> When more than one topic is supplied to BlockingConnection.subscribe the 
> BlockingConnection.receive fails and the following exception is thrown:
> {code}
> java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
>       at 
> org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>       at 
> org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> On the server shows the following messages:
> {code}
> 2013-06-06 15:06:00,125 WARN  
> [org.apache.activemq.transport.mqtt.MQTTProtocolConverter] (ActiveMQ 
> BrokerService[localhost] Task-1) Exception occurred processing: 
> null: javax.jms.JMSException: Durable consumer is in use for client: 
> 6056@3232261834SOC and subscriptionName: 6056@3232261834SOC
> 2013-06-06 15:06:00,130 WARN  
> [org.apache.activemq.broker.TransportConnection] (ActiveMQ Transport: 
> tcp:///127.0.0.1:53389@1883) Failed to add Connection 
> ID:LTD-SFW004-53303-1370527418664-2:14, reason: 
> javax.jms.InvalidClientIDException: Broker: localhost - Client: 
> 6056@3232261834SOC already connected from tcp://127.0.0.1:53388
> 2013-06-06 15:06:00,130 WARN  
> [org.apache.activemq.broker.TransportConnection.Transport] (ActiveMQ 
> Transport: tcp:///127.0.0.1:53389@1883) Transport Connection to: 
> tcp://127.0.0.1:53389 failed: java.io.IOException: Broker: localhost - 
> Client: 6056@3232261834SOC already connected from tcp://127.0.0.1:53388
> 2013-06-06 15:06:00,130 ERROR 
> [pt.intellicare.onecare.mqtt.OneCareFuseMqttClient] 
> (DefaultQuartzScheduler_Worker-8) Problem receiving mqtt messages: 
> java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
>       at 
> org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
>  [:1.5-SNAPSHOT]
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>  [:1.17]
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>  [:1.17]
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>  [:1.17]
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>  [:1.17]
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
>  [:1.17]
> {code}
> Code example:
> {code}
> MQTT = new MQTT();
> mqtt.setHost(url);
> mqtt.setClientId(clientId);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setCleanSession(false);
> BlockingConnection connection = mqtt.blockingConnection();
> connection.connect();
> Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", 
> QoS.EXACTLY_ONCE)};
> byte[] qoses = connection.subscribe(topics);
> while (true) {
>     Message message = connection.receive();
>     byte[] payload = message.getPayload();
>     String messageContent = new String(payload);
>     System.out.println("Received message from topic: " + message.getTopic() + 
> " Message content: " + messageContent);
>     message.ack();
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 
> 5.9, on Mosquitto mqtt the code works correctly.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to