Author: gtully
Date: Mon Mar 28 11:00:06 2011
New Revision: 1086182
URL: http://svn.apache.org/viewvc?rev=1086182&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-1780 - ActiveMQ broker does not
automatically reconnect if the connection to the database is lost - extend the
DefaultIOExceptionHandler
http://activemq.apache.org/configurable-ioexception-handling.html to be aware
of sql exceptions and provide a connector stop/resume option. This is now
called in the event of a failure to get a jdbc connection, the default
behaviour is to stop as before, but new iptions to ignore some sql exceptions
or to stop/resume connectors are supported which allow a db restart to be
recovered without restarting the broker
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Mon Mar 28 11:00:06 2011
@@ -1639,7 +1639,7 @@ public class BrokerService implements Se
}
}
- protected void stopAllConnectors(ServiceStopper stopper) {
+ public void stopAllConnectors(ServiceStopper stopper) {
for (Iterator<NetworkConnector> iter =
getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = iter.next();
unregisterNetworkConnectorMBean(connector);
@@ -2063,7 +2063,7 @@ public class BrokerService implements Se
*
* @throws Exception
*/
- protected void startAllConnectors() throws Exception {
+ public void startAllConnectors() throws Exception {
if (!isSlave()) {
Set<ActiveMQDestination> durableDestinations =
getBroker().getDurableDestinations();
List<TransportConnector> al = new ArrayList<TransportConnector>();
@@ -2330,12 +2330,22 @@ public class BrokerService implements Se
public void setPassiveSlave(boolean passiveSlave) {
this.passiveSlave = passiveSlave;
}
-
+
+ /**
+ * override the Default IOException handler, called when persistence
adapter
+ * has experiences File or JDBC I/O Exceptions
+ *
+ * @param ioExceptionHandler
+ */
public void setIoExceptionHandler(IOExceptionHandler ioExceptionHandler) {
- ioExceptionHandler.setBrokerService(this);
+ configureService(ioExceptionHandler);
this.ioExceptionHandler = ioExceptionHandler;
}
+ public IOExceptionHandler getIoExceptionHandler() {
+ return ioExceptionHandler;
+ }
+
/**
* @return the schedulerSupport
*/
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DefaultDatabaseLocker.java
Mon Mar 28 11:00:06 2011
@@ -88,7 +88,7 @@ public class DefaultDatabaseLocker imple
+
exceptionHandler.getClass().getCanonicalName()
+ " threw this exception: "
+ handlerException
- + " while trying to handle this excpetion:
"
+ + " while trying to handle this exception:
"
+ e, handlerException);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Mon Mar 28 11:00:06 2011
@@ -18,6 +18,7 @@ package org.apache.activemq.store.jdbc;
import java.io.File;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Set;
@@ -489,7 +490,7 @@ public class JDBCPersistenceAdapter exte
}
public TransactionContext getTransactionContext() throws IOException {
- TransactionContext answer = new TransactionContext(getDataSource());
+ TransactionContext answer = new TransactionContext(this);
if (transactionIsolation > 0) {
answer.setTransactionIsolation(transactionIsolation);
}
@@ -619,7 +620,7 @@ public class JDBCPersistenceAdapter exte
try {
brokerService.stop();
} catch (Exception e) {
- LOG.warn("Failure occured while stopping broker");
+ LOG.warn("Failure occurred while stopping broker");
}
}
@@ -642,7 +643,23 @@ public class JDBCPersistenceAdapter exte
public void setDirectory(File dir) {
}
+ // interesting bit here is proof that DB is ok
public void checkpoint(boolean sync) throws IOException {
+ // by pass TransactionContext to avoid IO Exception handler
+ Connection connection = null;
+ try {
+ connection = getDataSource().getConnection();
+ } catch (SQLException e) {
+ LOG.debug("Could not get JDBC connection for checkpoint: " + e);
+ throw IOExceptionSupport.create(e);
+ } finally {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Throwable ignored) {
+ }
+ }
+ }
}
public long size(){
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/TransactionContext.java
Mon Mar 28 11:00:06 2011
@@ -38,6 +38,7 @@ public class TransactionContext {
private static final Logger LOG =
LoggerFactory.getLogger(TransactionContext.class);
private final DataSource dataSource;
+ private final JDBCPersistenceAdapter persistenceAdapter;
private Connection connection;
private boolean inTx;
private PreparedStatement addMessageStatement;
@@ -46,8 +47,9 @@ public class TransactionContext {
// a cheap dirty level that we can live with
private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
- public TransactionContext(DataSource dataSource) {
- this.dataSource = dataSource;
+ public TransactionContext(JDBCPersistenceAdapter persistenceAdapter)
throws IOException {
+ this.persistenceAdapter = persistenceAdapter;
+ this.dataSource = persistenceAdapter.getDataSource();
}
public Connection getConnection() throws IOException {
@@ -60,7 +62,10 @@ public class TransactionContext {
}
} catch (SQLException e) {
JDBCPersistenceAdapter.log("Could not get JDBC connection: ",
e);
- throw IOExceptionSupport.create(e);
+ IOException ioe = IOExceptionSupport.create(e);
+ persistenceAdapter.getBrokerService().handleIOException(ioe);
+ throw ioe;
+
}
try {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/DefaultIOExceptionHandler.java
Mon Mar 28 11:00:06 2011
@@ -17,26 +17,37 @@
package org.apache.activemq.util;
import java.io.IOException;
+import java.sql.SQLException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DefaultIOExceptionHandler implements IOExceptionHandler {
+/**
+ * @org.apache.xbean.XBean
+ */
+ public class DefaultIOExceptionHandler implements IOExceptionHandler {
private static final Logger LOG = LoggerFactory
.getLogger(DefaultIOExceptionHandler.class);
private BrokerService broker;
private boolean ignoreAllErrors = false;
private boolean ignoreNoSpaceErrors = true;
+ private boolean ignoreSQLExceptions = true;
+ private boolean stopStartConnectors = false;
private String noSpaceMessage = "space";
+ private String sqlExceptionMessage = ""; // match all
+ private long resumeCheckSleepPeriod = 5*1000;
+ private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
public void handle(IOException exception) {
if (ignoreAllErrors) {
LOG.info("Ignoring IO exception, " + exception, exception);
return;
}
-
+
if (ignoreNoSpaceErrors) {
Throwable cause = exception;
while (cause != null && cause instanceof IOException) {
@@ -48,13 +59,71 @@ public class DefaultIOExceptionHandler i
}
}
+ if (ignoreSQLExceptions) {
+ Throwable cause = exception;
+ while (cause != null) {
+ if (cause instanceof SQLException &&
cause.getMessage().contains(sqlExceptionMessage)) {
+ LOG.info("Ignoring SQLException, " + exception, cause);
+ return;
+ }
+ cause = cause.getCause();
+ }
+ }
+
+ if (stopStartConnectors) {
+ if (!stopStartInProgress.compareAndSet(false, true)) {
+ // we are already working on it
+ return;
+ }
+ LOG.info("Initiating stop/restart of broker transport due to IO
exception, " + exception, exception);
+
+ new Thread("stop transport connectors on IO exception") {
+ public void run() {
+ try {
+ ServiceStopper stopper = new ServiceStopper();
+ broker.stopAllConnectors(stopper);
+ } catch (Exception e) {
+ LOG.warn("Failure occurred while stopping broker
connectors", e);
+ }
+ }
+ }.start();
+
+ // resume again
+ new Thread("restart transport connectors post IO exception") {
+ public void run() {
+ try {
+ while (isPersistenceAdapterDown()) {
+ LOG.info("waiting for broker persistence adapter
checkpoint to succeed before restarting transports");
+
TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
+ }
+ broker.startAllConnectors();
+ } catch (Exception e) {
+ LOG.warn("Failure occurred while restarting broker
connectors", e);
+ } finally {
+ stopStartInProgress.compareAndSet(true, false);
+ }
+ }
+
+ private boolean isPersistenceAdapterDown() {
+ boolean checkpointSuccess = false;
+ try {
+ broker.getPersistenceAdapter().checkpoint(true);
+ checkpointSuccess = true;
+ } catch (Throwable ignored) {}
+ return !checkpointSuccess;
+ }
+ }.start();
+
+ return;
+ }
+
LOG.info("Stopping the broker due to IO exception, " + exception,
exception);
- new Thread() {
+ new Thread("Stopping the broker due to IO exception") {
public void run() {
try {
broker.stop();
} catch (Exception e) {
- LOG.warn("Failure occured while stopping broker", e);
+ LOG.warn("Failure occurred while stopping broker", e);
}
}
}.start();
@@ -88,4 +157,35 @@ public class DefaultIOExceptionHandler i
this.noSpaceMessage = noSpaceMessage;
}
+ public boolean isIgnoreSQLExceptions() {
+ return ignoreSQLExceptions;
+ }
+
+ public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
+ this.ignoreSQLExceptions = ignoreSQLExceptions;
+ }
+
+ public String getSqlExceptionMessage() {
+ return sqlExceptionMessage;
+ }
+
+ public void setSqlExceptionMessage(String sqlExceptionMessage) {
+ this.sqlExceptionMessage = sqlExceptionMessage;
+ }
+
+ public boolean isStopStartConnectors() {
+ return stopStartConnectors;
+ }
+
+ public void setStopStartConnectors(boolean stopStartConnectors) {
+ this.stopStartConnectors = stopStartConnectors;
+ }
+
+ public long getResumeCheckSleepPeriod() {
+ return resumeCheckSleepPeriod;
+ }
+
+ public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
+ this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
+ }
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java?rev=1086182&r1=1086181&r2=1086182&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTopicSendReceiveWithTwoConnectionsTest.java
Mon Mar 28 11:00:06 2011
@@ -49,8 +49,8 @@ public class JmsTopicSendReceiveWithTwoC
LOG.info("Created sendConnection: " + sendConnection);
LOG.info("Created receiveConnection: " + receiveConnection);
- session = sendConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- receiveSession = receiveConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ session = createSendSession(sendConnection);
+ receiveSession = createReceiveSession(receiveConnection);
LOG.info("Created sendSession: " + session);
LOG.info("Created receiveSession: " + receiveSession);
@@ -80,6 +80,14 @@ public class JmsTopicSendReceiveWithTwoC
LOG.info("Started connections");
}
+ protected Session createReceiveSession(Connection receiveConnection)
throws Exception {
+ return receiveConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ }
+
+ protected Session createSendSession(Connection sendConnection) throws
Exception {
+ return sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
protected Connection createReceiveConnection() throws Exception {
return createConnection();
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java?rev=1086182&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
Mon Mar 28 11:00:06 2011
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DbRestartJDBCQueueTest extends
JmsTopicSendReceiveWithTwoConnectionsTest implements ExceptionListener {
+ private static final transient Logger LOG =
LoggerFactory.getLogger(DbRestartJDBCQueueTest.class);
+
+ public boolean transactedSends = false;
+ public int failureCount = 25; // or 20 for even tx batch boundary
+
+ int inflightMessageCount = 0;
+ EmbeddedDataSource sharedDs;
+ BrokerService broker;
+ final CountDownLatch restartDBLatch = new CountDownLatch(1);
+
+ protected void setUp() throws Exception {
+ setAutoFail(true);
+ topic = false;
+ verbose = true;
+ // startup db
+ sharedDs = (EmbeddedDataSource) new
DataSourceSupport().getDataSource();
+
+ broker = new BrokerService();
+
+ DefaultIOExceptionHandler handler = new DefaultIOExceptionHandler();
+ handler.setIgnoreSQLExceptions(false);
+ handler.setStopStartConnectors(true);
+ broker.setIoExceptionHandler(handler);
+ broker.addConnector("tcp://localhost:0");
+ broker.setUseJmx(false);
+ broker.setPersistent(true);
+ broker.setDeleteAllMessagesOnStartup(true);
+ JDBCPersistenceAdapter persistenceAdapter = new
JDBCPersistenceAdapter();
+ persistenceAdapter.setDataSource(sharedDs);
+ persistenceAdapter.setUseDatabaseLock(false);
+ persistenceAdapter.setLockKeepAlivePeriod(500);
+ persistenceAdapter.setLockAcquireSleepInterval(500);
+ broker.setPersistenceAdapter(persistenceAdapter);
+ broker.start();
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ broker.stop();
+ }
+
+
+ protected Session createSendSession(Connection sendConnection) throws
Exception {
+ if (transactedSends) {
+ return sendConnection.createSession(true,
Session.SESSION_TRANSACTED);
+ } else {
+ return sendConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
+ ActiveMQConnectionFactory f =
+ new ActiveMQConnectionFactory("failover://" +
broker.getTransportConnectors().get(0).getPublishableConnectString());
+ f.setExceptionListener(this);
+ return f;
+ }
+
+ @Override
+ protected void messageSent() throws Exception {
+ if (++inflightMessageCount == failureCount) {
+ LOG.info("STOPPING DB!@!!!!");
+ final EmbeddedDataSource ds = sharedDs;
+ ds.setShutdownDatabase("shutdown");
+ try {
+ ds.getConnection();
+ } catch (Exception ignored) {
+ }
+ LOG.info("DB STOPPED!@!!!!");
+
+ Thread dbRestartThread = new Thread("db-re-start-thread") {
+ public void run() {
+ LOG.info("Sleeping for 10 seconds before allowing db
restart");
+ try {
+ restartDBLatch.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ ds.setShutdownDatabase("false");
+ LOG.info("DB RESTARTED!@!!!!");
+ }
+ };
+ dbRestartThread.start();
+ }
+ }
+
+ protected void sendToProducer(MessageProducer producer,
+ Destination producerDestination, Message message) throws
JMSException {
+ {
+ // do some retries as db failures filter back to the client until
broker sees
+ // db lock failure and shuts down
+ boolean sent = false;
+ do {
+ try {
+ producer.send(producerDestination, message);
+
+ if (transactedSends && ((inflightMessageCount+1) %10 == 0
|| (inflightMessageCount+1) >= messageCount)) {
+ LOG.info("committing on send: " + inflightMessageCount
+ " message: " + message);
+ session.commit();
+ }
+
+ sent = true;
+ } catch (JMSException e) {
+ LOG.info("Exception on producer send:", e);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ } while(!sent);
+
+ }
+ }
+
+ @Override
+ public void onException(JMSException exception) {
+ LOG.error("exception on connection: ", exception);
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date