Author: chirino
Date: Sat Dec 10 21:37:23 2005
New Revision: 355920
URL: http://svn.apache.org/viewcvs?rev=355920&view=rev
Log:
- Revamped discovery a bit.
- We now use DiscoveryAgentFactory implementations to create the
DiscoveryAgent (so it can be more flexible on how it used the discovery usi to
configure the agent).
- using the discovery:tcp://localhost syntax when binding a transport seemed
fishy. a transport now support a discoveryUri property that can be configured
like "multicast://groupname"
- You can now programaticaly configure the embedded broker that the vm
connection factory starts up.
- Peer switch to using explicity configuration of the embedded broker.
Added:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
(with props)
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
(with props)
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
(with props)
Removed:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportServer.java
Modified:
incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static
incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java
incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java
incubator/activemq/activemq-core/src/test/resources/activemq.xml
Modified:
incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
(original)
+++
incubator/activemq/activemq-core/src/main/java/org/activemq/broker/TransportConnector.java
Sat Dec 10 21:37:23 2005
@@ -34,6 +34,9 @@
import org.activemq.transport.TransportAcceptListener;
import org.activemq.transport.TransportFactory;
import org.activemq.transport.TransportServer;
+import org.activemq.transport.discovery.DiscoveryAgent;
+import org.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -56,6 +59,10 @@
private TaskRunnerFactory taskRunnerFactory = null;
protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
protected TransportStatusDetector statusDector;
+ private DiscoveryAgent discoveryAgent;
+ private URI discoveryUri;
+
+ private URI connectUri;
/**
* @return Returns the connections.
@@ -160,18 +167,30 @@
public void start() throws Exception {
getServer().start();
log.info("Accepting connection on: "+getServer().getConnectURI());
+
+ DiscoveryAgent da = getDiscoveryAgent();
+ if( da!=null ) {
+ da.registerService(getConnectUri().toString());
+ da.start();
+ }
+
this.statusDector.start();
}
public void stop() throws Exception {
+ ServiceStopper ss = new ServiceStopper();
+ if( discoveryAgent!=null ) {
+ ss.stop(discoveryAgent);
+ }
if (server != null) {
- server.stop();
+ ss.stop(server);
}
this.statusDector.stop();
for (Iterator iter = connections.iterator(); iter.hasNext();) {
ConnectionContext context = (ConnectionContext) iter.next();
- context.getConnection().stop();
+ ss.stop(context.getConnection());
}
+ ss.throwFirstException();
}
// Implementation methods
@@ -209,6 +228,46 @@
throw new IllegalArgumentException("You must specify the broker
property. Maybe this connector should be added to a broker?");
}
return TransportFactory.bind(broker.getBrokerId().getBrokerId(),uri);
+ }
+
+ public DiscoveryAgent getDiscoveryAgent() throws IOException {
+ if( discoveryAgent==null ) {
+ discoveryAgent = createDiscoveryAgent();
+ }
+ return discoveryAgent;
+ }
+
+ protected DiscoveryAgent createDiscoveryAgent() throws IOException {
+ if( discoveryUri!=null ) {
+ return DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri);
+ }
+ return null;
+ }
+
+ public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
+ this.discoveryAgent = discoveryAgent;
+ }
+
+ public URI getDiscoveryUri() {
+ return discoveryUri;
+ }
+
+ public void setDiscoveryUri(URI discoveryUri) {
+ this.discoveryUri = discoveryUri;
+ }
+
+ public URI getConnectUri() throws IOException, URISyntaxException {
+ if( connectUri==null ) {
+ if( getServer().getConnectURI()==null ) {
+ throw new IllegalStateException("The transportConnector has
not been started.");
+ }
+ connectUri = getServer().getConnectURI();
+ }
+ return connectUri;
+ }
+
+ public void setConnectUri(URI transportUri) {
+ this.connectUri = transportUri;
}
}
Modified:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java
(original)
+++
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryAgentFactory.java
Sat Dec 10 21:37:23 2005
@@ -20,40 +20,60 @@
import java.io.IOException;
import java.net.URI;
-import java.util.Map;
import org.activeio.FactoryFinder;
-import org.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import org.activemq.util.IOExceptionSupport;
-import org.activemq.util.IntrospectionSupport;
-import org.activemq.util.URISupport;
-import org.activemq.util.URISupport.CompositeData;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public abstract class DiscoveryAgentFactory {
static final private FactoryFinder discoveryAgentFinder = new
FactoryFinder("META-INF/services/org/activemq/transport/discoveryagent/");
-
- public static DiscoveryAgent createDiscoveryAgent(String type) throws
IOException {
- try {
- return (DiscoveryAgent)discoveryAgentFinder.newInstance(type);
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Could not create discovery agent:
"+type, e);
+ static final private ConcurrentHashMap discoveryAgentFactorys = new
ConcurrentHashMap();
+
+ /**
+ * @param uri
+ * @return
+ * @throws IOException
+ */
+ private static DiscoveryAgentFactory findDiscoveryAgentFactory(URI uri)
throws IOException {
+ String scheme = uri.getScheme();
+ if( scheme == null )
+ throw new IOException("DiscoveryAgent scheme not specified: [" +
uri + "]");
+ DiscoveryAgentFactory daf = (DiscoveryAgentFactory)
discoveryAgentFactorys.get(scheme);
+ if (daf == null) {
+ // Try to load if from a META-INF property.
+ try {
+ daf = (DiscoveryAgentFactory)
discoveryAgentFinder.newInstance(scheme);
+ discoveryAgentFactorys.put(scheme, daf);
+ }
+ catch (Throwable e) {
+ throw IOExceptionSupport.create("DiscoveryAgent scheme NOT
recognized: [" + scheme + "]", e);
+ }
}
+ return daf;
}
public static DiscoveryAgent createDiscoveryAgent(URI uri) throws
IOException {
- try {
- String type = ( uri.getScheme() == null ) ? uri.getPath() :
uri.getScheme();
- DiscoveryAgent rc = (DiscoveryAgent)
discoveryAgentFinder.newInstance(type);
- Map options = URISupport.parseParamters(uri);
- IntrospectionSupport.setProperties(rc, options);
- if( rc.getClass() == SimpleDiscoveryAgent.class ) {
- CompositeData data = URISupport.parseComposite(uri);
- ((SimpleDiscoveryAgent)rc).setServices(data.getComponents());
- }
- return rc;
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Could not create discovery agent:
"+uri, e);
- }
- }
+ DiscoveryAgentFactory tf = findDiscoveryAgentFactory(uri);
+ return tf.doCreateDiscoveryAgent(uri);
+
+ }
+
+ abstract protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws
IOException;
+// {
+// try {
+// String type = ( uri.getScheme() == null ) ? uri.getPath() :
uri.getScheme();
+// DiscoveryAgent rc = (DiscoveryAgent)
discoveryAgentFinder.newInstance(type);
+// Map options = URISupport.parseParamters(uri);
+// IntrospectionSupport.setProperties(rc, options);
+// if( rc.getClass() == SimpleDiscoveryAgent.class ) {
+// CompositeData data = URISupport.parseComposite(uri);
+// ((SimpleDiscoveryAgent)rc).setServices(data.getComponents());
+// }
+// return rc;
+// } catch (Throwable e) {
+// throw IOExceptionSupport.create("Could not create discovery
agent: "+uri, e);
+// }
+// }
}
Modified:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java
(original)
+++
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/DiscoveryTransportFactory.java
Sat Dec 10 21:37:23 2005
@@ -20,15 +20,13 @@
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+
import org.activemq.transport.Transport;
-import org.activemq.transport.TransportFactory;
import org.activemq.transport.TransportServer;
import org.activemq.transport.failover.FailoverTransportFactory;
import org.activemq.util.IntrospectionSupport;
-import org.activemq.util.URISupport;
import org.activemq.util.URISupport.CompositeData;
/**
@@ -48,33 +46,34 @@
}
public TransportServer doBind(String brokerId,URI location) throws
IOException{
- try{
- CompositeData compositData=URISupport.parseComposite(location);
- URI[] components=compositData.getComponents();
- if(components.length!=1){
- throw new IOException("Invalid location: "+location
- +", the location must have 1 and only 1
composite URI in it - components = "
- +components.length);
- }
- Map parameters=new HashMap(compositData.getParameters());
- DiscoveryTransportServer server=new
DiscoveryTransportServer(TransportFactory.bind(brokerId,components[0]));
- IntrospectionSupport.setProperties(server,parameters,"discovery");
- DiscoveryAgent
discoveryAgent=DiscoveryAgentFactory.createDiscoveryAgent(server.getDiscovery());
- // Use the host name to configure the group of the discovery agent.
- if(!parameters.containsKey("discovery.group")){
- if(compositData.getHost()!=null){
- parameters.put("discovery.group",compositData.getHost());
- }
- }
- if(!parameters.containsKey("discovery.brokerName")){
- parameters.put("discovery.brokerName",brokerId);
- }
-
IntrospectionSupport.setProperties(discoveryAgent,parameters,"discovery.");
- server.setDiscoveryAgent(discoveryAgent);
- return server;
- }catch(URISyntaxException e){
- throw new IOException("Invalid location: "+location);
- }
+ throw new IOException("Invalid server URI: "+location);
+// try{
+// CompositeData compositData=URISupport.parseComposite(location);
+// URI[] components=compositData.getComponents();
+// if(components.length!=1){
+// throw new IOException("Invalid location: "+location
+// +", the location must have 1 and only 1
composite URI in it - components = "
+// +components.length);
+// }
+// Map parameters=new HashMap(compositData.getParameters());
+// DiscoveryTransportServer server=new
DiscoveryTransportServer(TransportFactory.bind(brokerId,components[0]));
+//
IntrospectionSupport.setProperties(server,parameters,"discovery");
+// DiscoveryAgent
discoveryAgent=DiscoveryAgentFactory.createDiscoveryAgent(server.getDiscovery());
+// // Use the host name to configure the group of the discovery
agent.
+// if(!parameters.containsKey("discovery.group")){
+// if(compositData.getHost()!=null){
+// parameters.put("discovery.group",compositData.getHost());
+// }
+// }
+// if(!parameters.containsKey("discovery.brokerName")){
+// parameters.put("discovery.brokerName",brokerId);
+// }
+//
IntrospectionSupport.setProperties(discoveryAgent,parameters,"discovery.");
+// server.setDiscoveryAgent(discoveryAgent);
+// return server;
+// }catch(URISyntaxException e){
+// throw new IOException("Invalid location: "+location);
+// }
}
}
Added:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java?rev=355920&view=auto
==============================================================================
---
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
(added)
+++
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
Sat Dec 10 21:37:23 2005
@@ -0,0 +1,46 @@
+/**
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.transport.discovery.multicast;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.activemq.transport.discovery.DiscoveryAgent;
+import org.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.activemq.util.IOExceptionSupport;
+import org.activemq.util.IntrospectionSupport;
+import org.activemq.util.URISupport;
+
+public class MulticastDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+ protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws
IOException {
+ try {
+
+ Map options = URISupport.parseParamters(uri);
+ MulticastDiscoveryAgent rc = new MulticastDiscoveryAgent();
+ rc.setGroup(uri.getHost());
+ IntrospectionSupport.setProperties(rc, options);
+ return rc;
+
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Could not create discovery agent:
" + uri, e);
+ }
+ }
+}
Propchange:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/multicast/MulticastDiscoveryAgentFactory.java
------------------------------------------------------------------------------
svn:executable = *
Added:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java?rev=355920&view=auto
==============================================================================
---
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
(added)
+++
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
Sat Dec 10 21:37:23 2005
@@ -0,0 +1,45 @@
+/**
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.transport.discovery.rendezvous;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.activemq.transport.discovery.DiscoveryAgent;
+import org.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.activemq.util.IOExceptionSupport;
+import org.activemq.util.IntrospectionSupport;
+import org.activemq.util.URISupport;
+
+public class RendezvousDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+ protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws
IOException {
+ try {
+ Map options = URISupport.parseParamters(uri);
+ RendezvousDiscoveryAgent rc = new RendezvousDiscoveryAgent();
+ rc.setGroup(uri.getHost());
+ IntrospectionSupport.setProperties(rc, options);
+ return rc;
+
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Could not create discovery agent:
" + uri, e);
+ }
+ }
+}
Propchange:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/rendezvous/RendezvousDiscoveryAgentFactory.java
------------------------------------------------------------------------------
svn:executable = *
Added:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java?rev=355920&view=auto
==============================================================================
---
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
(added)
+++
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
Sat Dec 10 21:37:23 2005
@@ -0,0 +1,51 @@
+/**
+ * <a href="http://activemq.org">ActiveMQ: The Open Source Message Fabric</a>
+ *
+ * Copyright 2005 (C) LogicBlaze, Inc. http://www.logicblaze.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ **/
+package org.activemq.transport.discovery.simple;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.activemq.transport.discovery.DiscoveryAgent;
+import org.activemq.transport.discovery.DiscoveryAgentFactory;
+import org.activemq.util.IOExceptionSupport;
+import org.activemq.util.IntrospectionSupport;
+import org.activemq.util.URISupport;
+import org.activemq.util.URISupport.CompositeData;
+
+public class SimpleDiscoveryAgentFactory extends DiscoveryAgentFactory {
+
+ protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws
IOException {
+ try {
+
+ CompositeData data = URISupport.parseComposite(uri);
+ Map options = URISupport.parseParamters(uri);
+
+ SimpleDiscoveryAgent rc = new SimpleDiscoveryAgent();
+ rc.setGroup(uri.getHost());
+ IntrospectionSupport.setProperties(rc, options);
+ rc.setServices(data.getComponents());
+
+ return rc;
+
+ } catch (Throwable e) {
+ throw IOExceptionSupport.create("Could not create discovery agent:
" + uri, e);
+ }
+ }
+}
Propchange:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/discovery/simple/SimpleDiscoveryAgentFactory.java
------------------------------------------------------------------------------
svn:executable = *
Modified:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
(original)
+++
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/peer/PeerTransportFactory.java
Sat Dec 10 21:37:23 2005
@@ -23,20 +23,23 @@
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+
+import org.activemq.broker.BrokerService;
+import org.activemq.broker.TransportConnector;
+import org.activemq.broker.BrokerFactory.BrokerFactoryHandler;
import org.activemq.transport.Transport;
import org.activemq.transport.TransportFactory;
import org.activemq.transport.TransportServer;
import org.activemq.transport.vm.VMTransportFactory;
import org.activemq.util.IOExceptionSupport;
import org.activemq.util.IdGenerator;
+import org.activemq.util.IntrospectionSupport;
import org.activemq.util.URISupport;
+
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
public class PeerTransportFactory extends TransportFactory {
- VMTransportFactory vmTransportFactory = new VMTransportFactory();
-
final public static ConcurrentHashMap brokers = new ConcurrentHashMap();
final public static ConcurrentHashMap connectors = new ConcurrentHashMap();
@@ -45,13 +48,14 @@
private IdGenerator idGenerator = new IdGenerator("peer-");
+
public Transport doConnect(URI location) throws Exception {
- location = convertURI(location);
+ VMTransportFactory vmTransportFactory =
createTransportFactory(location);
return vmTransportFactory.doConnect(location);
}
public Transport doCompositeConnect(URI location) throws Exception {
- location = convertURI(location);
+ VMTransportFactory vmTransportFactory =
createTransportFactory(location);
return vmTransportFactory.doCompositeConnect(location);
}
@@ -60,7 +64,7 @@
* @return the converted URI
* @throws URISyntaxException
*/
- private URI convertURI(URI location) throws IOException {
+ private VMTransportFactory createTransportFactory(URI location) throws
IOException {
try {
String group = location.getHost();
String broker = location.getPath();
@@ -72,23 +76,34 @@
broker = idGenerator.generateSanitizedId();
}
- Map brokerOptions = new
HashMap(URISupport.parseParamters(location));
- if (!brokerOptions.containsKey("brokerName")){
- brokerOptions.put("brokerName", broker);
- }
+ final Map brokerOptions = new
HashMap(URISupport.parseParamters(location));
if (!brokerOptions.containsKey("persistent")){
brokerOptions.put("persistent", "false");
}
-
- Map serverNetworkOptions = new HashMap();
- serverNetworkOptions.put("discovery.group", group);
- String serverDiscoveryOptions =
URISupport.createQueryString(serverNetworkOptions);
-
- Map networkOptions = new HashMap();
- networkOptions.put("group", group);
- String discoveryOptions =
URISupport.createQueryString(networkOptions);
- location = new URI("vm:broker:(discovery:(tcp://localhost:0)?" +
serverDiscoveryOptions+",network:multicast?"+discoveryOptions+")?"+URISupport.createQueryString(brokerOptions));
- return location;
+
+ final URI finalLocation = new URI("vm://"+broker);
+ final String finalBroker = broker;
+ final String finalGroup = group;
+ VMTransportFactory rc = new VMTransportFactory() {
+ public Transport doConnect(URI ignore) throws Exception {
+ return super.doConnect(finalLocation);
+ };
+ public Transport doCompositeConnect(URI ignore) throws
Exception {
+ return super.doCompositeConnect(finalLocation);
+ };
+ };
+ rc.setBrokerFactoryHandler(new BrokerFactoryHandler(){
+ public BrokerService createBroker(URI brokerURI) throws
Exception {
+ BrokerService service = new BrokerService();
+ IntrospectionSupport.setProperties(service, brokerOptions);
+ service.setBrokerName(finalBroker);
+ TransportConnector c =
service.addConnector("tcp://localhost:0");
+ c.setDiscoveryUri(new URI("multicast://"+finalGroup));
+ service.addNetworkConnector("multicast://"+finalGroup);
+ return service;
+ }
+ });
+ return rc;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
Modified:
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java
(original)
+++
incubator/activemq/activemq-core/src/main/java/org/activemq/transport/vm/VMTransportFactory.java
Sat Dec 10 21:37:23 2005
@@ -28,6 +28,7 @@
import org.activemq.broker.BrokerRegistry;
import org.activemq.broker.BrokerService;
import org.activemq.broker.TransportConnector;
+import org.activemq.broker.BrokerFactory.BrokerFactoryHandler;
import org.activemq.transport.MarshallingTransportFilter;
import org.activemq.transport.Transport;
import org.activemq.transport.TransportFactory;
@@ -46,6 +47,8 @@
final public static ConcurrentHashMap connectors = new ConcurrentHashMap();
final public static ConcurrentHashMap servers = new ConcurrentHashMap();
+ BrokerFactoryHandler brokerFactoryHandler;
+
public Transport doConnect(URI location) throws Exception {
return VMTransportServer.configure(doCompositeConnect(location));
}
@@ -87,7 +90,11 @@
BrokerService broker = BrokerRegistry.getInstance().lookup(host);
if (broker == null) {
try {
- broker = BrokerFactory.createBroker(brokerURI);
+ if( brokerFactoryHandler !=null ) {
+ broker = brokerFactoryHandler.createBroker(brokerURI);
+ } else {
+ broker = BrokerFactory.createBroker(brokerURI);
+ }
broker.start();
}
catch (URISyntaxException e) {
@@ -151,6 +158,14 @@
ServiceSupport.dispose(broker);
}
}
+ }
+
+ public BrokerFactoryHandler getBrokerFactoryHandler() {
+ return brokerFactoryHandler;
+ }
+
+ public void setBrokerFactoryHandler(BrokerFactoryHandler
brokerFactoryHandler) {
+ this.brokerFactoryHandler = brokerFactoryHandler;
}
}
Modified:
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast
(original)
+++
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/multicast
Sat Dec 10 21:37:23 2005
@@ -1 +1 @@
-class=org.activemq.transport.discovery.multicast.MulticastDiscoveryAgent
+class=org.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory
Modified:
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous
(original)
+++
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/rendezvous
Sat Dec 10 21:37:23 2005
@@ -1 +1 @@
-class=org.activemq.transport.discovery.rendezvous.RendezvousDiscoveryAgent
+class=org.activemq.transport.discovery.rendezvous.RendezvousDiscoveryAgentFactory
Modified:
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple
(original)
+++
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/simple
Sat Dec 10 21:37:23 2005
@@ -1 +1 @@
-class=org.activemq.transport.discovery.simple.SimpleDiscoveryAgent
+class=org.activemq.transport.discovery.simple.SimpleDiscoveryAgentFactory
Modified:
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static
(original)
+++
incubator/activemq/activemq-core/src/main/resources/META-INF/services/org/activemq/transport/discoveryagent/static
Sat Dec 10 21:37:23 2005
@@ -1 +1 @@
-class=org.activemq.transport.discovery.simple.SimpleDiscoveryAgent
+class=org.activemq.transport.discovery.simple.SimpleDiscoveryAgentFactory
Modified:
incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java
(original)
+++
incubator/activemq/activemq-core/src/test/java/org/activemq/network/NetworkTestSupport.java
Sat Dec 10 21:37:23 2005
@@ -18,7 +18,9 @@
**/
package org.activemq.network;
+import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -48,8 +50,7 @@
protected void setUp() throws Exception {
super.setUp();
- String brokerId = broker.getBrokerName();
- connector = new
TransportConnector(broker.getBroker(),TransportFactory.bind(brokerId,new
URI(getLocalURI())));
+ connector = createConnector();
connector.start();
remotePersistenceAdapter = createRemotePersistenceAdapter(true);
@@ -57,9 +58,32 @@
remoteBroker = createRemoteBroker(remotePersistenceAdapter);
remoteBroker.start();
BrokerRegistry.getInstance().bind("remotehost", remoteBroker);
- brokerId = remoteBroker.getBrokerName();
- remoteConnector = new
TransportConnector(remoteBroker.getBroker(),TransportFactory.bind(brokerId,new
URI(getRemoteURI())));
+ remoteConnector = createRemoteConnector();
remoteConnector.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ protected TransportConnector createRemoteConnector() throws Exception,
IOException, URISyntaxException {
+ return new TransportConnector(remoteBroker.getBroker(),
+ TransportFactory.bind(broker.getBrokerName(),
+ new URI(getRemoteURI())));
+ }
+
+ /**
+ * @param brokerId
+ * @return
+ * @throws Exception
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ protected TransportConnector createConnector() throws Exception,
IOException, URISyntaxException {
+ return new TransportConnector(broker.getBroker(),
+ TransportFactory.bind(broker.getBrokerName(), new
URI(getLocalURI())));
}
protected String getRemoteURI() {
Modified:
incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
---
incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java
(original)
+++
incubator/activemq/activemq-core/src/test/java/org/activemq/transport/discovery/DiscoveryTransportBrokerTest.java
Sat Dec 10 21:37:23 2005
@@ -18,7 +18,9 @@
**/
package org.activemq.transport.discovery;
+import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import javax.jms.DeliveryMode;
@@ -110,15 +112,27 @@
}
protected String getLocalURI() {
- return
"discovery://tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+ return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
}
protected String getRemoteURI() {
- return
"discovery://tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+ return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
}
+ protected TransportConnector createConnector() throws Exception,
IOException, URISyntaxException {
+ TransportConnector x = super.createConnector();
+ x.setDiscoveryUri(new URI("multicast://default"));
+ return x;
+ }
+
+ protected TransportConnector createRemoteConnector() throws Exception,
IOException, URISyntaxException {
+ TransportConnector x = super.createRemoteConnector();
+ x.setDiscoveryUri(new URI("multicast://default"));
+ return x;
+ }
+
protected StubConnection createFailoverConnection() throws Exception {
- URI failoverURI = new URI("discovery://multicast");
+ URI failoverURI = new URI("discovery:multicast://default");
Transport transport = TransportFactory.connect(failoverURI);
StubConnection connection = new StubConnection(transport);
connections.add(connection);
Modified: incubator/activemq/activemq-core/src/test/resources/activemq.xml
URL:
http://svn.apache.org/viewcvs/incubator/activemq/activemq-core/src/test/resources/activemq.xml?rev=355920&r1=355919&r2=355920&view=diff
==============================================================================
--- incubator/activemq/activemq-core/src/test/resources/activemq.xml (original)
+++ incubator/activemq/activemq-core/src/test/resources/activemq.xml Sat Dec 10
21:37:23 2005
@@ -1,21 +1,18 @@
<!-- START SNIPPET: xbean -->
<beans xmlns="http://activemq.org/config/1.0">
- <broker useJmx="true">
+ <broker useJmx="false">
<persistenceAdapter>
<journaledJDBC journalLogFiles="5" dataDirectory="foo"/>
</persistenceAdapter>
<transportConnectors>
- <transportConnector uri="discovery:tcp://localhost:0"/>
+ <transportConnector uri="tcp://localhost:0"
discoveryUri="rendezvous://default"/>
</transportConnectors>
<networkConnectors>
- <networkConnector uri="rendezvous"/>
- <!--
- <networkConnector uri="static://(tcp://host1:61616,tcp://host2:61616)"/>
- -->
+ <networkConnector uri="rendezvous://default"/>
</networkConnectors>
</broker>