Author: jstrachan
Date: Wed Feb 28 06:21:29 2007
New Revision: 512764
URL: http://svn.apache.org/viewvc?view=rev&rev=512764
Log:
added a little helper method to make it easy to wait on a broker being shut
down in Java code
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=512764&r1=512763&r2=512764
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Wed Feb 28 06:21:29 2007
@@ -30,6 +30,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@@ -145,6 +146,7 @@
private URI vmConnectorURI;
private PolicyMap destinationPolicy;
private AtomicBoolean started = new AtomicBoolean(false);
+ private AtomicBoolean stopped = new AtomicBoolean(false);
private BrokerPlugin[] plugins;
private boolean keepDurableSubsActive=true;
private boolean useVirtualTopics=true;
@@ -154,9 +156,8 @@
private Store tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private boolean useLocalHostBrokerName = false;
-
+ private CountDownLatch stoppedLatch = new CountDownLatch(1);
-
/**
* Adds a new transport connector for the given bind address
*
@@ -471,33 +472,27 @@
// to avoid timimg issue with discovery (spinning up a new instance)
BrokerRegistry.getInstance().unbind(getBrokerName());
VMTransportFactory.stopped(getBrokerName());
+ stopped.set(true);
+ stoppedLatch.countDown();
+
log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+",
"+brokerId+") stopped");
stopper.throwFirstException();
}
- protected void stopAllConnectors(ServiceStopper stopper) {
-
- for (Iterator iter = getNetworkConnectors().iterator();
iter.hasNext();) {
- NetworkConnector connector = (NetworkConnector) iter.next();
- unregisterNetworkConnectorMBean(connector);
- stopper.stop(connector);
+ /**
+ * A helper method to block the caller thread until the broker has been
stopped
+ */
+ public void waitUntilStopped() {
+ while (!stopped.get()) {
+ try {
+ stoppedLatch.await();
+ }
+ catch (InterruptedException e) {
+ // ignore
+ }
}
+ }
- for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();)
{
- ProxyConnector connector = (ProxyConnector) iter.next();
- stopper.stop(connector);
- }
-
- for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
- JmsConnector connector = (JmsConnector) iter.next();
- stopper.stop(connector);
- }
-
- for (Iterator iter = getTransportConnectors().iterator();
iter.hasNext();) {
- TransportConnector connector = (TransportConnector) iter.next();
- stopper.stop(connector);
- }
- }
// Properties
//
-------------------------------------------------------------------------
@@ -1122,6 +1117,30 @@
}
}
}
+
+ protected void stopAllConnectors(ServiceStopper stopper) {
+
+ for (Iterator iter = getNetworkConnectors().iterator();
iter.hasNext();) {
+ NetworkConnector connector = (NetworkConnector) iter.next();
+ unregisterNetworkConnectorMBean(connector);
+ stopper.stop(connector);
+ }
+
+ for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();)
{
+ ProxyConnector connector = (ProxyConnector) iter.next();
+ stopper.stop(connector);
+ }
+
+ for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
+ JmsConnector connector = (JmsConnector) iter.next();
+ stopper.stop(connector);
+ }
+
+ for (Iterator iter = getTransportConnectors().iterator();
iter.hasNext();) {
+ TransportConnector connector = (TransportConnector) iter.next();
+ stopper.stop(connector);
+ }
+ }
protected TransportConnector registerConnectorMBean(TransportConnector
connector) throws IOException {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java?view=diff&rev=512764&r1=512763&r2=512764
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
Wed Feb 28 06:21:29 2007
@@ -18,12 +18,7 @@
package org.apache.activemq.broker;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.util.UDPTraceBrokerPlugin;
-import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
-import org.apache.activemq.broker.view.DestinationDotFilePlugin;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.demo.DefaultQueueSender;
@@ -34,10 +29,11 @@
/**
* A helper class which can be handy for running a broker in your IDE from the
* activemq-core module.
- *
+ *
* @version $Revision$
*/
public class Main {
+ protected static boolean createConsumers = false;
/**
* @param args
@@ -66,23 +62,29 @@
broker.addConnector("stomp://localhost:61613");
broker.start();
- // lets create a dummy couple of consumers
- Connection connection = new
ActiveMQConnectionFactory().createConnection();
- connection.start();
- Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer1 = session.createConsumer(new
ActiveMQQueue("Orders.IBM"));
- MessageConsumer consumer2 = session.createConsumer(new
ActiveMQQueue("Orders.MSFT"), "price > 100");
- Session session2 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer3 = session2.createConsumer(new
ActiveMQQueue("Orders.MSFT"), "price > 200");
// lets publish some messages so that there is some stuff to browse
- DefaultQueueSender.main(new String[] { "Prices.Equity.IBM" });
- DefaultQueueSender.main(new String[] { "Prices.Equity.MSFT" });
+ DefaultQueueSender.main(new String[]{"Prices.Equity.IBM"});
+ DefaultQueueSender.main(new String[]{"Prices.Equity.MSFT"});
+
+ // lets create a dummy couple of consumers
+ if (createConsumers) {
+ Connection connection = new
ActiveMQConnectionFactory().createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer1 = session.createConsumer(new
ActiveMQQueue("Orders.IBM"));
+ MessageConsumer consumer2 = session.createConsumer(new
ActiveMQQueue("Orders.MSFT"), "price > 100");
+ Session session2 = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer3 = session2.createConsumer(new
ActiveMQQueue("Orders.MSFT"), "price > 200");
+ }
+ else {
+ // Lets wait for the broker
+ broker.waitUntilStopped();
+ }
}
catch (Exception e) {
System.out.println("Failed: " + e);
e.printStackTrace();
}
}
-
}