https://issues.apache.org/jira/browse/AMQ-4904 - ensure transports are not restarted without a store lock
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/77b4d700 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/77b4d700 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/77b4d700 Branch: refs/heads/activemq-5.9 Commit: 77b4d700b139793dd3489f54bbf54386ce5258b1 Parents: fcc773a Author: gtully <[email protected]> Authored: Wed Nov 27 12:48:16 2013 +0000 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 13:10:35 2014 -0400 ---------------------------------------------------------------------- .../util/DefaultIOExceptionHandler.java | 8 +- .../store/jdbc/JDBCIOExceptionHandler.java | 4 + .../jdbc/JDBCIOExceptionHandlerMockeryTest.java | 108 +++++++++++++++++++ 3 files changed, 117 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/77b4d700/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java index ab35800..c65ec65 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java +++ b/activemq-broker/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java @@ -94,10 +94,12 @@ import org.slf4j.LoggerFactory; LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports"); TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod); } - broker.startAllConnectors(); - LOG.info("Successfully restarted transports on " + broker); + if (hasLockOwnership()) { + broker.startAllConnectors(); + LOG.info("Successfully restarted transports on " + broker); + } } catch (Exception e) { - LOG.warn("Stopping " + broker + " due to failure while restarting transports", e); + LOG.warn("Stopping " + broker + " due to failure restarting transports", e); stopBroker(e); } finally { handlingException.compareAndSet(true, false); http://git-wip-us.apache.org/repos/asf/activemq/blob/77b4d700/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java index a92856c..d0ea276 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandler.java @@ -21,11 +21,14 @@ import java.io.IOException; import org.apache.activemq.broker.Locker; import org.apache.activemq.broker.SuppressReplyException; import org.apache.activemq.util.DefaultIOExceptionHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @org.apache.xbean.XBean */ public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler { + private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandler.class); public JDBCIOExceptionHandler() { setIgnoreSQLExceptions(false); @@ -49,6 +52,7 @@ public class JDBCIOExceptionHandler extends DefaultIOExceptionHandler { } if (!hasLock) { + LOG.warn("Lock keepAlive failed, no longer lock owner with: {}", locker); throw new IOException("Lock keepAlive failed, no longer lock owner with: " + locker); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/77b4d700/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java new file mode 100644 index 0000000..dd5d506 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerMockeryTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.jdbc; + +import java.io.IOException; +import java.util.HashMap; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.Locker; +import org.apache.activemq.broker.SuppressReplyException; +import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.Wait; +import org.jmock.Expectations; +import org.jmock.Mockery; +import org.jmock.States; +import org.jmock.lib.legacy.ClassImposteriser; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JDBCIOExceptionHandlerMockeryTest { + + private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerMockeryTest.class); + private HashMap<Thread, Throwable> exceptions = new HashMap<Thread, Throwable>(); + + @Test + public void testShutdownWithoutTransportRestart() throws Exception { + + Mockery context = new Mockery() {{ + setImposteriser(ClassImposteriser.INSTANCE); + }}; + + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("unexpected exception {} on thread {}", e, t); + exceptions.put(t, e); + } + }); + + final BrokerService brokerService = context.mock(BrokerService.class); + final JDBCPersistenceAdapter jdbcPersistenceAdapter = context.mock(JDBCPersistenceAdapter.class); + final Locker locker = context.mock(Locker.class); + + final States jdbcConn = context.states("jdbc").startsAs("down"); + final States broker = context.states("broker").startsAs("started"); + + // simulate jdbc up between hasLock and checkpoint, so hasLock fails to verify + context.checking(new Expectations() {{ + allowing(brokerService).isRestartAllowed(); + will(returnValue(false)); + allowing(brokerService).stopAllConnectors(with(any(ServiceStopper.class))); + allowing(brokerService).getPersistenceAdapter(); + will(returnValue(jdbcPersistenceAdapter)); + allowing(jdbcPersistenceAdapter).getLocker(); + will(returnValue(locker)); + allowing(locker).keepAlive(); + when(jdbcConn.is("down")); + will(returnValue(true)); + allowing(locker).keepAlive(); + when(jdbcConn.is("up")); + will(returnValue(false)); + + allowing(jdbcPersistenceAdapter).checkpoint(with(true)); + then(jdbcConn.is("up")); + allowing(brokerService).stop(); + then(broker.is("stopped")); + + }}); + + JDBCIOExceptionHandler underTest = new JDBCIOExceptionHandler(); + underTest.setBrokerService(brokerService); + + try { + underTest.handle(new IOException()); + fail("except suppress reply ex"); + } catch (SuppressReplyException expected) { + } + + assertTrue("broker stopped state triggered", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("broker state {}", broker); + return broker.is("stopped").isActive(); + } + })); + context.assertIsSatisfied(); + + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } +}
