Repository: activemq
Updated Branches:
  refs/heads/master dc36c19c8 -> 79c74998d


AMQ-7082 - recover index free pages in parallel with start, merge in flush, 
clean shutdown if complete. follow up on AMQ-6590


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/79c74998
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/79c74998
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/79c74998

Branch: refs/heads/master
Commit: 79c74998dc1efb72b05d32f920052a1df4b6dd8e
Parents: dc36c19
Author: gtully <[email protected]>
Authored: Fri Oct 19 16:00:23 2018 +0100
Committer: gtully <[email protected]>
Committed: Fri Oct 19 16:00:23 2018 +0100

----------------------------------------------------------------------
 .../store/kahadb/disk/page/PageFile.java        | 94 ++++++++++++++++----
 .../store/kahadb/disk/page/PageFileTest.java    | 27 ++++--
 2 files changed, 96 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/79c74998/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
index b426a72..2c6348e 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java
@@ -134,7 +134,7 @@ public class PageFile {
     // Keeps track of free pages.
     private final AtomicLong nextFreePageId = new AtomicLong();
     private SequenceSet freeList = new SequenceSet();
-
+    private SequenceSet recoveredFreeList = null;
     private final AtomicLong nextTxid = new AtomicLong();
 
     // Persistent settings stored in the page file.
@@ -423,11 +423,70 @@ public class PageFile {
             storeMetaData();
             getFreeFile().delete();
             startWriter();
+            if (needsFreePageRecovery) {
+                asyncFreePageRecovery();
+            }
         } else {
             throw new IllegalStateException("Cannot load the page file when it 
is already loaded.");
         }
     }
 
+    private void asyncFreePageRecovery() {
+        Thread thread = new Thread("KahaDB Index Free Page Recovery") {
+            @Override
+            public void run() {
+                try {
+                    recoverFreePages();
+                } catch (Throwable e) {
+                    if (loaded.get()) {
+                        LOG.warn("Error recovering index free page list", e);
+                    }
+                }
+            }
+        };
+        thread.setPriority(Thread.NORM_PRIORITY);
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    private void recoverFreePages() throws Exception {
+        LOG.info(toString() + ". Recovering pageFile free list due to prior 
unclean shutdown..");
+        SequenceSet newFreePages = new SequenceSet();
+        // need new pageFile instance to get unshared readFile
+        PageFile recoveryPageFile = new PageFile(directory, name);
+        recoveryPageFile.loadForRecovery(nextFreePageId.get());
+        try {
+            for (Iterator<Page> i = new 
Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) {
+                Page page = i.next();
+                if (page.getType() == Page.PAGE_FREE_TYPE) {
+                    newFreePages.add(page.getPageId());
+                }
+            }
+        } finally {
+            recoveryPageFile.readFile.close();
+        }
+
+        LOG.info(toString() + ". Recovered pageFile free list of size: " + 
newFreePages.rangeSize());
+        if (!newFreePages.isEmpty()) {
+
+            // allow flush (with index lock held) to merge
+            recoveredFreeList = newFreePages;
+        }
+        // all set for clean shutdown
+        needsFreePageRecovery = false;
+    }
+
+    private void loadForRecovery(long nextFreePageIdSnap) throws Exception {
+        loaded.set(true);
+        enablePageCaching = false;
+        File file = getMainPageFile();
+        readFile = new RecoverableRandomAccessFile(file, "r");
+        loadMetaData();
+        pageSize = metaData.getPageSize();
+        enableRecoveryFile = false;
+        nextFreePageId.set(nextFreePageIdSnap);
+    }
+
 
     /**
      * Unloads a previously loaded PageFile.  This deallocates OS related 
resources like file handles.
@@ -445,22 +504,6 @@ public class PageFile {
                 throw new InterruptedIOException();
             }
 
-            if (needsFreePageRecovery) {
-                LOG.info(toString() + ". Recovering pageFile free list due to 
prior unclean shutdown..");
-                freeList = new SequenceSet();
-                loaded.set(true);
-                try {
-                    for (Iterator<Page> i = new 
Transaction(this).iterator(true); i.hasNext(); ) {
-                        Page page = i.next();
-                        if (page.getType() == Page.PAGE_FREE_TYPE) {
-                            freeList.add(page.getPageId());
-                        }
-                    }
-                } finally {
-                    loaded.set(false);
-                }
-            }
-
             if (freeList.isEmpty()) {
                 metaData.setFreePages(0);
             } else {
@@ -469,7 +512,12 @@ public class PageFile {
             }
 
             metaData.setLastTxId(nextTxid.get() - 1);
-            metaData.setCleanShutdown(true);
+            if (needsFreePageRecovery) {
+                // async recovery incomplete, will have to try again
+                metaData.setCleanShutdown(false);
+            } else {
+                metaData.setCleanShutdown(true);
+            }
             storeMetaData();
 
             if (readFile != null) {
@@ -513,6 +561,16 @@ public class PageFile {
             throw new IOException("Page file already stopped: checkpointing is 
not allowed");
         }
 
+        SequenceSet toMerge = recoveredFreeList;
+        if (toMerge != null) {
+            recoveredFreeList = null;
+            Sequence seq = toMerge.getHead();
+            while (seq != null) {
+                freeList.add(seq);
+                seq = seq.getNext();
+            }
+        }
+
         // Setup a latch that gets notified when all buffered writes hits the 
disk.
         CountDownLatch checkpointLatch;
         synchronized (writes) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/79c74998/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
index 1bdbe6f..72e8d7b 100644
--- 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
+++ 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java
@@ -27,10 +27,15 @@ import java.util.HashSet;
 import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
 
 import junit.framework.TestCase;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("rawtypes")
 public class PageFileTest extends TestCase {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(PageFileTest.class);
+
     public void testCRUD() throws IOException {
 
         PageFile pf = new PageFile(new File("target/test-data"), getName());
@@ -219,12 +224,20 @@ public class PageFileTest extends TestCase {
         PageFile pf2 = new PageFile(new File("target/test-data"), getName());
         pf2.setEnableRecoveryFile(false);
         pf2.load();
-        pf2.unload();
-        pf2.load();
-        long freePages = pf2.getFreePageCount();
-        pf.unload();
-
-        //Make sure that all 10 pages are still tracked
-        assertEquals(10, freePages);
+        try {
+            assertTrue("We have 10 free pages", Wait.waitFor(new 
Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+
+                    pf2.flush();
+                    long freePages = pf2.getFreePageCount();
+                    LOG.info("free page count: " + freePages);
+                    return  freePages == 10l;
+                }
+            }, 12000000));
+        } finally {
+            pf.unload();
+            pf2.unload();
+        }
     }
 }

Reply via email to