Repository: activemq
Updated Branches:
  refs/heads/master 35f3df4e7 -> 2311749aa


AMQ-6706 Allow for hooks before broker stop occurs

When Camel is run embedded within ActiveMQ it's shutdown is called at
the point the broker is already stopped, this can lead to data loss as
there could be exchanges that have not ended.
This adds the ability to specify preStopHook(s) that can be invoked
during ActiveMQ shutdown but before the broker is shutdown.

With this Camel can gracefully shutdown while the broker is still
started.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/155461a0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/155461a0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/155461a0

Branch: refs/heads/master
Commit: 155461a0ba0a082f5a8459f17ae6166bae950231
Parents: 35f3df4
Author: Zoran Regvart <[email protected]>
Authored: Fri Jun 16 15:29:55 2017 +0200
Committer: Zoran Regvart <[email protected]>
Committed: Mon Jun 19 14:29:05 2017 +0200

----------------------------------------------------------------------
 .../apache/activemq/broker/BrokerService.java   | 26 ++++++-
 .../activemq/broker/BrokerServiceTest.java      | 60 +++++++++++++++
 .../activemq/camel/CamelShutdownHook.java       | 73 ++++++++++++++++++
 .../camel/BrokerPreShutdownHookTest.java        | 80 ++++++++++++++++++++
 4 files changed, 238 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/155461a0/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index de70d29..97fb814 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -268,6 +268,7 @@ public class BrokerService implements Service {
     private boolean rollbackOnlyOnAsyncException = true;
 
     private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
+    private List<Runnable> preShutdownHooks = new CopyOnWriteArrayList<>();
 
     static {
 
@@ -478,6 +479,16 @@ public class BrokerService implements Service {
         return connector;
     }
 
+    /**
+     * Adds a {@link Runnable} hook that will be invoked before the
+     * broker is stopped. This allows performing cleanup actions
+     * before the broker is stopped. The hook should not throw
+     * exceptions or block.
+     */
+    public final void addPreShutdownHook(final Runnable hook) {
+        preShutdownHooks.add(hook);
+    }
+
     public JmsConnector removeJmsConnector(JmsConnector connector) {
         if (jmsConnectors.remove(connector)) {
             return connector;
@@ -788,6 +799,16 @@ public class BrokerService implements Service {
      */
     @Override
     public void stop() throws Exception {
+        final ServiceStopper stopper = new ServiceStopper();
+
+        for (Runnable hook : preShutdownHooks) {
+            try {
+                hook.run();
+            } catch (Throwable e) {
+                stopper.onException(hook, e);
+            }
+        }
+
         if (!stopping.compareAndSet(false, true)) {
             LOG.trace("Broker already stopping/stopped");
             return;
@@ -812,7 +833,6 @@ public class BrokerService implements Service {
             this.scheduler.stop();
             this.scheduler = null;
         }
-        ServiceStopper stopper = new ServiceStopper();
         if (services != null) {
             for (Service service : services) {
                 stopper.stop(service);
@@ -2846,6 +2866,10 @@ public class BrokerService implements Service {
         this.regionBroker = regionBroker;
     }
 
+    public final void removePreShutdownHook(final Runnable hook) {
+        preShutdownHooks.remove(hook);
+    }
+
     public void addShutdownHook(Runnable hook) {
         synchronized (shutdownHooks) {
             shutdownHooks.add(hook);

http://git-wip-us.apache.org/repos/asf/activemq/blob/155461a0/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java
 
b/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java
new file mode 100644
index 0000000..e4fcb09
--- /dev/null
+++ 
b/activemq-broker/src/test/java/org/apache/activemq/broker/BrokerServiceTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class BrokerServiceTest {
+
+    static class Hook implements Runnable {
+
+        boolean invoked = false;
+
+        @Override
+        public void run() {
+            invoked = true;
+        }
+    }
+
+    @Test
+    public void removedPreShutdownHooksShouldNotBeInvokedWhenStopping() throws 
Exception {
+        final BrokerService brokerService = new BrokerService();
+
+        final Hook hook = new Hook();
+        brokerService.addPreShutdownHook(hook);
+        brokerService.removePreShutdownHook(hook);
+
+        brokerService.stop();
+
+        assertFalse("Removed pre-shutdown hook should not have been invoked", 
hook.invoked);
+    }
+
+    @Test
+    public void shouldInvokePreShutdownHooksBeforeStopping() throws Exception {
+        final BrokerService brokerService = new BrokerService();
+
+        final Hook hook = new Hook();
+        brokerService.addPreShutdownHook(hook);
+
+        brokerService.stop();
+
+        assertTrue("Pre-shutdown hook should have been invoked", hook.invoked);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/155461a0/activemq-camel/src/main/java/org/apache/activemq/camel/CamelShutdownHook.java
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/main/java/org/apache/activemq/camel/CamelShutdownHook.java 
b/activemq-camel/src/main/java/org/apache/activemq/camel/CamelShutdownHook.java
new file mode 100644
index 0000000..217a1a6
--- /dev/null
+++ 
b/activemq-camel/src/main/java/org/apache/activemq/camel/CamelShutdownHook.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * A shutdown hook that can be used to shutdown {@link CamelContext} before the
+ * ActiveMQ broker is shut down. This is sometimes important as if the broker 
is
+ * shutdown before Camel there could be a loss of data due to inflight 
exchanges
+ * not yet completed.
+ * <p>
+ * This hook can be added to ActiveMQ configuration ({@code activemq.xml}) as 
in
+ * the following example:
+ * <p>
+ * <code>
+ * &lt;bean xmlns=&quot;http://www.springframework.org/schema/beans&quot; 
class=&quot;org.apache.activemq.camel.CamelShutdownHook&quot; /&gt;
+ * </code>
+ */
+public final class CamelShutdownHook implements Runnable, CamelContextAware {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelShutdownHook.class);
+
+    private CamelContext camelContext;
+
+    @Autowired
+    public CamelShutdownHook(final BrokerService brokerService) {
+        brokerService.addPreShutdownHook(this);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void run() {
+        if (camelContext != null) {
+            try {
+                camelContext.stop();
+            } catch (final Exception e) {
+                LOG.warn("Unable to stop CamelContext", e);
+            }
+        } else {
+            LOG.warn("Unable to stop CamelContext, no CamelContext was set!");
+        }
+    }
+
+    @Override
+    public void setCamelContext(final CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/155461a0/activemq-camel/src/test/java/org/apache/activemq/camel/BrokerPreShutdownHookTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-camel/src/test/java/org/apache/activemq/camel/BrokerPreShutdownHookTest.java
 
b/activemq-camel/src/test/java/org/apache/activemq/camel/BrokerPreShutdownHookTest.java
new file mode 100644
index 0000000..4b6122d
--- /dev/null
+++ 
b/activemq-camel/src/test/java/org/apache/activemq/camel/BrokerPreShutdownHookTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.camel;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class BrokerPreShutdownHookTest {
+
+    static class TestProcessor implements Processor {
+
+        boolean messageReceived;
+
+        @Override
+        public void process(final Exchange exchange) throws Exception {
+            messageReceived = true;
+        }
+    }
+
+    @Test
+    public void testShouldCleanlyShutdownCamelBeforeStoppingBroker() throws 
Exception {
+        final BrokerService broker = new BrokerService();
+        broker.setBrokerName("testBroker");
+        broker.setUseJmx(true);
+        broker.setPersistent(false);
+        broker.addConnector("vm://testBroker");
+
+        final DefaultCamelContext camel = new DefaultCamelContext();
+        camel.setName("test-camel");
+
+        final CamelShutdownHook hook = new CamelShutdownHook(broker);
+        hook.setCamelContext(camel);
+
+        broker.start();
+
+        camel.addComponent("testq", 
ActiveMQComponent.activeMQComponent("vm://testBroker?create=false"));
+
+        final TestProcessor processor = new TestProcessor();
+        camel.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("testq:test.in").delay(200).process(processor);
+            }
+        });
+        camel.start();
+
+        final ProducerTemplate producer = camel.createProducerTemplate();
+        producer.sendBody("testq:test.in", "Hi!");
+        producer.stop();
+
+        broker.stop();
+
+        assertTrue("Message should be received", processor.messageReceived);
+        assertTrue("Camel context should be stopped", camel.isStopped());
+        assertTrue("Broker should be stopped", broker.isStopped());
+
+    }
+}

Reply via email to