Repository: activemq
Updated Branches:
  refs/heads/master 93092f7ea -> da076f4a6


https://issues.apache.org/jira/browse/AMQ-6086 - add some determinism to 
interleaved stop and start calls on broker service


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/da076f4a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/da076f4a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/da076f4a

Branch: refs/heads/master
Commit: da076f4a632af6ad1d66382523f4c50e9de9e62e
Parents: 93092f7
Author: gtully <[email protected]>
Authored: Fri Dec 11 16:22:31 2015 +0000
Committer: gtully <[email protected]>
Committed: Fri Dec 11 16:22:31 2015 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   |  34 ++-
 .../java/org/apache/activemq/util/LockFile.java |   2 +-
 .../network/DuplexNetworkMBeanTest.java         |   7 +-
 .../StartAndConcurrentStopBrokerTest.java       | 302 +++++++++++++++++++
 4 files changed, 333 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/da076f4a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index a899490..62af182 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -587,6 +587,7 @@ public class BrokerService implements Service {
             return;
         }
 
+        setStartException(null);
         stopping.set(false);
         startDate = new Date();
         MDC.put("activemq.broker", brokerName);
@@ -642,7 +643,7 @@ public class BrokerService implements Service {
                     try {
                         doStartPersistenceAdapter();
                     } catch (Throwable e) {
-                        startException = e;
+                        setStartException(e);
                     } finally {
                         synchronized (persistenceAdapterStarted) {
                             persistenceAdapterStarted.set(true);
@@ -704,7 +705,7 @@ public class BrokerService implements Service {
                         }
                         doStartBroker();
                     } catch (Throwable t) {
-                        startException = t;
+                        setStartException(t);
                     }
                 }
             }.start();
