Author: chirino
Date: Sun May 19 17:20:30 2013
New Revision: 1484318
URL: http://svn.apache.org/r1484318
Log:
No need to send MQTT debugging to console anymore.
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLevelDBTest.java
Modified:
activemq/trunk/activemq-mqtt/pom.xml
Modified: activemq/trunk/activemq-mqtt/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/pom.xml?rev=1484318&r1=1484317&r2=1484318&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/pom.xml (original)
+++ activemq/trunk/activemq-mqtt/pom.xml Sun May 19 17:20:30 2013
@@ -207,7 +207,7 @@
<argLine>${surefire.argLine}</argLine>
<runOrder>alphabetical</runOrder>
<!-- Note: if you want to see log messages on the console window
remove the following comment -->
- <redirectTestOutputToFile>false</redirectTestOutputToFile>
+ <!-- <redirectTestOutputToFile>false</redirectTestOutputToFile> -->
<systemProperties>
<property>
<name>org.apache.activemq.default.directory.prefix</name>
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLevelDBTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLevelDBTest.java?rev=1484318&view=auto
==============================================================================
---
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLevelDBTest.java
(added)
+++
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLevelDBTest.java
Sun May 19 17:20:30 2013
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.Enumeration;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueueBrowsingLevelDBTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(QueueBrowsingLevelDBTest.class);
+
+ private BrokerService broker;
+ private URI connectUri;
+ private ActiveMQConnectionFactory factory;
+
+
+ @Before
+ public void startBroker() throws Exception {
+ createBroker();
+ TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0");
+ broker.deleteAllMessages();
+ broker.start();
+ broker.waitUntilStarted();
+ connectUri = connector.getConnectUri();
+ factory = new ActiveMQConnectionFactory(connectUri);
+ }
+
+ private void createBroker() {
+ broker = new BrokerService();
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ @Test
+ public void testBrowsing() throws JMSException {
+
+ int messageToSend = 370;
+
+ ActiveMQQueue queue = new ActiveMQQueue("TEST");
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+
+ String data = "";
+ for( int i=0; i < 1024*2; i++ ) {
+ data += "x";
+ }
+
+ for( int i=0; i < messageToSend; i++ ) {
+ producer.send(session.createTextMessage(data));
+ }
+
+ QueueBrowser browser = session.createBrowser(queue);
+ Enumeration<?> enumeration = browser.getEnumeration();
+ int received = 0;
+ while (enumeration.hasMoreElements()) {
+ Message m = (Message) enumeration.nextElement();
+ received++;
+ LOG.info("Browsed message " + received + ": " +
m.getJMSMessageID());
+ }
+
+ browser.close();
+
+ assertEquals(messageToSend, received);
+ }
+
+ @Test
+ public void testBrowseConcurrent() throws Exception {
+ final int messageToSend = 370;
+
+ final ActiveMQQueue queue = new ActiveMQQueue("TEST");
+ Connection connection = factory.createConnection();
+ connection.start();
+ final Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ String data = "";
+ for( int i=0; i < 1024*2; i++ ) {
+ data += "x";
+ }
+
+ for( int i=0; i < messageToSend; i++ ) {
+ producer.send(session.createTextMessage(data));
+ }
+
+ Thread browserThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ QueueBrowser browser = session.createBrowser(queue);
+ Enumeration<?> enumeration = browser.getEnumeration();
+ int received = 0;
+ while (enumeration.hasMoreElements()) {
+ Message m = (Message) enumeration.nextElement();
+ received++;
+ LOG.info("Browsed message " + received + ": " +
m.getJMSMessageID());
+ }
+ assertEquals("Browsed all messages", messageToSend,
received);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ browserThread.start();
+
+ Thread consumerThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ MessageConsumer consumer = session.createConsumer(queue);
+ int received = 0;
+ while (true) {
+ Message m = consumer.receive(1000);
+ if (m == null)
+ break;
+ received++;
+ }
+ assertEquals("Consumed all messages", messageToSend,
received);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ consumerThread.start();
+
+ browserThread.join();
+ consumerThread.join();
+ }
+
+ @Test
+ public void testMemoryLimit() throws Exception {
+ broker.getSystemUsage().getMemoryUsage().setLimit(10 * 1024);
+
+ int messageToSend = 370;
+
+ ActiveMQQueue queue = new ActiveMQQueue("TEST");
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+
+ String data = "";
+ for( int i=0; i < 1024*2; i++ ) {
+ data += "x";
+ }
+
+ for( int i=0; i < messageToSend; i++ ) {
+ producer.send(session.createTextMessage(data));
+ }
+
+ QueueBrowser browser = session.createBrowser(queue);
+ Enumeration<?> enumeration = browser.getEnumeration();
+ int received = 0;
+ while (enumeration.hasMoreElements()) {
+ Message m = (Message) enumeration.nextElement();
+ received++;
+ LOG.info("Browsed message " + received + ": " +
m.getJMSMessageID());
+ }
+
+ browser.close();
+ assertEquals(3, received);
+ }
+}