Author: chirino
Date: Fri Feb  6 15:42:22 2009
New Revision: 741597

URL: http://svn.apache.org/viewvc?rev=741597&view=rev
Log:
better synchronization of the metadata.lastUpdate var

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741597&r1=741596&r2=741597&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 Fri Feb  6 15:42:22 2009
@@ -356,44 +356,48 @@
      * @throws IllegalStateException
      */
     private void recover() throws IllegalStateException, IOException {
-        long start = System.currentTimeMillis();
-        
-        Location recoveryPosition = getRecoveryPosition();
-        if( recoveryPosition ==null ) {
-               return;
-        }
-        
-        int redoCounter = 0;
-        LOG.info("Journal Recovery Started from: " + journal + " at " + 
recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
-
-        while (recoveryPosition != null) {
-            JournalCommand message = load(recoveryPosition);
-            metadata.lastUpdate = recoveryPosition;
-            process(message, recoveryPosition);
-            redoCounter++;
-            recoveryPosition = journal.getNextLocation(recoveryPosition);
+        synchronized (indexMutex) {
+               long start = System.currentTimeMillis();
+               
+               Location recoveryPosition = getRecoveryPosition();
+               if( recoveryPosition ==null ) {
+                       return;
+               }
+               
+               int redoCounter = 0;
+               LOG.info("Journal Recovery Started from: " + journal + " at " + 
recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
+       
+               while (recoveryPosition != null) {
+                   JournalCommand message = load(recoveryPosition);
+                   metadata.lastUpdate = recoveryPosition;
+                   process(message, recoveryPosition);
+                   redoCounter++;
+                   recoveryPosition = 
journal.getNextLocation(recoveryPosition);
+               }
+               long end = System.currentTimeMillis();
+               LOG.info("Replayed " + redoCounter + " operations from redo log 
in " + ((end - start) / 1000.0f) + " seconds.");
         }
-        long end = System.currentTimeMillis();
-        LOG.info("Replayed " + redoCounter + " operations from redo log in " + 
((end - start) / 1000.0f) + " seconds.");
     }
     
        private Location nextRecoveryPosition;
        private Location lastRecoveryPosition;
 
        public void incrementalRecover() throws IOException {
-        if( nextRecoveryPosition == null ) {
-               if( lastRecoveryPosition==null ) {
-                       nextRecoveryPosition = getRecoveryPosition();
-               } else {
-                nextRecoveryPosition = 
journal.getNextLocation(lastRecoveryPosition);
-               }               
-        }
-        while (nextRecoveryPosition != null) {
-               lastRecoveryPosition = nextRecoveryPosition;
-            metadata.lastUpdate = lastRecoveryPosition;
-            JournalCommand message = load(lastRecoveryPosition);
-            process(message, lastRecoveryPosition);            
-            nextRecoveryPosition = 
journal.getNextLocation(lastRecoveryPosition);
+        synchronized (indexMutex) {
+               if( nextRecoveryPosition == null ) {
+                       if( lastRecoveryPosition==null ) {
+                               nextRecoveryPosition = getRecoveryPosition();
+                       } else {
+                       nextRecoveryPosition = 
journal.getNextLocation(lastRecoveryPosition);
+                       }               
+               }
+               while (nextRecoveryPosition != null) {
+                       lastRecoveryPosition = nextRecoveryPosition;
+                   metadata.lastUpdate = lastRecoveryPosition;
+                   JournalCommand message = load(lastRecoveryPosition);
+                   process(message, lastRecoveryPosition);            
+                   nextRecoveryPosition = 
journal.getNextLocation(lastRecoveryPosition);
+               }
         }
        }
        
@@ -482,7 +486,9 @@
                LOG.warn("KahaDB long enqueue time: Journal Add Took: 
"+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
        }
 
-        metadata.lastUpdate = location;
+        synchronized (indexMutex) {
+               metadata.lastUpdate = location;
+        }
         return location;
     }
 


Reply via email to