Author: dejanb
Date: Tue Dec 13 15:43:53 2011
New Revision: 1213743
URL: http://svn.apache.org/viewvc?rev=1213743&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3634 - corrupted index recovery
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1213743&r1=1213742&r2=1213743&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Tue Dec 13 15:43:53 2011
@@ -16,20 +16,11 @@
*/
package org.apache.activemq.store.kahadb;
-import java.io.File;
-import java.io.IOException;
-import java.util.Set;
import org.apache.activeio.journal.Journal;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.command.*;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@@ -40,6 +31,10 @@ import org.apache.activemq.store.kahadb.
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
import org.apache.activemq.usage.SystemUsage;
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
/**
* An implementation of {@link PersistenceAdapter} designed for use with a
* {@link Journal} and then check pointing asynchronously on a timeout with
some
@@ -500,6 +495,14 @@ public class KahaDBPersistenceAdapter im
letter.setForceRecoverIndex(forceRecoverIndex);
}
+ public boolean isArchiveCorruptedIndex() {
+ return letter.isArchiveCorruptedIndex();
+ }
+
+ public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
+ letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
+ }
+
/**
* When true, persist the redelivery status such that the message
redelivery flag can survive a broker failure
* used with
org.apache.activemq.ActiveMQConnectionFactory#setTransactedIndividualAck(boolean)
true
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1213743&r1=1213742&r2=1213743&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Tue Dec 13 15:43:53 2011
@@ -16,38 +16,6 @@
*/
package org.apache.activemq.store.kahadb;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.Stack;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.activemq.ActiveMQMessageAuditNoSync;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@@ -55,18 +23,7 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.protobuf.Buffer;
-import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
-import org.apache.activemq.store.kahadb.data.KahaDestination;
-import org.apache.activemq.store.kahadb.data.KahaEntryType;
-import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
-import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
-import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
-import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
-import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
-import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
-import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
+import org.apache.activemq.store.kahadb.data.*;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
@@ -80,20 +37,19 @@ import org.apache.kahadb.journal.Locatio
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.LocationMarshaller;
-import org.apache.kahadb.util.LockFile;
-import org.apache.kahadb.util.LongMarshaller;
-import org.apache.kahadb.util.Marshaller;
-import org.apache.kahadb.util.Sequence;
-import org.apache.kahadb.util.SequenceSet;
-import org.apache.kahadb.util.StringMarshaller;
-import org.apache.kahadb.util.VariableMarshaller;
+import org.apache.kahadb.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.*;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
public abstract class MessageDatabase extends ServiceSupport implements
BrokerServiceAware {
protected BrokerService brokerService;
@@ -225,6 +181,7 @@ public abstract class MessageDatabase ex
protected boolean forceRecoverIndex = false;
private final Object checkpointThreadLock = new Object();
private boolean rewriteOnRedelivery = false;
+ private boolean archiveCorruptedIndex = false;
public MessageDatabase() {
}
@@ -333,7 +290,21 @@ public abstract class MessageDatabase ex
public void open() throws IOException {
if( opened.compareAndSet(false, true) ) {
getJournal().start();
- loadPageFile();
+ try {
+ loadPageFile();
+ } catch (IOException ioe) {
+ LOG.warn("Index corrupted, trying to recover ...", ioe);
+ // try to recover index
+ try {
+ pageFile.unload();
+ } catch (Exception ignore) {}
+ if (archiveCorruptedIndex) {
+ pageFile.archive();
+ } else {
+ pageFile.delete();
+ }
+ loadPageFile();
+ }
startCheckpoint();
recover();
}
@@ -2295,6 +2266,14 @@ public abstract class MessageDatabase ex
this.rewriteOnRedelivery = rewriteOnRedelivery;
}
+ public boolean isArchiveCorruptedIndex() {
+ return archiveCorruptedIndex;
+ }
+
+ public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
+ this.archiveCorruptedIndex = archiveCorruptedIndex;
+ }
+
// /////////////////////////////////////////////////////////////////
// Internal conversion methods.
// /////////////////////////////////////////////////////////////////
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java?rev=1213743&r1=1213742&r2=1213743&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreRecoveryBrokerTest.java
Tue Dec 13 15:43:53 2011
@@ -16,24 +16,15 @@
*/
package org.apache.activemq.store.kahadb;
-import java.io.File;
-import java.net.URI;
-import java.util.ArrayList;
-
import junit.framework.Test;
-
-import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.RecoveryBrokerTest;
import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.*;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
/**
@@ -53,8 +44,20 @@ public class KahaDBStoreRecoveryBrokerTe
}
protected BrokerService createRestartedBroker() throws Exception {
+
+ // corrupting index
+ File index = new File("target/activemq-data/kahadb/db.data");
+ index.delete();
+ RandomAccessFile raf = new RandomAccessFile(index, "rw");
+ raf.seek(index.length());
+ raf.writeBytes("corrupt");
+ raf.close();
+
+ // starting broker
BrokerService broker = new BrokerService();
KahaDBStore kaha = new KahaDBStore();
+ // uncomment if you want to test archiving
+ //kaha.setArchiveCorruptedIndex(true);
kaha.setDirectory(new File("target/activemq-data/kahadb"));
broker.setPersistenceAdapter(kaha);
return broker;
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1213743&r1=1213742&r2=1213743&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Tue Dec 13 15:43:53 2011
@@ -16,16 +16,13 @@
*/
package org.apache.kahadb.page;
+import org.apache.kahadb.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.RandomAccessFile;
+import java.io.*;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
@@ -34,16 +31,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.zip.Adler32;
import java.util.zip.Checksum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.IOExceptionSupport;
-import org.apache.kahadb.util.IOHelper;
-import org.apache.kahadb.util.IntrospectionSupport;
-import org.apache.kahadb.util.LRUCache;
-import org.apache.kahadb.util.Sequence;
-import org.apache.kahadb.util.SequenceSet;
-
/**
* A PageFile provides you random access to fixed sized disk pages. This
object is not thread safe and therefore access to it should
* be externally synchronized.
@@ -310,6 +297,16 @@ public class PageFile {
delete(getFreeFile());
delete(getRecoveryFile());
}
+
+ public void archive() throws IOException {
+ if( loaded.get() ) {
+ throw new IllegalStateException("Cannot delete page file data when
the page file is loaded");
+ }
+ long timestamp = System.currentTimeMillis();
+ archive(getMainPageFile(), String.valueOf(timestamp));
+ archive(getFreeFile(), String.valueOf(timestamp));
+ archive(getRecoveryFile(), String.valueOf(timestamp));
+ }
/**
* @param file
@@ -323,6 +320,15 @@ public class PageFile {
}
}
+ private void archive(File file, String suffix) throws IOException {
+ if( file.exists() ) {
+ File archive = new File(file.getPath() + "-" + suffix);
+ if( !file.renameTo(archive) ) {
+ throw new IOException("Could not archive: " + file.getPath() +
" to " + file.getPath());
+ }
+ }
+ }
+
/**
* Loads the page file so that it can be accessed for read/write purposes.
This allocates OS resources. If this is the
* first time the page file is loaded, then this creates the page file in
the file system.