Author: chirino
Date: Fri Feb 6 18:19:31 2009
New Revision: 741659
URL: http://svn.apache.org/viewvc?rev=741659&view=rev
Log:
- added some handy generic visitors to the BTreeVisitor class.
- Updated the recovery process so it now rollsback changes applied to the index
which did not get synced to the journal.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741659&r1=741658&r2=741659&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Feb 6 18:19:31 2009
@@ -360,25 +360,64 @@
long start = System.currentTimeMillis();
Location recoveryPosition = getRecoveryPosition();
- if( recoveryPosition ==null ) {
- return;
+ if( recoveryPosition!=null ) {
+ int redoCounter = 0;
+ while (recoveryPosition != null) {
+ JournalCommand message = load(recoveryPosition);
+ metadata.lastUpdate = recoveryPosition;
+ process(message, recoveryPosition);
+ redoCounter++;
+ recoveryPosition =
journal.getNextLocation(recoveryPosition);
+ }
+ long end = System.currentTimeMillis();
+ LOG.info("Replayed " + redoCounter + " operations from
the journal in " + ((end - start) / 1000.0f) + " seconds.");
}
-
- int redoCounter = 0;
- LOG.info("Journal Recovery Started from: " + journal + " at " +
recoveryPosition.getDataFileId() + ":" + recoveryPosition.getOffset());
-
- while (recoveryPosition != null) {
- JournalCommand message = load(recoveryPosition);
- metadata.lastUpdate = recoveryPosition;
- process(message, recoveryPosition);
- redoCounter++;
- recoveryPosition =
journal.getNextLocation(recoveryPosition);
- }
- long end = System.currentTimeMillis();
- LOG.info("Replayed " + redoCounter + " operations from redo log
in " + ((end - start) / 1000.0f) + " seconds.");
+
+ // We may have to undo some index updates.
+ pageFile.tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ recoverIndex(tx);
+ }
+ });
}
}
+ protected void recoverIndex(Transaction tx) throws IOException {
+ long start = System.currentTimeMillis();
+ // It is possible index updates got applied before the journal
updates..
+ // in that case we need to removed references to messages that are not
in the journal
+ final Location lastAppendLocation = journal.getLastAppendLocation();
+ long undoCounter=0;
+
+ // Go through all the destinations to see if they have messages past
the lastAppendLocation
+ for (StoredDestination sd : storedDestinations.values()) {
+
+ final ArrayList<Long> matches = new ArrayList<Long>();
+ // Find all the Locations that are >= than the last Append
Location.
+ sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location,
Long>(lastAppendLocation) {
+ @Override
+ protected void matched(Location key, Long
value) {
+ matches.add(value);
+ }
+ });
+
+
+ for (Long sequenceId : matches) {
+ MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+ sd.locationIndex.remove(tx, keys.location);
+ sd.messageIdIndex.remove(tx, keys.messageId);
+ undoCounter++;
+ // TODO: do we need to modify the ack positions for the pub
sub case?
+ }
+ }
+ long end = System.currentTimeMillis();
+ if( undoCounter > 0 ) {
+ // The rolledback operations are basically in flight journal
writes. To avoid getting these the end user
+ // should do sync writes to the journal.
+ LOG.info("Rolled back " + undoCounter + " operations from the
index in " + ((end - start) / 1000.0f) + " seconds.");
+ }
+ }
+
private Location nextRecoveryPosition;
private Location lastRecoveryPosition;
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=741659&r1=741658&r2=741659&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
Fri Feb 6 18:19:31 2009
@@ -43,4 +43,96 @@
*/
void visit(List<Key> keys, List<Value> values);
+
+ abstract class GTVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
+ final private Key value;
+
+ public GTVisitor(Key value) {
+ this.value = value;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second)
{
+ return second==null || second.compareTo(value)>0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for( int i=0; i < keys.size(); i++) {
+ Key key = keys.get(i);
+ if( key.compareTo(value)>0 ) {
+ matched(key, values.get(i));
+ }
+ }
+ }
+
+ abstract protected void matched(Key key, Value value);
+ }
+
+ abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
+ final private Key value;
+
+ public GTEVisitor(Key value) {
+ this.value = value;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second)
{
+ return second==null || second.compareTo(value)>=0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for( int i=0; i < keys.size(); i++) {
+ Key key = keys.get(i);
+ if( key.compareTo(value)>=0 ) {
+ matched(key, values.get(i));
+ }
+ }
+ }
+
+ abstract protected void matched(Key key, Value value);
+ }
+
+ abstract class LTVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
+ final private Key value;
+
+ public LTVisitor(Key value) {
+ this.value = value;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second)
{
+ return first==null || first.compareTo(value)<0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for( int i=0; i < keys.size(); i++) {
+ Key key = keys.get(i);
+ if( key.compareTo(value)<0 ) {
+ matched(key, values.get(i));
+ }
+ }
+ }
+
+ abstract protected void matched(Key key, Value value);
+ }
+
+ abstract class LTEVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
+ final private Key value;
+
+ public LTEVisitor(Key value) {
+ this.value = value;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second)
{
+ return first==null || first.compareTo(value)<=0;
+ }
+
+ public void visit(List<Key> keys, List<Value> values) {
+ for( int i=0; i < keys.size(); i++) {
+ Key key = keys.get(i);
+ if( key.compareTo(value)<=0 ) {
+ matched(key, values.get(i));
+ }
+ }
+ }
+
+ abstract protected void matched(Key key, Value value);
+ }
}
\ No newline at end of file