Repository: activemq
Updated Branches:
  refs/heads/master b4513004b -> 28819aea4


AMQ-7015 - Changed attribute to purgeRecoveredXATransactionStrategy and
allow NEVER, COMMIT, and ROLLBACK


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/28819aea
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/28819aea
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/28819aea

Branch: refs/heads/master
Commit: 28819aea4aa981d33c710d9d5e26f3cb6e03c1de
Parents: b451300
Author: Jeff Genender <[email protected]>
Authored: Wed Jul 25 12:52:49 2018 -0600
Committer: Jeff Genender <[email protected]>
Committed: Wed Jul 25 12:57:17 2018 -0600

----------------------------------------------------------------------
 .../store/kahadb/KahaDBPersistenceAdapter.java  |  8 +--
 .../activemq/store/kahadb/MessageDatabase.java  | 43 ++++++++----
 .../activemq/broker/XARecoveryBrokerTest.java   | 72 +++++++++++++++++++-
 3 files changed, 104 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/28819aea/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
index fa0f7c2..fbeda4c 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
@@ -547,12 +547,12 @@ public class KahaDBPersistenceAdapter extends 
LockableServiceSupport implements
         letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
     }
 
-    public boolean isPurgeRecoveredXATransactions() {
-        return letter.isPurgeRecoveredXATransactions();
+    public String getPurgeRecoveredXATransactionStrategy() {
+        return letter.getPurgeRecoveredXATransactionStrategy();
     }
 
-    public void setPurgeRecoveredXATransactions(boolean 
purgeRecoveredXATransactions) {
-        letter.setPurgeRecoveredXATransactions(purgeRecoveredXATransactions);
+    public void setPurgeRecoveredXATransactionStrategy(String 
purgeRecoveredXATransactionStrategy) {
+        
letter.setPurgeRecoveredXATransactionStrategy(purgeRecoveredXATransactionStrategy);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/28819aea/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 413e137..0e5c237 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -240,6 +240,12 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         }
     }
 
+    public enum PurgeRecoveredXATransactionStrategy {
+        NEVER,
+        COMMIT,
+        ROLLBACK;
+    }
+
     protected PageFile pageFile;
     protected Journal journal;
     protected Metadata metadata = new Metadata();
@@ -272,7 +278,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
     private boolean ignoreMissingJournalfiles = false;
     private int indexCacheSize = 10000;
     private boolean checkForCorruptJournalFiles = false;
-    private boolean purgeRecoveredXATransactions = false;
+    protected PurgeRecoveredXATransactionStrategy 
purgeRecoveredXATransactionStrategy = PurgeRecoveredXATransactionStrategy.NEVER;
     private boolean checksumJournalFiles = true;
     protected boolean forceRecoverIndex = false;
     private boolean archiveCorruptedIndex = false;
@@ -746,14 +752,20 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
             }
 
             synchronized (preparedTransactions) {
-                for (TransactionId txId : preparedTransactions.keySet()) {
-                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
-                }
-
-                if (purgeRecoveredXATransactions){
-                    if (!preparedTransactions.isEmpty()){
-                        LOG.warn("Purging " +  preparedTransactions.size() + " 
recovered prepared XA TXs" );
-                        preparedTransactions.clear();
+                Set<TransactionId> txIds = new 
LinkedHashSet<TransactionId>(preparedTransactions.keySet());
+                for (TransactionId txId : txIds) {
+                    switch (purgeRecoveredXATransactionStrategy){
+                        case NEVER:
+                            LOG.warn("Recovered prepared XA TX: [{}]", txId);
+                            break;
+                        case COMMIT:
+                            store(new 
KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(txId)), 
false, null, null);
+                            LOG.warn("Recovered and Committing prepared XA TX: 
[{}]", txId);
+                            break;
+                        case ROLLBACK:
+                            store(new 
KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convert(txId)),
 false, null, null);
+                            LOG.warn("Recovered and Rolling Back prepared XA 
TX: [{}]", txId);
+                            break;
                     }
                 }
             }
@@ -3315,12 +3327,17 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
     }
 
-    public boolean isPurgeRecoveredXATransactions() {
-        return purgeRecoveredXATransactions;
+    public PurgeRecoveredXATransactionStrategy 
getPurgeRecoveredXATransactionStrategyEnum() {
+        return purgeRecoveredXATransactionStrategy;
+    }
+
+    public String getPurgeRecoveredXATransactionStrategy() {
+        return purgeRecoveredXATransactionStrategy.name();
     }
 
-    public void setPurgeRecoveredXATransactions(boolean 
purgeRecoveredXATransactions) {
-        this.purgeRecoveredXATransactions = purgeRecoveredXATransactions;
+    public void setPurgeRecoveredXATransactionStrategy(String 
purgeRecoveredXATransactionStrategy) {
+        this.purgeRecoveredXATransactionStrategy = 
PurgeRecoveredXATransactionStrategy.valueOf(
+                purgeRecoveredXATransactionStrategy.trim().toUpperCase());
     }
 
     public boolean isChecksumJournalFiles() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/28819aea/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
index 60d3b8b..c9154a3 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -267,7 +267,7 @@ public class XARecoveryBrokerTest extends 
BrokerRestartTestSupport {
         assertEmptyDLQ();
     }
 
-    public void testPreparedTransactionRecoveredPurgeOnRestart() throws 
Exception {
+    public void testPreparedTransactionRecoveredPurgeRollbackOnRestart() 
throws Exception {
 
         ActiveMQDestination destination = createDestination();
 
@@ -306,7 +306,7 @@ public class XARecoveryBrokerTest extends 
BrokerRestartTestSupport {
         stopBroker();
         if (broker.getPersistenceAdapter() instanceof 
KahaDBPersistenceAdapter) {
             KahaDBPersistenceAdapter adapter = 
(KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
-            adapter.setPurgeRecoveredXATransactions(true);
+            adapter.setPurgeRecoveredXATransactionStrategy("ROLLBACK");
             LOG.info("Setting purgeRecoveredXATransactions to true on the 
KahaDBPersistenceAdapter");
         }
         broker.start();
@@ -320,9 +320,77 @@ public class XARecoveryBrokerTest extends 
BrokerRestartTestSupport {
         consumerInfo = createConsumerInfo(sessionInfo, destination);
         connection.send(consumerInfo);
 
+        // Since rolledback but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+
+        //These should be purged so expect 0
+        assertEquals(0, dar.getData().length);
+
+    }
+
+    public void testPreparedTransactionRecoveredPurgeCommitOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
         // Since prepared but not committed.. they should not get delivered.
         assertNull(receiveMessage(connection));
         assertNoMessagesLeft(connection);
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        stopBroker();
+        if (broker.getPersistenceAdapter() instanceof 
KahaDBPersistenceAdapter) {
+            KahaDBPersistenceAdapter adapter = 
(KahaDBPersistenceAdapter)broker.getPersistenceAdapter();
+            adapter.setPurgeRecoveredXATransactionStrategy("COMMIT");
+            LOG.info("Setting purgeRecoveredXATransactions to true on the 
KahaDBPersistenceAdapter");
+        }
+        broker.start();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Since committed ... they should get delivered.
+        for (int i = 0; i < 4; i++) {
+            assertNotNull(receiveMessage(connection));
+        }
+        assertNoMessagesLeft(connection);
 
         Response response = connection.request(new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER));
         assertNotNull(response);

Reply via email to