Author: jstrachan
Date: Wed Feb 28 06:21:29 2007
New Revision: 512764

URL: http://svn.apache.org/viewvc?view=rev&rev=512764
Log:
added a little helper method to make it easy to wait on a broker being shut 
down in Java code

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?view=diff&rev=512764&r1=512763&r2=512764
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
 Wed Feb 28 06:21:29 2007
@@ -30,6 +30,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -145,6 +146,7 @@
     private URI vmConnectorURI;
     private PolicyMap destinationPolicy;
     private AtomicBoolean started = new AtomicBoolean(false);
+    private AtomicBoolean stopped = new AtomicBoolean(false);
     private BrokerPlugin[] plugins;
     private boolean keepDurableSubsActive=true;
     private boolean useVirtualTopics=true;
@@ -154,9 +156,8 @@
     private Store tempDataStore;
     private int persistenceThreadPriority = Thread.MAX_PRIORITY;
     private boolean useLocalHostBrokerName = false;
-    
+    private CountDownLatch stoppedLatch = new CountDownLatch(1);
 
-   
     /**
      * Adds a new transport connector for the given bind address
      *
@@ -471,33 +472,27 @@
         // to avoid timimg issue with discovery (spinning up a new instance)
         BrokerRegistry.getInstance().unbind(getBrokerName());
         VMTransportFactory.stopped(getBrokerName());
+        stopped.set(true);
+        stoppedLatch.countDown();
+
         log.info("ActiveMQ JMS Message Broker ("+getBrokerName()+", 
"+brokerId+") stopped");
         stopper.throwFirstException();
     }
 
-       protected void stopAllConnectors(ServiceStopper stopper) {
-               
-               for (Iterator iter = getNetworkConnectors().iterator(); 
iter.hasNext();) {
-            NetworkConnector connector = (NetworkConnector) iter.next();
-            unregisterNetworkConnectorMBean(connector);
-            stopper.stop(connector);
+    /**
+     * A helper method to block the caller thread until the broker has been 
stopped
+     */
+    public void waitUntilStopped() {
+        while (!stopped.get()) {
+            try {
+                stoppedLatch.await();
+            }
+            catch (InterruptedException e) {
+                // ignore
+            }
         }
+    }
 
-        for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) 
{
-            ProxyConnector connector = (ProxyConnector) iter.next();
-            stopper.stop(connector);
-        }
-        
-        for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
-            JmsConnector connector = (JmsConnector) iter.next();
-            stopper.stop(connector);
-        }
-        
-        for (Iterator iter = getTransportConnectors().iterator(); 
iter.hasNext();) {
-            TransportConnector connector = (TransportConnector) iter.next();
-            stopper.stop(connector);
-        }
-       }
 
     // Properties
     // 
-------------------------------------------------------------------------
@@ -1122,6 +1117,30 @@
             }
         }
     }
+
+    protected void stopAllConnectors(ServiceStopper stopper) {
+
+               for (Iterator iter = getNetworkConnectors().iterator(); 
iter.hasNext();) {
+            NetworkConnector connector = (NetworkConnector) iter.next();
+            unregisterNetworkConnectorMBean(connector);
+            stopper.stop(connector);
+        }
+
+        for (Iterator iter = getProxyConnectors().iterator(); iter.hasNext();) 
{
+            ProxyConnector connector = (ProxyConnector) iter.next();
+            stopper.stop(connector);
+        }
+
+        for (Iterator iter = jmsConnectors.iterator(); iter.hasNext();) {
+            JmsConnector connector = (JmsConnector) iter.next();
+            stopper.stop(connector);
+        }
+
+        for (Iterator iter = getTransportConnectors().iterator(); 
iter.hasNext();) {
+            TransportConnector connector = (TransportConnector) iter.next();
+            stopper.stop(connector);
+        }
+       }
 
     protected TransportConnector registerConnectorMBean(TransportConnector 
connector) throws IOException  {
         MBeanServer mbeanServer = getManagementContext().getMBeanServer();

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java?view=diff&rev=512764&r1=512763&r2=512764
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java 
(original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/Main.java 
Wed Feb 28 06:21:29 2007
@@ -18,12 +18,7 @@
 package org.apache.activemq.broker;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.util.UDPTraceBrokerPlugin;
-import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
-import org.apache.activemq.broker.view.DestinationDotFilePlugin;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.demo.DefaultQueueSender;
 
@@ -34,10 +29,11 @@
 /**
  * A helper class which can be handy for running a broker in your IDE from the
  * activemq-core module.
- * 
+ *
  * @version $Revision$
  */
 public class Main {
+    protected static boolean createConsumers = false;
 
     /**
      * @param args
@@ -66,23 +62,29 @@
             broker.addConnector("stomp://localhost:61613");
             broker.start();
 
-            // lets create a dummy couple of consumers
-            Connection connection = new 
ActiveMQConnectionFactory().createConnection();
-            connection.start();
-            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer1 = session.createConsumer(new 
ActiveMQQueue("Orders.IBM"));
-            MessageConsumer consumer2 = session.createConsumer(new 
ActiveMQQueue("Orders.MSFT"), "price > 100");
-            Session session2 = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer3 = session2.createConsumer(new 
ActiveMQQueue("Orders.MSFT"), "price > 200");
 
             // lets publish some messages so that there is some stuff to browse
-            DefaultQueueSender.main(new String[] { "Prices.Equity.IBM" });
-            DefaultQueueSender.main(new String[] { "Prices.Equity.MSFT" });
+            DefaultQueueSender.main(new String[]{"Prices.Equity.IBM"});
+            DefaultQueueSender.main(new String[]{"Prices.Equity.MSFT"});
+
+            // lets create a dummy couple of consumers
+            if (createConsumers) {
+                Connection connection = new 
ActiveMQConnectionFactory().createConnection();
+                connection.start();
+                Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer1 = session.createConsumer(new 
ActiveMQQueue("Orders.IBM"));
+                MessageConsumer consumer2 = session.createConsumer(new 
ActiveMQQueue("Orders.MSFT"), "price > 100");
+                Session session2 = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer3 = session2.createConsumer(new 
ActiveMQQueue("Orders.MSFT"), "price > 200");
+            }
+            else {
+                // Lets wait for the broker
+                broker.waitUntilStopped();
+            }
         }
         catch (Exception e) {
             System.out.println("Failed: " + e);
             e.printStackTrace();
         }
     }
-
 }


Reply via email to