Updated Branches: refs/heads/trunk 497fbfc04 -> 61fadba43
Adding Paho over NIO tests. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/61fadba4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/61fadba4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/61fadba4 Branch: refs/heads/trunk Commit: 61fadba43a0af97f99c58f25e33899ad3a7fd2df Parents: 497fbfc Author: Hiram Chirino <[email protected]> Authored: Tue Nov 12 09:55:28 2013 -0500 Committer: Hiram Chirino <[email protected]> Committed: Tue Nov 12 09:55:37 2013 -0500 ---------------------------------------------------------------------- .../transport/mqtt/AbstractMQTTTest.java | 27 +++++ .../transport/mqtt/PahoMQTNioTTest.java | 113 +++++++++++++++++++ .../activemq/transport/mqtt/PahoMQTTTest.java | 3 +- 3 files changed, 142 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/61fadba4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java index b976056..13a0e75 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/AbstractMQTTTest.java @@ -97,4 +97,31 @@ public abstract class AbstractMQTTTest extends AutoFailTestSupport { provider.connect("tcp://localhost:"+mqttConnector.getConnectUri().getPort()); } + protected static interface Task { + public void run() throws Exception; + } + + protected void within(int time, TimeUnit unit, Task task) throws InterruptedException { + long timeMS = unit.toMillis(time); + long deadline = System.currentTimeMillis() + timeMS; + while (true) { + try { + task.run(); + return; + } catch (Throwable e) { + long remaining = deadline - System.currentTimeMillis(); + if( remaining <=0 ) { + if( e instanceof RuntimeException ) { + throw (RuntimeException)e; + } + if( e instanceof Error ) { + throw (Error)e; + } + throw new RuntimeException(e); + } + Thread.sleep(Math.min(timeMS/10, remaining)); + } + } + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/61fadba4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java new file mode 100644 index 0000000..b657c0a --- /dev/null +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTNioTTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.mqtt; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.TransportConnector; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class PahoMQTNioTTest extends PahoMQTTTest { + + private static final Logger LOG = LoggerFactory.getLogger(PahoMQTNioTTest.class); + + @Override + protected String getProtocolScheme() { + return "mqtt+nio"; + } + + @Test(timeout=300000) + public void testLotsOfClients() throws Exception { + + final int CLIENTS = 100; + addMQTTConnector(); + TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + + ActiveMQConnection activeMQConnection = (ActiveMQConnection) new ActiveMQConnectionFactory(openwireTransport.getConnectUri()).createConnection(); + activeMQConnection.start(); + Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = s.createConsumer(s.createTopic("test")); + + final AtomicInteger receiveCounter = new AtomicInteger(); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + receiveCounter.incrementAndGet(); + } + }); + + final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(); + final CountDownLatch connectedDoneLatch = new CountDownLatch(CLIENTS); + final CountDownLatch disconnectDoneLatch = new CountDownLatch(CLIENTS); + final CountDownLatch sendBarrier = new CountDownLatch(1); + for( int i=0; i < CLIENTS; i++ ) { + Thread.sleep(10); + new Thread(null, null, "client:"+i) { + @Override + public void run() { + try { + MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), Thread.currentThread().getName(), new MemoryPersistence()); + client.connect(); + connectedDoneLatch.countDown(); + sendBarrier.await(); + client.publish("test", "hello".getBytes(), 1, false); + client.disconnect(); + client.close(); + } catch (Throwable e) { + e.printStackTrace(); + asyncError.set(e); + } finally { + disconnectDoneLatch.countDown(); + } + } + }.start(); + } + + connectedDoneLatch.await(); + assertNull("Async error: "+asyncError.get(),asyncError.get()); + System.out.println("All clients connected..."); + sendBarrier.countDown(); + + // We should eventually get all the messages. + within(30, TimeUnit.SECONDS, new Task() { + @Override + public void run() throws Exception { + assertTrue(receiveCounter.get() == CLIENTS); + } + }); + + disconnectDoneLatch.await(); + assertNull("Async error: "+asyncError.get(),asyncError.get()); + + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/61fadba4/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java index 5251567..a8dfd03 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java @@ -20,6 +20,7 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.TransportConnector; import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,7 @@ public class PahoMQTTTest extends AbstractMQTTTest { Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = s.createConsumer(s.createTopic("test")); - MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid"); + MqttClient client = new MqttClient("tcp://localhost:" + mqttConnector.getConnectUri().getPort(), "clientid", new MemoryPersistence()); client.connect(); client.publish("test", "hello".getBytes(), 1, false);
