Author: chirino
Date: Thu Feb  5 18:07:57 2009
New Revision: 741212

URL: http://svn.apache.org/viewvc?rev=741212&view=rev
Log:
Changed the cleanup algorithm used in the KahaDB..  It should now be much 
faster.
Fixed bug in the BTreeNode.visit() which was making us load most of the pages 
instead of letting us filter down to just a few.


Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741212&r1=741211&r2=741212&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 Thu Feb  5 18:07:57 2009
@@ -29,7 +29,9 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -443,7 +445,6 @@
                     checkpointUpdate(tx, false);
                 }
             });
-            pageFile.flush();
             closure.execute();
         }
        }
@@ -774,46 +775,82 @@
         pageFile.flush();
 
         if( cleanup ) {
-            // Find empty journal files to remove.
-            final HashSet<Integer> inUseFiles = new HashSet<Integer>();
+               
+               final TreeSet<Integer> gcCandidateSet = new 
TreeSet<Integer>(journal.getFileMap().keySet());
+               
+               // Don't GC files under replication
+               if( journalFilesBeingReplicated!=null ) {
+                       gcCandidateSet.removeAll(journalFilesBeingReplicated);
+               }
+               
+               // Don't GC files after the first in progress tx
+               Location firstTxLocation = metadata.lastUpdate;
+            if( metadata.firstInProgressTransactionLocation!=null ) {
+                firstTxLocation = metadata.firstInProgressTransactionLocation;
+            }
+            
+            if( firstTxLocation!=null ) {
+               while( !gcCandidateSet.isEmpty() ) {
+                       Integer last = gcCandidateSet.last();
+                       if( last >= firstTxLocation.getDataFileId() ) {
+                               gcCandidateSet.remove(last);
+                       } else {
+                               break;
+                       }
+               }
+            }
+
+            // Go through all the destinations to see if any of them can 
remove GC candidates.
             for (StoredDestination sd : storedDestinations.values()) {
+               if( gcCandidateSet.isEmpty() ) {
+                       break;
+                }
                 
                 // Use a visitor to cut down the number of pages that we load
                 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
                     int last=-1;
                     public boolean isInterestedInKeysBetween(Location first, 
Location second) {
-                        if( second!=null ) {
-                            if( last+1 == second.getDataFileId() ) {
-                                last++;
-                                inUseFiles.add(last);
-                            }
-                            if( last == second.getDataFileId() ) {
-                                return false;
-                            }
-                        }
-                        return true;
+                       if( first==null ) {
+                               SortedSet<Integer> subset = 
gcCandidateSet.headSet(second.getDataFileId()+1);
+                               if( !subset.isEmpty() && subset.last() == 
second.getDataFileId() ) {
+                                       subset.remove(second.getDataFileId());
+                               }
+                                                       return 
!subset.isEmpty();
+                       } else if( second==null ) {
+                               SortedSet<Integer> subset = 
gcCandidateSet.tailSet(first.getDataFileId());
+                               if( !subset.isEmpty() && subset.first() == 
first.getDataFileId() ) {
+                                       subset.remove(first.getDataFileId());
+                               }
+                                                       return 
!subset.isEmpty();
+                       } else {
+                               SortedSet<Integer> subset = 
gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
+                               if( !subset.isEmpty() && subset.first() == 
first.getDataFileId() ) {
+                                       subset.remove(first.getDataFileId());
+                               }
+                               if( !subset.isEmpty() && subset.last() == 
second.getDataFileId() ) {
+                                       subset.remove(second.getDataFileId());
+                               }
+                                                       return 
!subset.isEmpty();
+                       }
                     }
     
                     public void visit(List<Location> keys, List<Long> values) {
-                        for (int i = 0; i < keys.size(); i++) {
-                            if( last != keys.get(i).getDataFileId() ) {
-                                inUseFiles.add(keys.get(i).getDataFileId());
-                                last = keys.get(i).getDataFileId();
+                       for (Location l : keys) {
+                            int fileId = l.getDataFileId();
+                                                       if( last != fileId ) {
+                                       gcCandidateSet.remove(fileId);
+                                last = fileId;
                             }
-                        }
-                        
+                                               }                        
                     }
     
                 });
             }
-            inUseFiles.addAll(journalFilesBeingReplicated);
-            Location l = metadata.lastUpdate;
-            if( metadata.firstInProgressTransactionLocation!=null ) {
-                l = metadata.firstInProgressTransactionLocation;
+
+            if( !gcCandidateSet.isEmpty() ) {
+                   LOG.debug("Cleanup removing the data files: 
"+gcCandidateSet);
+                   journal.removeDataFiles(gcCandidateSet);
             }
-            
-            LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
-            journal.consolidateDataFilesNotIn(inUseFiles, 
l==null?null:l.getDataFileId());
         }
         
         LOG.debug("Checkpoint done.");

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741212&r1=741211&r2=741212&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
 Thu Feb  5 18:07:57 2009
@@ -150,12 +150,12 @@
         } else {
             KahaDBStore kaha = new KahaDBStore();
             kaha.setDirectory(new File("target/activemq-data/kahadb"));
-            kaha.deleteAllMessages();
-            kaha.setCleanupInterval(1000 * 60 * 60 * 60);
             // The setEnableJournalDiskSyncs(false) setting is a little 
dangerous right now, as I have not verified 
             // what happens if the index is updated but a journal update is 
lost.
             // Index is going to be in consistent, but can it be repaired?
             kaha.setEnableJournalDiskSyncs(false);
+            // Using a bigger journal file size makes he take fewer spikes as 
it is not switching files as often.
+            kaha.getJournal().setMaxFileLength(1024*1024*100);
             kaha.getPageFile().setWriteBatchSize(100);
             kaha.getPageFile().setEnableWriteThread(true);
             broker.setPersistenceAdapter(kaha);

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=741212&r1=741211&r2=741212&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java 
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java 
Thu Feb  5 18:07:57 2009
@@ -524,7 +524,7 @@
                 }
                 Key key2 = null;
                 if( i!=this.children.length-1 ) {
-                    key1 = keys[i];
+                    key2 = keys[i];
                 }
                 if( visitor.isInterestedInKeysBetween(key1, key2) ) {
                     BTreeNode<Key, Value> child = getChild(tx, i);

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=741212&r1=741211&r2=741212&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java 
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java 
Thu Feb  5 18:07:57 2009
@@ -16,17 +16,12 @@
  */
 package org.apache.kahadb.journal;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -42,7 +37,6 @@
 import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
 import org.apache.kahadb.journal.DataFileAppender.WriteKey;
 import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.LinkedNodeList;
 import org.apache.kahadb.util.Scheduler;
 
@@ -299,18 +293,10 @@
         return result;
     }
 
-    public synchronized void consolidateDataFilesNotIn(Set<Integer> inUse, 
Integer lastFile) throws IOException {
-        Set<Integer> unUsed = new HashSet<Integer>(fileMap.keySet());
-        unUsed.removeAll(inUse);
-
-        for (Integer key : unUsed) {
-            // Don't remove files that come after the lastFile
-            if (lastFile !=null && key >= lastFile ) {
-                continue;
-            }
+    public synchronized void removeDataFiles(Set<Integer> files) throws 
IOException {
+        for (Integer key : files) {
             DataFile dataFile = fileMap.get(key);
-            
-            // Can't remove the last file either.
+            // Can't remove the last file.
             if( dataFile == dataFiles.getTail() ) {
                 continue;
             }


Reply via email to