Author: chirino
Date: Wed Mar 22 23:24:28 2006
New Revision: 388082
URL: http://svn.apache.org/viewcvs?rev=388082&view=rev
Log:
If multiple concurrent threads were creating vm://localhost connection, it was
possible multiple "localhost" broker would be started.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java?rev=388082&r1=388081&r2=388082&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerRegistry.java
Wed Mar 22 23:24:28 2006
@@ -16,7 +16,7 @@
*/
package org.apache.activemq.broker;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
/**
*
@@ -29,22 +29,30 @@
public static BrokerRegistry getInstance() {
return instance;
}
-
- ConcurrentHashMap brokers = new ConcurrentHashMap();
-
- private BrokerRegistry() {
- }
+ private final Object mutex = new Object();
+ private final HashMap brokers = new HashMap();
+
public BrokerService lookup(String brokerName) {
- return (BrokerService)brokers.get(brokerName);
+ synchronized(mutex) {
+ return (BrokerService)brokers.get(brokerName);
+ }
}
public void bind(String brokerName, BrokerService broker) {
- brokers.put(brokerName, broker);
+ synchronized(mutex) {
+ brokers.put(brokerName, broker);
+ }
}
public void unbind(String brokerName) {
- brokers.remove(brokerName);
+ synchronized(mutex) {
+ brokers.remove(brokerName);
+ }
+ }
+
+ public Object getRegistryMutext() {
+ return mutex;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java?rev=388082&r1=388081&r2=388082&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java
Wed Mar 22 23:24:28 2006
@@ -86,29 +86,38 @@
VMTransportServer server=(VMTransportServer) servers.get(host);
// validate the broker is still active
if(!validateBroker(host)||server==null){
- BrokerService broker=BrokerRegistry.getInstance().lookup(host);
- if(broker==null){
- try{
- if(brokerFactoryHandler!=null){
- broker=brokerFactoryHandler.createBroker(brokerURI);
- }else{
- broker=BrokerFactory.createBroker(brokerURI);
+ BrokerService broker=null;
+ // Synchronize on the registry so that multiple concurrent threads
+ // doing this do not think that the broker has not been created
and cause multiple
+ // brokers to be started.
+ synchronized( BrokerRegistry.getInstance().getRegistryMutext() ) {
+ broker=BrokerRegistry.getInstance().lookup(host);
+ if(broker==null){
+ try{
+ if(brokerFactoryHandler!=null){
+
broker=brokerFactoryHandler.createBroker(brokerURI);
+ }else{
+ broker=BrokerFactory.createBroker(brokerURI);
+ }
+ broker.start();
+ }catch(URISyntaxException e){
+ throw IOExceptionSupport.create(e);
}
- broker.start();
- }catch(URISyntaxException e){
- throw IOExceptionSupport.create(e);
+ brokers.put(host,broker);
}
- brokers.put(host,broker);
- }
- server=(VMTransportServer) servers.get(host);
- if(server==null){
- server=(VMTransportServer) bind(location,true);
- TransportConnector connector=new
TransportConnector(broker.getBroker(),server);
- connector.setTaskRunnerFactory( broker.getTaskRunnerFactory()
);
- connector.start();
- connectors.put(host,connector);
+
+ server=(VMTransportServer) servers.get(host);
+ if(server==null){
+ server=(VMTransportServer) bind(location,true);
+ TransportConnector connector=new
TransportConnector(broker.getBroker(),server);
+ connector.setTaskRunnerFactory(
broker.getTaskRunnerFactory() );
+ connector.start();
+ connectors.put(host,connector);
+ }
+
}
- }else{}
+ }
+
VMTransport vmtransport=server.connect();
IntrospectionSupport.setProperties(vmtransport,options);
Transport transport=vmtransport;