Author: chirino
Date: Fri Feb 6 21:35:39 2009
New Revision: 741741
URL: http://svn.apache.org/viewvc?rev=741741&view=rev
Log:
Change the way journal records are validated on recovery to be consistent. We
now use a checksum to provide better consistency.
Removed:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/NioJournalTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Feb 6 21:35:39 2009
@@ -1277,7 +1277,6 @@
Journal manager = new Journal();
manager.setDirectory(directory);
manager.setMaxFileLength(getJournalMaxFileLength());
- manager.setUseNio(false);
return manager;
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
Fri Feb 6 21:35:39 2009
@@ -29,7 +29,6 @@
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.page.Transaction.Closure;
import org.apache.kahadb.util.Marshaller;
/**
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
Fri Feb 6 21:35:39 2009
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.io.RandomAccessFile;
-import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
import org.apache.kahadb.util.IOHelper;
import org.apache.kahadb.util.LinkedNode;
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
Fri Feb 6 21:35:39 2009
@@ -81,12 +81,12 @@
if (location.getSize() == Location.NOT_SET) {
file.seek(location.getOffset());
location.setSize(file.readInt());
- file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+ location.setType(file.readByte());
} else {
- file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+ file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
}
- byte[] data = new byte[location.getSize() -
Journal.ITEM_HEAD_FOOT_SPACE];
+ byte[] data = new byte[location.getSize() -
Journal.RECORD_HEAD_SPACE];
file.readFully(data);
return new ByteSequence(data, 0, data.length);
@@ -94,6 +94,11 @@
throw new IOException("Invalid location: " + location + ", : " +
e);
}
}
+
+ public void read(long offset, byte data[]) throws IOException {
+ file.seek(offset);
+ file.readFully(data);
+ }
public void readLocationDetails(Location location) throws IOException {
WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new
WriteKey(location));
@@ -107,42 +112,42 @@
}
}
- public boolean readLocationDetailsAndValidate(Location location) {
- try {
- WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new
WriteKey(location));
- if (asyncWrite != null) {
- location.setSize(asyncWrite.location.getSize());
- location.setType(asyncWrite.location.getType());
- } else {
- file.seek(location.getOffset());
- location.setSize(file.readInt());
- location.setType(file.readByte());
-
- byte data[] = new byte[3];
- file.seek(location.getOffset() +
Journal.ITEM_HEAD_OFFSET_TO_SOR);
- file.readFully(data);
- if (data[0] != Journal.ITEM_HEAD_SOR[0]
- || data[1] != Journal.ITEM_HEAD_SOR[1]
- || data[2] != Journal.ITEM_HEAD_SOR[2]) {
- return false;
- }
- file.seek(location.getOffset() + location.getSize() -
Journal.ITEM_FOOT_SPACE);
- file.readFully(data);
- if (data[0] != Journal.ITEM_HEAD_EOR[0]
- || data[1] != Journal.ITEM_HEAD_EOR[1]
- || data[2] != Journal.ITEM_HEAD_EOR[2]) {
- return false;
- }
- }
- } catch (IOException e) {
- return false;
- }
- return true;
- }
+// public boolean readLocationDetailsAndValidate(Location location) {
+// try {
+// WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new
WriteKey(location));
+// if (asyncWrite != null) {
+// location.setSize(asyncWrite.location.getSize());
+// location.setType(asyncWrite.location.getType());
+// } else {
+// file.seek(location.getOffset());
+// location.setSize(file.readInt());
+// location.setType(file.readByte());
+//
+// byte data[] = new byte[3];
+// file.seek(location.getOffset() +
Journal.ITEM_HEAD_OFFSET_TO_SOR);
+// file.readFully(data);
+// if (data[0] != Journal.ITEM_HEAD_SOR[0]
+// || data[1] != Journal.ITEM_HEAD_SOR[1]
+// || data[2] != Journal.ITEM_HEAD_SOR[2]) {
+// return false;
+// }
+// file.seek(location.getOffset() + location.getSize() -
Journal.ITEM_FOOT_SPACE);
+// file.readFully(data);
+// if (data[0] != Journal.ITEM_HEAD_EOR[0]
+// || data[1] != Journal.ITEM_HEAD_EOR[1]
+// || data[2] != Journal.ITEM_HEAD_EOR[2]) {
+// return false;
+// }
+// }
+// } catch (IOException e) {
+// return false;
+// }
+// return true;
+// }
public void updateRecord(Location location, ByteSequence data, boolean
sync) throws IOException {
- file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+ file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
int size = Math.min(data.getLength(), location.getSize());
file.write(data.getData(), data.getOffset(), size);
if (sync) {
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
Fri Feb 6 21:35:39 2009
@@ -30,7 +30,7 @@
*/
public class DataFileAccessorPool {
- private final Journal dataManager;
+ private final Journal journal;
private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
private boolean closed;
private int maxOpenReadersPerFile = 5;
@@ -50,9 +50,9 @@
public DataFileAccessor openDataFileReader() throws IOException {
DataFileAccessor rc = null;
if (pool.isEmpty()) {
- rc = new DataFileAccessor(dataManager, file);
+ rc = new DataFileAccessor(journal, file);
} else {
- rc = (DataFileAccessor)pool.remove(pool.size() - 1);
+ rc = pool.remove(pool.size() - 1);
}
used = true;
openCounter++;
@@ -91,12 +91,12 @@
}
public DataFileAccessorPool(Journal dataManager) {
- this.dataManager = dataManager;
+ this.journal = dataManager;
}
synchronized void clearUsedMark() {
- for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
- Pool pool = (Pool)iter.next();
+ for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();)
{
+ Pool pool = iter.next();
pool.clearUsedMark();
}
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Fri Feb 6 21:35:39 2009
@@ -21,6 +21,8 @@
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayOutputStream;
@@ -36,10 +38,9 @@
*/
class DataFileAppender {
- protected static final byte[] RESERVED_SPACE = new
byte[Journal.ITEM_HEAD_RESERVED_SPACE];
protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
- protected final Journal dataManager;
+ protected final Journal journal;
protected final Map<WriteKey, WriteCommand> inflightWrites;
protected final Object enqueueMutex = new Object() {
};
@@ -84,19 +85,21 @@
public final LinkedNodeList<WriteCommand> writes = new
LinkedNodeList<WriteCommand>();
public final CountDownLatch latch = new CountDownLatch(1);
- public int size;
+ private final int offset;
+ public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
- public WriteBatch(DataFile dataFile, WriteCommand write) throws
IOException {
+ public WriteBatch(DataFile dataFile, int offset, WriteCommand write)
throws IOException {
this.dataFile = dataFile;
- this.writes.addLast(write);
- size += write.location.getSize();
+ this.offset = offset;
+ this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+ this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
+ journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+ append(write);
}
- public boolean canAppend(DataFile dataFile, WriteCommand write) {
- if (dataFile != this.dataFile) {
- return false;
- }
- if (size + write.location.getSize() >= maxWriteBatchSize) {
+ public boolean canAppend(WriteCommand write) {
+ int newSize = size + write.location.getSize();
+ if (newSize >= maxWriteBatchSize || offset+newSize >
journal.getMaxFileLength() ) {
return false;
}
return true;
@@ -104,7 +107,12 @@
public void append(WriteCommand write) throws IOException {
this.writes.addLast(write);
- size += write.location.getSize();
+ write.location.setDataFileId(dataFile.getDataFileId());
+ write.location.setOffset(offset+size);
+ int s = write.location.getSize();
+ size += s;
+ dataFile.incrementLength(s);
+ journal.addToTotalLength(s);
}
}
@@ -135,8 +143,8 @@
* @param fileId
*/
public DataFileAppender(Journal dataManager) {
- this.dataManager = dataManager;
- this.inflightWrites = this.dataManager.getInflightWrites();
+ this.journal = dataManager;
+ this.inflightWrites = this.journal.getInflightWrites();
}
/**
@@ -153,7 +161,7 @@
public Location storeItem(ByteSequence data, byte type, boolean sync)
throws IOException {
// Write the packet our internal buffer.
- int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
+ int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
@@ -168,12 +176,7 @@
// by the data manager (which is basically just appending)
synchronized (this) {
- // Find the position where this item will land at.
- DataFile dataFile = dataManager.allocateLocation(location);
- if (!sync) {
- inflightWrites.put(new WriteKey(location), write);
- }
- batch = enqueue(dataFile, write);
+ batch = enqueue(write);
}
location.setLatch(batch.latch);
if (sync) {
@@ -182,6 +185,8 @@
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
+ } else {
+ inflightWrites.put(new WriteKey(location), write);
}
return location;
@@ -189,7 +194,7 @@
public Location storeItem(ByteSequence data, byte type, Runnable
onComplete) throws IOException {
// Write the packet our internal buffer.
- int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
+ int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
final Location location = new Location();
location.setSize(size);
@@ -198,23 +203,15 @@
WriteBatch batch;
WriteCommand write = new WriteCommand(location, data, onComplete);
- // Locate datafile and enqueue into the executor in sychronized block
so
- // that writes get equeued onto the executor in order that they were
- // assigned
- // by the data manager (which is basically just appending)
-
synchronized (this) {
- // Find the position where this item will land at.
- DataFile dataFile = dataManager.allocateLocation(location);
- inflightWrites.put(new WriteKey(location), write);
- batch = enqueue(dataFile, write);
+ batch = enqueue(write);
}
+ inflightWrites.put(new WriteKey(location), write);
location.setLatch(batch.latch);
-
return location;
}
- private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws
IOException {
+ private WriteBatch enqueue(WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
WriteBatch rc = null;
if (shutdown) {
@@ -237,35 +234,37 @@
thread.start();
}
- if (nextWriteBatch == null) {
- nextWriteBatch = new WriteBatch(dataFile, write);
- rc = nextWriteBatch;
- enqueueMutex.notify();
- } else {
- // Append to current batch if possible..
- if (nextWriteBatch.canAppend(dataFile, write)) {
- nextWriteBatch.append(write);
- rc = nextWriteBatch;
- } else {
- // Otherwise wait for the queuedCommand to be null
- try {
- while (nextWriteBatch != null) {
- enqueueMutex.wait();
- }
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
- }
- if (shutdown) {
- throw new IOException("Async Writter Thread Shutdown");
- }
-
- // Start a new batch.
- nextWriteBatch = new WriteBatch(dataFile, write);
- rc = nextWriteBatch;
- enqueueMutex.notify();
- }
+ while ( true ) {
+ if (nextWriteBatch == null) {
+ DataFile file = journal.getCurrentWriteFile();
+ if( file.getLength() > journal.getMaxFileLength() ) {
+ file = journal.rotateWriteFile();
+ }
+
+ nextWriteBatch = new WriteBatch(file, file.getLength(),
write);
+ rc = nextWriteBatch;
+ enqueueMutex.notify();
+ return rc;
+ } else {
+ // Append to current batch if possible..
+ if (nextWriteBatch.canAppend(write)) {
+ nextWriteBatch.append(write);
+ return nextWriteBatch;
+ } else {
+ // Otherwise wait for the queuedCommand to be null
+ try {
+ while (nextWriteBatch != null) {
+ enqueueMutex.wait();
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ if (shutdown) {
+ throw new IOException("Async Writter Thread
Shutdown");
+ }
+ }
+ }
}
- return rc;
}
}
@@ -331,67 +330,57 @@
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
- if( file.length() < dataManager.preferedFileLength ) {
- file.setLength(dataManager.preferedFileLength);
+ if( file.length() < journal.preferedFileLength ) {
+ file.setLength(journal.preferedFileLength);
}
}
WriteCommand write = wb.writes.getHead();
- // Write all the data.
- // Only need to seek to first location.. all others
- // are in sequence.
- file.seek(write.location.getOffset());
-
+ // Write an empty batch control record.
+ buff.reset();
+ buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
+ buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
+ buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
+ buff.writeInt(0);
+ buff.writeLong(0);
+
boolean forceToDisk = false;
+ while (write != null) {
+ forceToDisk |= write.sync | write.onComplete != null;
+ buff.writeInt(write.location.getSize());
+ buff.writeByte(write.location.getType());
+ buff.write(write.data.getData(), write.data.getOffset(),
write.data.getLength());
+ write = write.getNext();
+ }
- //
- // is it just 1 big write?
- ReplicationTarget replicationTarget =
dataManager.getReplicationTarget();
- if (wb.size == write.location.getSize() &&
replicationTarget==null) {
- forceToDisk = write.sync | write.onComplete != null;
-
- // Just write it directly..
- file.writeInt(write.location.getSize());
- file.writeByte(write.location.getType());
- file.write(RESERVED_SPACE);
- file.write(Journal.ITEM_HEAD_SOR);
- file.write(write.data.getData(), write.data.getOffset(),
write.data.getLength());
- file.write(Journal.ITEM_HEAD_EOR);
-
- } else {
-
- // We are going to do 1 big write.
- while (write != null) {
- forceToDisk |= write.sync | write.onComplete != null;
-
- buff.writeInt(write.location.getSize());
- buff.writeByte(write.location.getType());
- buff.write(RESERVED_SPACE);
- buff.write(Journal.ITEM_HEAD_SOR);
- buff.write(write.data.getData(),
write.data.getOffset(), write.data.getLength());
- buff.write(Journal.ITEM_HEAD_EOR);
-
- write = write.getNext();
- }
-
- // Now do the 1 big write.
- ByteSequence sequence = buff.toByteSequence();
- file.write(sequence.getData(), sequence.getOffset(),
sequence.getLength());
-
- if( replicationTarget!=null ) {
-
replicationTarget.replicate(wb.writes.getHead().location, sequence,
forceToDisk);
- }
-
- buff.reset();
+ ByteSequence sequence = buff.toByteSequence();
+
+ // Now we can fill in the batch control record properly.
+ buff.reset();
+ buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
+
buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+ if( journal.isChecksum() ) {
+ Checksum checksum = new Adler32();
+ checksum.update(sequence.getData(),
sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE,
sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+ buff.writeLong(checksum.getValue());
}
+ // Now do the 1 big write.
+ file.seek(wb.offset);
+ file.write(sequence.getData(), sequence.getOffset(),
sequence.getLength());
+
+ ReplicationTarget replicationTarget =
journal.getReplicationTarget();
+ if( replicationTarget!=null ) {
+
replicationTarget.replicate(wb.writes.getHead().location, sequence,
forceToDisk);
+ }
+
if (forceToDisk) {
file.getFD().sync();
}
WriteCommand lastWrite = wb.writes.getTail();
- dataManager.setLastAppendLocation(lastWrite.location);
+ journal.setLastAppendLocation(lastWrite.location);
// Now that the data is on disk, remove the writes from the in
// flight
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Fri Feb 6 21:35:39 2009
@@ -19,6 +19,7 @@
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -31,12 +32,15 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
import org.apache.kahadb.journal.DataFileAppender.WriteKey;
import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.LinkedNodeList;
import org.apache.kahadb.util.Scheduler;
@@ -47,24 +51,17 @@
*/
public class Journal {
- public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
- public static final int ITEM_HEAD_RESERVED_SPACE = 21;
- // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
- public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE
+ 3;
- public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
- public static final int ITEM_FOOT_SPACE = 3; // EOR
-
- public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE +
ITEM_FOOT_SPACE;
-
- public static final byte[] ITEM_HEAD_SOR = new byte[] {
- 'S', 'O', 'R'
- }; //
- public static final byte[] ITEM_HEAD_EOR = new byte[] {
- 'E', 'O', 'R'
- }; //
+ private static final int MAX_BATCH_SIZE = 32*1024*1024;
- public static final byte DATA_ITEM_TYPE = 1;
- public static final byte REDO_ITEM_TYPE = 2;
+ // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+ public static final int RECORD_HEAD_SPACE = 4 + 1;
+
+ public static final byte USER_RECORD_TYPE = 1;
+ public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
+ // Batch Control Item holds a 4 byte size of the batch and a 8 byte
checksum of the batch.
+ public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE
BATCH");
+ public static final int BATCH_CONTROL_RECORD_SIZE =
RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
+
public static final String DEFAULT_DIRECTORY = ".";
public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
public static final String DEFAULT_FILE_PREFIX = "db-";
@@ -82,8 +79,7 @@
protected String filePrefix = DEFAULT_FILE_PREFIX;
protected String fileSuffix = DEFAULT_FILE_SUFFIX;
protected boolean started;
- protected boolean useNio = true;
-
+
protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
@@ -99,8 +95,8 @@
protected final AtomicLong totalLength = new AtomicLong();
protected boolean archiveDataLogs;
private ReplicationTarget replicationTarget;
+ protected boolean checksum;
- @SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
if (started) {
return;
@@ -111,11 +107,7 @@
started = true;
preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() -
PREFERED_DIFF);
- if (useNio) {
- appender = new NIODataFileAppender(this);
- } else {
- appender = new DataFileAppender(this);
- }
+ appender = new DataFileAppender(this);
File[] files = directory.listFiles(new FilenameFilter() {
public boolean accept(File dir, String n) {
@@ -148,26 +140,14 @@
}
}
- // Need to check the current Write File to see if there was a partial
- // write to it.
- if (!dataFiles.isEmpty()) {
-
- // See if the lastSyncedLocation is valid..
- Location l = lastAppendLocation.get();
- if (l != null && l.getDataFileId() !=
dataFiles.getTail().getDataFileId()) {
- l = null;
- }
-
- // If we know the last location that was ok.. then we can skip lots
- // of checking
- try {
- l = recoveryCheck(dataFiles.getTail(), l);
- lastAppendLocation.set(l);
- } catch (IOException e) {
- LOG.warn("recovery check failed", e);
- }
+ getCurrentWriteFile();
+ try {
+ Location l = recoveryCheck(dataFiles.getTail());
+ lastAppendLocation.set(l);
+ } catch (IOException e) {
+ LOG.warn("recovery check failed", e);
}
-
+
cleanupTask = new Runnable() {
public void run() {
cleanup();
@@ -178,43 +158,97 @@
LOG.trace("Startup took: "+(end-start)+" ms");
}
- protected Location recoveryCheck(DataFile dataFile, Location location)
throws IOException {
- if (location == null) {
- location = new Location();
- location.setDataFileId(dataFile.getDataFileId());
- location.setOffset(0);
- }
- DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ private static byte[] bytes(String string) {
+ try {
+ return string.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected Location recoveryCheck(DataFile dataFile) throws IOException {
+ byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
+ DataByteArrayInputStream controlIs = new
DataByteArrayInputStream(controlRecord);
+
+ Location location = new Location();
+ location.setDataFileId(dataFile.getDataFileId());
+ location.setOffset(0);
+
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
- while (reader.readLocationDetailsAndValidate(location)) {
- location.setOffset(location.getOffset() + location.getSize());
- }
- } finally {
+ while( true ) {
+ reader.read(location.getOffset(), controlRecord);
+ controlIs.restart();
+
+ // Assert that it's a batch record.
+ if( controlIs.readInt() != BATCH_CONTROL_RECORD_SIZE ) {
+ break;
+ }
+ if( controlIs.readByte() != BATCH_CONTROL_RECORD_TYPE )
{
+ break;
+ }
+ for( int i=0; i < BATCH_CONTROL_RECORD_MAGIC.length;
i++ ) {
+ if( controlIs.readByte() !=
BATCH_CONTROL_RECORD_MAGIC[i] ) {
+ break;
+ }
+ }
+
+ int size = controlIs.readInt();
+ if( size > MAX_BATCH_SIZE ) {
+ break;
+ }
+
+ if( isChecksum() ) {
+
+ long expectedChecksum = controlIs.readLong();
+
+ byte data[] = new byte[size];
+
reader.read(location.getOffset()+BATCH_CONTROL_RECORD_SIZE, data);
+
+ Checksum checksum = new Adler32();
+ checksum.update(data, 0, data.length);
+
+ if( expectedChecksum!=checksum.getValue() ) {
+ break;
+ }
+
+ }
+
+
+
location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+ }
+
+ } catch (IOException e) {
+ } finally {
accessorPool.closeDataFileAccessor(reader);
}
+
dataFile.setLength(location.getOffset());
return location;
}
- synchronized DataFile allocateLocation(Location location) throws
IOException {
- if (dataFiles.isEmpty()|| ((dataFiles.getTail().getLength() +
location.getSize()) > maxFileLength)) {
- int nextNum = !dataFiles.isEmpty() ?
dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
-
- File file = getFile(nextNum);
- DataFile nextWriteFile = new DataFile(file, nextNum,
preferedFileLength);
- // actually allocate the disk space
- fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
- fileByFileMap.put(file, nextWriteFile);
- dataFiles.addLast(nextWriteFile);
- }
- DataFile currentWriteFile = dataFiles.getTail();
- location.setOffset(currentWriteFile.getLength());
- location.setDataFileId(currentWriteFile.getDataFileId().intValue());
- int size = location.getSize();
- currentWriteFile.incrementLength(size);
- totalLength.addAndGet(size);
- return currentWriteFile;
- }
+ void addToTotalLength(int size) {
+ totalLength.addAndGet(size);
+ }
+
+
+ synchronized DataFile getCurrentWriteFile() throws IOException {
+ if (dataFiles.isEmpty()) {
+ rotateWriteFile();
+ }
+ return dataFiles.getTail();
+ }
+
+ synchronized DataFile rotateWriteFile() {
+ int nextNum = !dataFiles.isEmpty() ?
dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
+ File file = getFile(nextNum);
+ DataFile nextWriteFile = new DataFile(file, nextNum,
preferedFileLength);
+ // actually allocate the disk space
+ fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+ fileByFileMap.put(file, nextWriteFile);
+ dataFiles.addLast(nextWriteFile);
+ return nextWriteFile;
+ }
public File getFile(int nextNum) {
String fileName = filePrefix + nextNum + fileSuffix;
@@ -285,11 +319,7 @@
// reopen open file handles...
accessorPool = new DataFileAccessorPool(this);
- if (useNio) {
- appender = new NIODataFileAppender(this);
- } else {
- appender = new DataFileAppender(this);
- }
+ appender = new DataFileAppender(this);
return result;
}
@@ -411,7 +441,7 @@
if (cur.getType() == 0) {
return null;
- } else if (cur.getType() > 0) {
+ } else if (cur.getType() == USER_RECORD_TYPE) {
// Only return user records.
return cur;
}
@@ -496,10 +526,6 @@
return loc;
}
- public synchronized Location write(ByteSequence data, byte type, boolean
sync) throws IOException, IllegalStateException {
- return appender.storeItem(data, type, sync);
- }
-
public void update(Location location, ByteSequence data, boolean sync)
throws IOException {
DataFile dataFile = getDataFile(location);
DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
@@ -538,14 +564,6 @@
this.lastAppendLocation.set(lastSyncedLocation);
}
- public boolean isUseNio() {
- return useNio;
- }
-
- public void setUseNio(boolean useNio) {
- this.useNio = useNio;
- }
-
public File getDirectoryArchive() {
return directoryArchive;
}
@@ -614,5 +632,13 @@
this.fileSuffix = fileSuffix;
}
+ public boolean isChecksum() {
+ return checksum;
+ }
+
+ public void setChecksum(boolean checksumWrites) {
+ this.checksum = checksumWrites;
+ }
+
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
Fri Feb 6 21:35:39 2009
@@ -71,13 +71,6 @@
this.size = size;
}
- /**
- * @return the size of the payload of the record.
- */
- public int getPaylodSize() {
- return size - Journal.ITEM_HEAD_FOOT_SPACE;
- }
-
public int getOffset() {
return offset;
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
Fri Feb 6 21:35:39 2009
@@ -20,8 +20,6 @@
import java.io.IOException;
import java.io.RandomAccessFile;
-import org.apache.kahadb.util.IOHelper;
-
/**
* Allows you to open a data file in read only mode. Useful when working with
* archived data files.
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
Fri Feb 6 21:35:39 2009
@@ -23,23 +23,18 @@
import java.util.Collections;
import java.util.List;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
/**
* An AsyncDataManager that works in read only mode against multiple data
directories.
* Useful for reading back archived data files.
*/
public class ReadOnlyJournal extends Journal {
- private static final Log LOG = LogFactory.getLog(ReadOnlyJournal.class);
private final ArrayList<File> dirs;
public ReadOnlyJournal(final ArrayList<File> dirs) {
this.dirs = dirs;
}
- @SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
if (started) {
return;
Modified:
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
(original)
+++
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
Fri Feb 6 21:35:39 2009
@@ -41,7 +41,6 @@
}
protected void configure(Journal dataManager) {
- dataManager.setUseNio(false);
}
@Override