Repository: activemq Updated Branches: refs/heads/trunk 77713d9d1 -> 5016c4d4f
https://issues.apache.org/jira/browse/AMQ-5086 Ensure that wait for started on vm transport factory actually waits for start, currently it doesn't really check started or wait porperly. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5016c4d4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5016c4d4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5016c4d4 Branch: refs/heads/trunk Commit: 5016c4d4f2fb2a62d8f38732be0f14d96426d921 Parents: 77713d9 Author: Timothy Bish <[email protected]> Authored: Mon Jun 9 15:40:03 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Jun 9 15:40:03 2014 -0400 ---------------------------------------------------------------------- .../apache/activemq/broker/BrokerService.java | 15 +++++- .../transport/vm/VMTransportFactory.java | 38 +++++++++---- .../transport/vm/VMTransportWaitForTest.java | 57 +++++++++++++++++++- 3 files changed, 99 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5016c4d4/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 6ecd427..235ad5d 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -921,8 +921,21 @@ public class BrokerService implements Service { * @return boolean true if wait succeeded false if broker was not started or was stopped */ public boolean waitUntilStarted() { + return waitUntilStarted(Long.MAX_VALUE); + } + + /** + * A helper method to block the caller thread until the broker has fully started + * + * @param timeout + * the amount of time to wait before giving up and returning false. + * + * @return boolean true if wait succeeded false if broker was not started or was stopped + */ + public boolean waitUntilStarted(long timeout) { boolean waitSucceeded = isStarted(); - while (!isStarted() && !stopped.get() && !waitSucceeded) { + long expiration = Math.max(0, timeout + System.currentTimeMillis()); + while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) { try { if (startException != null) { return waitSucceeded; http://git-wip-us.apache.org/repos/asf/activemq/blob/5016c4d4/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java index e88faaf..7bd24cf 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportFactory.java @@ -42,18 +42,20 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; public class VMTransportFactory extends TransportFactory { - + public static final ConcurrentHashMap<String, BrokerService> BROKERS = new ConcurrentHashMap<String, BrokerService>(); public static final ConcurrentHashMap<String, TransportConnector> CONNECTORS = new ConcurrentHashMap<String, TransportConnector>(); public static final ConcurrentHashMap<String, VMTransportServer> SERVERS = new ConcurrentHashMap<String, VMTransportServer>(); private static final Logger LOG = LoggerFactory.getLogger(VMTransportFactory.class); - + BrokerFactoryHandler brokerFactoryHandler; + @Override public Transport doConnect(URI location) throws Exception { return VMTransportServer.configure(doCompositeConnect(location)); } + @Override public Transport doCompositeConnect(URI location) throws Exception { URI brokerURI; String host; @@ -64,7 +66,7 @@ public class VMTransportFactory extends TransportFactory { if (data.getComponents().length == 1 && "broker".equals(data.getComponents()[0].getScheme())) { brokerURI = data.getComponents()[0]; CompositeData brokerData = URISupport.parseComposite(brokerURI); - host = (String)brokerData.getParameters().get("brokerName"); + host = brokerData.getParameters().get("brokerName"); if (host == null) { host = "localhost"; } @@ -79,7 +81,7 @@ public class VMTransportFactory extends TransportFactory { try { host = extractHost(location); options = URISupport.parseParameters(location); - String config = (String)options.remove("brokerConfig"); + String config = options.remove("brokerConfig"); if (config != null) { brokerURI = new URI(config); } else { @@ -170,32 +172,50 @@ public class VMTransportFactory extends TransportFactory { return host; } -/** + /** + * Attempt to find a Broker instance. + * * @param registry + * the registry in which to search for the BrokerService instance. * @param brokerName - * @param waitForStart - time in milliseconds to wait for a broker to appear - * @return + * the name of the Broker that should be located. + * @param waitForStart + * time in milliseconds to wait for a broker to appear and be started. + * + * @return a BrokerService instance if one is found, or null. */ private BrokerService lookupBroker(final BrokerRegistry registry, final String brokerName, int waitForStart) { BrokerService broker = null; synchronized(registry.getRegistryMutext()) { broker = registry.lookup(brokerName); - if (broker == null && waitForStart > 0) { + if (broker == null || waitForStart > 0) { final long expiry = System.currentTimeMillis() + waitForStart; while ((broker == null || !broker.isStarted()) && expiry > System.currentTimeMillis()) { long timeout = Math.max(0, expiry - System.currentTimeMillis()); try { - LOG.debug("waiting for broker named: " + brokerName + " to start"); + LOG.debug("waiting for broker named: " + brokerName + " to enter registry"); registry.getRegistryMutext().wait(timeout); } catch (InterruptedException ignored) { } broker = registry.lookup(brokerName); + if (broker != null && !broker.isStarted()) { + LOG.debug("waiting for broker named: " + brokerName + " to start"); + timeout = Math.max(0, expiry - System.currentTimeMillis()); + // Wait for however long we have left for broker to be started, if + // it doesn't get started we need to clear broker so it doesn't get + // returned. A null return should throw an exception. + if (!broker.waitUntilStarted(timeout)) { + broker = null; + break; + } + } } } } return broker; } + @Override public TransportServer doBind(URI location) throws IOException { return bind(location, false); } http://git-wip-us.apache.org/repos/asf/activemq/blob/5016c4d4/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java index faa93e4..e498936 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportWaitForTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.net.URI; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -27,20 +28,34 @@ import java.util.concurrent.TimeUnit; import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; +import org.junit.After; import org.junit.Test; +import org.mortbay.log.Log; public class VMTransportWaitForTest { + private static final int WAIT_TIME = 20000; + private static final int SHORT_WAIT_TIME = 5000; + private static final String VM_BROKER_URI_NO_WAIT = "vm://localhost?broker.persistent=false&create=false"; private static final String VM_BROKER_URI_WAIT_FOR_START = - VM_BROKER_URI_NO_WAIT + "&waitForStart=20000"; + VM_BROKER_URI_NO_WAIT + "&waitForStart=" + WAIT_TIME; + + private static final String VM_BROKER_URI_SHORT_WAIT_FOR_START = + VM_BROKER_URI_NO_WAIT + "&waitForStart=" + SHORT_WAIT_TIME; CountDownLatch started = new CountDownLatch(1); CountDownLatch gotConnection = new CountDownLatch(1); + @After + public void after() throws IOException { + BrokerRegistry.getInstance().unbind("localhost"); + } + @Test(timeout=90000) public void testWaitFor() throws Exception { try { @@ -77,4 +92,44 @@ public class VMTransportWaitForTest { assertTrue("has got connection", gotConnection.await(400, TimeUnit.MILLISECONDS)); broker.stop(); } + + @Test(timeout=90000) + public void testWaitForNoBrokerInRegistry() throws Exception { + + long startTime = System.currentTimeMillis(); + + try { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START)); + cf.createConnection(); + fail("expect broker not exist exception"); + } catch (JMSException expectedOnNoBrokerAndNoCreate) { + } + + long endTime = System.currentTimeMillis(); + + Log.info("Total wait time was: {}", endTime - startTime); + assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100); + } + + @Test(timeout=90000) + public void testWaitForNotStartedButInRegistry() throws Exception { + + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + BrokerRegistry.getInstance().bind("localhost", broker); + + long startTime = System.currentTimeMillis(); + + try { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(VM_BROKER_URI_SHORT_WAIT_FOR_START)); + cf.createConnection(); + fail("expect broker not exist exception"); + } catch (JMSException expectedOnNoBrokerAndNoCreate) { + } + + long endTime = System.currentTimeMillis(); + + Log.info("Total wait time was: {}", endTime - startTime); + assertTrue(endTime - startTime >= SHORT_WAIT_TIME - 100); + } }
