Author: gtully
Date: Fri Aug 13 11:02:34 2010
New Revision: 985155
URL: http://svn.apache.org/viewvc?rev=985155&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2868 - make it bullit
proof with a transaction completion sync that serializes updates to a cusror
and store so that they are always in order
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=985155&r1=985154&r2=985155&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
Fri Aug 13 11:02:34 2010
@@ -259,7 +259,10 @@ public class KahaDBTransactionStore impl
} else {
KahaTransactionInfo info = getTransactionInfo(txid);
- theStore.store(new
KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
+ // ensure message order w.r.t to cursor and store for
setBatch()
+ synchronized (this) {
+ theStore.store(new
KahaCommitCommand().setTransactionInfo(info), true, preCommit, postCommit);
+ }
}
}else {
LOG.error("Null transaction passed on commit");
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java?rev=985155&r1=985154&r2=985155&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java
Fri Aug 13 11:02:34 2010
@@ -217,11 +217,13 @@ public class MemoryTransactionStore impl
}
return;
}
- tx.commit();
- if (postCommit != null) {
- postCommit.run();
+ // ensure message order w.r.t to cursor and store for setBatch()
+ synchronized (this) {
+ tx.commit();
+ if (postCommit != null) {
+ postCommit.run();
+ }
}
-
}
/**
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=985155&r1=985154&r2=985155&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Fri Aug 13 11:02:34 2010
@@ -41,6 +41,7 @@ import javax.management.ObjectName;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -77,7 +78,7 @@ import org.apache.commons.logging.LogFac
* 2) transacted
*
*/
-public class NegativeQueueTest extends TestCase {
+public class NegativeQueueTest extends AutoFailTestSupport {
private static final Log LOG = LogFactory.getLog(NegativeQueueTest.class);
public static SimpleDateFormat formatter = new
SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
@@ -92,7 +93,7 @@ public class NegativeQueueTest extends T
private static final int MESSAGE_COUNT = 2000;
protected static final boolean TRANSACTED = true;
- protected static final boolean DEBUG = false;
+ protected static final boolean DEBUG = true;
protected static int NUM_CONSUMERS = 20;
protected static int PREFETCH_SIZE = 1000;
@@ -211,7 +212,25 @@ public class NegativeQueueTest extends T
consumer.setMessageListener(new
SessionAwareMessageListener(consumerSession, latch2, consumerList2));
}
- assertTrue("got all expected messages on 2", latch2.await(300000,
TimeUnit.MILLISECONDS));
+ boolean success = Wait.waitFor(new Wait.Condition() {
+ public boolean isSatisified() throws Exception {
+ boolean done = latch2.await(10, TimeUnit.SECONDS);
+ if(DEBUG){
+ System.out.println("");
+ System.out.println("Queue1 Size =
"+proxyQueue1.getQueueSize());
+ System.out.println("Queue1 Memory % Used =
"+proxyQueue1.getMemoryPercentUsage());
+ System.out.println("Queue2 Size =
"+proxyQueue2.getQueueSize());
+ System.out.println("Queue2 Memory % Used =
"+proxyQueue2.getMemoryPercentUsage());
+ System.out.println("Queue2 Memory Available =
"+proxyQueue2.getMemoryLimit());
+ }
+ return done;
+ }
+ }, 300 * 1000);
+ if (!success) {
+ dumpAllThreads("blocked waiting on 2");
+ }
+ assertTrue("got all expected messages on 2", success);
+
producerConnection.close();
for(int ix=0; ix<NUM_CONSUMERS; ix++){
consumerConnections1[ix].close();