Fixed AMQ-5160, polished MQTT tests
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0a39782b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0a39782b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0a39782b Branch: refs/heads/trunk Commit: 0a39782bf5d95fc0ae6d54a7fa1469b230621358 Parents: 88c6ee9 Author: Dhiraj Bokde <[email protected]> Authored: Tue May 13 00:30:11 2014 -0700 Committer: Dejan Bosanac <[email protected]> Committed: Mon May 26 11:07:19 2014 +0200 ---------------------------------------------------------------------- .../activemq/transport/mqtt/MQTTTest.java | 31 ++++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0a39782b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index e11f6e9..3c0701e 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -379,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest { connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true); connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) }); - Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); + Message msg = connection.receive(5, TimeUnit.SECONDS); assertNotNull("No message for " + topic, msg); assertEquals(RETAINED + topic, new String(msg.getPayload())); msg.ack(); @@ -406,7 +406,7 @@ public class MQTTTest extends AbstractMQTTTest { assertNotEquals("Subscribe failed " + wildcard, (byte)0x80, qos[0]); // test retained messages - Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); + Message msg = connection.receive(5, TimeUnit.SECONDS); do { assertNotNull("RETAINED null " + wildcard, msg); assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED)); @@ -459,7 +459,7 @@ public class MQTTTest extends AbstractMQTTTest { final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true); - connection.subscribe(new Topic[] { new Topic(topic, QoS.valueOf(topic)) }); + connection.subscribe(new Topic[]{new Topic(topic, QoS.valueOf(topic))}); final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(msg); @@ -472,7 +472,7 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals(i, actualQoS[0]); msg.ack(); - connection.unsubscribe(new String[] { topic }); + connection.unsubscribe(new String[]{topic}); connection.disconnect(); } @@ -1341,10 +1341,9 @@ public class MQTTTest extends AbstractMQTTTest { BlockingConnection connectionSub = mqttSub.blockingConnection(); connectionSub.connect(); connectionSub.subscribe(topics); - connectionSub.unsubscribe(new String[] { "TopicA" }); connectionSub.disconnect(); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 5; i++) { String payload = "Message " + i; connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false); } @@ -1353,14 +1352,28 @@ public class MQTTTest extends AbstractMQTTTest { connectionSub.connect(); int received = 0; - for (int i = 0; i < 10; ++i) { + for (int i = 0; i < 5; ++i) { Message message = connectionSub.receive(5, TimeUnit.SECONDS); - assertNotNull(message); + assertNotNull("Missing message " + i, message); LOG.info("Message is " + new String(message.getPayload())); received++; message.ack(); } - assertEquals(10, received); + assertEquals(5, received); + + // unsubscribe from topic + connectionSub.unsubscribe(new String[]{"TopicA"}); + + // send more messages + for (int i = 0; i < 5; i++) { + String payload = "Message " + i; + connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false); + } + + // these should not be received + connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + assertNull(connectionSub.receive(5, TimeUnit.SECONDS)); connectionSub.disconnect(); connectionPub.disconnect();
