Author: chirino
Date: Thu Aug 14 10:22:15 2008
New Revision: 685966
URL: http://svn.apache.org/viewvc?rev=685966&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1886
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?rev=685966&r1=685965&r2=685966&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
Thu Aug 14 10:22:15 2008
@@ -134,7 +134,7 @@
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
- transactionStore = new MemoryTransactionStore();
+ transactionStore = new MemoryTransactionStore(this);
}
return this.transactionStore;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java?rev=685966&r1=685965&r2=685966&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java
Thu Aug 14 10:22:15 2008
@@ -89,7 +89,7 @@
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
- transactionStore = new MemoryTransactionStore();
+ transactionStore = new MemoryTransactionStore(this);
}
return transactionStore;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=685966&r1=685965&r2=685966&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
Thu Aug 14 10:22:15 2008
@@ -29,6 +29,7 @@
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
@@ -44,12 +45,12 @@
public class MemoryTransactionStore implements TransactionStore {
ConcurrentHashMap<Object, Tx> inflightTransactions = new
ConcurrentHashMap<Object, Tx>();
-
ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new
ConcurrentHashMap<TransactionId, Tx>();
+ final PersistenceAdapter persistenceAdapter;
private boolean doingRecover;
- public static class Tx {
+ public class Tx {
private ArrayList<AddMessageCommand> messages = new
ArrayList<AddMessageCommand>();
private ArrayList<RemoveMessageCommand> acks = new
ArrayList<RemoveMessageCommand>();
@@ -86,29 +87,43 @@
* @throws IOException
*/
public void commit() throws IOException {
- // Do all the message adds.
- for (Iterator<AddMessageCommand> iter = messages.iterator();
iter.hasNext();) {
- AddMessageCommand cmd = iter.next();
- cmd.run();
- }
- // And removes..
- for (Iterator<RemoveMessageCommand> iter = acks.iterator();
iter.hasNext();) {
- RemoveMessageCommand cmd = iter.next();
- cmd.run();
+ ConnectionContext ctx = new ConnectionContext();
+ persistenceAdapter.beginTransaction(ctx);
+ try {
+
+ // Do all the message adds.
+ for (Iterator<AddMessageCommand> iter = messages.iterator();
iter.hasNext();) {
+ AddMessageCommand cmd = iter.next();
+ cmd.run(ctx);
+ }
+ // And removes..
+ for (Iterator<RemoveMessageCommand> iter = acks.iterator();
iter.hasNext();) {
+ RemoveMessageCommand cmd = iter.next();
+ cmd.run(ctx);
+ }
+
+ } catch ( IOException e ) {
+ persistenceAdapter.rollbackTransaction(ctx);
+ throw e;
}
+ persistenceAdapter.commitTransaction(ctx);
}
}
-
+
public interface AddMessageCommand {
Message getMessage();
- void run() throws IOException;
+ void run(ConnectionContext context) throws IOException;
}
public interface RemoveMessageCommand {
MessageAck getMessageAck();
- void run() throws IOException;
+ void run(ConnectionContext context) throws IOException;
+ }
+
+ public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
+ this.persistenceAdapter=persistenceAdapter;
}
public MessageStore proxy(MessageStore messageStore) {
@@ -221,15 +236,16 @@
return message;
}
- public void run() throws IOException {
- destination.addMessage(null, message);
+ public void run(ConnectionContext ctx) throws IOException {
+ destination.addMessage(ctx, message);
}
+
});
} else {
destination.addMessage(null, message);
}
}
-
+
/**
* @param ack
* @throws IOException
@@ -246,8 +262,8 @@
return ack;
}
- public void run() throws IOException {
- destination.removeMessage(null, ack);
+ public void run(ConnectionContext ctx) throws IOException {
+ destination.removeMessage(ctx, ack);
}
});
} else {