Author: chirino
Date: Fri Feb  6 18:19:31 2009
New Revision: 741659

URL: http://svn.apache.org/viewvc?rev=741659&view=rev
Log:
- added some handy generic visitors to the BTreeVisitor class.
- Updated the recovery process so it now rollsback changes applied to the index 
which did not get synced to the journal.


Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741659&r1=741658&r2=741659&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 Fri Feb  6 18:19:31 2009
@@ -360,25 +360,64 @@
                long start = System.currentTimeMillis();
                
                Location recoveryPosition = getRecoveryPosition();
-               if( recoveryPosition ==null ) {
-                       return;
+               if( recoveryPosition!=null ) {
+                       int redoCounter = 0;
+                       while (recoveryPosition != null) {
+                           JournalCommand message = load(recoveryPosition);
+                           metadata.lastUpdate = recoveryPosition;
+                           process(message, recoveryPosition);
+                           redoCounter++;
+                           recoveryPosition = 
journal.getNextLocation(recoveryPosition);
+                       }
+                       long end = System.currentTimeMillis();
+                       LOG.info("Replayed " + redoCounter + " operations from 
the journal in " + ((end - start) / 1000.0f) + " seconds.");
                }
-               
-               int redoCounter = 0;
-               LOG.info("Journal Recovery Started from: " + journal + " at " + 
recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
-       
-               while (recoveryPosition != null) {
-                   JournalCommand message = load(recoveryPosition);
-                   metadata.lastUpdate = recoveryPosition;
-                   process(message, recoveryPosition);
-                   redoCounter++;
-                   recoveryPosition = 
journal.getNextLocation(recoveryPosition);
-               }
-               long end = System.currentTimeMillis();
-               LOG.info("Replayed " + redoCounter + " operations from redo log 
in " + ((end - start) / 1000.0f) + " seconds.");
+            
+               // We may have to undo some index updates.
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    recoverIndex(tx);
+                }
+            });
         }
     }
     
+       protected void recoverIndex(Transaction tx) throws IOException {
+        long start = System.currentTimeMillis();
+        // It is possible index updates got applied before the journal 
updates.. 
+        // in that case we need to removed references to messages that are not 
in the journal
+        final Location lastAppendLocation = journal.getLastAppendLocation();
+        long undoCounter=0;
+        
+        // Go through all the destinations to see if they have messages past 
the lastAppendLocation
+        for (StoredDestination sd : storedDestinations.values()) {
+               
+            final ArrayList<Long> matches = new ArrayList<Long>();
+            // Find all the Locations that are >= than the last Append 
Location.
+            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, 
Long>(lastAppendLocation) {
+                               @Override
+                               protected void matched(Location key, Long 
value) {
+                                       matches.add(value);
+                               }
+            });
+            
+            
+            for (Long sequenceId : matches) {
+                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+                sd.locationIndex.remove(tx, keys.location);
+                sd.messageIdIndex.remove(tx, keys.messageId);
+                undoCounter++;
+                // TODO: do we need to modify the ack positions for the pub 
sub case?
+                       }
+        }
+        long end = System.currentTimeMillis();
+        if( undoCounter > 0 ) {
+               // The rolledback operations are basically in flight journal 
writes.  To avoid getting these the end user
+               // should do sync writes to the journal.
+               LOG.info("Rolled back " + undoCounter + " operations from the 
index in " + ((end - start) / 1000.0f) + " seconds.");
+        }
+       }
+
        private Location nextRecoveryPosition;
        private Location lastRecoveryPosition;
 

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=741659&r1=741658&r2=741659&view=diff
==============================================================================
--- 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java 
(original)
+++ 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java 
Fri Feb  6 18:19:31 2009
@@ -43,4 +43,96 @@
      */
     void visit(List<Key> keys, List<Value> values);
     
+    
+    abstract class GTVisitor<Key extends Comparable<Key>, Value> implements 
BTreeVisitor<Key, Value>{
+               final private Key value;
+
+               public GTVisitor(Key value) {
+                       this.value = value;
+               }
+
+               public boolean isInterestedInKeysBetween(Key first, Key second) 
{
+               return second==null || second.compareTo(value)>0;
+               }
+
+               public void visit(List<Key> keys, List<Value> values) {
+                       for( int i=0; i < keys.size(); i++) {
+                               Key key = keys.get(i);
+                               if( key.compareTo(value)>0 ) {
+                                       matched(key, values.get(i));
+                               }
+                       }
+               }
+
+               abstract protected void matched(Key key, Value value);
+    }
+    
+    abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements 
BTreeVisitor<Key, Value>{
+               final private Key value;
+
+               public GTEVisitor(Key value) {
+                       this.value = value;
+               }
+
+               public boolean isInterestedInKeysBetween(Key first, Key second) 
{
+               return second==null || second.compareTo(value)>=0;
+               }
+
+               public void visit(List<Key> keys, List<Value> values) {
+                       for( int i=0; i < keys.size(); i++) {
+                               Key key = keys.get(i);
+                               if( key.compareTo(value)>=0 ) {
+                                       matched(key, values.get(i));
+                               }
+                       }
+               }
+
+               abstract protected void matched(Key key, Value value);
+    }
+    
+    abstract class LTVisitor<Key extends Comparable<Key>, Value> implements 
BTreeVisitor<Key, Value>{
+               final private Key value;
+
+               public LTVisitor(Key value) {
+                       this.value = value;
+               }
+
+               public boolean isInterestedInKeysBetween(Key first, Key second) 
{
+               return first==null || first.compareTo(value)<0;
+               }
+
+               public void visit(List<Key> keys, List<Value> values) {
+                       for( int i=0; i < keys.size(); i++) {
+                               Key key = keys.get(i);
+                               if( key.compareTo(value)<0 ) {
+                                       matched(key, values.get(i));
+                               }
+                       }
+               }
+
+               abstract protected void matched(Key key, Value value);
+    }
+    
+    abstract class LTEVisitor<Key extends Comparable<Key>, Value> implements 
BTreeVisitor<Key, Value>{
+               final private Key value;
+
+               public LTEVisitor(Key value) {
+                       this.value = value;
+               }
+
+               public boolean isInterestedInKeysBetween(Key first, Key second) 
{
+               return first==null || first.compareTo(value)<=0;
+               }
+
+               public void visit(List<Key> keys, List<Value> values) {
+                       for( int i=0; i < keys.size(); i++) {
+                               Key key = keys.get(i);
+                               if( key.compareTo(value)<=0 ) {
+                                       matched(key, values.get(i));
+                               }
+                       }
+               }
+
+               abstract protected void matched(Key key, Value value);
+    }
 }
\ No newline at end of file


Reply via email to