Author: dejanb
Date: Wed Feb 18 16:24:56 2009
New Revision: 745558
URL: http://svn.apache.org/viewvc?rev=745558&view=rev
Log:
additional fix for https://issues.apache.org/activemq/browse/AMQ-2104 and
https://issues.apache.org/activemq/browse/AMQ-1509
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Wed Feb 18 16:24:56 2009
@@ -47,12 +47,12 @@
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info)
throws IOException {
-
if (addToAlreadyInterestedConsumers(info)) {
return null; // don't want this subscription added
}
//add our original id to ourselves
info.addNetworkConsumerId(info.getConsumerId());
+ info.setSelector(null);
return doCreateDemandSubscription(info);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Wed Feb 18 16:24:56 2009
@@ -39,7 +39,7 @@
localInfo = info.copy();
localInfo.setNetworkSubscription(true);
remoteSubsIds.add(info.getConsumerId());
- }
+ }
/**
* Increment the consumers associated with this subscription
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Wed Feb 18 16:24:56 2009
@@ -95,6 +95,7 @@
info.setSubscriptionName(getSubscriberName(info.getDestination()));
}
+ info.setSelector(null);
return doCreateDemandSubscription(info);
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
Wed Feb 18 16:24:56 2009
@@ -75,27 +75,27 @@
protected boolean verbose;
protected NetworkConnector bridgeBrokers(String localBrokerName, String
remoteBrokerName) throws Exception {
- return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1);
+ return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1,
true);
}
protected void bridgeBrokers(String localBrokerName, String
remoteBrokerName, boolean dynamicOnly) throws Exception {
BrokerService localBroker = brokers.get(localBrokerName).broker;
BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
- bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1);
+ bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1, true);
}
- protected NetworkConnector bridgeBrokers(String localBrokerName, String
remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
+ protected NetworkConnector bridgeBrokers(String localBrokerName, String
remoteBrokerName, boolean dynamicOnly, int networkTTL, boolean conduit) throws
Exception {
BrokerService localBroker = brokers.get(localBrokerName).broker;
BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
- return bridgeBrokers(localBroker, remoteBroker, dynamicOnly,
networkTTL);
+ return bridgeBrokers(localBroker, remoteBroker, dynamicOnly,
networkTTL, conduit);
}
// Overwrite this method to specify how you want to bridge the two brokers
// By default, bridge them using add network connector of the local broker
// and the first connector of the remote broker
- protected NetworkConnector bridgeBrokers(BrokerService localBroker,
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL) throws
Exception {
+ protected NetworkConnector bridgeBrokers(BrokerService localBroker,
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean
conduit) throws Exception {
List transportConnectors = remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {
@@ -103,6 +103,7 @@
NetworkConnector connector = new DiscoveryNetworkConnector(new
URI("static:" + remoteURI));
connector.setDynamicOnly(dynamicOnly);
connector.setNetworkTTL(networkTTL);
+ connector.setConduitSubscriptions(conduit);
localBroker.addNetworkConnector(connector);
maxSetupTime = 2000;
return connector;
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
Wed Feb 18 16:24:56 2009
@@ -136,7 +136,7 @@
@Override
- protected NetworkConnector bridgeBrokers(BrokerService localBroker,
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL) throws
Exception {
+ protected NetworkConnector bridgeBrokers(BrokerService localBroker,
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean
conduit) throws Exception {
List<TransportConnector> transportConnectors =
remoteBroker.getTransportConnectors();
URI remoteURI;
if (!transportConnectors.isEmpty()) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
Wed Feb 18 16:24:56 2009
@@ -94,8 +94,8 @@
*/
public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws
Exception {
// Setup broker networks
- bridgeBrokers("BrokerB", "BrokerA");
- bridgeBrokers("BrokerB", "BrokerC");
+ bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
+ bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
startAllBrokers();
@@ -135,8 +135,8 @@
*/
public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws
Exception {
// Setup broker networks
- bridgeBrokers("BrokerB", "BrokerA");
- bridgeBrokers("BrokerB", "BrokerC");
+ bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
+ bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
startAllBrokers();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
Wed Feb 18 16:24:56 2009
@@ -109,7 +109,7 @@
}
protected NetworkConnector bridgeBrokers(String localBrokerName, String
remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
- NetworkConnector connector = super.bridgeBrokers(localBrokerName,
remoteBrokerName, dynamicOnly, networkTTL);
+ NetworkConnector connector = super.bridgeBrokers(localBrokerName,
remoteBrokerName, dynamicOnly, networkTTL, true);
connector.setBridgeTempDestinations(enableTempDestinationBridging);
return connector;
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
Wed Feb 18 16:24:56 2009
@@ -17,11 +17,15 @@
package org.apache.activemq.usecases;
import java.net.URI;
+import java.util.HashMap;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
+import junit.framework.Test;
+
import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.transport.failover.FailoverUriTest;
import org.apache.activemq.util.MessageIdList;
/**
@@ -29,6 +33,7 @@
*/
public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport
{
protected static final int MESSAGE_COUNT = 100;
+ public boolean dynamicOnly;
/**
* BrokerA -> BrokerB -> BrokerC
@@ -68,6 +73,52 @@
assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
}
+
+ public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
+ addCombinationValues("dynamicOnly", new Object[] {true, false});
+ }
+
+ /**
+ * BrokerA -> BrokerB -> BrokerC
+ */
+ public void testABandBCbrokerNetworkWithSelectors() throws Exception {
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, 2, true);
+ bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, 2, true);
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerC", dest, "dummy =
33");
+ MessageConsumer clientB = createConsumer("BrokerC", dest, "dummy >
30");
+ MessageConsumer clientC = createConsumer("BrokerC", dest, "dummy =
34");
+
+ // let consumers propogate around the network
+ Thread.sleep(2000);
+ // Send messages
+ // Send messages for broker A
+ HashMap<String, Object> props = new HashMap<String, Object>();
+ props.put("dummy", 33);
+ sendMessages("BrokerA", dest, MESSAGE_COUNT, props);
+ props.put("dummy", 34);
+ sendMessages("BrokerA", dest, MESSAGE_COUNT * 2, props);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerC", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerC", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
+ msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+ msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2) ;
+
+ assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+ assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
+ assertEquals(MESSAGE_COUNT *2, msgsC.getMessageCount());
+ }
/**
* BrokerA <- BrokerB -> BrokerC
@@ -237,4 +288,8 @@
createBroker(new
URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
createBroker(new
URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
}
+
+ public static Test suite() {
+ return suite(ThreeBrokerTopicNetworkTest.class);
+ }
}