Author: rajdavies
Date: Sun Mar 16 09:30:55 2008
New Revision: 637609

URL: http://svn.apache.org/viewvc?rev=637609&view=rev
Log:
Ensure we detect Connection splits for any type of network

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java?rev=637609&r1=637608&r2=637609&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
 Sun Mar 16 09:30:55 2008
@@ -44,39 +44,39 @@
     }
 
         
-    public Subscription addConsumer(ConnectionContext context, ConsumerInfo 
info) throws Exception{
+    public Subscription addConsumer(ConnectionContext context, ConsumerInfo 
info)
+            throws Exception {
         ActiveMQDestination dest = info.getDestination();
-        boolean validDestination = dest != null && !dest.isTemporary();
-        if (validDestination) {
-            synchronized (networkConsumerList) {
-                if (info.isNetworkSubscription()) {
-                    networkConsumerList.add(info);
-                } else {
-                    if(!networkConsumerList.isEmpty()) {
-                        List<ConsumerInfo> gcList = new 
ArrayList<ConsumerInfo>();
-                        for (ConsumerInfo nc : networkConsumerList) {
-                            if (!nc.isNetworkConsumersEmpty()) {
-                                for (ConsumerId id : 
nc.getNetworkConsumerIds()) {
-                                    if (id.equals(info.getConsumerId())) {
-                                        nc.removeNetworkConsumerId(id);
-                                        if (nc.isNetworkConsumersEmpty()) {
-                                            gcList.add(nc);
-                                        }
+
+        synchronized (networkConsumerList) {
+            if (info.isNetworkSubscription()) {
+                networkConsumerList.add(info);
+            } else {
+                if (!networkConsumerList.isEmpty()) {
+                    List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
+                    for (ConsumerInfo nc : networkConsumerList) {
+                        if (!nc.isNetworkConsumersEmpty()) {
+                            
+                            for (ConsumerId id : nc.getNetworkConsumerIds()) {
+                                
+                                if (id.equals(info.getConsumerId())) {
+                                    nc.removeNetworkConsumerId(id);
+                                    if (nc.isNetworkConsumersEmpty()) {
+                                        gcList.add(nc);
                                     }
                                 }
-                            } else {
-                                gcList.add(nc);
                             }
                         }
-                        for (ConsumerInfo nc : gcList) {
-                            networkConsumerList.remove(nc);
-                            super.removeConsumer(context, nc);
-                            LOG.warn("Removed stale network consumer " + nc);
-                        }
+                    }
+                    for (ConsumerInfo nc : gcList) {
+                        networkConsumerList.remove(nc);
+                        super.removeConsumer(context, nc);
+                        LOG.warn("Removed stale network consumer " + nc);
                     }
                 }
             }
         }
+
         return super.addConsumer(context, info);
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=637609&r1=637608&r2=637609&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
 Sun Mar 16 09:30:55 2008
@@ -51,6 +51,8 @@
         if (addToAlreadyInterestedConsumers(info)) {
             return null; // don't want this subscription added
         }
+        //add our original id to ourselves
+        info.addNetworkConsumerId(info.getConsumerId());
         return doCreateDemandSubscription(info);
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=637609&r1=637608&r2=637609&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 Sun Mar 16 09:30:55 2008
@@ -820,11 +820,14 @@
         }
         return result;
     }
-
+    
     protected DemandSubscription createDemandSubscription(ConsumerInfo info) 
throws IOException {
+        //add our original id to ourselves
+        info.addNetworkConsumerId(info.getConsumerId());
         return doCreateDemandSubscription(info);
     }
 
+    
     protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) 
throws IOException {
         DemandSubscription result = new DemandSubscription(info);
         result.getLocalInfo().setConsumerId(new 
ConsumerId(localSessionInfo.getSessionId(), 
consumerIdGenerator.getNextSequenceId()));
@@ -905,6 +908,7 @@
     protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info) 
throws IOException;
 
     protected abstract void serviceRemoteBrokerInfo(Command command) throws 
IOException;
+    
 
     protected abstract BrokerId[] getRemoteBrokerPath();
 


Reply via email to