Author: chirino
Date: Tue Aug 18 19:44:22 2009
New Revision: 805550
URL: http://svn.apache.org/viewvc?rev=805550&view=rev
Log:
- Journal will now check the data files for data corruption if
checkForCorruptionOnStartup option is enabled.
- The corrupted journal records can be inspected via
DataFile.getCorruptedBlocks()
- An OR and AND BTreeVisitor is now supported to support running more complex
queries against the BTree
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=805550&r1=805549&r2=805550&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
Tue Aug 18 19:44:22 2009
@@ -25,7 +25,7 @@
* @param <Value>
*/
public interface BTreeVisitor<Key,Value> {
-
+
/**
* Do you want to visit the range of BTree entries between the first and
and second key?
*
@@ -34,7 +34,7 @@
* @return true if you want to visit the values between the first and
second key.
*/
boolean isInterestedInKeysBetween(Key first, Key second);
-
+
/**
* The keys and values of a BTree leaf node.
*
@@ -42,32 +42,111 @@
* @param values
*/
void visit(List<Key> keys, List<Value> values);
-
-
- abstract class GTVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
- final private Key value;
-
- public GTVisitor(Key value) {
- this.value = value;
- }
- public boolean isInterestedInKeysBetween(Key first, Key second)
{
- return second==null || second.compareTo(value)>0;
- }
+ public interface Predicate<Key> {
+ boolean isInterestedInKeysBetween(Key first, Key second);
+ boolean isInterestedInKey(Key key);
+ }
+ abstract class PredicateVisitor<Key, Value> implements BTreeVisitor<Key,
Value>, Predicate<Key> {
public void visit(List<Key> keys, List<Value> values) {
for( int i=0; i < keys.size(); i++) {
Key key = keys.get(i);
- if( key.compareTo(value)>0 ) {
+ if( isInterestedInKey(key) ) {
matched(key, values.get(i));
}
}
}
- abstract protected void matched(Key key, Value value);
+ protected void matched(Key key, Value value) {
+ }
}
-
- abstract class BetweenVisitor<Key extends Comparable<Key>, Value>
implements BTreeVisitor<Key, Value>{
+
+ class OrVisitor<Key, Value> extends PredicateVisitor<Key, Value> {
+ private final List<Predicate<Key>> conditions;
+
+ public OrVisitor(List<Predicate<Key>> conditions) {
+ this.conditions = conditions;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second)
{
+ for (Predicate<Key> condition : conditions) {
+ if( condition.isInterestedInKeysBetween(first, second) ) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean isInterestedInKey(Key key) {
+ for (Predicate<Key> condition : conditions) {
+ if( condition.isInterestedInKey(key) ) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ boolean first=true;
+ for (Predicate<Key> condition : conditions) {
+ if( !first ) {
+ sb.append(" OR ");
+ }
+ first=false;
+ sb.append("(");
+ sb.append(condition);
+ sb.append(")");
+ }
+ return sb.toString();
+ }
+ }
+
+ class AndVisitor<Key, Value> extends PredicateVisitor<Key, Value> {
+ private final List<Predicate<Key>> conditions;
+
+ public AndVisitor(List<Predicate<Key>> conditions) {
+ this.conditions = conditions;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second)
{
+ for (Predicate<Key> condition : conditions) {
+ if( !condition.isInterestedInKeysBetween(first, second) ) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public boolean isInterestedInKey(Key key) {
+ for (Predicate<Key> condition : conditions) {
+ if( !condition.isInterestedInKey(key) ) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ boolean first=true;
+ for (Predicate<Key> condition : conditions) {
+ if( !first ) {
+ sb.append(" AND ");
+ }
+ first=false;
+ sb.append("(");
+ sb.append(condition);
+ sb.append(")");
+ }
+ return sb.toString();
+ }
+ }
+
+ class BetweenVisitor<Key extends Comparable<Key>, Value> extends
PredicateVisitor<Key, Value> {
private final Key first;
private final Key last;
@@ -81,19 +160,38 @@
&& (first==null || first.compareTo(last)<0);
}
- public void visit(List<Key> keys, List<Value> values) {
- for( int i=0; i < keys.size(); i++) {
- Key key = keys.get(i);
- if( key.compareTo(first)>=0 &&
key.compareTo(last)<0 ) {
- matched(key, values.get(i));
- }
- }
+ public boolean isInterestedInKey(Key key) {
+ return key.compareTo(first) >=0 && key.compareTo(last) <0;
+ }
+
+ @Override
+ public String toString() {
+ return first+" <= key < "+last;
+ }
+ }
+
+ class GTVisitor<Key extends Comparable<Key>, Value> extends
PredicateVisitor<Key, Value> {
+ final private Key value;
+
+ public GTVisitor(Key value) {
+ this.value = value;
+ }
+
+ public boolean isInterestedInKeysBetween(Key first, Key second)
{
+ return second==null || second.compareTo(value)>0;
}
- abstract protected void matched(Key key, Value value);
+ public boolean isInterestedInKey(Key key) {
+ return key.compareTo(value)>0;
+ }
+
+ @Override
+ public String toString() {
+ return "key > "+ value;
+ }
}
- abstract class GTEVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
+ class GTEVisitor<Key extends Comparable<Key>, Value> extends
PredicateVisitor<Key, Value> {
final private Key value;
public GTEVisitor(Key value) {
@@ -104,19 +202,17 @@
return second==null || second.compareTo(value)>=0;
}
- public void visit(List<Key> keys, List<Value> values) {
- for( int i=0; i < keys.size(); i++) {
- Key key = keys.get(i);
- if( key.compareTo(value)>=0 ) {
- matched(key, values.get(i));
- }
- }
- }
+ public boolean isInterestedInKey(Key key) {
+ return key.compareTo(value)>=0;
+ }
- abstract protected void matched(Key key, Value value);
+ @Override
+ public String toString() {
+ return "key >= "+ value;
+ }
}
- abstract class LTVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
+ class LTVisitor<Key extends Comparable<Key>, Value> extends
PredicateVisitor<Key, Value> {
final private Key value;
public LTVisitor(Key value) {
@@ -127,19 +223,17 @@
return first==null || first.compareTo(value)<0;
}
- public void visit(List<Key> keys, List<Value> values) {
- for( int i=0; i < keys.size(); i++) {
- Key key = keys.get(i);
- if( key.compareTo(value)<0 ) {
- matched(key, values.get(i));
- }
- }
- }
+ public boolean isInterestedInKey(Key key) {
+ return key.compareTo(value)<0;
+ }
- abstract protected void matched(Key key, Value value);
+ @Override
+ public String toString() {
+ return "key < "+ value;
+ }
}
- abstract class LTEVisitor<Key extends Comparable<Key>, Value> implements
BTreeVisitor<Key, Value>{
+ class LTEVisitor<Key extends Comparable<Key>, Value> extends
PredicateVisitor<Key, Value> {
final private Key value;
public LTEVisitor(Key value) {
@@ -150,15 +244,13 @@
return first==null || first.compareTo(value)<=0;
}
- public void visit(List<Key> keys, List<Value> values) {
- for( int i=0; i < keys.size(); i++) {
- Key key = keys.get(i);
- if( key.compareTo(value)<=0 ) {
- matched(key, values.get(i));
- }
- }
- }
+ public boolean isInterestedInKey(Key key) {
+ return key.compareTo(value)<=0;
+ }
- abstract protected void matched(Key key, Value value);
+ @Override
+ public String toString() {
+ return "key <= "+ value;
+ }
}
}
\ No newline at end of file
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=805550&r1=805549&r2=805550&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
Tue Aug 18 19:44:22 2009
@@ -22,6 +22,7 @@
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.LinkedNode;
+import org.apache.kahadb.util.SequenceSet;
/**
* DataFile
@@ -33,6 +34,7 @@
protected final File file;
protected final Integer dataFileId;
protected int length;
+ protected final SequenceSet corruptedBlocks = new SequenceSet();
DataFile(File file, int number, int preferedSize) {
this.file = file;
@@ -80,6 +82,10 @@
IOHelper.moveFile(file,targetDirectory);
}
+ public SequenceSet getCorruptedBlocks() {
+ return corruptedBlocks;
+ }
+
public int compareTo(DataFile df) {
return dataFileId - df.dataFileId;
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=805550&r1=805549&r2=805550&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
Tue Aug 18 19:44:22 2009
@@ -95,11 +95,16 @@
}
}
- public void read(long offset, byte data[]) throws IOException {
+ public void readFully(long offset, byte data[]) throws IOException {
file.seek(offset);
file.readFully(data);
}
+ public int read(long offset, byte data[]) throws IOException {
+ file.seek(offset);
+ return file.read(data);
+ }
+
public void readLocationDetails(Location location) throws IOException {
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new
WriteKey(location));
if (asyncWrite != null) {
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=805550&r1=805549&r2=805550&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Tue Aug 18 19:44:22 2009
@@ -20,15 +20,7 @@
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -39,10 +31,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.LinkedNodeList;
-import org.apache.kahadb.util.Scheduler;
+import org.apache.kahadb.util.*;
/**
* Manages DataFiles
@@ -61,7 +50,22 @@
// Batch Control Item holds a 4 byte size of the batch and a 8 byte
checksum of the batch.
public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE
BATCH");
public static final int BATCH_CONTROL_RECORD_SIZE =
RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
-
+ public static final byte[] BATCH_CONTROL_RECORD_HEADER =
createBatchControlRecordHeader();
+
+ private static byte[] createBatchControlRecordHeader() {
+ try {
+ DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+ os.writeInt(BATCH_CONTROL_RECORD_SIZE);
+ os.writeByte(BATCH_CONTROL_RECORD_TYPE);
+ os.write(BATCH_CONTROL_RECORD_MAGIC);
+ ByteSequence sequence = os.toByteSequence();
+ sequence.compact();
+ return sequence.getData();
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create batch control record
header.");
+ }
+ }
+
public static final String DEFAULT_DIRECTORY = ".";
public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
public static final String DEFAULT_FILE_PREFIX = "db-";
@@ -96,6 +100,7 @@
protected boolean archiveDataLogs;
private ReplicationTarget replicationTarget;
protected boolean checksum;
+ protected boolean checkForCorruptionOnStartup;
public synchronized void start() throws IOException {
if (started) {
@@ -137,17 +142,20 @@
for (DataFile df : l) {
dataFiles.addLast(df);
fileByFileMap.put(df.getFile(), df);
+
+ if( isCheckForCorruptionOnStartup() ) {
+ lastAppendLocation.set(recoveryCheck(df));
+ }
}
}
getCurrentWriteFile();
- try {
- Location l = recoveryCheck(dataFiles.getTail());
- lastAppendLocation.set(l);
- } catch (IOException e) {
- LOG.warn("recovery check failed", e);
+
+ if( lastAppendLocation.get()==null ) {
+ DataFile df = dataFiles.getTail();
+ lastAppendLocation.set(recoveryCheck(df));
}
-
+
cleanupTask = new Runnable() {
public void run() {
cleanup();
@@ -177,56 +185,108 @@
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
while( true ) {
- reader.read(location.getOffset(), controlRecord);
- controlIs.restart();
-
- // Assert that it's a batch record.
- if( controlIs.readInt() != BATCH_CONTROL_RECORD_SIZE ) {
- break;
- }
- if( controlIs.readByte() != BATCH_CONTROL_RECORD_TYPE )
{
- break;
- }
- for( int i=0; i < BATCH_CONTROL_RECORD_MAGIC.length;
i++ ) {
- if( controlIs.readByte() !=
BATCH_CONTROL_RECORD_MAGIC[i] ) {
- break;
- }
- }
-
- int size = controlIs.readInt();
- if( size > MAX_BATCH_SIZE ) {
- break;
- }
-
- if( isChecksum() ) {
-
- long expectedChecksum = controlIs.readLong();
-
- byte data[] = new byte[size];
-
reader.read(location.getOffset()+BATCH_CONTROL_RECORD_SIZE, data);
-
- Checksum checksum = new Adler32();
- checksum.update(data, 0, data.length);
-
- if( expectedChecksum!=checksum.getValue() ) {
- break;
- }
-
- }
-
-
-
location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+ int size = checkBatchRecord(reader, location.getOffset());
+ if ( size>=0 ) {
+
location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+ } else {
+
+ // Perhaps it's just some corruption... scan through the
file to find the next valid batch record. We
+ // may have subsequent valid batch records.
+ int nextOffset = findNextBatchRecord(reader,
location.getOffset()+1);
+ if( nextOffset >=0 ) {
+ Sequence sequence = new Sequence(location.getOffset(),
nextOffset - 1);
+ LOG.info("Corrupt journal records found in
'"+dataFile.getFile()+"' between offsets: "+sequence);
+ dataFile.corruptedBlocks.add(sequence);
+ location.setOffset(nextOffset);
+ } else {
+ break;
+ }
+ }
}
} catch (IOException e) {
} finally {
accessorPool.closeDataFileAccessor(reader);
}
-
+
dataFile.setLength(location.getOffset());
+
+ if( !dataFile.corruptedBlocks.isEmpty() ) {
+ // Is the end of the data file corrupted?
+ if( dataFile.corruptedBlocks.getTail().getLast()+1 ==
location.getOffset() ) {
+ dataFile.setLength((int)
dataFile.corruptedBlocks.removeLastSequence().getFirst());
+ }
+ }
+
return location;
}
+ private int findNextBatchRecord(DataFileAccessor reader, int offset)
throws IOException {
+ ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
+ byte data[] = new byte[1024*4];
+ ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
+
+ int pos = 0;
+ while( true ) {
+ pos = bs.indexOf(header, pos);
+ if( pos >= 0 ) {
+ return offset+pos;
+ } else {
+ // need to load the next data chunck in..
+ if( bs.length != data.length ) {
+ // If we had a short read then we were at EOF
+ return -1;
+ }
+ offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
+ bs = new ByteSequence(data, 0, reader.read(offset, data));
+ pos=0;
+ }
+ }
+ }
+
+
+ public int checkBatchRecord(DataFileAccessor reader, int offset) throws
IOException {
+ byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
+ DataByteArrayInputStream controlIs = new
DataByteArrayInputStream(controlRecord);
+
+ reader.readFully(offset, controlRecord);
+
+ // Assert that it's a batch record.
+ for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
+ if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
+ return -1;
+ }
+ }
+
+ int size = controlIs.readInt();
+ if( size > MAX_BATCH_SIZE ) {
+ return -1;
+ }
+
+ if( isChecksum() ) {
+
+ long expectedChecksum = controlIs.readLong();
+ if( expectedChecksum == 0 ) {
+ // Checksuming was not enabled when the record was stored.
+ // we can't validate the record :(
+ return size;
+ }
+
+ byte data[] = new byte[size];
+ reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
+
+ Checksum checksum = new Adler32();
+ checksum.update(data, 0, data.length);
+
+ if( expectedChecksum!=checksum.getValue() ) {
+ return -1;
+ }
+
+ }
+ return size;
+ }
+
+
void addToTotalLength(int size) {
totalLength.addAndGet(size);
}
@@ -640,5 +700,11 @@
this.checksum = checksumWrites;
}
+ public boolean isCheckForCorruptionOnStartup() {
+ return checkForCorruptionOnStartup;
+ }
+ public void setCheckForCorruptionOnStartup(boolean
checkForCorruptionOnStartup) {
+ this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
+ }
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java?rev=805550&r1=805549&r2=805550&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/ByteSequence.java
Tue Aug 18 19:44:22 2009
@@ -71,4 +71,35 @@
}
}
+ public int indexOf(ByteSequence needle, int pos) {
+ int max = length - needle.length;
+ for (int i = pos; i < max; i++) {
+ if (matches(needle, i)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private boolean matches(ByteSequence needle, int pos) {
+ for (int i = 0; i < needle.length; i++) {
+ if( data[offset + pos+ i] != needle.data[needle.offset + i] ) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private byte getByte(int i) {
+ return data[offset+i];
+ }
+
+ final public int indexOf(byte value, int pos) {
+ for (int i = pos; i < length; i++) {
+ if (data[offset + i] == value) {
+ return i;
+ }
+ }
+ return -1;
+ }
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=805550&r1=805549&r2=805550&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
Tue Aug 18 19:44:22 2009
@@ -175,7 +175,18 @@
Sequence rc = removeFirstSequence(1);
return rc.first;
}
-
+
+
+ public Sequence removeLastSequence() {
+ if (isEmpty()) {
+ return null;
+ }
+
+ Sequence rc = getTail();
+ rc.unlink();
+ return rc;
+ }
+
/**
* Removes and returns the first sequence that is count range large.
*