Author: rajdavies
Date: Fri Jan 27 09:00:26 2006
New Revision: 372910
URL: http://svn.apache.org/viewcvs?rev=372910&view=rev
Log:
check if a broker held by the TransportConnector is stopped before asigning it
for re-use (i.e. VMTransportServer)
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=372910&r1=372909&r2=372910&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
Fri Jan 27 09:00:26 2006
@@ -197,9 +197,13 @@
public void processDispatchNotification(MessageDispatchNotification
messageDispatchNotification) throws Throwable;
/**
- *
* @return true if the broker is running as a slave
*/
public boolean isSlaveBroker();
+
+ /**
+ * @return true if the broker has stopped
+ */
+ public boolean isStopped();
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=372910&r1=372909&r2=372910&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Fri Jan 27 09:00:26 2006
@@ -183,5 +183,9 @@
public boolean isSlaveBroker(){
return next.isSlaveBroker();
}
+
+ public boolean isStopped(){
+ return next.isStopped();
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=372910&r1=372909&r2=372910&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Fri Jan 27 09:00:26 2006
@@ -181,5 +181,9 @@
public boolean isSlaveBroker(){
return false;
}
+
+ public boolean isStopped(){
+ return false;
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=372910&r1=372909&r2=372910&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Fri Jan 27 09:00:26 2006
@@ -180,5 +180,9 @@
throw new IllegalStateException(this.message);
}
+ public boolean isStopped(){
+ return true;
+ }
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=372910&r1=372909&r2=372910&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Fri Jan 27 09:00:26 2006
@@ -193,5 +193,9 @@
public boolean isSlaveBroker(){
return getNext().isSlaveBroker();
}
+
+ public boolean isStopped(){
+ return getNext().isStopped();
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=372910&r1=372909&r2=372910&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Fri Jan 27 09:00:26 2006
@@ -67,6 +67,7 @@
private final Region tempQueueRegion;
private final Region tempTopicRegion;
private BrokerService brokerService;
+ private boolean stopped = false;
protected final DestinationStatistics destinationStatistics = new
DestinationStatistics();
@@ -130,6 +131,7 @@
}
public void stop() throws Exception {
+ stopped = true;
ServiceStopper ss = new ServiceStopper();
ss.stop(queueRegion);
ss.stop(topicRegion);
@@ -441,6 +443,10 @@
public boolean isSlaveBroker(){
return brokerService.isSlave();
+ }
+
+ public boolean isStopped(){
+ return stopped;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java?rev=372910&r1=372909&r2=372910&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
Fri Jan 27 09:00:26 2006
@@ -88,8 +88,9 @@
location = new URI("vm://"+host);
}
- VMTransportServer server = (VMTransportServer) servers.get(host);
- if( server == null ) {
+ VMTransportServer server = (VMTransportServer) servers.get(host);
+ //validate the broker is still active
+ if( !validateBroker(host) || server == null ) {
BrokerService broker = BrokerRegistry.getInstance().lookup(host);
if (broker == null) {
try {
@@ -112,6 +113,8 @@
connector.start();
connectors.put(host, connector);
}
+ }else {
+
}
VMTransport vmtransport = server.connect();
@@ -171,4 +174,25 @@
this.brokerFactoryHandler = brokerFactoryHandler;
}
+
+ private boolean validateBroker(String host){
+ boolean result=true;
+
if(brokers.containsKey(host)||servers.containsKey(host)||connectors.containsKey(host)){
+ //check the broker is still in the BrokerRegistry
+ TransportConnector connector=(TransportConnector)
connectors.get(host);
+
if(BrokerRegistry.getInstance().lookup(host)==null||(connector!=null&&connector.getBroker().isStopped())){
+ result=false;
+ //clean-up
+ brokers.remove(host);
+ servers.remove(host);
+ if(connector!=null){
+ connectors.remove(host);
+ if(connector!=null){
+ ServiceSupport.dispose(connector);
+ }
+ }
+ }
+ }
+ return result;
+ }
}