[ https://issues.apache.org/jira/browse/AMQ-5358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
AR updated AMQ-5358: -------------------- Attachment: MqttDurableSubTest.java JUnit test case for MQTT Durable subscription test using MQTT Paho client library. > MQTT Durable subscription broken in 5.10 and 5.11 > ------------------------------------------------- > > Key: AMQ-5358 > URL: https://issues.apache.org/jira/browse/AMQ-5358 > Project: ActiveMQ > Issue Type: Bug > Components: MQTT > Affects Versions: 5.10.0, 5.11.0 > Environment: Mac OS X, MQTT Paho client library version 0.4.0 > Reporter: AR > Attachments: MqttDurableSubTest.java > > > Durable subscriptions do not work in 5.10 and 5.11-SNAPSHOT. > Test case: > Run default broker. > . Connect "client1" with clean_session=true. > . Subscribe to topic "paho/test" > . Disconnect client1 > . Connect "client1" with clean_session=false > . Subscribe to topic "paho/test" > . Disconnect "client1" > . Connect "client2" > . Publish message "hello world" to topic "paho/test" > . Disconnect "client2" > . Connect "client1" with clean_session=false > . Subscribe to topic "paho/test" > . Verify that the message is received. > Here is the Junit test code: > ---------------------------------- > package com.mytests.activemqtests; > import java.util.Date; > import java.util.concurrent.CountDownLatch; > import java.util.concurrent.TimeUnit; > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; > import org.eclipse.paho.client.mqttv3.MqttCallback; > import org.eclipse.paho.client.mqttv3.MqttClient; > import org.eclipse.paho.client.mqttv3.MqttConnectOptions; > import org.eclipse.paho.client.mqttv3.MqttException; > import org.eclipse.paho.client.mqttv3.MqttMessage; > import org.junit.Test; > import junit.framework.TestCase; > public class MqttDurableSubTest extends TestCase implements MqttCallback { > final static String BROKER1_URL = "tcp://localhost:1883"; > > final static int MQTT_QOS_ATMOSTONCE = 0; > final static int MQTT_QOS_ATLEASTONCE = 1; > final static int MQTT_QOS_EXACTLYONCE = 2; > > private boolean mMessageReceived = false; > private CountDownLatch mLatch; > public MqttDurableSubTest(String name) { > super(name); > } > protected void setUp() throws Exception { > super.setUp(); > } > protected void tearDown() throws Exception { > super.tearDown(); > } > @Test > public void testDuplicateMsg() throws Exception { > MqttConnectOptions client1ConnOpt = new MqttConnectOptions(); > MqttClient client1 = new MqttClient(BROKER1_URL, "client1"); > > client1ConnOpt.setCleanSession(true); > client1.connect(); > System.out.println("Client1 Connected to " + BROKER1_URL); > client1.subscribe("paho/test", MQTT_QOS_ATMOSTONCE); > Thread.sleep(2000); > client1.disconnect(); > System.out.println("Client1 completed cleansession=1 > subscription."); > > > client1ConnOpt.setCleanSession(false); > client1.setCallback(this); > client1.connect(client1ConnOpt); > System.out.println("Client1 Connected to " + BROKER1_URL); > client1.subscribe("paho/test", MQTT_QOS_ATMOSTONCE); > Thread.sleep(2000); > client1.disconnect(); > System.out.println("Client1 completed durable subscription."); > > MqttConnectOptions client2ConnOpt = new MqttConnectOptions(); > client1ConnOpt.setCleanSession(true); > MqttClient client2 = new MqttClient(BROKER1_URL, "client2"); > client2.setCallback(this); > client2.connect(client2ConnOpt); > client2.publish("paho/test", "hello world".getBytes(), > MQTT_QOS_ATMOSTONCE, false); > System.out.println("Client2 published"); > client2.disconnect(); > > > client1ConnOpt.setCleanSession(false); > client1.setCallback(this); > client1.connect(client1ConnOpt); > System.out.println("Client1 Connected to " + BROKER1_URL); > client1.subscribe("paho/test", MQTT_QOS_ATMOSTONCE); > > mLatch = new CountDownLatch(1); > mLatch.await(10, TimeUnit.SECONDS); > assertEquals("Success!", true, mMessageReceived); > System.out.println("Done"); > } > @Override > public void connectionLost(Throwable arg0) { > // TODO Auto-generated method stub > } > @Override > public void deliveryComplete(IMqttDeliveryToken arg0) { > // TODO Auto-generated method stub > } > @Override > public void messageArrived(String topic, MqttMessage message) throws > Exception { > System.out.println("MQTT - messageArrived "+topic+" , Message: > ["+message+"] , ReceviedTime: ["+ new Date() +"] , QoS: > ["+message.getQos()+"] , isDup: ["+message.isDuplicate()+"]" ); > if (message.toString().equals("hello world")) { > mMessageReceived = true; > mLatch.countDown(); > } > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)