Author: gtully
Date: Tue Aug 18 11:13:08 2009
New Revision: 805361
URL: http://svn.apache.org/viewvc?rev=805361&view=rev
Log:
apply parameters from discoveryURI to subsequent network connections so that
options like inactivityTimeout can be configured on all discovered uris
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
(with props)
Modified:
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java
Modified: activemq/trunk/activemq-core/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Tue Aug 18 11:13:08 2009
@@ -246,8 +246,12 @@
</dependency>
<dependency>
<groupId>org.jmock</groupId>
- <artifactId>jmock</artifactId>
- <version>${jmock-version}</version>
+ <artifactId>jmock-junit4</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jmock</groupId>
+ <artifactId>jmock-legacy</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Tue Aug 18 11:13:08 2009
@@ -20,6 +20,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.SslContext;
@@ -29,8 +30,10 @@
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.transport.discovery.DiscoveryListener;
+import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,7 +49,8 @@
private DiscoveryAgent discoveryAgent;
private ConcurrentHashMap<URI, NetworkBridge> bridges = new
ConcurrentHashMap<URI, NetworkBridge>();
-
+ private Map<String, String> parameters;
+
public DiscoveryNetworkConnector() {
}
@@ -56,6 +60,14 @@
public void setUri(URI discoveryURI) throws IOException {
setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
+ try {
+ parameters = URISupport.parseParamters(discoveryURI);
+ // allow discovery agent to grab it's parameters
+ IntrospectionSupport.setProperties(getDiscoveryAgent(),
parameters);
+ } catch (URISyntaxException e) {
+ LOG.warn("failed to parse query parameters from discoveryURI: " +
discoveryURI, e);
+ }
+
}
public void onServiceAdd(DiscoveryEvent event) {
@@ -83,6 +95,11 @@
return;
}
URI connectUri = uri;
+ try {
+ connectUri = URISupport.applyParameters(connectUri,
parameters);
+ } catch (URISyntaxException e) {
+ LOG.warn("could not apply query parameters: " + parameters + "
to: " + connectUri, e);
+ }
LOG.info("Establishing network connection from " + localURIName +
" to " + connectUri);
Transport remoteTransport;
@@ -93,7 +110,7 @@
try {
remoteTransport = TransportFactory.connect(connectUri);
} catch (Exception e) {
- LOG.warn("Could not connect to remote URI: " +
localURIName + ": " + e.getMessage());
+ LOG.warn("Could not connect to remote URI: " + connectUri
+ ": " + e.getMessage());
LOG.debug("Connection failure exception: " + e, e);
return;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
Tue Aug 18 11:13:08 2009
@@ -25,6 +25,7 @@
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -74,27 +75,13 @@
URI uri = new URI(url);
serviceURIs.put(event.getServiceName(), uri);
LOG.info("Adding new broker connection URL: " + uri);
- next.add(new URI[] {applyParameters(uri)});
+ next.add(new URI[] {URISupport.applyParameters(uri,
parameters)});
} catch (URISyntaxException e) {
LOG.warn("Could not connect to remote URI: " + url + " due to
bad URI syntax: " + e, e);
}
}
}
- private URI applyParameters(URI uri) throws URISyntaxException {
- if (parameters != null && !parameters.isEmpty()) {
- StringBuffer newQuery = uri.getRawQuery() != null ? new
StringBuffer(uri.getRawQuery()) : new StringBuffer() ;
- for ( Map.Entry<String, String> param: parameters.entrySet()) {
- if (newQuery.length()!=0) {
- newQuery.append(';');
- }
-
newQuery.append(param.getKey()).append('=').append(param.getValue());
- }
- uri = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(),
newQuery.toString(), uri.getFragment());
- }
- return uri;
-}
-
public void onServiceRemove(DiscoveryEvent event) {
URI uri = serviceURIs.get(event.getServiceName());
if (uri != null) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Tue Aug 18 11:13:08 2009
@@ -27,7 +27,6 @@
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java?rev=805361&r1=805360&r2=805361&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/URISupport.java
Tue Aug 18 11:13:08 2009
@@ -127,6 +127,20 @@
return uri.getQuery() == null ? emptyMap() :
parseQuery(stripPrefix(uri.getQuery(), "?"));
}
+ public static URI applyParameters(URI uri, Map<String, String>
queryParameters) throws URISyntaxException {
+ if (queryParameters != null && !queryParameters.isEmpty()) {
+ StringBuffer newQuery = uri.getRawQuery() != null ? new
StringBuffer(uri.getRawQuery()) : new StringBuffer() ;
+ for ( Map.Entry<String, String> param: queryParameters.entrySet())
{
+ if (newQuery.length()!=0) {
+ newQuery.append('&');
+ }
+
newQuery.append(param.getKey()).append('=').append(param.getValue());
+ }
+ uri = createURIWithQuery(uri, newQuery.toString());
+ }
+ return uri;
+ }
+
@SuppressWarnings("unchecked")
private static Map<String, String> emptyMap() {
return Collections.EMPTY_MAP;
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java?rev=805361&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
Tue Aug 18 11:13:08 2009
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.discovery;
+
+import static org.junit.Assert.*;
+
+import java.net.URI;
+
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import
org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
+import org.apache.activemq.util.SocketProxy;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.integration.junit4.JMock;
+import org.jmock.integration.junit4.JUnit4Mockery;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+
+...@runwith(JMock.class)
+public class DiscoveryNetworkReconnectTest {
+
+ private static final Log LOG =
LogFactory.getLog(DiscoveryNetworkReconnectTest.class);
+
+ BrokerService brokerA, brokerB;
+ Mockery context;
+ ManagementContext managementContext;
+
+ final String groupName = "GroupID-" + "DiscoveryNetworkReconnectTest";
+ final String discoveryAddress = "multicast://default?group=" + groupName +
"&initialReconnectDelay=600";
+
+ private DiscoveryAgent agent;
+
+ @Before
+ public void setUp() throws Exception {
+ context = new JUnit4Mockery() {{
+ setImposteriser(ClassImposteriser.INSTANCE);
+ }};
+
+ brokerA = new BrokerService();
+ brokerA.setBrokerName("BrokerA");
+ configure(brokerA);
+ brokerA.addConnector("tcp://localhost:0");
+ brokerA.start();
+ }
+
+ private void configure(BrokerService broker) {
+ broker.setPersistent(false);
+ broker.setUseJmx(true);
+ }
+
+ @Test
+ public void testReconnect() throws Exception {
+ final SocketProxy proxy = new
SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
+
+ // control multicast publish advertise agent to inject proxy
+ agent = MulticastDiscoveryAgentFactory.createDiscoveryAgent(new
URI(discoveryAddress));
+ agent.registerService(proxy.getUrl().toString());
+ agent.start();
+
+ managementContext = context.mock(ManagementContext.class);
+
+ context.checking(new Expectations(){{
+ allowing (managementContext).getJmxDomainName(); will
(returnValue("Test"));
+ allowing (managementContext).start();
+ allowing (managementContext).stop();
+ allowing
(managementContext).unregisterMBean(with(any(ObjectName.class)));
+
+ // expected MBeans
+ allowing
(managementContext).registerMBean(with(any(Object.class)), with(equal(
+ new ObjectName("Test:BrokerName=BrokerNC,Type=Broker"))));
+ allowing
(managementContext).registerMBean(with(any(Object.class)), with(equal(
+ new
ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost"))));
+ allowing
(managementContext).registerMBean(with(any(Object.class)), with(equal(
+ new
ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
+
+ // due to reconnect we get two registrations
+ atLeast(2).of
(managementContext).registerMBean(with(any(Object.class)), with(equal(
+ new
ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"
+ + proxy.getUrl().getPort()))));
+ }});
+
+ brokerB = new BrokerService();
+ brokerB.setManagementContext(managementContext);
+ brokerB.setBrokerName("BrokerNC");
+ configure(brokerB);
+ brokerB.addNetworkConnector(discoveryAddress +
"&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000&trace=true");
+ brokerB.start();
+
+ Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return proxy.connections.size() == 1;
+ }
+ });
+
+ // force an inactivity timeout timeout
+ proxy.pause();
+
+ // wait for the inactivity timeout
+ Thread.sleep(2000);
+
+ // let a reconnect succeed
+ proxy.goOn();
+
+ assertTrue("got a reconnect", Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ return proxy.connections.size() == 1;
+ }
+ }));
+
+ brokerB.stop();
+ // let mockery validate minimal duplicate mbean registrations
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
------------------------------------------------------------------------------
svn:executable = *
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date