Author: gtully
Date: Wed Aug 4 12:56:42 2010
New Revision: 982240
URL: http://svn.apache.org/viewvc?rev=982240&view=rev
Log:
revert http://fisheye6.atlassian.com/changelog/activemq/?cs=958009 - concurrent
dispatch with transaction completion breaks when the cache is disabled, either
through memory limit or through durable subs with existing messages in the
store. cannot see the use case though as the transaction semantic is broken if
a message can get dispatched but a commit can still fail. With dispatch
ocurring postCommit like before, the option for kahaDB to do concurrent
transaction dispatch is disabled
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=982240&r1=982239&r2=982240&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Aug 4 12:56:42 2010
@@ -679,7 +679,7 @@ public class Queue extends BaseDestinati
context.getTransaction().addSynchronization(new
Synchronization() {
@Override
- public void beforeCommit() throws Exception {
+ public void afterCommit() throws Exception {
sendLock.lockInterruptibly();
try {
// It could take while before we receive the commit
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=982240&r1=982239&r2=982240&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Aug 4 12:56:42 2010
@@ -452,7 +452,7 @@ public class Topic extends BaseDestinati
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
- public void beforeCommit() throws Exception {
+ public void afterCommit() throws Exception {
// It could take while before we receive the commit
// operration.. by that time the message could have
// expired..
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=982240&r1=982239&r2=982240&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Wed Aug 4 12:56:42 2010
@@ -460,21 +460,6 @@ public class KahaDBPersistenceAdapter im
letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
}
- /**
- * @return the concurrentStoreAndDispatchTransactions
- */
- public boolean isConcurrentStoreAndDispatchTransactions() {
- return letter.isConcurrentStoreAndDispatchTransactions();
- }
-
- /**
- * @param concurrentStoreAndDispatchTransactions
- * the concurrentStoreAndDispatchTransactions to set
- */
- public void setConcurrentStoreAndDispatchTransactions(boolean
concurrentStoreAndDispatchTransactions) {
-
letter.setConcurrentStoreAndDispatchTransactions(concurrentStoreAndDispatchTransactions);
- }
-
public int getMaxAsyncJobs() {
return letter.getMaxAsyncJobs();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=982240&r1=982239&r2=982240&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
Wed Aug 4 12:56:42 2010
@@ -106,7 +106,7 @@ public class KahaDBStore extends Message
Semaphore globalTopicSemaphore;
private boolean concurrentStoreAndDispatchQueues = true;
private boolean concurrentStoreAndDispatchTopics = true;
- private boolean concurrentStoreAndDispatchTransactions = true;
+ private boolean concurrentStoreAndDispatchTransactions = false;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore;
@@ -155,21 +155,10 @@ public class KahaDBStore extends Message
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
}
- /**
- * @return the concurrentStoreAndDispatchTransactions
- */
public boolean isConcurrentStoreAndDispatchTransactions() {
return this.concurrentStoreAndDispatchTransactions;
}
-
- /**
- * @param concurrentStoreAndDispatchTransactions
- * the concurrentStoreAndDispatchTransactions to set
- */
- public void setConcurrentStoreAndDispatchTransactions(boolean
concurrentStoreAndDispatchTransactions) {
- this.concurrentStoreAndDispatchTransactions =
concurrentStoreAndDispatchTransactions;
- }
-
+
/**
* @return the maxAsyncJobs
*/
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=982240&r1=982239&r2=982240&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
Wed Aug 4 12:56:42 2010
@@ -217,7 +217,6 @@ public class KahaDBTransactionStore impl
}
/**
- * @throws XAException
* @see
org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared, Runnable
preCommit, Runnable postCommit)
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java?rev=982240&r1=982239&r2=982240&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java
Wed Aug 4 12:56:42 2010
@@ -42,7 +42,6 @@ public class TransactedTopicMasterSlaveT
File dir = new File ("target" + File.separator + "slave");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(dir);
- adapter.setConcurrentStoreAndDispatchTransactions(false);
broker.start();
slave = new BrokerService();
slave.setBrokerName("slave");
@@ -73,7 +72,6 @@ public class TransactedTopicMasterSlaveT
File dir = new File ("target" + File.separator + "master");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setDirectory(dir);
- adapter.setConcurrentStoreAndDispatchTransactions(false);
BrokerService broker = new BrokerService();
broker.setBrokerName("master");
broker.setPersistenceAdapter(adapter);