Author: chirino
Date: Fri Feb 6 15:42:22 2009
New Revision: 741597
URL: http://svn.apache.org/viewvc?rev=741597&view=rev
Log:
better synchronization of the metadata.lastUpdate var
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741597&r1=741596&r2=741597&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Feb 6 15:42:22 2009
@@ -356,44 +356,48 @@
* @throws IllegalStateException
*/
private void recover() throws IllegalStateException, IOException {
- long start = System.currentTimeMillis();
-
- Location recoveryPosition = getRecoveryPosition();
- if( recoveryPosition ==null ) {
- return;
- }
-
- int redoCounter = 0;
- LOG.info("Journal Recovery Started from: " + journal + " at " +
recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
-
- while (recoveryPosition != null) {
- JournalCommand message = load(recoveryPosition);
- metadata.lastUpdate = recoveryPosition;
- process(message, recoveryPosition);
- redoCounter++;
- recoveryPosition = journal.getNextLocation(recoveryPosition);
+ synchronized (indexMutex) {
+ long start = System.currentTimeMillis();
+
+ Location recoveryPosition = getRecoveryPosition();
+ if( recoveryPosition ==null ) {
+ return;
+ }
+
+ int redoCounter = 0;
+ LOG.info("Journal Recovery Started from: " + journal + " at " +
recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
+
+ while (recoveryPosition != null) {
+ JournalCommand message = load(recoveryPosition);
+ metadata.lastUpdate = recoveryPosition;
+ process(message, recoveryPosition);
+ redoCounter++;
+ recoveryPosition =
journal.getNextLocation(recoveryPosition);
+ }
+ long end = System.currentTimeMillis();
+ LOG.info("Replayed " + redoCounter + " operations from redo log
in " + ((end - start) / 1000.0f) + " seconds.");
}
- long end = System.currentTimeMillis();
- LOG.info("Replayed " + redoCounter + " operations from redo log in " +
((end - start) / 1000.0f) + " seconds.");
}
private Location nextRecoveryPosition;
private Location lastRecoveryPosition;
public void incrementalRecover() throws IOException {
- if( nextRecoveryPosition == null ) {
- if( lastRecoveryPosition==null ) {
- nextRecoveryPosition = getRecoveryPosition();
- } else {
- nextRecoveryPosition =
journal.getNextLocation(lastRecoveryPosition);
- }
- }
- while (nextRecoveryPosition != null) {
- lastRecoveryPosition = nextRecoveryPosition;
- metadata.lastUpdate = lastRecoveryPosition;
- JournalCommand message = load(lastRecoveryPosition);
- process(message, lastRecoveryPosition);
- nextRecoveryPosition =
journal.getNextLocation(lastRecoveryPosition);
+ synchronized (indexMutex) {
+ if( nextRecoveryPosition == null ) {
+ if( lastRecoveryPosition==null ) {
+ nextRecoveryPosition = getRecoveryPosition();
+ } else {
+ nextRecoveryPosition =
journal.getNextLocation(lastRecoveryPosition);
+ }
+ }
+ while (nextRecoveryPosition != null) {
+ lastRecoveryPosition = nextRecoveryPosition;
+ metadata.lastUpdate = lastRecoveryPosition;
+ JournalCommand message = load(lastRecoveryPosition);
+ process(message, lastRecoveryPosition);
+ nextRecoveryPosition =
journal.getNextLocation(lastRecoveryPosition);
+ }
}
}
@@ -482,7 +486,9 @@
LOG.warn("KahaDB long enqueue time: Journal Add Took:
"+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
}
- metadata.lastUpdate = location;
+ synchronized (indexMutex) {
+ metadata.lastUpdate = location;
+ }
return location;
}