This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new ed5edb0  AMQ-7311 - track recovered prepared ack locations on a per 
subscriber basis, fix and test
ed5edb0 is described below

commit ed5edb03d7fe63ef27269566aaa9a9b501650eb0
Author: gtully <gary.tu...@gmail.com>
AuthorDate: Thu Sep 26 15:54:57 2019 +0100

    AMQ-7311 - track recovered prepared ack locations on a per subscriber 
basis, fix and test
---
 .../store/jdbc/JdbcMemoryTransactionStore.java     |   2 +-
 .../store/jdbc/adapter/DefaultJDBCAdapter.java     |   4 +-
 .../apache/activemq/store/kahadb/KahaDBStore.java  |  66 +++++--
 .../store/kahadb/KahaDBTransactionStore.java       |   5 +
 .../activemq/broker/XARecoveryBrokerTest.java      |  93 +++++++++-
 .../broker/mLevelDBXARecoveryBrokerTest.java       |   2 +
 .../activemq/store/jdbc/XACompletionTest.java      | 199 ++++++++++++++++++++-
 7 files changed, 351 insertions(+), 20 deletions(-)

diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
index 4bbe43d..ccf7485 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JdbcMemoryTransactionStore.java
@@ -294,7 +294,7 @@ public class JdbcMemoryTransactionStore extends 
MemoryTransactionStore {
 
             @Override
             public void rollback(ConnectionContext context) throws IOException 
{
-                
((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, priority, 
jdbcTopicMessageStore.getDestination(), subName, clientId);
+                
((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, 
jdbcTopicMessageStore.isPrioritizedMessages() ? priority : 0, 
jdbcTopicMessageStore.getDestination(), subName, clientId);
                 jdbcTopicMessageStore.complete(clientId, subName);
             }
 
diff --git 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
index 8d76fe6..031d976 100644
--- 
a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
+++ 
b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java
@@ -1008,8 +1008,8 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
                 String encodedString = rs.getString(1);
                 byte[] encodedXid = parseBase64Binary(encodedString);
                 String destination = rs.getString(2);
-                String subName = rs.getString(3);
-                String subId = rs.getString(4);
+                String subId = rs.getString(3);
+                String subName = rs.getString(4);
                 jdbcMemoryTransactionStore.recoverLastAck(encodedXid,
                         ActiveMQDestination.createDestination(destination, 
ActiveMQDestination.TOPIC_TYPE),
                         subName, subId);
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 47285ea..a8af5ae 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -419,8 +420,8 @@ public class KahaDBStore extends MessageDatabase implements 
PersistenceAdapter,
         protected KahaDestination dest;
         private final int maxAsyncJobs;
         private final Semaphore localDestinationSemaphore;
-        protected final Set<String> ackedAndPrepared = new HashSet<>();
-        protected final Set<String> rolledBackAcks = new HashSet<>();
+        protected final HashMap<String, Set<String>> ackedAndPreparedMap = new 
HashMap<String, Set<String>>();
+        protected final HashMap<String, Set<String>> rolledBackAcksMap = new 
HashMap<String, Set<String>>();
 
         double doneTasks, canceledTasks = 0;
 
@@ -437,6 +438,10 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
         }
 
 
+        private final String recoveredTxStateMapKey(ActiveMQDestination 
destination, MessageAck ack) {
+            return destination.isQueue() ? destination.getPhysicalName() : 
ack.getConsumerId().getConnectionId();
+        }
+
         // messages that have prepared (pending) acks cannot be re-dispatched 
unless the outcome is rollback,
         // till then they are skipped by the store.
         // 'at most once' XA guarantee
@@ -444,6 +449,12 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
             indexLock.writeLock().lock();
             try {
                 for (MessageAck ack : acks) {
+                    final String key = recoveredTxStateMapKey(destination, 
ack);
+                    Set ackedAndPrepared = ackedAndPreparedMap.get(key);
+                    if (ackedAndPrepared == null) {
+                        ackedAndPrepared = new LinkedHashSet<String>();
+                        ackedAndPreparedMap.put(key, ackedAndPrepared);
+                    }
                     
ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
                 }
             } finally {
@@ -457,8 +468,20 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                 try {
                     for (MessageAck ack : acks) {
                         final String id = 
ack.getLastMessageId().toProducerKey();
-                        ackedAndPrepared.remove(id);
+                        final String key = recoveredTxStateMapKey(destination, 
ack);
+                        Set ackedAndPrepared = ackedAndPreparedMap.get(key);
+                        if (ackedAndPrepared != null) {
+                            ackedAndPrepared.remove(id);
+                            if (ackedAndPreparedMap.isEmpty()) {
+                                ackedAndPreparedMap.remove(key);
+                            }
+                        }
                         if (rollback) {
+                            Set rolledBackAcks = rolledBackAcksMap.get(key);
+                            if (rolledBackAcks == null) {
+                                rolledBackAcks = new LinkedHashSet<String>();
+                                rolledBackAcksMap.put(key, rolledBackAcks);
+                            }
                             rolledBackAcks.add(id);
                             pageFile.tx().execute(tx -> {
                                 incrementAndAddSizeToStoreStat(tx, dest, 0);
@@ -646,12 +669,13 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                     @Override
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
-                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, 
listener);
+                        recoverRolledBackAcks(destination.getPhysicalName(), 
sd, tx, Integer.MAX_VALUE, listener);
                         sd.orderIndex.resetCursorPosition();
                         for (Iterator<Entry<Long, MessageKeys>> iterator = 
sd.orderIndex.iterator(tx); listener.hasSpace() && iterator
                                 .hasNext(); ) {
                             Entry<Long, MessageKeys> entry = iterator.next();
-                            if 
(ackedAndPrepared.contains(entry.getValue().messageId)) {
+                            Set ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
+                            if (ackedAndPrepared != null && 
ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
                             Message msg = 
loadMessage(entry.getValue().location);
@@ -673,10 +697,11 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                     public void execute(Transaction tx) throws Exception {
                         StoredDestination sd = getStoredDestination(dest, tx);
                         Entry<Long, MessageKeys> entry = null;
-                        int counter = recoverRolledBackAcks(sd, tx, 
maxReturned, listener);
+                        int counter = 
recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, 
listener);
+                        Set ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
                         for (Iterator<Entry<Long, MessageKeys>> iterator = 
sd.orderIndex.iterator(tx); iterator.hasNext(); ) {
                             entry = iterator.next();
-                            if 
(ackedAndPrepared.contains(entry.getValue().messageId)) {
+                            if (ackedAndPrepared != null && 
ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
                             Message msg = 
loadMessage(entry.getValue().location);
@@ -695,9 +720,14 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
             }
         }
 
-        protected int recoverRolledBackAcks(StoredDestination sd, Transaction 
tx, int maxReturned, MessageRecoveryListener listener) throws Exception {
+        protected int recoverRolledBackAcks(String recoveredTxStateMapKey, 
StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener 
listener) throws Exception {
             int counter = 0;
             String id;
+
+            Set rolledBackAcks = rolledBackAcksMap.get(recoveredTxStateMapKey);
+            if (rolledBackAcks == null) {
+                return counter;
+            }
             for (Iterator<String> iterator = rolledBackAcks.iterator(); 
iterator.hasNext(); ) {
                 id = iterator.next();
                 iterator.remove();
@@ -710,12 +740,15 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                             break;
                         }
                     } else {
-                        LOG.info("rolledback ack message {} with seq {} will 
be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
+                        LOG.debug("rolledback ack message {} with seq {} will 
be picked up in future batch {}", id, sequence, sd.orderIndex.cursor);
                     }
                 } else {
                     LOG.warn("Failed to locate rolled back ack message {} in 
{}", id, sd);
                 }
             }
+            if (rolledBackAcks.isEmpty()) {
+                rolledBackAcksMap.remove(recoveredTxStateMapKey);
+            }
             return counter;
         }
 
@@ -830,7 +863,10 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                             return statistics;
                         }
                     });
-                    
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
+                    Set ackedAndPrepared = 
ackedAndPreparedMap.get(destination.getPhysicalName());
+                    if (ackedAndPrepared != null) {
+                        
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
+                    }
                     
getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
                     
getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
                 } finally {
@@ -1113,11 +1149,12 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                             subAckPositions = null;
                             sd.orderIndex.setBatch(tx, cursorPos);
                         }
-                        recoverRolledBackAcks(sd, tx, Integer.MAX_VALUE, 
listener);
+                        recoverRolledBackAcks(subscriptionKey, sd, tx, 
Integer.MAX_VALUE, listener);
+                        Set ackedAndPrepared = 
ackedAndPreparedMap.get(subscriptionKey);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = 
sd.orderIndex.iterator(tx); iterator
                                 .hasNext();) {
                             Entry<Long, MessageKeys> entry = iterator.next();
-                            if 
(ackedAndPrepared.contains(entry.getValue().messageId)) {
+                            if (ackedAndPrepared != null && 
ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
                             //If subAckPositions is set then verify the 
sequence set contains the message still
@@ -1173,11 +1210,12 @@ public class KahaDBStore extends MessageDatabase 
implements PersistenceAdapter,
                         }
 
                         Entry<Long, MessageKeys> entry = null;
-                        int counter = recoverRolledBackAcks(sd, tx, 
maxReturned, listener);
+                        int counter = recoverRolledBackAcks(subscriptionKey, 
sd, tx, maxReturned, listener);
+                        Set ackedAndPrepared = 
ackedAndPreparedMap.get(subscriptionKey);
                         for (Iterator<Entry<Long, MessageKeys>> iterator = 
sd.orderIndex.iterator(tx, moc); iterator
                                 .hasNext();) {
                             entry = iterator.next();
-                            if 
(ackedAndPrepared.contains(entry.getValue().messageId)) {
+                            if (ackedAndPrepared != null && 
ackedAndPrepared.contains(entry.getValue().messageId)) {
                                 continue;
                             }
                             //If subAckPositions is set then verify the 
sequence set contains the message still
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
index 8b66867..b0f5c41 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
@@ -359,6 +360,10 @@ public class KahaDBTransactionStore implements 
TransactionStore {
                     MessageDatabase.RemoveOperation rmOp = 
(MessageDatabase.RemoveOperation) op;
                     Buffer ackb = rmOp.getCommand().getAck();
                     MessageAck ack = (MessageAck) wireFormat().unmarshal(new 
DataInputStream(ackb.newInput()));
+                    // allow the ack to be tracked back to its durable sub
+                    ConsumerId subKey = new ConsumerId();
+                    
subKey.setConnectionId(rmOp.getCommand().getSubscriptionKey());
+                    ack.setConsumerId(subKey);
                     ackList.add(ack);
                 }
             }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 8415b93..9e174c2 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -713,7 +713,7 @@ public class XARecoveryBrokerTest extends 
BrokerRestartTestSupport {
 
     }
 
-    public void 
x_initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() {
+    public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() 
{
         addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, 
Boolean.TRUE});
     }
 
@@ -793,6 +793,97 @@ public class XARecoveryBrokerTest extends 
BrokerRestartTestSupport {
         assertEquals("there are no prepared tx", 0, 
dataArrayResponse.getData().length);
     }
 
+    public void testTopicPersistentPreparedAcksNotLostOnRestartForNSubs() 
throws Exception {
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // setup durable subs
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        consumerInfo.setSubscriptionName("sub");
+        connection.send(consumerInfo);
+
+        ConsumerInfo consumerInfoX = createConsumerInfo(sessionInfo, 
destination);
+        consumerInfoX.setSubscriptionName("subX");
+        connection.send(consumerInfoX);
+        connection.send(consumerInfoX.createRemoveCommand());
+
+        final int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        final int messageCount = expectedMessageCount(numMessages, 
destination);
+        Message m = null;
+        for (int i = 0; i < messageCount; i++) {
+            m = receiveMessage(connection);
+            assertNotNull("unexpected null on: " + i, m);
+        }
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, m, messageCount, 
MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = 
(DataArrayResponse)connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, 
dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("sub");
+        connection.send(consumerInfo);
+
+        // no redelivery, exactly once semantics unless there is rollback
+        m = receiveMessage(connection);
+        assertNull(m);
+        assertNoMessagesLeft(connection);
+
+        // ensure subX can get it's copy of the messages
+        consumerInfoX = createConsumerInfo(sessionInfo, destination);
+        consumerInfoX.setSubscriptionName("subX");
+        connection.send(consumerInfoX);
+
+        for (int i = 0; i < messageCount; i++) {
+            m = receiveMessage(connection);
+            assertNotNull("unexpected null for subX on: " + i, m);
+        }
+
+        connection.request(createCommitTransaction2Phase(connectionInfo, 
txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, 
dataArrayResponse.getData().length);
+    }
+
     public void 
testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws 
Exception {
 
         ActiveMQDestination destination = createDestination();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
index 7adb983..eb8badd 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/mLevelDBXARecoveryBrokerTest.java
@@ -74,4 +74,6 @@ public class mLevelDBXARecoveryBrokerTest extends 
XARecoveryBrokerTest {
     }
     public void testTopicPersistentPreparedAcksUnavailableTillRollback() 
throws Exception {
     }
+    public void testTopicPersistentPreparedAcksNotLostOnRestartForNSubs() 
throws Exception {
+    }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
index a0c49cb..e203f96 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -29,6 +29,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.filter.AnyDestination;
 import org.apache.activemq.filter.DestinationMap;
@@ -408,6 +409,182 @@ public class XACompletionTest extends TestSupport {
     }
 
     @Test
+    public void testConsumeAfterAckPreparedRolledbackTopic() throws Exception {
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri + 
"?jms.prefetchPolicy.all=0");
+        factory.setWatchTopicAdvisories(false);
+
+        final ActiveMQTopic destination = new ActiveMQTopic("TEST");
+
+        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) 
factory.createXAConnection();
+        activeMQXAConnection.setClientID("durable");
+        activeMQXAConnection.start();
+        XASession xaSession = activeMQXAConnection.createXASession();
+
+        MessageConsumer consumer = 
xaSession.createDurableSubscriber(destination, "sub1");
+        consumer.close();
+        consumer = xaSession.createDurableSubscriber(destination, "sub2");
+
+        sendMessagesTo(10, destination);
+
+        XAResource resource = xaSession.getXAResource();
+        resource.recover(XAResource.TMSTARTRSCAN);
+        resource.recover(XAResource.TMNOFLAGS);
+
+        dumpMessages();
+        Xid tid = createXid();
+
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        int messagesReceived = 0;
+
+        for (int i = 0; i < 5; i++) {
+
+            Message message = null;
+            try {
+                LOG.debug("Receiving message " + (messagesReceived + 1) + " of 
" + messagesExpected);
+                message = consumer.receive(2000);
+                LOG.info("Received : " + message);
+                messagesReceived++;
+            } catch (Exception e) {
+                LOG.debug("Caught exception:", e);
+            }
+        }
+
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.prepare(tid);
+
+        consumer.close();
+        activeMQXAConnection.close();
+
+        LOG.info("after close");
+
+        broker = restartBroker();
+
+        LOG.info("Try consume... after restart");
+        dumpMessages();
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri + 
"?jms.prefetchPolicy.all=0");
+        factory.setWatchTopicAdvisories(false);
+
+        activeMQXAConnection = (ActiveMQXAConnection) 
factory.createXAConnection();
+        activeMQXAConnection.start();
+        xaSession = activeMQXAConnection.createXASession();
+
+        XAResource xaResource = xaSession.getXAResource();
+
+        Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
+        xaResource.recover(XAResource.TMNOFLAGS);
+
+        LOG.info("Rollback outcome for ack");
+        xaResource.rollback(xids[0]);
+
+        assertTrue("got expected", consumeOnlyN(10,"durable", "sub1", 
destination));
+        assertTrue("got expected", consumeOnlyN(10, "durable", "sub2", 
destination));
+    }
+
+    @Test
+    public void testConsumeAfterAckPreparedCommitTopic() throws Exception {
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri + 
"?jms.prefetchPolicy.all=0");
+        factory.setWatchTopicAdvisories(false);
+
+        final ActiveMQTopic destination = new ActiveMQTopic("TEST");
+
+        ActiveMQXAConnection activeMQXAConnection = (ActiveMQXAConnection) 
factory.createXAConnection();
+        activeMQXAConnection.setClientID("durable");
+        activeMQXAConnection.start();
+        XASession xaSession = activeMQXAConnection.createXASession();
+
+        MessageConsumer consumer = 
xaSession.createDurableSubscriber(destination, "sub1");
+        consumer.close();
+        consumer = xaSession.createDurableSubscriber(destination, "sub2");
+
+        sendMessagesTo(10, destination);
+
+        XAResource resource = xaSession.getXAResource();
+        resource.recover(XAResource.TMSTARTRSCAN);
+        resource.recover(XAResource.TMNOFLAGS);
+
+        dumpMessages();
+        Xid tid = createXid();
+
+        resource.start(tid, XAResource.TMNOFLAGS);
+
+        int messagesReceived = 0;
+
+        for (int i = 0; i < 5; i++) {
+
+            Message message = null;
+            try {
+                LOG.debug("Receiving message " + (messagesReceived + 1) + " of 
" + messagesExpected);
+                message = consumer.receive(2000);
+                LOG.info("Received : " + message);
+                messagesReceived++;
+            } catch (Exception e) {
+                LOG.debug("Caught exception:", e);
+            }
+        }
+
+        resource.end(tid, XAResource.TMSUCCESS);
+        resource.prepare(tid);
+
+        consumer.close();
+        activeMQXAConnection.close();
+
+        LOG.info("after close");
+
+        broker = restartBroker();
+
+        LOG.info("Try consume... after restart");
+        dumpMessages();
+
+        factory = new ActiveMQXAConnectionFactory(connectionUri + 
"?jms.prefetchPolicy.all=0");
+        factory.setWatchTopicAdvisories(false);
+
+        activeMQXAConnection = (ActiveMQXAConnection) 
factory.createXAConnection();
+        activeMQXAConnection.start();
+        xaSession = activeMQXAConnection.createXASession();
+
+        XAResource xaResource = xaSession.getXAResource();
+
+        Xid[] xids = xaResource.recover(XAResource.TMSTARTRSCAN);
+        xaResource.recover(XAResource.TMNOFLAGS);
+
+        LOG.info("Rollback outcome for ack");
+        xaResource.commit(xids[0], false);
+
+        assertTrue("got expected", consumeOnlyN(10,"durable", "sub1", 
destination));
+        assertTrue("got expected", consumeOnlyN(5, "durable", "sub2", 
destination));
+
+        LOG.info("at end...");
+        dumpMessages();
+
+    }
+
+    private boolean consumeOnlyN(int expected, String clientId, String 
subName, ActiveMQTopic destination) throws Exception {
+        int drained = 0;
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri + "?jms.prefetchPolicy.all=" + 
expected);
+        factory.setWatchTopicAdvisories(false);
+        javax.jms.Connection connection = factory.createConnection();
+        connection.setClientID(clientId);
+        try {
+            connection.start();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = 
session.createDurableSubscriber(destination, subName);
+            Message message = null;
+            while ( (message =consumer.receive(2000)) != null) {
+                drained++;
+                LOG.info("Sub:" + subName + ", received: " + 
message.getJMSMessageID());
+            }
+            consumer.close();
+        } finally {
+            connection.close();
+        }
+        return drained == expected;
+    }
+
+    @Test
     public void 
testStatsAndConsumeAfterAckPreparedRolledbackOutOfOrderRecovery() throws 
Exception {
 
         factory = new ActiveMQXAConnectionFactory(connectionUri + 
"?jms.prefetchPolicy.all=0");
@@ -938,16 +1115,24 @@ public class XACompletionTest extends TestSupport {
     }
 
     protected void sendMessages(int messagesExpected) throws Exception {
+        sendMessagesTo(messagesExpected, new ActiveMQQueue("TEST"));
+    }
+
+    protected void sendMessagesTo(int messagesExpected, Destination 
destination) throws Exception {
         ActiveMQConnectionFactory activeMQConnectionFactory = new 
ActiveMQConnectionFactory(connectionUri);
         activeMQConnectionFactory.setWatchTopicAdvisories(false);
-        sendMessagesWith(activeMQConnectionFactory, messagesExpected);
+        sendMessagesWithTo(activeMQConnectionFactory, messagesExpected, 
destination);
     }
 
     protected void sendMessagesWith(ConnectionFactory factory, int 
messagesExpected) throws Exception {
+        sendMessagesWithTo(factory, messagesExpected, new 
ActiveMQQueue("TEST"));
+    }
+
+    protected void sendMessagesWithTo(ConnectionFactory factory, int 
messagesExpected, Destination destination) throws Exception {
         javax.jms.Connection connection = factory.createConnection();
         connection.start();
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        Destination destination = session.createQueue("TEST");
+
         MessageProducer producer = session.createProducer(destination);
         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
@@ -975,6 +1160,15 @@ public class XACompletionTest extends TestSupport {
             LOG.info("id: " + id + ", message SeqId: " + 
message.getMessageId().getBrokerSequenceId() + ", XID:" + xid + ", MSG: " + 
message);
         }
         statement.close();
+
+        statement = conn.prepareStatement("SELECT LAST_ACKED_ID, CLIENT_ID, 
SUB_NAME, PRIORITY, XID FROM ACTIVEMQ_ACKS");
+        result = statement.executeQuery();
+        LOG.info("Messages in ACKS table db...");
+        while (result.next()) {
+            LOG.info("lastAcked: {}, clientId: {}, SUB_NAME: {}, PRIORITY: {}, 
XID {}",
+                    result.getLong(1), result.getString(2), 
result.getString(3), result.getInt(4), result.getString(5));
+        }
+        statement.close();
         conn.close();
     }
 
@@ -1011,6 +1205,7 @@ public class XACompletionTest extends TestSupport {
         DestinationMap destinationMap = new DestinationMap();
         GroupPrincipal anaGroup = new GroupPrincipal(id);
         destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new 
ActiveMQQueue(">")}), anaGroup);
+        destinationMap.put(new AnyDestination(new ActiveMQDestination[]{new 
ActiveMQTopic(">")}), anaGroup);
         map.setWriteACLs(destinationMap);
         map.setAdminACLs(destinationMap);
         map.setReadACLs(destinationMap);

Reply via email to