Author: dejanb
Date: Fri Nov 5 16:27:58 2010
New Revision: 1031656
URL: http://svn.apache.org/viewvc?rev=1031656&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-3020 - intial tests
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
activemq/trunk/activemq-core/src/test/resources/log4j.properties
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Fri Nov 5 16:27:58 2010
@@ -216,6 +216,18 @@ public class JmsMultipleBrokersTestSuppo
return null;
}
+ protected MessageConsumer createSyncConsumer(String brokerName,
Destination dest) throws Exception {
+ BrokerItem brokerItem = brokers.get(brokerName);
+ if (brokerItem != null) {
+ Connection con = brokerItem.createConnection();
+ con.start();
+ Session sess = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = sess.createConsumer(dest);
+ return consumer;
+ }
+ return null;
+ }
+
protected MessageConsumer createConsumer(String brokerName, Destination
dest) throws Exception {
return createConsumer(brokerName, dest, null, null);
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrowseOverNetworkTest.java
Fri Nov 5 16:27:58 2010
@@ -17,14 +17,19 @@
package org.apache.activemq.usecases;
import java.net.URI;
+import java.util.Arrays;
import java.util.Enumeration;
import javax.jms.Destination;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.QueueBrowser;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.region.QueueSubscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.MessageIdList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -66,7 +71,7 @@ public class BrowseOverNetworkTest exten
+ msgsB.getMessageCount());
}
- public void testconsumerInfo() throws Exception {
+ public void testConsumerInfo() throws Exception {
createBroker(new
ClassPathResource("org/apache/activemq/usecases/browse-broker1.xml"));
createBroker(new
ClassPathResource("org/apache/activemq/usecases/browse-broker2.xml"));
@@ -74,7 +79,7 @@ public class BrowseOverNetworkTest exten
brokers.get("broker1").broker.waitUntilStarted();
-
+
Destination dest = createDestination("QUEUE.A,QUEUE.B", false);
@@ -86,17 +91,138 @@ public class BrowseOverNetworkTest exten
}
- protected int browseMessages(String broker, Destination dest) throws
Exception {
- QueueBrowser browser = createBrowser(broker, dest);
+ public class Browser extends Thread {
+
+ String broker;
+ Destination dest;
+ int totalCount;
+ QueueBrowser browser = null;
+ MessageConsumer consumer = null;
+ boolean consume = false;
+
+ public Browser(String broker, Destination dest) {
+ this.broker = broker;
+ this.dest = dest;
+ }
+
+ public void run() {
+ int retries = 0;
+ while (retries++ < 5) {
+ try {
+ QueueBrowser browser = createBrowser(broker, dest);
+ int count = browseMessages(browser, broker);
+ LOG.info("browser '" + broker + "' browsed " + totalCount);
+ if (consume) {
+ if (count != 0) {
+ MessageConsumer consumer =
createSyncConsumer(broker, dest);
+ totalCount += count;
+ for (int i = 0; i < count; i++) {
+ ActiveMQTextMessage message =
(ActiveMQTextMessage)consumer.receive(1000);
+ LOG.info(broker + " consumer: " +
message.getText() + " " + message.getDestination() + " " +
message.getMessageId() + " " + Arrays.toString(message.getBrokerPath()));
+ if (message == null) break;
+ }
+ }
+ } else {
+ totalCount = count;
+ }
+
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ LOG.info("Exception browsing " + e, e);
+ } finally {
+ try {
+ if (browser != null) {
+ browser.close();
+ }
+ if (consumer != null) {
+ consumer.close();
+ }
+ } catch (Exception e) {
+ LOG.info("Exception closing browser " + e, e);
+ }
+ }
+ }
+ }
+
+ public int getTotalCount() {
+ return totalCount;
+ }
+ }
+
+ protected NetworkConnector bridgeBrokersWithIncludedDestination(String
localBrokerName, String remoteBrokerName, ActiveMQDestination included,
ActiveMQDestination excluded) throws Exception {
+ NetworkConnector nc = bridgeBrokers(localBrokerName, remoteBrokerName,
false, 4, true);
+ nc.addStaticallyIncludedDestination(included);
+ if (excluded != null) {
+ nc.addExcludedDestination(excluded);
+ }
+ nc.setPrefetchSize(1);
+ return nc;
+ }
+
+
+ public void testMultipleBrowsers() throws Exception {
+ createBroker(new
URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false&brokerId=BrokerA"));
+ createBroker(new
URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false&brokerId=BrokerB"));
+ createBroker(new
URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false&brokerId=BrokerC"));
+ createBroker(new
URI("broker:(tcp://localhost:61619)/BrokerD?persistent=false&useJmx=false&brokerId=BrokerD"));
+
+ Destination composite = createDestination("TEST.FOO,TEST.BAR", false);
+ Destination dest1 = createDestination("TEST.FOO", false);
+ Destination dest2 = createDestination("TEST.BAR", false);
+
+ bridgeBrokersWithIncludedDestination("BrokerA", "BrokerC",
(ActiveMQDestination)composite, null);
+ bridgeBrokersWithIncludedDestination("BrokerA", "BrokerB",
(ActiveMQDestination)composite, null);
+ bridgeBrokersWithIncludedDestination("BrokerA", "BrokerD",
(ActiveMQDestination)composite, null);
+ bridgeBrokersWithIncludedDestination("BrokerB", "BrokerA",
(ActiveMQDestination)composite, null);
+ bridgeBrokersWithIncludedDestination("BrokerB", "BrokerC",
(ActiveMQDestination)composite, null);
+ bridgeBrokersWithIncludedDestination("BrokerB", "BrokerD",
(ActiveMQDestination)composite, null);
+ bridgeBrokersWithIncludedDestination("BrokerC", "BrokerA",
(ActiveMQDestination)dest2, (ActiveMQDestination)dest1);
+ bridgeBrokersWithIncludedDestination("BrokerC", "BrokerB",
(ActiveMQDestination)dest2, (ActiveMQDestination)dest1);
+ bridgeBrokersWithIncludedDestination("BrokerC", "BrokerD",
(ActiveMQDestination)dest2, (ActiveMQDestination)dest1);
+ bridgeBrokersWithIncludedDestination("BrokerD", "BrokerA",
(ActiveMQDestination)dest1, (ActiveMQDestination)dest2);
+ bridgeBrokersWithIncludedDestination("BrokerD", "BrokerB",
(ActiveMQDestination)dest1, (ActiveMQDestination)dest2);
+ bridgeBrokersWithIncludedDestination("BrokerD", "BrokerC",
(ActiveMQDestination)dest1, (ActiveMQDestination)dest2);
+
+ startAllBrokers();
+
+ brokers.get("BrokerA").broker.waitUntilStarted();
+ brokers.get("BrokerC").broker.waitUntilStarted();
+ brokers.get("BrokerD").broker.waitUntilStarted();
+
+ Browser browser1 = new Browser("BrokerC", composite);
+ browser1.start();
+
+ Browser browser2 = new Browser("BrokerD", composite);
+ browser2.start();
+
+ sendMessages("BrokerA", composite, MESSAGE_COUNT);
+
+ browser1.join();
+ browser2.join();
+
+ assertEquals(MESSAGE_COUNT * 2, browser1.getTotalCount() +
browser2.getTotalCount() );
+
+ }
+
+ protected int browseMessages(QueueBrowser browser, String name) throws
Exception {
Enumeration msgs = browser.getEnumeration();
int browsedMessage = 0;
while (msgs.hasMoreElements()) {
browsedMessage++;
- msgs.nextElement();
+ ActiveMQTextMessage message =
(ActiveMQTextMessage)msgs.nextElement();
+ LOG.info(name + " browsed: " + message.getText() + " " +
message.getDestination() + " " + message.getMessageId() + " " +
Arrays.toString(message.getBrokerPath()));
}
return browsedMessage;
}
+
+ protected int browseMessages(String broker, Destination dest) throws
Exception {
+ QueueBrowser browser = createBrowser(broker, dest);
+ int browsedMessage = browseMessages(browser, "browser");
+ browser.close();
+ return browsedMessage;
+ }
+
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
Modified: activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ activemq/trunk/activemq-core/src/test/resources/log4j.properties Fri Nov 5
16:27:58 2010
@@ -21,6 +21,7 @@
log4j.rootLogger=INFO, out, stdout
log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
+#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
#log4j.logger.org.apache.activemq=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
Modified:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
(original)
+++
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker1.xml
Fri Nov 5 16:27:58 2010
@@ -25,7 +25,7 @@
<!-- Broker1 ?useQueueForAccept=false -->
<amq:broker brokerName="broker1" id="broker1" useJmx="true"
- persistent="true" start="false"
advisorySupport="true">
+ persistent="true" start="false"
advisorySupport="true" deleteAllMessagesOnStartup="true">
<amq:destinationInterceptors>
Modified:
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml?rev=1031656&r1=1031655&r2=1031656&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
(original)
+++
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/usecases/browse-broker2.xml
Fri Nov 5 16:27:58 2010
@@ -26,7 +26,7 @@
<!-- Broker2 (lonb) -->
<amq:broker brokerName="broker2" id="broker2" useJmx="true"
- persistent="true" start="false"
advisorySupport="true">
+ persistent="true" start="false"
advisorySupport="true" deleteAllMessagesOnStartup="true">
<!-- Network connectors -->
<amq:networkConnectors>