Repository: activemq
Updated Branches:
  refs/heads/master 7f4851f31 -> 24b9ae2ed


AMQ-7015 Added a purgeRecoveredXATransactions property on the KahaDB adaptor to 
purge prepared XA messages on recovery


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/24b9ae2e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/24b9ae2e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/24b9ae2e

Branch: refs/heads/master
Commit: 24b9ae2ed3c01ae9c67a94bfa50ca4037228f27b
Parents: 7f4851f
Author: hkesler <hkes...@contractor.cengage.com>
Authored: Thu Jul 19 11:53:04 2018 -0600
Committer: Jeff Genender <jgenen...@savoirtech.com>
Committed: Thu Jul 19 12:01:01 2018 -0600

----------------------------------------------------------------------
 .../store/kahadb/KahaDBPersistenceAdapter.java  |  8 +++
 .../activemq/store/kahadb/MessageDatabase.java  | 16 +++++
 .../broker/BrokerRestartTestSupport.java        |  7 ++-
 .../activemq/broker/XARecoveryBrokerTest.java   | 66 ++++++++++++++++++++
 4 files changed, 95 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/24b9ae2e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index c4f480c..fa0f7c2 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -547,6 +547,14 @@ public class KahaDBPersistenceAdapter extends 
LockableServiceSupport implements
         letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
     }
 
+    public boolean isPurgeRecoveredXATransactions() {
+        return letter.isPurgeRecoveredXATransactions();
+    }
+
+    public void setPurgeRecoveredXATransactions(boolean 
purgeRecoveredXATransactions) {
+        letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions);
+    }
+
     @Override
     public void setBrokerService(BrokerService brokerService) {
         super.setBrokerService(brokerService);

http://git-wip-us.apache.org/repos/asf/activemq/blob/24b9ae2e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 26e8cd0..413e137 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -272,6 +272,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
     private boolean ignoreMissingJournalfiles = false;
     private int indexCacheSize = 10000;
     private boolean checkForCorruptJournalFiles = false;
+    private boolean purgeRecoveredXATransactions = false;
     private boolean checksumJournalFiles = true;
     protected boolean forceRecoverIndex = false;
     private boolean archiveCorruptedIndex = false;
@@ -748,6 +749,13 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                 for (TransactionId txId : preparedTransactions.keySet()) {
                     LOG.warn("Recovered prepared XA TX: [{}]", txId);
                 }
+
+                if (purgeRecoveredXATransactions){
+                    if (!preparedTransactions.isEmpty()){
+                        LOG.warn("Purging " +  preparedTransactions.size() + " 
recovered prepared XA TXs" );
+                        preparedTransactions.clear();
+                    }
+                }
             }
 
         } finally {
@@ -3307,6 +3315,14 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
     }
 
+    public boolean isPurgeRecoveredXATransactions() {
+        return purgeRecoveredXATransactions;
+    }
+
+    public void setPurgeRecoveredXATransactions(boolean 
purgeRecoveredXATransactions) {
+        this.purgeRecoveredXATransactions = purgeRecoveredXATransactions;
+    }
+
     public boolean isChecksumJournalFiles() {
         return checksumJournalFiles;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/24b9ae2e/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
index c4e3848..111494a 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/BrokerRestartTestSupport.java
@@ -58,10 +58,13 @@ public class BrokerRestartTestSupport extends 
BrokerTestSupport {
      * @throws URISyntaxException
      */
     protected void restartBroker() throws Exception {
+        stopBroker();
+        broker.start();
+    }
+
+    protected void stopBroker() throws Exception {
         broker.stop();
         broker.waitUntilStopped();
         broker = createRestartedBroker();
-        broker.start();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/24b9ae2e/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 6a8b3f4..60d3b8b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -267,6 +267,72 @@ public class XARecoveryBrokerTest extends 
BrokerRestartTestSupport {
         assertEmptyDLQ();
     }
 
+    public void testPreparedTransactionRecoveredPurgeOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        stopBroker();
+        if (broker.getPersistenceAdapter() instanceof 
KahaDBPersistenceAdapter) {
+            KahaDBPersistenceAdapter adapter = 
(KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+            adapter.setPurgeRecoveredXATransactions(true);
+            LOG.info("Setting purgeRecoveredXATransactions to true on the 
KahaDBPersistenceAdapter");
+        }
+        broker.start();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+
+        //These should be purged so expect 0
+        assertEquals(0, dar.getData().length);
+
+    }
+
     private void assertEmptyDLQ() throws Exception {
         try {
             DestinationViewMBean destinationView = getProxyToDestination(new 
ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));

Reply via email to