@@ -714,9 +715,7 @@ public class BrokerService implements Service {
     }
 
     private void doStartBroker() throws Exception {
-        if (startException != null) {
-            return;
-        }
+        checkStartException();
         startDestinations();
         addShutdownHook();
 
@@ -786,6 +785,9 @@ public class BrokerService implements Service {
             return;
         }
 
+        if (started.get()) {
+            setStartException(new BrokerStoppedException("Stop invoked"));
+        }
         MDC.put("activemq.broker", brokerName);
 
         if (systemExitOnShutdown) {
@@ -831,7 +833,7 @@ public class BrokerService implements Service {
             tempDataStore = null;
         }
         try {
-            stopper.stop(persistenceAdapter);
+            stopper.stop(getPersistenceAdapter());
             persistenceAdapter = null;
             if (isUseJmx()) {
                 stopper.stop(getManagementContext());
@@ -989,7 +991,7 @@ public class BrokerService implements Service {
         long expiration = Math.max(0, timeout + System.currentTimeMillis());
         while (!isStarted() && !stopped.get() && !waitSucceeded && expiration 
> System.currentTimeMillis()) {
             try {
-                if (startException != null) {
+                if (getStartException() != null) {
                     return waitSucceeded;
                 }
                 waitSucceeded = startedLatch.await(100L, 
TimeUnit.MILLISECONDS);
@@ -1006,6 +1008,7 @@ public class BrokerService implements Service {
      */
     public Broker getBroker() throws Exception {
         if (broker == null) {
+            checkStartException();
             broker = createBroker();
         }
         return broker;
@@ -1225,8 +1228,9 @@ public class BrokerService implements Service {
         addService(this.producerSystemUsage);
     }
 
-    public PersistenceAdapter getPersistenceAdapter() throws IOException {
+    public synchronized PersistenceAdapter getPersistenceAdapter() throws 
IOException {
         if (persistenceAdapter == null) {
+            checkStartException();
             persistenceAdapter = createPersistenceAdapter();
             configureService(persistenceAdapter);
             this.persistenceAdapter = 
registerPersistenceAdapterMBean(persistenceAdapter);
@@ -1314,11 +1318,22 @@ public class BrokerService implements Service {
 
     public ManagementContext getManagementContext() {
         if (managementContext == null) {
+            checkStartException();
             managementContext = new ManagementContext();
         }
         return managementContext;
     }
 
+    synchronized private void checkStartException() {
+        if (startException != null) {
+            throw new BrokerStoppedException(startException);
+        }
+    }
+
+    synchronized private void setStartException(Throwable t) {
+        startException = t;
+    }
+
     public void setManagementContext(ManagementContext managementContext) {
         this.managementContext = managementContext;
     }
@@ -2688,6 +2703,7 @@ public class BrokerService implements Service {
     }
 
     protected void startVirtualConsumerDestinations() throws Exception {
+        checkStartException();
         ConnectionContext adminConnectionContext = getAdminConnectionContext();
         Set<ActiveMQDestination> destinations = 
destinationFactory.getDestinations();
         DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
@@ -3063,7 +3079,7 @@ public class BrokerService implements Service {
                getVirtualTopicConsumerDestinationFilter().matches(destination);
     }
 
-    public Throwable getStartException() {
+    synchronized public Throwable getStartException() {
         return startException;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/da076f4a/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java 
b/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java
index b454c41..2f89bc5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/LockFile.java
@@ -111,7 +111,7 @@ public class LockFile {
 
     /**
      */
-    public void unlock() {
+    synchronized public void unlock() {
         if (DISABLE_FILE_LOCK) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/da076f4a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
index 13a94cb..bac271f 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -23,10 +23,12 @@ import java.net.MalformedURLException;
 import java.util.List;
 import java.util.Set;
 
+import javax.management.MBeanServer;
 import javax.management.ObjectInstance;
 import javax.management.ObjectName;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.ManagementContext;
 import org.apache.activemq.util.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,6 +42,7 @@ public class DuplexNetworkMBeanTest {
 
     private int primaryBrokerPort;
     private int secondaryBrokerPort;
+    private MBeanServer mBeanServer = new ManagementContext().getMBeanServer();
 
     @Before
     public void setUp() throws Exception {
@@ -155,7 +158,7 @@ public class DuplexNetworkMBeanTest {
             }
 
             LOG.info("Query name: " + beanName);
-            mbeans = broker.getManagementContext().queryNames(beanName, null);
+            mbeans = mBeanServer.queryNames(beanName, null);
             if (mbeans != null) {
                 count = mbeans.size();
             } else {
@@ -175,7 +178,7 @@ public class DuplexNetworkMBeanTest {
     private void logAllMbeans(BrokerService broker) throws 
MalformedURLException {
         try {
             // trace all existing MBeans
-            Set<?> all = broker.getManagementContext().queryNames(null, null);
+            Set<?> all = mBeanServer.queryNames(null, null);
             LOG.info("Total MBean count=" + all.size());
             for (Object o : all) {
                 ObjectInstance bean = (ObjectInstance)o;

http://git-wip-us.apache.org/repos/asf/activemq/blob/da076f4a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
new file mode 100755
index 0000000..b2ad1cd
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/StartAndConcurrentStopBrokerTest.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.io.ObjectInputStream;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
+import javax.management.InvalidAttributeValueException;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.NotCompliantMBeanException;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.OperationsException;
+import javax.management.QueryExp;
+import javax.management.ReflectionException;
+import javax.management.loading.ClassLoaderRepository;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerStoppedException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertTrue;
+
+
+public class StartAndConcurrentStopBrokerTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StartAndConcurrentStopBrokerTest.class);
+
+
+    @Test(timeout = 30000)
+    public void testConcurrentStop() throws Exception {
+
+        final CountDownLatch gotBrokerMbean = new CountDownLatch(1);
+        final HashMap mbeans = new HashMap();
+        final MBeanServer mBeanServer = new MBeanServer() {
+            @Override
+            public ObjectInstance createMBean(String className, ObjectName 
name) throws ReflectionException, InstanceAlreadyExistsException, 
MBeanRegistrationException, MBeanException, NotCompliantMBeanException {
+                return null;
+            }
+
+            @Override
+            public ObjectInstance createMBean(String className, ObjectName 
name, ObjectName loaderName) throws ReflectionException, 
InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, 
NotCompliantMBeanException, InstanceNotFoundException {
+                return null;
+            }
+
+            @Override
+            public ObjectInstance createMBean(String className, ObjectName 
name, Object[] params, String[] signature) throws ReflectionException, 
InstanceAlreadyExistsException, MBeanRegistrationException, MBeanException, 
NotCompliantMBeanException {
+                return null;
+            }
+
+            @Override
+            public ObjectInstance createMBean(String className, ObjectName 
name, ObjectName loaderName, Object[] params, String[] signature) throws 
ReflectionException, InstanceAlreadyExistsException, 
MBeanRegistrationException, MBeanException, NotCompliantMBeanException, 
InstanceNotFoundException {
+                return null;
+            }
+
+            @Override
+            public ObjectInstance registerMBean(Object object, ObjectName 
name) throws InstanceAlreadyExistsException, MBeanRegistrationException, 
NotCompliantMBeanException {
+                if (mbeans.containsKey(name)) {
+                    throw new InstanceAlreadyExistsException("Got one 
already");
+                }
+                LOG.info("register:" + name);
+
+                try {
+                    if (name.compareTo(new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost")) == 0) {
+                        gotBrokerMbean.countDown();
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+                mbeans.put(name, object);
+                return new ObjectInstance(name, object.getClass().getName());
+
+            }
+
+            @Override
+            public void unregisterMBean(ObjectName name) throws 
InstanceNotFoundException, MBeanRegistrationException {
+                mbeans.remove(name);
+            }
+
+            @Override
+            public ObjectInstance getObjectInstance(ObjectName name) throws 
InstanceNotFoundException {
+                return null;
+            }
+
+            @Override
+            public Set<ObjectInstance> queryMBeans(ObjectName name, QueryExp 
query) {
+                return null;
+            }
+
+            @Override
+            public Set<ObjectName> queryNames(ObjectName name, QueryExp query) 
{
+                return null;
+            }
+
+            @Override
+            public boolean isRegistered(ObjectName name) {
+                return mbeans.containsKey(name);
+            }
+
+            @Override
+            public Integer getMBeanCount() {
+                return null;
+            }
+
+            @Override
+            public Object getAttribute(ObjectName name, String attribute) 
throws MBeanException, AttributeNotFoundException, InstanceNotFoundException, 
ReflectionException {
+                return null;
+            }
+
+            @Override
+            public AttributeList getAttributes(ObjectName name, String[] 
attributes) throws InstanceNotFoundException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public void setAttribute(ObjectName name, Attribute attribute) 
throws InstanceNotFoundException, AttributeNotFoundException, 
InvalidAttributeValueException, MBeanException, ReflectionException {
+
+            }
+
+            @Override
+            public AttributeList setAttributes(ObjectName name, AttributeList 
attributes) throws InstanceNotFoundException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public Object invoke(ObjectName name, String operationName, 
Object[] params, String[] signature) throws InstanceNotFoundException, 
MBeanException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public String getDefaultDomain() {
+                return null;
+            }
+
+            @Override
+            public String[] getDomains() {
+                return new String[0];
+            }
+
+            @Override
+            public void addNotificationListener(ObjectName name, 
NotificationListener listener, NotificationFilter filter, Object handback) 
throws InstanceNotFoundException {
+
+            }
+
+            @Override
+            public void addNotificationListener(ObjectName name, ObjectName 
listener, NotificationFilter filter, Object handback) throws 
InstanceNotFoundException {
+
+            }
+
+            @Override
+            public void removeNotificationListener(ObjectName name, ObjectName 
listener) throws InstanceNotFoundException, ListenerNotFoundException {
+
+            }
+
+            @Override
+            public void removeNotificationListener(ObjectName name, ObjectName 
listener, NotificationFilter filter, Object handback) throws 
InstanceNotFoundException, ListenerNotFoundException {
+
+            }
+
+            @Override
+            public void removeNotificationListener(ObjectName name, 
NotificationListener listener) throws InstanceNotFoundException, 
ListenerNotFoundException {
+
+            }
+
+            @Override
+            public void removeNotificationListener(ObjectName name, 
NotificationListener listener, NotificationFilter filter, Object handback) 
throws InstanceNotFoundException, ListenerNotFoundException {
+
+            }
+
+            @Override
+            public MBeanInfo getMBeanInfo(ObjectName name) throws 
InstanceNotFoundException, IntrospectionException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public boolean isInstanceOf(ObjectName name, String className) 
throws InstanceNotFoundException {
+                return false;
+            }
+
+            @Override
+            public Object instantiate(String className) throws 
ReflectionException, MBeanException {
+                return null;
+            }
+
+            @Override
+            public Object instantiate(String className, ObjectName loaderName) 
throws ReflectionException, MBeanException, InstanceNotFoundException {
+                return null;
+            }
+
+            @Override
+            public Object instantiate(String className, Object[] params, 
String[] signature) throws ReflectionException, MBeanException {
+                return null;
+            }
+
+            @Override
+            public Object instantiate(String className, ObjectName loaderName, 
Object[] params, String[] signature) throws ReflectionException, 
MBeanException, InstanceNotFoundException {
+                return null;
+            }
+
+            @Override
+            public ObjectInputStream deserialize(ObjectName name, byte[] data) 
throws InstanceNotFoundException, OperationsException {
+                return null;
+            }
+
+            @Override
+            public ObjectInputStream deserialize(String className, byte[] 
data) throws OperationsException, ReflectionException {
+                return null;
+            }
+
+            @Override
+            public ObjectInputStream deserialize(String className, ObjectName 
loaderName, byte[] data) throws InstanceNotFoundException, OperationsException, 
ReflectionException {
+                return null;
+            }
+
+            @Override
+            public ClassLoader getClassLoaderFor(ObjectName mbeanName) throws 
InstanceNotFoundException {
+                return null;
+            }
+
+            @Override
+            public ClassLoader getClassLoader(ObjectName loaderName) throws 
InstanceNotFoundException {
+                return null;
+            }
+
+            @Override
+            public ClassLoaderRepository getClassLoaderRepository() {
+                return null;
+            }
+        };
+
+
+        final BrokerService broker = new BrokerService();
+
+        ExecutorService executor = Executors.newFixedThreadPool(4);
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    broker.getManagementContext().setMBeanServer(mBeanServer);
+                    broker.start();
+                } catch (BrokerStoppedException expected) {
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    assertTrue("broker has registered mbean", 
gotBrokerMbean.await(10, TimeUnit.SECONDS));
+                    broker.stop();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        executor.shutdown();
+        assertTrue("stop tasks done", executor.awaitTermination(20, 
TimeUnit.SECONDS));
+
+        BrokerService second = new BrokerService();
+        second.getManagementContext().setMBeanServer(mBeanServer);
+        second.start();
+        second.stop();
+
+    }
+
+}

Reply via email to