[ 
https://issues.apache.org/jira/browse/AMQ-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13681640#comment-13681640
 ] 

Timothy Bish commented on AMQ-4576:
-----------------------------------

This will probably need a bit of thought.  The broker is doing what it was 
designed to do here, allowing only one durable subscriber with the same client 
Id and subscription name.  In the case of the MQTT code as it currently stands 
when the subscribe occurs the code uses the client Id as the subscription name 
as well. 
                
> MQTT BlockingConnection.receive fails when subscribing multiple topics
> ----------------------------------------------------------------------
>
>                 Key: AMQ-4576
>                 URL: https://issues.apache.org/jira/browse/AMQ-4576
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Pedro Marques
>
> When more than one topic is supplied to BlockingConnection.subscribe the 
> BlockingConnection.receive fails and the following exception is thrown:
> {code}
> java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
>       at 
> org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>       at 
> org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> On the server shows the following messages:
> {code}
> 2013-06-06 15:06:00,125 WARN  
> [org.apache.activemq.transport.mqtt.MQTTProtocolConverter] (ActiveMQ 
> BrokerService[localhost] Task-1) Exception occurred processing: 
> null: javax.jms.JMSException: Durable consumer is in use for client: 
> 6056@3232261834SOC and subscriptionName: 6056@3232261834SOC
> 2013-06-06 15:06:00,130 WARN  
> [org.apache.activemq.broker.TransportConnection] (ActiveMQ Transport: 
> tcp:///127.0.0.1:53389@1883) Failed to add Connection 
> ID:LTD-SFW004-53303-1370527418664-2:14, reason: 
> javax.jms.InvalidClientIDException: Broker: localhost - Client: 
> 6056@3232261834SOC already connected from tcp://127.0.0.1:53388
> 2013-06-06 15:06:00,130 WARN  
> [org.apache.activemq.broker.TransportConnection.Transport] (ActiveMQ 
> Transport: tcp:///127.0.0.1:53389@1883) Transport Connection to: 
> tcp://127.0.0.1:53389 failed: java.io.IOException: Broker: localhost - 
> Client: 6056@3232261834SOC already connected from tcp://127.0.0.1:53388
> 2013-06-06 15:06:00,130 ERROR 
> [pt.intellicare.onecare.mqtt.OneCareFuseMqttClient] 
> (DefaultQuartzScheduler_Worker-8) Problem receiving mqtt messages: 
> java.io.IOException: Could not connect: CONNECTION_REFUSED_SERVER_UNAVAILABLE
>       at 
> org.fusesource.mqtt.client.CallbackConnection$LoginHandler$1.onTransportCommand(CallbackConnection.java:331)
>  [:1.5-SNAPSHOT]
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>  [:1.17]
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>  [:1.17]
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>  [:1.17]
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>  [:1.17]
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
>  [:1.17]
> {code}
> Code example:
> {code}
> MQTT = new MQTT();
> mqtt.setHost(url);
> mqtt.setClientId(clientId);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setCleanSession(false);
> BlockingConnection connection = mqtt.blockingConnection();
> connection.connect();
> Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE), new Topic("TopicB", 
> QoS.EXACTLY_ONCE)};
> byte[] qoses = connection.subscribe(topics);
> while (true) {
>     Message message = connection.receive();
>     byte[] payload = message.getPayload();
>     String messageContent = new String(payload);
>     System.out.println("Received message from topic: " + message.getTopic() + 
> " Message content: " + messageContent);
>     message.ack();
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 
> 5.9, on Mosquitto mqtt the code works correctly.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to