Repository: activemq Updated Branches: refs/heads/master 15405af2e -> ab434ee77
Refactor test which has some flawed assumptions about the incoming messages based on subscriptions that it makes. Adds better thread safety as well. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ab434ee7 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ab434ee7 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ab434ee7 Branch: refs/heads/master Commit: ab434ee776bda7afeb6c1ae26f66559940afbf6d Parents: 15405af Author: Timothy Bish <[email protected]> Authored: Fri May 13 18:30:44 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri May 13 18:30:44 2016 -0400 ---------------------------------------------------------------------- .../activemq/transport/mqtt/PahoMQTTTest.java | 223 ++++++++----------- 1 file changed, 99 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ab434ee7/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index c105b5f..343c0c8 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -16,13 +16,14 @@ */ package org.apache.activemq.transport.mqtt; - import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +42,7 @@ import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +55,7 @@ public class PahoMQTTTest extends MQTTTestSupport { return s.createConsumer(s.createTopic(topic)); } - @Test(timeout = 300000) + @Test(timeout = 90000) public void testLotsOfClients() throws Exception { final int CLIENTS = Integer.getInteger("PahoMQTTTest.CLIENTS", 100); @@ -88,7 +90,7 @@ public class PahoMQTTTest extends MQTTTestSupport { sendBarrier.await(); for (int i = 0; i < 10; i++) { Thread.sleep(1000); - client.publish("test", "hello".getBytes(), 1, false); + client.publish("test", "hello".getBytes(StandardCharsets.UTF_8), 1, false); } client.disconnect(); client.close(); @@ -122,7 +124,7 @@ public class PahoMQTTTest extends MQTTTestSupport { assertNull("Async error: " + asyncError.get(), asyncError.get()); } - @Test(timeout = 300000) + @Test(timeout = 90000) public void testSendAndReceiveMQTT() throws Exception { ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection(); @@ -132,15 +134,16 @@ public class PahoMQTTTest extends MQTTTestSupport { MqttClient client = new MqttClient("tcp://localhost:" + getPort(), "clientid", new MemoryPersistence()); client.connect(); - client.publish("test", "hello".getBytes(), 1, false); + client.publish("test", "hello".getBytes(StandardCharsets.UTF_8), 1, false); Message msg = consumer.receive(100 * 5); assertNotNull(msg); client.disconnect(); + client.close(); } - @Test(timeout = 300000) + @Test(timeout = 90000) public void testSubs() throws Exception { final DefaultListener listener = new DefaultListener(); @@ -155,29 +158,22 @@ public class PahoMQTTTest extends MQTTTestSupport { assertTrue(client.getPendingDeliveryTokens().length == 0); String expectedResult = "should get everything"; - client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(), 0, false); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return listener.result != null; - } - }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)); - + client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); + // One delivery for topic ACCOUNT_PREFIX + "#" + String result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS); assertTrue(client.getPendingDeliveryTokens().length == 0); - assertEquals(expectedResult, listener.result); + assertEquals(expectedResult, result); expectedResult = "should get everything"; - listener.result = null; - client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(), 0, false); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return listener.result != null; - } - }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)); - assertEquals(expectedResult, listener.result); + client.publish(ACCOUNT_PREFIX + "a/1/2", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); + + // One delivery for topic ACCOUNT_PREFIX + "a/1/2" + result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(expectedResult, result); + // One delivery for topic ACCOUNT_PREFIX + "#" + result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); client.unsubscribe(ACCOUNT_PREFIX + "a/+/#"); @@ -185,19 +181,27 @@ public class PahoMQTTTest extends MQTTTestSupport { assertTrue(client.getPendingDeliveryTokens().length == 0); expectedResult = "should still get 1/2/3"; - listener.result = null; - client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false); - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return listener.result != null; - } - }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200)); - assertEquals(expectedResult, listener.result); + client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); + + // One delivery for topic ACCOUNT_PREFIX + "1/2/3" + result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); + + client.disconnect(); + client.close(); } + @Ignore @Test(timeout = 300000) + public void testOverlappingTopicsLooped() throws Exception { + for (int i = 0; i < 100; ++i) { + LOG.info("Running test iteration: {}", i); + testOverlappingTopics(); + } + } + + @Test(timeout = 90000) public void testOverlappingTopics() throws Exception { final DefaultListener listener = new DefaultListener(); @@ -212,26 +216,16 @@ public class PahoMQTTTest extends MQTTTestSupport { client.subscribe(ACCOUNT_PREFIX + "#"); assertTrue(client.getPendingDeliveryTokens().length == 0); String expectedResult = "hello mqtt broker on hash"; - client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(), 0, false); - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return listener.result != null; - } - }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200))); - assertEquals(expectedResult, listener.result); + client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); + + String result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); expectedResult = "hello mqtt broker on a different topic"; - listener.result = null; - client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(), 0, false); - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return listener.result != null; - } - }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200))); - assertEquals(expectedResult, listener.result); + client.publish(ACCOUNT_PREFIX + "1/2/3/4/5/6", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); + result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); // ***************************************** @@ -242,27 +236,22 @@ public class PahoMQTTTest extends MQTTTestSupport { assertTrue(client.getPendingDeliveryTokens().length == 0); expectedResult = "hello mqtt broker on explicit topic"; - listener.result = null; - client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(), 0, false); - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return listener.result != null; - } - }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200))); - assertEquals(expectedResult, listener.result); + client.publish(ACCOUNT_PREFIX + "1/2/3", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); + + // One message from topic subscription on ACCOUNT_PREFIX + "#" + result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(expectedResult, result); + + // One message from topic subscription on ACCOUNT_PREFIX + "1/2/3" + result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(expectedResult, result); + assertTrue(client.getPendingDeliveryTokens().length == 0); expectedResult = "hello mqtt broker on some other topic"; - listener.result = null; - client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(), 0, false); - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return listener.result != null; - } - }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200))); - assertEquals(expectedResult, listener.result); + client.publish(ACCOUNT_PREFIX + "a/b/c/d/e", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); + result = listener.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(expectedResult, result); assertTrue(client.getPendingDeliveryTokens().length == 0); // ***************************************** @@ -272,31 +261,22 @@ public class PahoMQTTTest extends MQTTTestSupport { assertTrue(client.getPendingDeliveryTokens().length == 0); expectedResult = "this should not come back..."; - listener.result = null; - client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(), 0, false); - assertFalse(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return listener.result != null; - } - }, TimeUnit.SECONDS.toMillis(5))); - assertNull(listener.result); + client.publish(ACCOUNT_PREFIX + "1/2/3/4", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); + result = listener.messageQ.poll(3, TimeUnit.SECONDS); + assertNull(result); assertTrue(client.getPendingDeliveryTokens().length == 0); expectedResult = "this should not come back either..."; - listener.result = null; - client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(), 0, false); - assertFalse(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return listener.result != null; - } - }, TimeUnit.SECONDS.toMillis(5))); - assertNull(listener.result); + client.publish(ACCOUNT_PREFIX + "a/b/c", expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); + result = listener.messageQ.poll(3, TimeUnit.SECONDS); + assertNull(result); assertTrue(client.getPendingDeliveryTokens().length == 0); + + client.disconnect(); + client.close(); } - @Test(timeout = 300000) + @Test(timeout = 90000) public void testCleanSession() throws Exception { String topic = "test"; final DefaultListener listener = new DefaultListener(); @@ -316,13 +296,13 @@ public class PahoMQTTTest extends MQTTTestSupport { LOG.info("Publish message with QoS 1..."); String expectedResult = "QOS 1 message"; - client2.publish(topic, expectedResult.getBytes(), 1, false); + client2.publish(topic, expectedResult.getBytes(StandardCharsets.UTF_8), 1, false); waitForDelivery(client2); // Publish message with QoS 0 LOG.info("Publish message with QoS 0..."); expectedResult = "QOS 0 message"; - client2.publish(topic, expectedResult.getBytes(), 0, false); + client2.publish(topic, expectedResult.getBytes(StandardCharsets.UTF_8), 0, false); waitForDelivery(client2); // subscriber reconnects @@ -335,26 +315,31 @@ public class PahoMQTTTest extends MQTTTestSupport { assertTrue(Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - return listener.received == 2; + return listener.received.get() == 2; } }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(100))); - assertEquals(2, listener.received); + assertEquals(2, listener.received.get()); disconnect(client3); LOG.info("Disconnected durable subscriber."); // make sure we consumed everything - listener.received = 0; + assertTrue(listener.received.compareAndSet(2, 0)); LOG.info("Reconnecting durable subscriber..."); MqttClient client4 = createClient(false, "receive", listener); LOG.info("Subscribing durable subscriber..."); client4.subscribe(topic, 1); - Thread.sleep(3 * 1000); - assertEquals(0, listener.received); + TimeUnit.SECONDS.sleep(3); + assertEquals(0, listener.received.get()); + + client2.disconnect(); + client2.close(); + client4.disconnect(); + client4.close(); } - @Test(timeout = 300000) + @Test(timeout = 90000) public void testClientIdSpecialChars() throws Exception { testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1); testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1_1); @@ -374,28 +359,18 @@ public class PahoMQTTTest extends MQTTTestSupport { client1.subscribe(topic, 1); String message = "Message from client: " + clientId; - client1.publish(topic, message.getBytes(), 1, false); + client1.publish(topic, message.getBytes(StandardCharsets.UTF_8), 1, false); + String result = client1MqttCallback.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(message, result); + assertEquals(1, client1MqttCallback.received.get()); - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return client1MqttCallback.result != null; - } - }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200))); - assertEquals(message, client1MqttCallback.result); - assertEquals(1, client1MqttCallback.received); - - assertTrue(Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return clientAdminMqttCallback.result != null; - } - }, TimeUnit.SECONDS.toMillis(45), TimeUnit.MILLISECONDS.toMillis(200))); - assertEquals(message, clientAdminMqttCallback.result); + result = clientAdminMqttCallback.messageQ.poll(45, TimeUnit.MILLISECONDS); + assertEquals(message, result); assertTrue(client1.isConnected()); client1.disconnect(); + client1.close(); } protected void testClientIdSpecialChars(int mqttVersion) throws Exception { @@ -414,8 +389,10 @@ public class PahoMQTTTest extends MQTTTestSupport { testClientId("Consumer:id:AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback); testClientId("Consumer:qid:EXACTLY_ONCE:VirtualTopic", mqttVersion, clientAdminMqttCallback); testClientId("Consumertestmin:testst:AT_LEAST_ONCE.VirtualTopic::AT_LEAST_ONCE", mqttVersion, clientAdminMqttCallback); - } + clientAdmin.disconnect(); + clientAdmin.close(); + } protected MqttClient createClient(boolean cleanSession, String clientId, MqttCallback listener) throws Exception { MqttConnectOptions options = new MqttConnectOptions(); @@ -453,30 +430,28 @@ public class PahoMQTTTest extends MQTTTestSupport { public boolean isSatisified() throws Exception { return client.getPendingDeliveryTokens().length == 0; } - }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(200)); + }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100)); assertTrue(client.getPendingDeliveryTokens().length == 0); } static class DefaultListener implements MqttCallback { - int received = 0; - volatile String result; + final AtomicInteger received = new AtomicInteger(); + final BlockingQueue<String> messageQ = new ArrayBlockingQueue<String>(10); @Override public void connectionLost(Throwable cause) { - } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { - LOG.debug("Received: " + message); - received++; - result = new String(message.getPayload()); + LOG.info("Received: {}", message); + received.incrementAndGet(); + messageQ.put(new String(message.getPayload(), StandardCharsets.UTF_8)); } @Override public void deliveryComplete(IMqttDeliveryToken token) { - } } } \ No newline at end of file
