Author: gtully
Date: Mon Oct 10 11:47:09 2011
New Revision: 1180884
URL: http://svn.apache.org/viewvc?rev=1180884&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3519 - fix regression in
org.apache.activemq.bugs.AMQ2983Test, transacted ack needs to use async remove
to ensure a concurrent dispatch does not preceed an ack
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java?rev=1180884&r1=1180883&r2=1180884&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
Mon Oct 10 11:47:09 2011
@@ -454,7 +454,7 @@ public class KahaDBTransactionStore impl
if (ack.isInTransaction()) {
if (ack.getTransactionId().isXATransaction() ||
theStore.isConcurrentStoreAndDispatchTransactions()==false) {
- destination.removeMessage(context, ack);
+ destination.removeAsyncMessage(context, ack);
} else {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand(context) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1180884&r1=1180883&r2=1180884&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Mon Oct 10 11:47:09 2011
@@ -1148,7 +1148,6 @@ public abstract class MessageDatabase ex
// store a DUP
// message. Bad BOY! Don't do it, and log a warning.
LOG.warn("Duplicate message add attempt rejected. Destination:
" + command.getDestination().getName() + ", Message id: " +
command.getMessageId());
- // TODO: consider just rolling back the tx.
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
sd.locationIndex.remove(tx, location);
}
@@ -1159,7 +1158,6 @@ public abstract class MessageDatabase ex
// indexes would
// be wrong..
//
- // TODO: consider just rolling back the tx.
sd.locationIndex.put(tx, location, previous);
}
// record this id in any event, initial send or recovery
@@ -1178,7 +1176,11 @@ public abstract class MessageDatabase ex
if (keys != null) {
sd.locationIndex.remove(tx, keys.location);
recordAckMessageReferenceLocation(ackLocation,
keys.location);
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("message not found in order index: " +
sequenceId + " for: " + command.getMessageId());
}
+ } else if (LOG.isDebugEnabled()) {
+ LOG.debug("message not found in sequence id index: " +
command.getMessageId());
}
} else {
// In the topic case we need remove the message once it's been
acked