Repository: activemq
Updated Branches:
  refs/heads/master 3ac3a420a -> 7c890d477


AMQ-7067 - test and fix for eager ack compaction moving acks from data files 
with in progress tx and clearing the link from the prepare record, that now 
uses the same ack map


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c890d47
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c890d47
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c890d47

Branch: refs/heads/master
Commit: 7c890d477663d91aef518e30d60cf3c13827877a
Parents: 3ac3a42
Author: gtully <[email protected]>
Authored: Fri Oct 12 14:51:37 2018 +0100
Committer: gtully <[email protected]>
Committed: Fri Oct 12 14:51:37 2018 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 27 +++++-
 .../org/apache/activemq/bugs/AMQ7067Test.java   | 90 +++++++++++++++++++-
 2 files changed, 113 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7c890d47/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index d231a86..dfb40ec 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -65,7 +65,6 @@ import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Queue;
 import org.apache.activemq.broker.region.Topic;
-import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.Buffer;
@@ -2037,7 +2036,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                     }
 
                     // Check if we found one, or if we only found the current 
file being written to.
-                    if (journalToAdvance == -1 || journalToAdvance == 
journal.getCurrentDataFileId()) {
+                    if (journalToAdvance == -1 || 
blockedFromCompaction(journalToAdvance)) {
                         return;
                     }
 
@@ -2077,8 +2076,30 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
         }
     }
 
+    // called with the index lock held
+    private boolean blockedFromCompaction(int journalToAdvance) {
+        // don't forward the current data file
+        if (journalToAdvance == journal.getCurrentDataFileId()) {
+            return true;
+        }
+        // don't forward any data file with inflight transaction records 
because it will whack the tx - data file link
+        // in the ack map when all acks are migrated (now that the ack map is 
not just for acks)
+        // TODO: prepare records can be dropped but completion records (maybe 
only commit outcomes) need to be migrated
+        // as part of the forward work.
+        Location[] inProgressTxRange = getInProgressTxLocationRange();
+        if (inProgressTxRange[0] != null) {
+            for (int pendingTx = inProgressTxRange[0].getDataFileId(); 
pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
+                if (journalToAdvance == pendingTx) {
+                    LOG.trace("Compaction target:{} blocked by inflight 
transaction records: {}", journalToAdvance, inProgressTxRange);
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     private void forwardAllAcks(Integer journalToRead, Set<Integer> 
journalLogsReferenced) throws IllegalStateException, IOException {
-        LOG.trace("Attempting to move all acks in journal:{} to the front.", 
journalToRead);
+        LOG.trace("Attempting to move all acks in journal:{} to the front. 
Referenced files:{}", journalToRead, journalLogsReferenced);
 
         DataFile forwardsFile = journal.reserveDataFile();
         forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c890d47/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
index d00ee41..4997632 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
@@ -13,9 +13,12 @@ import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MessageDatabase;
 import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.Wait;
 import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -114,6 +117,82 @@ public class AMQ7067Test {
 
         ((org.apache.activemq.broker.region.Queue) 
broker.getRegionBroker().getDestinationMap().get(queue)).purge();
 
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == getQueueSize(queue.getQueueName());
+            }
+        });
+
+        // force gc
+        broker.getPersistenceAdapter().checkpoint(true);
+
+        Xid[] xids = xaRes.recover(TMSTARTRSCAN);
+
+        //Should be 1 since we have only 1 prepared
+        assertEquals(1, xids.length);
+        connection.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+
+        setupXAConnection();
+        xids = xaRes.recover(TMSTARTRSCAN);
+
+        System.out.println("****** recovered = " + xids);
+
+        // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
+        assertEquals(1, xids.length);
+    }
+
+    @Test
+    public void testXAPrepareWithAckCompactionDoesNotLooseInflight() throws 
Exception {
+
+        // investigate liner gc issue - store usage not getting released
+        
org.apache.log4j.Logger.getLogger(MessageDatabase.class).setLevel(Level.TRACE);
+
+
+        setupXAConnection();
+
+        Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
+
+        MessageProducer holdKahaDbProducer = 
xaSession.createProducer(holdKahaDb);
+
+        XATransactionId txid = createXATransaction();
+        System.out.println("****** create new txid = " + txid);
+        xaRes.start(txid, TMNOFLAGS);
+
+        TextMessage helloMessage = 
xaSession.createTextMessage(StringUtils.repeat("a", 10));
+        holdKahaDbProducer.send(helloMessage);
+        xaRes.end(txid, TMSUCCESS);
+
+        Queue queue = xaSession.createQueue("test");
+
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+
+        xaRes.prepare(txid);
+
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+
+        ((org.apache.activemq.broker.region.Queue) 
broker.getRegionBroker().getDestinationMap().get(queue)).purge();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == getQueueSize(queue.getQueueName());
+            }
+        });
+
+        // force gc, two data files requires two cycles
+        int limit = 
((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).getCompactAcksAfterNoGC()
 + 1;
+        for (int i=0; i<limit*2; i++) {
+            broker.getPersistenceAdapter().checkpoint(true);
+        }
+
+        // ack compaction task operates in the background
+        TimeUnit.SECONDS.sleep(5);
+
         Xid[] xids = xaRes.recover(TMSTARTRSCAN);
 
         //Should be 1 since we have only 1 prepared
@@ -160,6 +239,16 @@ public class AMQ7067Test {
 
         ((org.apache.activemq.broker.region.Queue) 
broker.getRegionBroker().getDestinationMap().get(queue)).purge();
 
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == getQueueSize(queue.getQueueName());
+            }
+        });
+
+        // force gc
+        broker.getPersistenceAdapter().checkpoint(true);
+
         Xid[] xids = xaRes.recover(TMSTARTRSCAN);
 
         //Should be 1 since we have only 1 prepared
@@ -343,7 +432,6 @@ public class AMQ7067Test {
             TextMessage helloMessage = 
xaSession.createTextMessage(StringUtils.repeat("a", messageSize));
             producer.send(helloMessage);
             xaRes.end(txid, TMSUCCESS);
-            xaRes.prepare(txid);
             xaRes.commit(txid, true);
         }
     }

Reply via email to