This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/master by this push:
new ed5edb0 AMQ-7311 - track recovered prepared ack locations on a per
subscriber basis, fix and test
ed5edb0 is described below
commit ed5edb03d7fe63ef27269566aaa9a9b501650eb0
Author: gtully <[email protected]>
AuthorDate: Thu Sep 26 15:54:57 2019 +0100
AMQ-7311 - track recovered prepared ack locations on a per subscriber
basis, fix and test
---
.../store/jdbc/JdbcMemoryTransactionStore.java | 2 +-
.../store/jdbc/adapter/DefaultJDBCAdapter.java | 4 +-
.../apache/activemq/store/kahadb/KahaDBStore.java | 66 +++++--
.../store/kahadb/KahaDBTransactionStore.java | 5 +
.../activemq/broker/XARecoveryBrokerTest.java | 93 +++++++++-
.../broker/mLevelDBXARecoveryBrokerTest.java | 2 +
.../activemq/store/jdbc/XACompletionTest.java | 199 ++++++++++++++++++++-
7 files changed, 351 insertions(+), 20 deletions(-)
diff --git
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index 4bbe43d..ccf7485 100644
---
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -294,7 +294,7 @@ public class JdbcMemoryTransactionStore extends
MemoryTransactionStore {
@Override
public void rollback(ConnectionContext context) throws IOException
{
-
((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority,
jdbcTopicMessageStore.getDestination(), subName, clientId);
+
((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context,
jdbcTopicMessageStore.isPrioritizedMessages() ? priority : 0,
jdbcTopicMessageStore.getDestination(), subName, clientId);
jdbcTopicMessageStore.complete(clientId, subName);
}
diff --git
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index 8d76fe6..031d976 100644
---
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -1008,8 +1008,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
String encodedString = rs.getString(1);
byte[] encodedXid = parseBase64Binary(encodedString);
String destination = rs.getString(2);
- String subName = rs.getString(3);
- String subId = rs.getString(4);
+ String subId = rs.getString(3);
+ String subName = rs.getString(4);
jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
ActiveMQDestination.createDestination(destination,
ActiveMQDestination.TOPIC_TYPE),
subName, subId);
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 47285ea..a8af5ae 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -419,8 +420,8 @@ public class KahaDBStore extends MessageDatabase implements
PersistenceAdapter,
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
- protected final Set<String> ackedAndPrepared = new HashSet<>();
- protected final Set<String> rolledBackAcks = new HashSet<>();
+ protected final HashMap<String, Set<String>> ackedAndPreparedMap = new
HashMap<String, Set<String>>();
+ protected final HashMap<String, Set<String>> rolledBackAcksMap = new
HashMap<String, Set<String>>();
double doneTasks, canceledTasks = 0;
@@ -437,6 +438,10 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
}
+ private final String recoveredTxStateMapKey(ActiveMQDestination
destination, MessageAck ack) {
+ return destination.isQueue() ? destination.getPhysicalName() :
ack.getConsumerId().getConnectionId();
+ }
+
// messages that have prepared (pending) acks cannot be re-dispatched
unless the outcome is rollback,
// till then they are skipped by the store.
// 'at most once' XA guarantee
@@ -444,6 +449,12 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
indexLock.writeLock().lock();
try {
for (MessageAck ack : acks) {
+ final String key = recoveredTxStateMapKey(destination,
ack);
+ Set ackedAndPrepared = ackedAndPreparedMap.get(key);
+ if (ackedAndPrepared == null) {
+ ackedAndPrepared = new LinkedHashSet<String>();
+ ackedAndPreparedMap.put(key, ackedAndPrepared);
+ }
ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
}
} finally {
@@ -457,8 +468,20 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
try {
for (MessageAck ack : acks) {
final String id =
ack.getLastMessageId().toProducerKey();
- ackedAndPrepared.remove(id);
+ final String key = recoveredTxStateMapKey(destination,
ack);
+ Set ackedAndPrepared = ackedAndPreparedMap.get(key);
+ if (ackedAndPrepared != null) {
+ ackedAndPrepared.remove(id);
+ if (ackedAndPreparedMap.isEmpty()) {
+ ackedAndPreparedMap.remove(key);
+ }
+ }
if (rollback) {
+ Set rolledBackAcks = rolledBackAcksMap.get(key);
+ if (rolledBackAcks == null) {
+ rolledBackAcks = new LinkedHashSet<String>();
+ rolledBackAcksMap.put(key, rolledBackAcks);
+ }
rolledBackAcks.add(id);
pageFile.tx().execute(tx -> {
incrementAndAddSizeToStoreStat(tx, dest, 0);
@@ -646,12 +669,13 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
@Override
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
- recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE,
listener);
+ recoverRolledBackAcks(destination.getPhysicalName(),
sd, tx, Integer.MAX_VALUE, listener);
sd.orderIndex.resetCursorPosition();
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
.hasNext(); ) {
Entry<Long, MessageKeys> entry = iterator.next();
- if
(ackedAndPrepared.contains(entry.getValue().messageId)) {
+ Set ackedAndPrepared =
ackedAndPreparedMap.get(destination.getPhysicalName());
+ if (ackedAndPrepared != null &&
ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
Message msg =
loadMessage(entry.getValue().location);
@@ -673,10 +697,11 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
public void execute(Transaction tx) throws Exception {
StoredDestination sd = getStoredDestination(dest, tx);
Entry<Long, MessageKeys> entry = null;
- int counter = recoverRolledBackAcks(sd, tx,
maxReturned, listener);
+ int counter =
recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned,
listener);
+ Set ackedAndPrepared =
ackedAndPreparedMap.get(destination.getPhysicalName());
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
entry = iterator.next();
- if
(ackedAndPrepared.contains(entry.getValue().messageId)) {
+ if (ackedAndPrepared != null &&
ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
Message msg =
loadMessage(entry.getValue().location);
@@ -695,9 +720,14 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
}
}
- protected int recoverRolledBackAcks(StoredDestination sd, Transaction
tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
+ protected int recoverRolledBackAcks(String recoveredTxStateMapKey,
StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener
listener) throws Exception {
int counter = 0;
String id;
+
+ Set rolledBackAcks = rolledBackAcksMap.get(recoveredTxStateMapKey);
+ if (rolledBackAcks == null) {
+ return counter;
+ }
for (Iterator<String> iterator = rolledBackAcks.iterator();
iterator.hasNext(); ) {
id = iterator.next();
iterator.remove();
@@ -710,12 +740,15 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
break;
}
} else {
- LOG.info("rolledback ack message {} with seq {} will
be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
+ LOG.debug("rolledback ack message {} with seq {} will
be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
}
} else {
LOG.warn("Failed to locate rolled back ack message {} in
{}", id, sd);
}
}
+ if (rolledBackAcks.isEmpty()) {
+ rolledBackAcksMap.remove(recoveredTxStateMapKey);
+ }
return counter;
}
@@ -830,7 +863,10 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
return statistics;
}
});
-
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
+ Set ackedAndPrepared =
ackedAndPreparedMap.get(destination.getPhysicalName());
+ if (ackedAndPrepared != null) {
+
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
+ }
getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
} finally {
@@ -1113,11 +1149,12 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
subAckPositions = null;
sd.orderIndex.setBatch(tx, cursorPos);
}
- recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE,
listener);
+ recoverRolledBackAcks(subscriptionKey, sd, tx,
Integer.MAX_VALUE, listener);
+ Set ackedAndPrepared =
ackedAndPreparedMap.get(subscriptionKey);
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx); iterator
.hasNext();) {
Entry<Long, MessageKeys> entry = iterator.next();
- if
(ackedAndPrepared.contains(entry.getValue().messageId)) {
+ if (ackedAndPrepared != null &&
ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
//If subAckPositions is set then verify the
sequence set contains the message still
@@ -1173,11 +1210,12 @@ public class KahaDBStore extends MessageDatabase
implements PersistenceAdapter,
}
Entry<Long, MessageKeys> entry = null;
- int counter = recoverRolledBackAcks(sd, tx,
maxReturned, listener);
+ int counter = recoverRolledBackAcks(subscriptionKey,
sd, tx, maxReturned, listener);
+ Set ackedAndPrepared =
ackedAndPreparedMap.get(subscriptionKey);
for (Iterator<Entry<Long, MessageKeys>> iterator =
sd.orderIndex.iterator(tx, moc); iterator
.hasNext();) {
entry = iterator.next();
- if
(ackedAndPrepared.contains(entry.getValue().messageId)) {
+ if (ackedAndPrepared != null &&
ackedAndPrepared.contains(entry.getValue().messageId)) {
continue;
}
//If subAckPositions is set then verify the
sequence set contains the message still
diff --git
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
index 8b66867..b0f5c41 100644
---
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
+++
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
@@ -359,6 +360,10 @@ public class KahaDBTransactionStore implements
TransactionStore {
MessageDatabase.RemoveOperation rmOp =
(MessageDatabase.RemoveOperation) op;
Buffer ackb = rmOp.getCommand().getAck();
MessageAck ack = (MessageAck) wireFormat().unmarshal(new
DataInputStream(ackb.newInput()));
+ // allow the ack to be tracked back to its durable sub
+ ConsumerId subKey = new ConsumerId();
+
subKey.setConnectionId(rmOp.getCommand().getSubscriptionKey());
+ ack.setConsumerId(subKey);
ackList.add(ack);
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 8415b93..9e174c2 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -713,7 +713,7 @@ public class XARecoveryBrokerTest extends
BrokerRestartTestSupport {
}
- public void
x_initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
+ public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart()
{
addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE,
Boolean.TRUE});
}
@@ -793,6 +793,97 @@ public class XARecoveryBrokerTest extends
BrokerRestartTestSupport {
assertEquals("there are no prepared tx", 0,
dataArrayResponse.getData().length);
}
+ public void testTopicPersistentPreparedAcksNotLostOnRestartForNSubs()
throws Exception {
+ ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+ // Setup the producer and send the message.
+ StubConnection connection = createConnection();
+ ConnectionInfo connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+ connection.send(connectionInfo);
+ connection.send(sessionInfo);
+ connection.send(producerInfo);
+
+ // setup durable subs
+ ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo,
destination);
+ consumerInfo.setSubscriptionName("sub");
+ connection.send(consumerInfo);
+
+ ConsumerInfo consumerInfoX = createConsumerInfo(sessionInfo,
destination);
+ consumerInfoX.setSubscriptionName("subX");
+ connection.send(consumerInfoX);
+ connection.send(consumerInfoX.createRemoveCommand());
+
+ final int numMessages = 4;
+ for (int i = 0; i < numMessages; i++) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(true);
+ connection.send(message);
+ }
+
+ // Begin the transaction.
+ XATransactionId txid = createXATransaction(sessionInfo);
+ connection.send(createBeginTransaction(connectionInfo, txid));
+
+ final int messageCount = expectedMessageCount(numMessages,
destination);
+ Message m = null;
+ for (int i = 0; i < messageCount; i++) {
+ m = receiveMessage(connection);
+ assertNotNull("unexpected null on: " + i, m);
+ }
+
+ // one ack with last received, mimic a beforeEnd synchronization
+ MessageAck ack = createAck(consumerInfo, m, messageCount,
MessageAck.STANDARD_ACK_TYPE);
+ ack.setTransactionId(txid);
+ connection.send(ack);
+
+ connection.request(createPrepareTransaction(connectionInfo, txid));
+
+ // restart the broker.
+ restartBroker();
+
+ connection = createConnection();
+ connectionInfo = createConnectionInfo();
+ connectionInfo.setClientId("durable");
+ connection.send(connectionInfo);
+
+ // validate recovery
+ TransactionInfo recoverInfo = new
TransactionInfo(connectionInfo.getConnectionId(), null,
TransactionInfo.RECOVER);
+ DataArrayResponse dataArrayResponse =
(DataArrayResponse)connection.request(recoverInfo);
+
+ assertEquals("there is a prepared tx", 1,
dataArrayResponse.getData().length);
+ assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+ sessionInfo = createSessionInfo(connectionInfo);
+ connection.send(sessionInfo);
+ consumerInfo = createConsumerInfo(sessionInfo, destination);
+ consumerInfo.setSubscriptionName("sub");
+ connection.send(consumerInfo);
+
+ // no redelivery, exactly once semantics unless there is rollback
+ m = receiveMessage(connection);
+ assertNull(m);
+ assertNoMessagesLeft(connection);
+
+ // ensure subX can get it's copy of the messages
+ consumerInfoX = createConsumerInfo(sessionInfo, destination);
+ consumerInfoX.setSubscriptionName("subX");
+ connection.send(consumerInfoX);
+
+ for (int i = 0; i < messageCount; i++) {
+ m = receiveMessage(connection);
+ assertNotNull("unexpected null for subX on: " + i, m);
+ }
+
+ connection.request(createCommitTransaction2Phase(connectionInfo,
txid));
+
+ // validate recovery complete
+ dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+ assertEquals("there are no prepared tx", 0,
dataArrayResponse.getData().length);
+ }
+
public void
testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws
Exception {
ActiveMQDestination destination = createDestination();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
index 7adb983..eb8badd 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
@@ -74,4 +74,6 @@ public class mLevelDBXARecoveryBrokerTest extends
XARecoveryBrokerTest {
}
public void testTopicPersistentPreparedAcksUnavailableTillRollback()
throws Exception {
}
+ public void testTopicPersistentPreparedAcksNotLostOnRestartForNSubs()
throws Exception {
+ }
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
index a0c49cb..e203f96 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -29,6 +29,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.filter.AnyDestination;
import org.apache.activemq.filter.DestinationMap;
@@ -408,6 +409,182 @@ public class XACompletionTest extends TestSupport {
}
@Test
+ public void testConsumeAfterAckPreparedRolledbackTopic() throws Exception {
+
+ factory = new ActiveMQXAConnectionFactory(connectionUri +
"?jms.prefetchPolicy.all=0");
+ factory.setWatchTopicAdvisories(false);
+
+ final ActiveMQTopic destination = new ActiveMQTopic("TEST");
+
+ ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)
factory.createXAConnection();
+ activeMQXAConnection.setClientID("durable");
+ activeMQXAConnection.start();
+ XASession xaSession = activeMQXAConnection.createXASession();
+
+ MessageConsumer consumer =
xaSession.createDurableSubscriber(destination, "sub1");
+ consumer.close();
+ consumer = xaSession.createDurableSubscriber(destination, "sub2");
+
+ sendMessagesTo(10, destination);
+
+ XAResource resource = xaSession.getXAResource();
+ resource.recover(XAResource.TMSTARTRSCAN);
+ resource.recover(XAResource.TMNOFLAGS);
+
+ dumpMessages();
+ Xid tid = createXid();
+
+ resource.start(tid, XAResource.TMNOFLAGS);
+
+ int messagesReceived = 0;
+
+ for (int i = 0; i < 5; i++) {
+
+ Message message = null;
+ try {
+ LOG.debug("Receiving message " + (messagesReceived + 1) + " of
" + messagesExpected);
+ message = consumer.receive(2000);
+ LOG.info("Received : " + message);
+ messagesReceived++;
+ } catch (Exception e) {
+ LOG.debug("Caught exception:", e);
+ }
+ }
+
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.prepare(tid);
+
+ consumer.close();
+ activeMQXAConnection.close();
+
+ LOG.info("after close");
+
+ broker = restartBroker();
+
+ LOG.info("Try consume... after restart");
+ dumpMessages();
+
+ factory = new ActiveMQXAConnectionFactory(connectionUri +
"?jms.prefetchPolicy.all=0");
+ factory.setWatchTopicAdvisories(false);
+
+ activeMQXAConnection = (ActiveMQXAConnection)
factory.createXAConnection();
+ activeMQXAConnection.start();
+ xaSession = activeMQXAConnection.createXASession();
+
+ XAResource xaResource = xaSession.getXAResource();
+
+ Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
+ xaResource.recover(XAResource.TMNOFLAGS);
+
+ LOG.info("Rollback outcome for ack");
+ xaResource.rollback(xids[0]);
+
+ assertTrue("got expected", consumeOnlyN(10,"durable", "sub1",
destination));
+ assertTrue("got expected", consumeOnlyN(10, "durable", "sub2",
destination));
+ }
+
+ @Test
+ public void testConsumeAfterAckPreparedCommitTopic() throws Exception {
+
+ factory = new ActiveMQXAConnectionFactory(connectionUri +
"?jms.prefetchPolicy.all=0");
+ factory.setWatchTopicAdvisories(false);
+
+ final ActiveMQTopic destination = new ActiveMQTopic("TEST");
+
+ ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection)
factory.createXAConnection();
+ activeMQXAConnection.setClientID("durable");
+ activeMQXAConnection.start();
+ XASession xaSession = activeMQXAConnection.createXASession();
+
+ MessageConsumer consumer =
xaSession.createDurableSubscriber(destination, "sub1");
+ consumer.close();
+ consumer = xaSession.createDurableSubscriber(destination, "sub2");
+
+ sendMessagesTo(10, destination);
+
+ XAResource resource = xaSession.getXAResource();
+ resource.recover(XAResource.TMSTARTRSCAN);
+ resource.recover(XAResource.TMNOFLAGS);
+
+ dumpMessages();
+ Xid tid = createXid();
+
+ resource.start(tid, XAResource.TMNOFLAGS);
+
+ int messagesReceived = 0;
+
+ for (int i = 0; i < 5; i++) {
+
+ Message message = null;
+ try {
+ LOG.debug("Receiving message " + (messagesReceived + 1) + " of
" + messagesExpected);
+ message = consumer.receive(2000);
+ LOG.info("Received : " + message);
+ messagesReceived++;
+ } catch (Exception e) {
+ LOG.debug("Caught exception:", e);
+ }
+ }
+
+ resource.end(tid, XAResource.TMSUCCESS);
+ resource.prepare(tid);
+
+ consumer.close();
+ activeMQXAConnection.close();
+
+ LOG.info("after close");
+
+ broker = restartBroker();
+
+ LOG.info("Try consume... after restart");
+ dumpMessages();
+
+ factory = new ActiveMQXAConnectionFactory(connectionUri +
"?jms.prefetchPolicy.all=0");
+ factory.setWatchTopicAdvisories(false);
+
+ activeMQXAConnection = (ActiveMQXAConnection)
factory.createXAConnection();
+ activeMQXAConnection.start();
+ xaSession = activeMQXAConnection.createXASession();
+
+ XAResource xaResource = xaSession.getXAResource();
+
+ Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
+ xaResource.recover(XAResource.TMNOFLAGS);
+
+ LOG.info("Rollback outcome for ack");
+ xaResource.commit(xids[0], false);
+
+ assertTrue("got expected", consumeOnlyN(10,"durable", "sub1",
destination));
+ assertTrue("got expected", consumeOnlyN(5, "durable", "sub2",
destination));
+
+ LOG.info("at end...");
+ dumpMessages();
+
+ }
+
+ private boolean consumeOnlyN(int expected, String clientId, String
subName, ActiveMQTopic destination) throws Exception {
+ int drained = 0;
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" +
expected);
+ factory.setWatchTopicAdvisories(false);
+ javax.jms.Connection connection = factory.createConnection();
+ connection.setClientID(clientId);
+ try {
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createDurableSubscriber(destination, subName);
+ Message message = null;
+ while ( (message =consumer.receive(2000)) != null) {
+ drained++;
+ LOG.info("Sub:" + subName + ", received: " +
message.getJMSMessageID());
+ }
+ consumer.close();
+ } finally {
+ connection.close();
+ }
+ return drained == expected;
+ }
+
+ @Test
public void
testStatsAndConsumeAfterAckPreparedRolledbackOutOfOrderRecovery() throws
Exception {
factory = new ActiveMQXAConnectionFactory(connectionUri +
"?jms.prefetchPolicy.all=0");
@@ -938,16 +1115,24 @@ public class XACompletionTest extends TestSupport {
}
protected void sendMessages(int messagesExpected) throws Exception {
+ sendMessagesTo(messagesExpected, new ActiveMQQueue("TEST"));
+ }
+
+ protected void sendMessagesTo(int messagesExpected, Destination
destination) throws Exception {
ActiveMQConnectionFactory activeMQConnectionFactory = new
ActiveMQConnectionFactory(connectionUri);
activeMQConnectionFactory.setWatchTopicAdvisories(false);
- sendMessagesWith(activeMQConnectionFactory, messagesExpected);
+ sendMessagesWithTo(activeMQConnectionFactory, messagesExpected,
destination);
}
protected void sendMessagesWith(ConnectionFactory factory, int
messagesExpected) throws Exception {
+ sendMessagesWithTo(factory, messagesExpected, new
ActiveMQQueue("TEST"));
+ }
+
+ protected void sendMessagesWithTo(ConnectionFactory factory, int
messagesExpected, Destination destination) throws Exception {
javax.jms.Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createQueue("TEST");
+
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -975,6 +1160,15 @@ public class XACompletionTest extends TestSupport {
LOG.info("id: " + id + ", message SeqId: " +
message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " +
message);
}
statement.close();
+
+ statement = conn.prepareStatement("SELECT LAST_ACKED_ID, CLIENT_ID,
SUB_NAME, PRIORITY, XID FROM ACTIVEMQ_ACKS");
+ result = statement.executeQuery();
+ LOG.info("Messages in ACKS table db...");
+ while (result.next()) {
+ LOG.info("lastAcked: {}, clientId: {}, SUB_NAME: {}, PRIORITY: {},
XID {}",
+ result.getLong(1), result.getString(2),
result.getString(3), result.getInt(4), result.getString(5));
+ }
+ statement.close();
conn.close();
}
@@ -1011,6 +1205,7 @@ public class XACompletionTest extends TestSupport {
DestinationMap destinationMap = new DestinationMap();
GroupPrincipal anaGroup = new GroupPrincipal(id);
destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new
ActiveMQQueue(">")}), anaGroup);
+ destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new
ActiveMQTopic(">")}), anaGroup);
map.setWriteACLs(destinationMap);
map.setAdminACLs(destinationMap);
map.setReadACLs(destinationMap);