Author: gtully
Date: Fri Sep 4 12:47:00 2009
New Revision: 811371
URL: http://svn.apache.org/viewvc?rev=811371&view=rev
Log:
resolve some timing issues that caused intermittent failure of the test for:
https://issues.apache.org/activemq/browse/AMQ-1855 - also wait a little longer
for a close to complete before restarting
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=811371&r1=811370&r2=811371&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Sep 4 12:47:00 2009
@@ -377,8 +377,8 @@
}
});
- if( !sendShutdown.await(5, TimeUnit.SECONDS) ) {
- LOG.debug("Network Could not shutdown in a timely
manner");
+ if( !sendShutdown.await(10, TimeUnit.SECONDS) ) {
+ LOG.info("Network Could not shutdown in a timely
manner");
}
} finally {
ServiceStopper ss = new ServiceStopper();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java?rev=811371&r1=811370&r2=811371&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.java
Fri Sep 4 12:47:00 2009
@@ -16,10 +16,13 @@
*/
package org.apache.activemq.transport.discovery;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
import java.net.URI;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import javax.management.ObjectInstance;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
@@ -31,8 +34,10 @@
import org.apache.commons.logging.LogFactory;
import org.jmock.Expectations;
import org.jmock.Mockery;
+import org.jmock.api.Invocation;
import org.jmock.integration.junit4.JMock;
import org.jmock.integration.junit4.JUnit4Mockery;
+import org.jmock.lib.action.CustomAction;
import org.jmock.lib.legacy.ClassImposteriser;
import org.junit.After;
import org.junit.Before;
@@ -51,7 +56,10 @@
final String groupName = "GroupID-" + "DiscoveryNetworkReconnectTest";
final String discoveryAddress = "multicast://default?group=" + groupName +
"&initialReconnectDelay=1000";
+ final Semaphore mbeanRegistered = new Semaphore(0);
+ final Semaphore mbeanUnregistered = new Semaphore(0);
+
private DiscoveryAgent agent;
SocketProxy proxy;
@@ -68,15 +76,12 @@
brokerA.start();
proxy = new
SocketProxy(brokerA.getTransportConnectors().get(0).getConnectUri());
- //new SocketProxy(new URI("tcp://localhost:61617"));
-
managementContext = context.mock(ManagementContext.class);
context.checking(new Expectations(){{
allowing (managementContext).getJmxDomainName(); will
(returnValue("Test"));
allowing (managementContext).start();
allowing (managementContext).stop();
- allowing
(managementContext).unregisterMBean(with(any(ObjectName.class)));
// expected MBeans
allowing
(managementContext).registerMBean(with(any(Object.class)), with(equal(
@@ -86,10 +91,31 @@
allowing
(managementContext).registerMBean(with(any(Object.class)), with(equal(
new
ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
- // due to reconnect we get two registrations
atLeast(maxReconnects - 1).of
(managementContext).registerMBean(with(any(Object.class)), with(equal(
+ new
ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"
+ + proxy.getUrl().getPort())))); will(new
CustomAction("signal register network mbean") {
+ public Object invoke(Invocation invocation)
throws Throwable {
+ LOG.info("Mbean Registered: " +
invocation.getParameter(0));
+ mbeanRegistered.release();
+ return new
ObjectInstance((ObjectName)invocation.getParameter(0), "dscription");
+ }
+ });
+ atLeast(maxReconnects - 1).of
(managementContext).unregisterMBean(with(equal(
new
ObjectName("Test:BrokerName=BrokerNC,Type=NetworkBridge,NetworkConnectorName=localhost,Name=localhost/127.0.0.1_"
- + proxy.getUrl().getPort()))));
+ + proxy.getUrl().getPort())))); will(new
CustomAction("signal unregister network mbean") {
+ public Object invoke(Invocation invocation)
throws Throwable {
+ LOG.info("Mbean Unregistered: " +
invocation.getParameter(0));
+ mbeanUnregistered.release();
+ return null;
+ }
+ });
+
+ allowing (managementContext).unregisterMBean(with(equal(
+ new ObjectName("Test:BrokerName=BrokerNC,Type=Broker"))));
+ allowing (managementContext).unregisterMBean(with(equal(
+ new
ObjectName("Test:BrokerName=BrokerNC,Type=NetworkConnector,NetworkConnectorName=localhost"))));
+ allowing (managementContext).unregisterMBean(with(equal(
+ new
ObjectName("Test:BrokerName=BrokerNC,Type=Topic,Destination=ActiveMQ.Advisory.Connection"))));
}});
brokerB = new BrokerService();
@@ -102,6 +128,7 @@
public void tearDown() throws Exception {
brokerA.stop();
brokerB.stop();
+ proxy.close();
}
private void configure(BrokerService broker) {
@@ -138,17 +165,19 @@
// Wait for connection
assertTrue("we got a network connection in a timely manner",
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
- return proxy.connections.size() == 1;
+ return proxy.connections.size() >= 1;
}
}));
- Thread.sleep(3000);
- // force an inactivity timeout timeout
+ // wait for network connector
+ assertTrue("network connector mbean registered within 1 minute",
mbeanRegistered.tryAcquire(60, TimeUnit.SECONDS));
+
+ // force an inactivity timeout via the proxy
proxy.pause();
- // wait for the inactivity timeout
- Thread.sleep(6000);
-
+ // wait for the inactivity timeout and network shutdown
+ assertTrue("network connector mbean unregistered within 1 minute",
mbeanUnregistered.tryAcquire(60, TimeUnit.SECONDS));
+
// let a reconnect succeed
proxy.goOn();
}