Author: dejanb
Date: Wed Feb 18 16:24:56 2009
New Revision: 745558

URL: http://svn.apache.org/viewvc?rev=745558&view=rev
Log:
additional fix for https://issues.apache.org/activemq/browse/AMQ-2104 and 
https://issues.apache.org/activemq/browse/AMQ-1509

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
 Wed Feb 18 16:24:56 2009
@@ -47,12 +47,12 @@
     }
 
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) 
throws IOException {
-
         if (addToAlreadyInterestedConsumers(info)) {
             return null; // don't want this subscription added
         }
         //add our original id to ourselves
         info.addNetworkConsumerId(info.getConsumerId());
+        info.setSelector(null);
         return doCreateDemandSubscription(info);
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
 Wed Feb 18 16:24:56 2009
@@ -39,7 +39,7 @@
         localInfo = info.copy();
         localInfo.setNetworkSubscription(true);
         remoteSubsIds.add(info.getConsumerId());    
-     }
+    }
 
     /**
      * Increment the consumers associated with this subscription

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
 Wed Feb 18 16:24:56 2009
@@ -95,6 +95,7 @@
 
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
         }
+        info.setSelector(null);
         return doCreateDemandSubscription(info);
     }
 

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
 Wed Feb 18 16:24:56 2009
@@ -75,27 +75,27 @@
     protected boolean verbose;
 
     protected NetworkConnector bridgeBrokers(String localBrokerName, String 
remoteBrokerName) throws Exception {
-        return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1);
+        return bridgeBrokers(localBrokerName, remoteBrokerName, false, 1, 
true);
     }
 
     protected void bridgeBrokers(String localBrokerName, String 
remoteBrokerName, boolean dynamicOnly) throws Exception {
         BrokerService localBroker = brokers.get(localBrokerName).broker;
         BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
 
-        bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1);
+        bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 1, true);
     }
 
-    protected NetworkConnector bridgeBrokers(String localBrokerName, String 
remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String 
remoteBrokerName, boolean dynamicOnly, int networkTTL, boolean conduit) throws 
Exception {
         BrokerService localBroker = brokers.get(localBrokerName).broker;
         BrokerService remoteBroker = brokers.get(remoteBrokerName).broker;
 
-        return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 
networkTTL);
+        return bridgeBrokers(localBroker, remoteBroker, dynamicOnly, 
networkTTL, conduit);
     }
 
     // Overwrite this method to specify how you want to bridge the two brokers
     // By default, bridge them using add network connector of the local broker
     // and the first connector of the remote broker
-    protected NetworkConnector bridgeBrokers(BrokerService localBroker, 
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL) throws 
Exception {
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, 
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean 
conduit) throws Exception {
         List transportConnectors = remoteBroker.getTransportConnectors();
         URI remoteURI;
         if (!transportConnectors.isEmpty()) {
@@ -103,6 +103,7 @@
             NetworkConnector connector = new DiscoveryNetworkConnector(new 
URI("static:" + remoteURI));
             connector.setDynamicOnly(dynamicOnly);
             connector.setNetworkTTL(networkTTL);
+            connector.setConduitSubscriptions(conduit);
             localBroker.addNetworkConnector(connector);
             maxSetupTime = 2000;
             return connector;

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/BrokerQueueNetworkWithDisconnectTest.java
 Wed Feb 18 16:24:56 2009
@@ -136,7 +136,7 @@
 
 
     @Override
-    protected NetworkConnector bridgeBrokers(BrokerService localBroker, 
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL) throws 
Exception {
+    protected NetworkConnector bridgeBrokers(BrokerService localBroker, 
BrokerService remoteBroker, boolean dynamicOnly, int networkTTL, boolean 
conduit) throws Exception {
         List<TransportConnector> transportConnectors = 
remoteBroker.getTransportConnectors();
         URI remoteURI;
         if (!transportConnectors.isEmpty()) {

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerQueueNetworkTest.java
 Wed Feb 18 16:24:56 2009
@@ -94,8 +94,8 @@
      */
     public void testBAandBCbrokerNetworkWithSelectorsSendFirst() throws 
