Author: gtully
Date: Thu Aug 14 11:33:48 2008
New Revision: 685988
URL: http://svn.apache.org/viewvc?rev=685988&view=rev
Log:
fix for AMQ-1885, allow jdbc slave broker to out live a db outage
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
(with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Thu Aug 14 11:33:48 2008
@@ -176,6 +176,13 @@
LOG.warn("No databaseLocker configured for the JDBC
Persistence Adapter");
} else {
service.start();
+ if (lockKeepAlivePeriod > 0) {
+ getScheduledThreadPoolExecutor().scheduleAtFixedRate(new
Runnable() {
+ public void run() {
+ databaseLockKeepAlive();
+ }
+ }, lockKeepAlivePeriod, lockKeepAlivePeriod,
TimeUnit.MILLISECONDS);
+ }
if (brokerService != null) {
brokerService.getBroker().nowMasterBroker();
}
@@ -258,13 +265,6 @@
public DatabaseLocker getDatabaseLocker() throws IOException {
if (databaseLocker == null) {
databaseLocker = createDatabaseLocker();
- if (lockKeepAlivePeriod > 0) {
- getScheduledThreadPoolExecutor().scheduleAtFixedRate(new
Runnable() {
- public void run() {
- databaseLockKeepAlive();
- }
- }, lockKeepAlivePeriod, lockKeepAlivePeriod,
TimeUnit.MILLISECONDS);
- }
}
return databaseLocker;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
Thu Aug 14 11:33:48 2008
@@ -122,6 +122,7 @@
// This will fail usually since the tables will be
// created already.
try {
+ LOG.debug("Executing SQL: " + dropStatments[i]);
s.execute(dropStatments[i]);
} catch (SQLException e) {
LOG.warn("Could not drop JDBC tables; they may not exist."
+ " Failure was: "
@@ -187,7 +188,9 @@
}
} finally {
if (!batchStatments) {
- s.close();
+ if (s!=null) {
+ s.close();
+ }
}
}
}
Modified:
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
(original)
+++
activemq/trunk/activemq-core/src/main/resources/META-INF/services/org/apache/activemq/store/jdbc/apache_derby_embedded_jdbc_driver
Thu Aug 14 11:33:48 2008
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter
\ No newline at end of file
+class=org.apache.activemq.store.jdbc.adapter.DB2JDBCAdapter
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java
Thu Aug 14 11:33:48 2008
@@ -85,7 +85,6 @@
*/
public void testSendReceive() throws Exception {
messages.clear();
-
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(data[i]);
message.setStringProperty("stringProperty", data[i]);
@@ -97,7 +96,7 @@
}
}
- producer.send(producerDestination, message);
+ sendToProducer(producer, producerDestination, message);
messageSent();
}
@@ -106,6 +105,18 @@
}
/**
+ * Sends a message to a destination using the supplied producer
+ * @param producer
+ * @param producerDestination
+ * @param message
+ * @throws JMSException
+ */
+ protected void sendToProducer(MessageProducer producer,
+ Destination producerDestination, Message message) throws
JMSException {
+ producer.send(producerDestination, message);
+ }
+
+ /**
* Asserts messages are received.
*
* @throws JMSException
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java?rev=685988&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
Thu Aug 14 11:33:48 2008
@@ -0,0 +1,53 @@
+package org.apache.activemq.broker.ft;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class DbRestartJDBCQueueMasterSlaveTest extends
JDBCQueueMasterSlaveTest {
+ private static final transient Log LOG =
LogFactory.getLog(DbRestartJDBCQueueMasterSlaveTest.class);
+
+ protected void messageSent() throws Exception {
+ if (++inflightMessageCount == failureCount) {
+ final EmbeddedDataSource ds = getExistingDataSource();
+ ds.setShutdownDatabase("shutdown");
+ LOG.info("DB [EMAIL PROTECTED]");
+
+ Thread dbRestartThread = new Thread("db-re-start-thread") {
+ public void run() {
+ LOG.info("Waiting for master broker to Stop");
+ master.waitUntilStopped();
+ ds.setShutdownDatabase("false");
+ LOG.info("DB [EMAIL PROTECTED]");
+ }
+ };
+ dbRestartThread.start();
+ }
+ }
+
+ protected void sendToProducer(MessageProducer producer,
+ Destination producerDestination, Message message) throws
JMSException {
+ {
+ // do some retries as db failures filter back to the client until
broker sees
+ // db lock failure and shuts down
+ boolean sent = false;
+ do {
+ try {
+ producer.send(producerDestination, message);
+ sent = true;
+ } catch (JMSException e) {
+ LOG.info("Exception on producer send:", e);
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ } while(!sent);
+ }
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/DbRestartJDBCQueueMasterSlaveTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java?rev=685988&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
Thu Aug 14 11:33:48 2008
@@ -0,0 +1,62 @@
+package org.apache.activemq.broker.ft;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.jdbc.DataSourceSupport;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+
+public class JDBCQueueMasterSlaveTest extends QueueMasterSlaveTest {
+ protected EmbeddedDataSource sharedDs;
+ protected String MASTER_URL = "tcp://localhost:62001";
+ protected String SLAVE_URL = "tcp://localhost:62002";
+
+ protected void setUp() throws Exception {
+ // startup db
+ sharedDs = (EmbeddedDataSource) new
DataSourceSupport().getDataSource();
+ super.setUp();
+ }
+
+ protected void createMaster() throws Exception {
+ master = new BrokerService();
+ master.addConnector(MASTER_URL);
+ master.setUseJmx(false);
+ master.setPersistent(true);
+ master.setDeleteAllMessagesOnStartup(true);
+ JDBCPersistenceAdapter persistenceAdapter = new
JDBCPersistenceAdapter();
+ persistenceAdapter.setDataSource(getExistingDataSource());
+ persistenceAdapter.setLockKeepAlivePeriod(500);
+ master.setPersistenceAdapter(persistenceAdapter);
+ master.start();
+ }
+
+ protected void createSlave() throws Exception {
+ // use a separate thread as the slave will block waiting for
+ // the exclusive db lock
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ BrokerService broker = new BrokerService();
+ broker.addConnector(SLAVE_URL);
+ // no need for
broker.setMasterConnectorURI(masterConnectorURI)
+ // as the db lock provides the slave/master initialisation
+ broker.setUseJmx(false);
+ broker.setPersistent(true);
+ JDBCPersistenceAdapter persistenceAdapter = new
JDBCPersistenceAdapter();
+ persistenceAdapter.setDataSource(getExistingDataSource());
+ persistenceAdapter.setCreateTablesOnStartup(false);
+ broker.setPersistenceAdapter(persistenceAdapter);
+ broker.start();
+ slave.set(broker);
+ slaveStarted.countDown();
+ } catch (Exception e) {
+ fail("failed to start slave broker, reason:" + e);
+ }
+ }
+ };
+ t.start();
+ }
+
+ protected EmbeddedDataSource getExistingDataSource() throws Exception {
+ return sharedDs;
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/JDBCQueueMasterSlaveTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=685988&r1=685987&r2=685988&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
Thu Aug 14 11:33:48 2008
@@ -82,8 +82,7 @@
}
protected void messageSent() throws Exception {
- if (++inflightMessageCount >= failureCount) {
- inflightMessageCount = 0;
+ if (++inflightMessageCount == failureCount) {
Thread.sleep(1000);
LOG.error("MASTER [EMAIL PROTECTED]");
master.stop();