Updated Branches: refs/heads/trunk b0b3a169c -> bc9751ac2
Adding an mqtt test that uses the eclipse paho client. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bc9751ac Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bc9751ac Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bc9751ac Branch: refs/heads/trunk Commit: bc9751ac234d6db138b5b4f4dec786952f0b23ff Parents: b0b3a16 Author: Hiram Chirino <[email protected]> Authored: Mon Nov 11 16:06:59 2013 -0500 Committer: Hiram Chirino <[email protected]> Committed: Mon Nov 11 16:06:59 2013 -0500 ---------------------------------------------------------------------- activemq-mqtt/pom.xml | 6 +- .../transport/mqtt/AbstractMQTTTest.java | 295 ------------------ .../activemq/transport/mqtt/MQTTTest.java | 302 ++++++++++++++++++- .../activemq/transport/mqtt/PahoMQTTTest.java | 59 ++++ pom.xml | 15 +- 5 files changed, 378 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/activemq-mqtt/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml index 60b1e39..a1205e2 100755 --- a/activemq-mqtt/pom.xml +++ b/activemq-mqtt/pom.xml @@ -134,7 +134,11 @@ <type>test-jar</type> <scope>test</scope> </dependency> - + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>mqtt-client</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java index ac6cfc5..b976056 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java @@ -77,300 +77,6 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport { super.tearDown(); } - - @Test(timeout=300000) - public void testSendAndReceiveMQTT() throws Exception { - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); - initializeConnection(subscriptionProvider); - - - subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE); - - final CountDownLatch latch = new CountDownLatch(numberOfMessages); - - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - for (int i = 0; i < numberOfMessages; i++){ - try { - byte[] payload = subscriptionProvider.receive(10000); - assertNotNull("Should get a message", payload); - latch.countDown(); - } catch (Exception e) { - e.printStackTrace(); - break; - } - - } - } - }); - thread.start(); - - final MQTTClientProvider publishProvider = getMQTTClientProvider(); - initializeConnection(publishProvider); - - for (int i = 0; i < numberOfMessages; i++){ - String payload = "Message " + i; - publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE); - } - - latch.await(10, TimeUnit.SECONDS); - assertEquals(0, latch.getCount()); - subscriptionProvider.disconnect(); - publishProvider.disconnect(); - } - - @Test(timeout=300000) - public void testUnsubscribeMQTT() throws Exception { - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); - initializeConnection(subscriptionProvider); - - String topic = "foo/bah"; - - subscriptionProvider.subscribe(topic,AT_MOST_ONCE); - - final CountDownLatch latch = new CountDownLatch(numberOfMessages/2); - - Thread thread = new Thread(new Runnable() { - @Override - public void run() { - for (int i = 0; i < numberOfMessages; i++){ - try { - byte[] payload = subscriptionProvider.receive(10000); - assertNotNull("Should get a message", payload); - latch.countDown(); - } catch (Exception e) { - e.printStackTrace(); - break; - } - - } - } - }); - thread.start(); - - final MQTTClientProvider publishProvider = getMQTTClientProvider(); - initializeConnection(publishProvider); - - for (int i = 0; i < numberOfMessages; i++){ - String payload = "Message " + i; - if (i == numberOfMessages/2){ - subscriptionProvider.unsubscribe(topic); - } - publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE); - } - - latch.await(10, TimeUnit.SECONDS); - assertEquals(0, latch.getCount()); - subscriptionProvider.disconnect(); - publishProvider.disconnect(); - } - - @Test(timeout=300000) - public void testSendAtMostOnceReceiveExactlyOnce() throws Exception { - /** - * Although subscribing with EXACTLY ONCE, the message gets published - * with AT_MOST_ONCE - in MQTT the QoS is always determined by the message - * as published - not the wish of the subscriber - */ - addMQTTConnector(); - brokerService.start(); - - final MQTTClientProvider provider = getMQTTClientProvider(); - initializeConnection(provider); - provider.subscribe("foo",EXACTLY_ONCE); - for (int i = 0; i < numberOfMessages; i++) { - String payload = "Test Message: " + i; - provider.publish("foo", payload.getBytes(), AT_MOST_ONCE); - byte[] message = provider.receive(5000); - assertNotNull("Should get a message", message); - assertEquals(payload, new String(message)); - } - provider.disconnect(); - } - - @Test(timeout=300000) - public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); - - final MQTTClientProvider provider = getMQTTClientProvider(); - initializeConnection(provider); - provider.subscribe("foo",EXACTLY_ONCE); - for (int i = 0; i < numberOfMessages; i++) { - String payload = "Test Message: " + i; - provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); - byte[] message = provider.receive(5000); - assertNotNull("Should get a message", message); - assertEquals(payload, new String(message)); - } - provider.disconnect(); - } - - @Test(timeout=300000) - public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); - - final MQTTClientProvider provider = getMQTTClientProvider(); - initializeConnection(provider); - provider.subscribe("foo",AT_MOST_ONCE); - for (int i = 0; i < numberOfMessages; i++) { - String payload = "Test Message: " + i; - provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); - byte[] message = provider.receive(5000); - assertNotNull("Should get a message", message); - assertEquals(payload, new String(message)); - } - provider.disconnect(); - } - - @Test(timeout=300000) - public void testSendAndReceiveAtMostOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); - - final MQTTClientProvider provider = getMQTTClientProvider(); - initializeConnection(provider); - provider.subscribe("foo",AT_MOST_ONCE); - for (int i = 0; i < numberOfMessages; i++) { - String payload = "Test Message: " + i; - provider.publish("foo", payload.getBytes(), AT_MOST_ONCE); - byte[] message = provider.receive(5000); - assertNotNull("Should get a message", message); - assertEquals(payload, new String(message)); - } - provider.disconnect(); - } - - @Test(timeout=300000) - public void testSendAndReceiveAtLeastOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); - - final MQTTClientProvider provider = getMQTTClientProvider(); - initializeConnection(provider); - provider.subscribe("foo",AT_LEAST_ONCE); - for (int i = 0; i < numberOfMessages; i++) { - String payload = "Test Message: " + i; - provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); - byte[] message = provider.receive(5000); - assertNotNull("Should get a message", message); - assertEquals(payload, new String(message)); - } - provider.disconnect(); - } - - @Test(timeout=300000) - public void testSendAndReceiveExactlyOnce() throws Exception { - addMQTTConnector(); - brokerService.start(); - final MQTTClientProvider publisher = getMQTTClientProvider(); - initializeConnection(publisher); - - final MQTTClientProvider subscriber = getMQTTClientProvider(); - initializeConnection(subscriber); - - subscriber.subscribe("foo",EXACTLY_ONCE); - for (int i = 0; i < numberOfMessages; i++) { - String payload = "Test Message: " + i; - publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE); - byte[] message = subscriber.receive(5000); - assertNotNull("Should get a message + ["+ i + "]", message); - assertEquals(payload, new String(message)); - } - subscriber.disconnect(); - publisher.disconnect(); - } - - @Test(timeout=300000) - public void testSendAndReceiveLargeMessages() throws Exception { - byte[] payload = new byte[1024 * 32]; - for (int i = 0; i < payload.length; i++){ - payload[i] = '2'; - } - addMQTTConnector(); - brokerService.start(); - - final MQTTClientProvider publisher = getMQTTClientProvider(); - initializeConnection(publisher); - - final MQTTClientProvider subscriber = getMQTTClientProvider(); - initializeConnection(subscriber); - - subscriber.subscribe("foo",AT_LEAST_ONCE); - for (int i = 0; i < 10; i++) { - publisher.publish("foo", payload, AT_LEAST_ONCE); - byte[] message = subscriber.receive(5000); - assertNotNull("Should get a message", message); - - assertArrayEquals(payload, message); - } - subscriber.disconnect(); - publisher.disconnect(); - } - - @Test(timeout=300000) - public void testSendMQTTReceiveJMS() throws Exception { - addMQTTConnector(); - TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); - - final MQTTClientProvider provider = getMQTTClientProvider(); - initializeConnection(provider); - final String DESTINATION_NAME = "foo.*"; - - ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); - activeMQConnection.start(); - Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME); - MessageConsumer consumer = s.createConsumer(jmsTopic); - - for (int i = 0; i < numberOfMessages; i++) { - String payload = "Test Message: " + i; - provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE); - ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000); - assertNotNull("Should get a message", message); - ByteSequence bs = message.getContent(); - assertEquals(payload, new String(bs.data, bs.offset, bs.length)); - } - - activeMQConnection.close(); - provider.disconnect(); - } - - @Test(timeout=300000) - public void testSendJMSReceiveMQTT() throws Exception { - addMQTTConnector(); - TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); - brokerService.start(); - final MQTTClientProvider provider = getMQTTClientProvider(); - initializeConnection(provider); - - ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); - activeMQConnection.start(); - Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Topic jmsTopic = s.createTopic("foo.far"); - MessageProducer producer = s.createProducer(jmsTopic); - - provider.subscribe("foo/+",AT_MOST_ONCE); - for (int i = 0; i < numberOfMessages; i++) { - String payload = "This is Test Message: " + i; - TextMessage sendMessage = s.createTextMessage(payload); - producer.send(sendMessage); - byte[] message = provider.receive(5000); - assertNotNull("Should get a message", message); - - assertEquals(payload, new String(message)); - } - provider.disconnect(); - activeMQConnection.close(); - } - protected String getProtocolScheme() { return "mqtt"; } @@ -391,5 +97,4 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport { provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort()); } - protected abstract MQTTClientProvider getMQTTClientProvider(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 221abf3..61934b6 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -16,9 +16,14 @@ */ package org.apache.activemq.transport.mqtt; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.Wait; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; @@ -33,11 +38,306 @@ import org.slf4j.LoggerFactory; import javax.jms.*; +import static org.junit.Assert.assertArrayEquals; + public class MQTTTest extends AbstractMQTTTest { private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class); @Test(timeout=300000) + public void testSendAndReceiveMQTT() throws Exception { + addMQTTConnector(); + brokerService.start(); + final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); + initializeConnection(subscriptionProvider); + + + subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE); + + final CountDownLatch latch = new CountDownLatch(numberOfMessages); + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < numberOfMessages; i++){ + try { + byte[] payload = subscriptionProvider.receive(10000); + assertNotNull("Should get a message", payload); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + + } + } + }); + thread.start(); + + final MQTTClientProvider publishProvider = getMQTTClientProvider(); + initializeConnection(publishProvider); + + for (int i = 0; i < numberOfMessages; i++){ + String payload = "Message " + i; + publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE); + } + + latch.await(10, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + subscriptionProvider.disconnect(); + publishProvider.disconnect(); + } + + @Test(timeout=300000) + public void testUnsubscribeMQTT() throws Exception { + addMQTTConnector(); + brokerService.start(); + final MQTTClientProvider subscriptionProvider = getMQTTClientProvider(); + initializeConnection(subscriptionProvider); + + String topic = "foo/bah"; + + subscriptionProvider.subscribe(topic,AT_MOST_ONCE); + + final CountDownLatch latch = new CountDownLatch(numberOfMessages/2); + + Thread thread = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < numberOfMessages; i++){ + try { + byte[] payload = subscriptionProvider.receive(10000); + assertNotNull("Should get a message", payload); + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + + } + } + }); + thread.start(); + + final MQTTClientProvider publishProvider = getMQTTClientProvider(); + initializeConnection(publishProvider); + + for (int i = 0; i < numberOfMessages; i++){ + String payload = "Message " + i; + if (i == numberOfMessages/2){ + subscriptionProvider.unsubscribe(topic); + } + publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE); + } + + latch.await(10, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + subscriptionProvider.disconnect(); + publishProvider.disconnect(); + } + + @Test(timeout=300000) + public void testSendAtMostOnceReceiveExactlyOnce() throws Exception { + /** + * Although subscribing with EXACTLY ONCE, the message gets published + * with AT_MOST_ONCE - in MQTT the QoS is always determined by the message + * as published - not the wish of the subscriber + */ + addMQTTConnector(); + brokerService.start(); + + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + provider.subscribe("foo",EXACTLY_ONCE); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + provider.publish("foo", payload.getBytes(), AT_MOST_ONCE); + byte[] message = provider.receive(5000); + assertNotNull("Should get a message", message); + assertEquals(payload, new String(message)); + } + provider.disconnect(); + } + + @Test(timeout=300000) + public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception { + addMQTTConnector(); + brokerService.start(); + + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + provider.subscribe("foo",EXACTLY_ONCE); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); + byte[] message = provider.receive(5000); + assertNotNull("Should get a message", message); + assertEquals(payload, new String(message)); + } + provider.disconnect(); + } + + @Test(timeout=300000) + public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception { + addMQTTConnector(); + brokerService.start(); + + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + provider.subscribe("foo",AT_MOST_ONCE); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); + byte[] message = provider.receive(5000); + assertNotNull("Should get a message", message); + assertEquals(payload, new String(message)); + } + provider.disconnect(); + } + + @Test(timeout=300000) + public void testSendAndReceiveAtMostOnce() throws Exception { + addMQTTConnector(); + brokerService.start(); + + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + provider.subscribe("foo",AT_MOST_ONCE); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + provider.publish("foo", payload.getBytes(), AT_MOST_ONCE); + byte[] message = provider.receive(5000); + assertNotNull("Should get a message", message); + assertEquals(payload, new String(message)); + } + provider.disconnect(); + } + + @Test(timeout=300000) + public void testSendAndReceiveAtLeastOnce() throws Exception { + addMQTTConnector(); + brokerService.start(); + + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + provider.subscribe("foo",AT_LEAST_ONCE); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE); + byte[] message = provider.receive(5000); + assertNotNull("Should get a message", message); + assertEquals(payload, new String(message)); + } + provider.disconnect(); + } + + @Test(timeout=300000) + public void testSendAndReceiveExactlyOnce() throws Exception { + addMQTTConnector(); + brokerService.start(); + final MQTTClientProvider publisher = getMQTTClientProvider(); + initializeConnection(publisher); + + final MQTTClientProvider subscriber = getMQTTClientProvider(); + initializeConnection(subscriber); + + subscriber.subscribe("foo",EXACTLY_ONCE); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE); + byte[] message = subscriber.receive(5000); + assertNotNull("Should get a message + ["+ i + "]", message); + assertEquals(payload, new String(message)); + } + subscriber.disconnect(); + publisher.disconnect(); + } + + @Test(timeout=300000) + public void testSendAndReceiveLargeMessages() throws Exception { + byte[] payload = new byte[1024 * 32]; + for (int i = 0; i < payload.length; i++){ + payload[i] = '2'; + } + addMQTTConnector(); + brokerService.start(); + + final MQTTClientProvider publisher = getMQTTClientProvider(); + initializeConnection(publisher); + + final MQTTClientProvider subscriber = getMQTTClientProvider(); + initializeConnection(subscriber); + + subscriber.subscribe("foo",AT_LEAST_ONCE); + for (int i = 0; i < 10; i++) { + publisher.publish("foo", payload, AT_LEAST_ONCE); + byte[] message = subscriber.receive(5000); + assertNotNull("Should get a message", message); + + assertArrayEquals(payload, message); + } + subscriber.disconnect(); + publisher.disconnect(); + } + + @Test(timeout=300000) + public void testSendMQTTReceiveJMS() throws Exception { + addMQTTConnector(); + TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + final String DESTINATION_NAME = "foo.*"; + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Topic jmsTopic = s.createTopic(DESTINATION_NAME); + MessageConsumer consumer = s.createConsumer(jmsTopic); + + for (int i = 0; i < numberOfMessages; i++) { + String payload = "Test Message: " + i; + provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE); + ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000); + assertNotNull("Should get a message", message); + ByteSequence bs = message.getContent(); + assertEquals(payload, new String(bs.data, bs.offset, bs.length)); + } + + activeMQConnection.close(); + provider.disconnect(); + } + + @Test(timeout=300000) + public void testSendJMSReceiveMQTT() throws Exception { + addMQTTConnector(); + TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + final MQTTClientProvider provider = getMQTTClientProvider(); + initializeConnection(provider); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Topic jmsTopic = s.createTopic("foo.far"); + MessageProducer producer = s.createProducer(jmsTopic); + + provider.subscribe("foo/+",AT_MOST_ONCE); + for (int i = 0; i < numberOfMessages; i++) { + String payload = "This is Test Message: " + i; + TextMessage sendMessage = s.createTextMessage(payload); + producer.send(sendMessage); + byte[] message = provider.receive(5000); + assertNotNull("Should get a message", message); + + assertEquals(payload, new String(message)); + } + provider.disconnect(); + activeMQConnection.close(); + } + + @Test(timeout=300000) public void testPingKeepsInactivityMonitorAlive() throws Exception { addMQTTConnector(); brokerService.start(); @@ -287,8 +587,6 @@ public class MQTTTest extends AbstractMQTTTest { return "mqtt"; } - - @Override protected MQTTClientProvider getMQTTClientProvider() { return new FuseMQQTTClientProvider(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java new file mode 100644 index 0000000..5251567 --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.TransportConnector; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +public class PahoMQTTTest extends AbstractMQTTTest { + + private static final Logger LOG = LoggerFactory.getLogger(PahoMQTTTest.class); + + + @Test(timeout=300000) + public void testSendAndReceiveMQTT() throws Exception { + addMQTTConnector(); + TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = s.createConsumer(s.createTopic("test")); + + MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid"); + client.connect(); + client.publish("test", "hello".getBytes(), 1, false); + + Message msg = consumer.receive(100 * 5); + assertNotNull(msg); + + client.disconnect(); + client.close(); + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/bc9751ac/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b4cc16d..bd095f5 100755 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,7 @@ <org-apache-derby-version>10.10.1.1</org-apache-derby-version> <org.osgi.core-version>4.3.1</org.osgi.core-version> <p2psockets-version>1.1.2</p2psockets-version> + <paho-version>0.4.0</paho-version> <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version> <zookeeper-version>3.4.5</zookeeper-version> <qpid-proton-version>0.5</qpid-proton-version> @@ -366,6 +367,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>mqtt-client</artifactId> + <version>${paho-version}</version> + </dependency> + <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-jaas</artifactId> <version>${project.version}</version> @@ -1591,12 +1597,19 @@ <releases><enabled>true</enabled></releases> <snapshots><enabled>false</enabled></snapshots> </repository> + <!-- for the paho dependency --> + <repository> + <id>eclipse.m2</id> + <url>https://repo.eclipse.org/content/groups/releases/</url> + <releases><enabled>true</enabled></releases> + <snapshots><enabled>false</enabled></snapshots> + </repository> <repository> <id>com.fusesource.m2.snapshot</id> <url>http://repo.fusesource.com/nexus/content/repositories/snapshots/</url> <releases><enabled>false</enabled></releases> <snapshots><enabled>true</enabled></snapshots> - </repository> + </repository> </repositories>