Exception {
        // Setup broker networks
-        bridgeBrokers("BrokerB", "BrokerA");
-        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
+        bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
 
         startAllBrokers();
 
@@ -135,8 +135,8 @@
      */
     public void testBAandBCbrokerNetworkWithSelectorsSubscribeFirst() throws 
Exception {
        // Setup broker networks
-        bridgeBrokers("BrokerB", "BrokerA");
-        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerB", "BrokerA", true, 1, false);
+        bridgeBrokers("BrokerB", "BrokerC", true, 1, false);
 
         startAllBrokers();
 

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTempQueueNetworkTest.java
 Wed Feb 18 16:24:56 2009
@@ -109,7 +109,7 @@
     }
 
     protected NetworkConnector bridgeBrokers(String localBrokerName, String 
remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
-        NetworkConnector connector = super.bridgeBrokers(localBrokerName, 
remoteBrokerName, dynamicOnly, networkTTL);
+        NetworkConnector connector = super.bridgeBrokers(localBrokerName, 
remoteBrokerName, dynamicOnly, networkTTL, true);
         connector.setBridgeTempDestinations(enableTempDestinationBridging);
         return connector;
     }

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java?rev=745558&r1=745557&r2=745558&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
 Wed Feb 18 16:24:56 2009
@@ -17,11 +17,15 @@
 package org.apache.activemq.usecases;
 
 import java.net.URI;
+import java.util.HashMap;
 
 import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 
+import junit.framework.Test;
+
 import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.transport.failover.FailoverUriTest;
 import org.apache.activemq.util.MessageIdList;
 
 /**
@@ -29,6 +33,7 @@
  */
 public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport 
{
     protected static final int MESSAGE_COUNT = 100;
+    public boolean dynamicOnly;
 
     /**
      * BrokerA -> BrokerB -> BrokerC
@@ -68,6 +73,52 @@
         assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
         assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
     }
+    
+    public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
+       addCombinationValues("dynamicOnly", new Object[] {true, false});
+    }
+    
+    /**
+     * BrokerA -> BrokerB -> BrokerC
+     */
+    public void testABandBCbrokerNetworkWithSelectors() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, 2, true);
+        bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, 2, true);
+
+        startAllBrokers();
+
+        // Setup destination
+        Destination dest = createDestination("TEST.FOO", true);
+
+        // Setup consumers
+        MessageConsumer clientA = createConsumer("BrokerC", dest, "dummy = 
33");
+        MessageConsumer clientB = createConsumer("BrokerC", dest, "dummy > 
30");
+        MessageConsumer clientC = createConsumer("BrokerC", dest, "dummy = 
34");
+
+        // let consumers propogate around the network
+        Thread.sleep(2000);
+        // Send messages
+        // Send messages for broker A
+        HashMap<String, Object> props = new HashMap<String, Object>();
+        props.put("dummy", 33);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT, props);
+        props.put("dummy", 34);
+        sendMessages("BrokerA", dest, MESSAGE_COUNT * 2, props);
+
+        // Get message count
+        MessageIdList msgsA = getConsumerMessages("BrokerC", clientA);
+        MessageIdList msgsB = getConsumerMessages("BrokerC", clientB);
+        MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+        msgsA.waitForMessagesToArrive(MESSAGE_COUNT);
+        msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 3);
+        msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 2) ;
+
+        assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
+        assertEquals(MESSAGE_COUNT * 3, msgsB.getMessageCount());
+        assertEquals(MESSAGE_COUNT *2, msgsC.getMessageCount());
+    }
 
     /**
      * BrokerA <- BrokerB -> BrokerC
@@ -237,4 +288,8 @@
         createBroker(new 
URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
         createBroker(new 
URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
     }
+    
+    public static Test suite() {
+       return suite(ThreeBrokerTopicNetworkTest.class);
+    }
 }


Reply via email to