Author: rajdavies
Date: Sun Mar 16 09:30:55 2008
New Revision: 637609
URL: http://svn.apache.org/viewvc?rev=637609&view=rev
Log:
Ensure we detect Connection splits for any type of network
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java?rev=637609&r1=637608&r2=637609&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Sun Mar 16 09:30:55 2008
@@ -44,39 +44,39 @@
}
- public Subscription addConsumer(ConnectionContext context, ConsumerInfo
info) throws Exception{
+ public Subscription addConsumer(ConnectionContext context, ConsumerInfo
info)
+ throws Exception {
ActiveMQDestination dest = info.getDestination();
- boolean validDestination = dest != null && !dest.isTemporary();
- if (validDestination) {
- synchronized (networkConsumerList) {
- if (info.isNetworkSubscription()) {
- networkConsumerList.add(info);
- } else {
- if(!networkConsumerList.isEmpty()) {
- List<ConsumerInfo> gcList = new
ArrayList<ConsumerInfo>();
- for (ConsumerInfo nc : networkConsumerList) {
- if (!nc.isNetworkConsumersEmpty()) {
- for (ConsumerId id :
nc.getNetworkConsumerIds()) {
- if (id.equals(info.getConsumerId())) {
- nc.removeNetworkConsumerId(id);
- if (nc.isNetworkConsumersEmpty()) {
- gcList.add(nc);
- }
+
+ synchronized (networkConsumerList) {
+ if (info.isNetworkSubscription()) {
+ networkConsumerList.add(info);
+ } else {
+ if (!networkConsumerList.isEmpty()) {
+ List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
+ for (ConsumerInfo nc : networkConsumerList) {
+ if (!nc.isNetworkConsumersEmpty()) {
+
+ for (ConsumerId id : nc.getNetworkConsumerIds()) {
+
+ if (id.equals(info.getConsumerId())) {
+ nc.removeNetworkConsumerId(id);
+ if (nc.isNetworkConsumersEmpty()) {
+ gcList.add(nc);
}
}
- } else {
- gcList.add(nc);
}
}
- for (ConsumerInfo nc : gcList) {
- networkConsumerList.remove(nc);
- super.removeConsumer(context, nc);
- LOG.warn("Removed stale network consumer " + nc);
- }
+ }
+ for (ConsumerInfo nc : gcList) {
+ networkConsumerList.remove(nc);
+ super.removeConsumer(context, nc);
+ LOG.warn("Removed stale network consumer " + nc);
}
}
}
}
+
return super.addConsumer(context, info);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=637609&r1=637608&r2=637609&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Sun Mar 16 09:30:55 2008
@@ -51,6 +51,8 @@
if (addToAlreadyInterestedConsumers(info)) {
return null; // don't want this subscription added
}
+ //add our original id to ourselves
+ info.addNetworkConsumerId(info.getConsumerId());
return doCreateDemandSubscription(info);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=637609&r1=637608&r2=637609&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Sun Mar 16 09:30:55 2008
@@ -820,11 +820,14 @@
}
return result;
}
-
+
protected DemandSubscription createDemandSubscription(ConsumerInfo info)
throws IOException {
+ //add our original id to ourselves
+ info.addNetworkConsumerId(info.getConsumerId());
return doCreateDemandSubscription(info);
}
+
protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info)
throws IOException {
DemandSubscription result = new DemandSubscription(info);
result.getLocalInfo().setConsumerId(new
ConsumerId(localSessionInfo.getSessionId(),
consumerIdGenerator.getNextSequenceId()));
@@ -905,6 +908,7 @@
protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info)
throws IOException;
protected abstract void serviceRemoteBrokerInfo(Command command) throws
IOException;
+
protected abstract BrokerId[] getRemoteBrokerPath();