Repository: activemq Updated Branches: refs/heads/trunk d9d9d5b66 -> 74d2c2425
https://issues.apache.org/jira/browse/AMQ-5390 Adds a test case to show that things work as expected. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/74d2c242 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/74d2c242 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/74d2c242 Branch: refs/heads/trunk Commit: 74d2c2425fbcbdf38f910f3c321d89780ba2ab6c Parents: d9d9d5b Author: Timothy Bish <[email protected]> Authored: Tue Oct 21 17:59:32 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue Oct 21 17:59:32 2014 -0400 ---------------------------------------------------------------------- .../activemq/transport/mqtt/MQTTTest.java | 51 ++++++++++++++++++++ 1 file changed, 51 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/74d2c242/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 32f8167..c9f106d 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -1408,6 +1408,57 @@ public class MQTTTest extends MQTTTestSupport { assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages", (messagesPerRun * (numberOfRuns + 1)), received); } + @Test(timeout = 60 * 1000) + public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception { + stopBroker(); + this.persistent = true; + startBroker(); + + final byte[] payload = new byte[1024 * 32]; + for (int i = 0; i < payload.length; i++) { + payload[i] = '2'; + } + + int messagesPerRun = 10; + + Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) }; + + { + // Establish a durable subscription. + MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false); + BlockingConnection connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + connectionSub.subscribe(topics); + connectionSub.disconnect(); + } + + MQTT mqttPubLoop = createMQTTConnection("MQTT-Pub-Client", true); + BlockingConnection connectionPub = mqttPubLoop.blockingConnection(); + connectionPub.connect(); + + for (int i = 0; i < messagesPerRun; ++i) { + connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE, false); + } + + connectionPub.disconnect(); + + stopBroker(); + startBroker(); + + MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false); + BlockingConnection connectionSub = mqttSub.blockingConnection(); + connectionSub.connect(); + connectionSub.subscribe(topics); + + for (int i = 0; i < messagesPerRun; ++i) { + Message message = connectionSub.receive(5, TimeUnit.SECONDS); + assertNotNull(message); + assertTrue(Arrays.equals(payload, message.getPayload())); + message.ack(); + } + connectionSub.disconnect(); + } + @Test(timeout = 30 * 1000) public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception { stopBroker();
