Author: dejanb
Date: Thu Nov 12 12:56:29 2009
New Revision: 835373
URL: http://svn.apache.org/viewvc?rev=835373&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-2042 - fix for kaha persistence
store
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java?rev=835373&r1=835372&r2=835373&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java
Thu Nov 12 12:56:29 2009
@@ -24,6 +24,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -54,7 +56,7 @@
* @org.apache.xbean.XBean
* @version $Revision: 1.4 $
*/
-public class KahaPersistenceAdapter implements PersistenceAdapter {
+public class KahaPersistenceAdapter implements PersistenceAdapter,
BrokerServiceAware {
private static final int STORE_LOCKED_WAIT_DELAY = 10 * 1000;
private static final Log LOG =
LogFactory.getLog(KahaPersistenceAdapter.class);
@@ -73,6 +75,7 @@
private boolean initialized;
private final AtomicLong storeSize;
private boolean persistentIndex = true;
+ private BrokerService brokerService;
public KahaPersistenceAdapter(AtomicLong size) {
@@ -175,6 +178,7 @@
container.setValueMarshaller(new
TransactionMarshaller(wireFormat));
container.load();
transactionStore = new KahaTransactionStore(this,
container);
+ transactionStore.setBrokerService(brokerService);
break;
} catch (StoreLockedExcpetion e) {
LOG.info("Store is locked... waiting " +
(STORE_LOCKED_WAIT_DELAY / 1000)
@@ -361,6 +365,10 @@
wireFormat.setTightEncodingEnabled(true);
}
}
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java?rev=835373&r1=835372&r2=835373&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java
Thu Nov 12 12:56:29 2009
@@ -24,17 +24,23 @@
import javax.transaction.xa.XAException;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.journal.JournalPersistenceAdapter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Provides a TransactionStore implementation that can create transaction aware
@@ -42,10 +48,14 @@
*
* @version $Revision: 1.4 $
*/
-public class KahaTransactionStore implements TransactionStore {
+public class KahaTransactionStore implements TransactionStore,
BrokerServiceAware {
+ private static final Log LOG =
LogFactory.getLog(KahaTransactionStore.class);
+
private Map transactions = new ConcurrentHashMap();
private Map prepared;
private KahaPersistenceAdapter adaptor;
+
+ private BrokerService brokerService;
KahaTransactionStore(KahaPersistenceAdapter adaptor, Map preparedMap) {
this.adaptor = adaptor;
@@ -130,12 +140,17 @@
* @throws IOException
*/
void addMessage(final MessageStore destination, final Message message)
throws IOException {
- if (message.isInTransaction()) {
- KahaTransaction tx = getOrCreateTx(message.getTransactionId());
- tx.add((KahaMessageStore)destination, message);
- } else {
- destination.addMessage(null, message);
- }
+ try {
+ if (message.isInTransaction()) {
+ KahaTransaction tx =
getOrCreateTx(message.getTransactionId());
+ tx.add((KahaMessageStore)destination, message);
+ } else {
+ destination.addMessage(null, message);
+ }
+ } catch (RuntimeStoreException rse) {
+ stopBroker();
+ throw rse;
+ }
}
/**
@@ -143,12 +158,17 @@
* @throws IOException
*/
final void removeMessage(final MessageStore destination, final MessageAck
ack) throws IOException {
- if (ack.isInTransaction()) {
- KahaTransaction tx = getOrCreateTx(ack.getTransactionId());
- tx.add((KahaMessageStore)destination, ack);
- } else {
- destination.removeMessage(null, ack);
- }
+ try {
+ if (ack.isInTransaction()) {
+ KahaTransaction tx =
getOrCreateTx(ack.getTransactionId());
+ tx.add((KahaMessageStore)destination, ack);
+ } else {
+ destination.removeMessage(null, ack);
+ }
+ } catch (RuntimeStoreException rse) {
+ stopBroker();
+ throw rse;
+ }
}
protected synchronized KahaTransaction getTx(TransactionId key) {
@@ -181,4 +201,20 @@
protected MessageStore getStoreById(Object id) {
return adaptor.retrieveMessageStore(id);
}
+
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+
+ protected void stopBroker() {
+ new Thread() {
+ public void run() {
+ try {
+ brokerService.stop();
+ } catch (Exception e) {
+ LOG.warn("Failure occured while stopping broker", e);
+ }
+ }
+ }.start();
+ }
}