[ 
https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13685840#comment-13685840
 ] 

Pedro Marques commented on AMQ-4585:
------------------------------------

Yes, well the code is not simple because I used two classes running 
simultaneously... The publisher always sending messages, the subscriber 
receiving 10 messages on each "run". The problem occurs when restarting the 
subscriber after the first run (leaving some time between runs in order to 
allow the publisher to publish some messages). Theoretically it would be simple 
to use only one class publish some messages, receive them on the subscriber, 
close the subscriber, publish more messages, and attempt to reconnect and 
receive those messages but I haven't tested this method. I should stress that 
the problem, although it fails frequently, doesn't occur every time, I had 
multiple "runs" of the subscriber that processed the pending messages correctly.
                
> MQTT BlockingConnection.receive fails when receiving pending messages after 
> reconnect without cleaning session
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4585
>                 URL: https://issues.apache.org/jira/browse/AMQ-4585
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Pedro Marques
>
> The system throws at least three different types of exceptions when a 
> subscriber receives the first pending message without cleaning the session. 
> The test case corresponds to receiving several messages from a publisher then 
> closing the subscriber connection and finally reconnecting with 
> setCleanSession(false) and attempt to read the messages published while the 
> subscriber was disconnected.
> The exceptions thrown:
> {code}
> java.net.ProtocolException: Command from server contained an invalid message 
> id: 1
>       at 
> org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
>       at 
> org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>       at 
> org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 0
>       at 
> org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
>       at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
>       at 
> org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>       at 
> org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.net.ProtocolException: Unexpected MQTT command type: 0
>       at 
> org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
>       at 
> org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>       at 
> org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> No message is shown in the server. The problem doesn't occur always but most 
> of the times the first reconnection attempt is made. With 
> setCleanSession(true) the system works fine.
> Code sample (publisher, permanently running):
> {code}
> MQTT mqtt = new MQTT();
> mqtt.setHost(url);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setClientId("test_id");
> int i = 0;
> while (true) {
>       BlockingConnection connection = mqtt.blockingConnection();
>       connection.connect();
>       String message = "TestMessage: " + i;
>       connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, 
> false);
>       System.out.println("Vendor: Sent message.");
>       Thread.sleep(2500);
>       connection.disconnect();
>       Thread.sleep(2500);
>       i++;
> }
> {code}
> Code sample (subscriber, fails multiple times when restarting after the 
> connection is closed):
> {code}
> BlockingConnection connection = null;
> try {
>     MQTT = new MQTT();
>     mqtt.setHost(url);
>     mqtt.setClientId(clientId);
>     mqtt.setUserName(user);
>     mqtt.setPassword(password);
>     mqtt.setCleanSession(false);
>     connection = mqtt.blockingConnection();
>     connection.connect();
>     Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
>     byte[] qoses = connection.subscribe(topics);
>     int numMessages = 1;
>     while (numMessages % 10 != 0) {
>         Message message = connection.receive();
>         byte[] payload = message.getPayload();
>         String messageContent = new String(payload);
>         System.out.println("Received message from topic: " + 
> message.getTopic() + " Message content: " + messageContent);
>         message.ack();
>         numMessages++;
>     }
> } finally {
>     if(connection != null) {
>         try {
>             connection.disconnect();
>         } catch (Exception e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 
> 5.9, on Mosquitto mqtt the code works correctly

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to