Repository: activemq Updated Branches: refs/heads/trunk ec35588e5 -> d60022ec6
https://issues.apache.org/jira/browse/AMQ-5162 - handle io exception in lease database locker start Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d60022ec Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d60022ec Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d60022ec Branch: refs/heads/trunk Commit: d60022ec65060619262b498c44854fd6a6e5e5e7 Parents: ec35588 Author: Dejan Bosanac <[email protected]> Authored: Fri Apr 25 15:08:05 2014 +0200 Committer: Dejan Bosanac <[email protected]> Committed: Fri Apr 25 15:08:05 2014 +0200 ---------------------------------------------------------------------- .../store/jdbc/LeaseDatabaseLocker.java | 7 ++ .../store/jdbc/JDBCIOExceptionHandlerTest.java | 83 +++++++++++++++++++- 2 files changed, 87 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/d60022ec/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java ---------------------------------------------------------------------- diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java index 39f8cfe..3e35ee8 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/LeaseDatabaseLocker.java @@ -83,6 +83,13 @@ public class LeaseDatabaseLocker extends AbstractJDBCLocker { } catch (Exception e) { LOG.debug(getLeaseHolderId() + " lease acquire failure: "+ e, e); + if (isStopping()) { + throw new Exception( + "Cannot start broker as being asked to shut down. " + + "Interrupted attempt to acquire lock: " + + e, e); + } + lockable.getBrokerService().handleIOException(IOExceptionSupport.create(e)); } finally { close(statement); close(connection); http://git-wip-us.apache.org/repos/asf/activemq/blob/d60022ec/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java index b9ffab8..50d1904 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java @@ -20,6 +20,7 @@ import java.io.PrintWriter; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; @@ -27,6 +28,7 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; import org.apache.derby.jdbc.EmbeddedDataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +49,12 @@ public class JDBCIOExceptionHandlerTest extends TestCase { private BrokerService broker; protected BrokerService createBroker(boolean withJMX) throws Exception { + return createBroker("localhost", withJMX, true, true); + } + + protected BrokerService createBroker(String name, boolean withJMX, boolean leaseLocker, boolean startStopConnectors) throws Exception { BrokerService broker = new BrokerService(); + broker.setBrokerName(name); broker.setUseJmx(withJMX); @@ -63,13 +70,16 @@ public class JDBCIOExceptionHandlerTest extends TestCase { jdbc.setDataSource(dataSource); jdbc.setLockKeepAlivePeriod(1000l); - LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); - leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); - jdbc.setLocker(leaseDatabaseLocker); + if (leaseLocker) { + LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); + leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); + jdbc.setLocker(leaseDatabaseLocker); + } broker.setPersistenceAdapter(jdbc); JDBCIOExceptionHandler jdbcioExceptionHandler = new JDBCIOExceptionHandler(); jdbcioExceptionHandler.setResumeCheckSleepPeriod(1000l); + jdbcioExceptionHandler.setStopStartConnectors(startStopConnectors); broker.setIoExceptionHandler(jdbcioExceptionHandler); String connectionUri = broker.addConnector(TRANSPORT_URL).getPublishableConnectString(); @@ -92,6 +102,73 @@ public class JDBCIOExceptionHandlerTest extends TestCase { recoverFromDisconnectDB(true); } + public void testSlaveStoppedLease() throws Exception { + testSlaveStopped(true); + } + + public void testSlaveStoppedDefault() throws Exception { + testSlaveStopped(false); + } + + public void testSlaveStopped(final boolean lease) throws Exception { + final BrokerService master = createBroker("master", true, lease, false); + master.start(); + master.waitUntilStarted(); + + final AtomicReference<BrokerService> slave = new AtomicReference<BrokerService>(); + + Thread slaveThread = new Thread() { + public void run() { + try { + BrokerService broker = new BrokerService(); + broker.setBrokerName("slave"); + + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); + jdbc.setDataSource(dataSource); + + jdbc.setLockKeepAlivePeriod(1000l); + + if (lease) { + LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); + leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); + jdbc.setLocker(leaseDatabaseLocker); + } + + broker.setPersistenceAdapter(jdbc); + JDBCIOExceptionHandler jdbcioExceptionHandler = new JDBCIOExceptionHandler(); + jdbcioExceptionHandler.setResumeCheckSleepPeriod(1000l); + jdbcioExceptionHandler.setStopStartConnectors(false); + broker.setIoExceptionHandler(jdbcioExceptionHandler); + slave.set(broker); + broker.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + + slaveThread.start(); + + Thread.sleep(5000); + + dataSource.stopDB(); + + assertTrue("Master hasn't been stopped", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return master.isStopped(); + } + })); + + assertTrue("Slave hasn't been stopped", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return slave.get().isStopped(); + } + })); + + } + public void recoverFromDisconnectDB(boolean withJMX) throws Exception { try { broker = createBroker(withJMX);
