Author: gtully
Date: Thu Dec 22 21:44:01 2011
New Revision: 1222471
URL: http://svn.apache.org/viewvc?rev=1222471&view=rev
Log:
add experimental appender that takes the buffering burden from the writer
thread, it and some trace enabled via
-Dorg.apache.kahadb.journal.appender.WRITE_STAT_WINDOW=10000
-Dorg.apache.kahadb.journal.CALLER_BUFFER_APPENDER=true. Additional accessors
on KahaDb to further configure index for the fast but may need recovery case
Added:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
- copied, changed from r1222219,
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Thu Dec 22 21:44:01 2011
@@ -537,6 +537,30 @@ public class KahaDBPersistenceAdapter im
letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
}
+ public void setEnableIndexDiskSyncs(boolean diskSyncs) {
+ letter.setEnableIndexDiskSyncs(diskSyncs);
+ }
+
+ public boolean isEnableIndexDiskSyncs() {
+ return letter.isEnableIndexDiskSyncs();
+ }
+
+ public void setEnableIndexRecoveryFile(boolean enable) {
+ letter.setEnableIndexRecoveryFile(enable);
+ }
+
+ public boolean isEnableIndexRecoveryFile() {
+ return letter.isEnableIndexRecoveryFile();
+ }
+
+ public void setEnableIndexPageCaching(boolean enable) {
+ letter.setEnableIndexPageCaching(enable);
+ }
+
+ public boolean isEnableIndexPageCaching() {
+ return isEnableIndexPageCaching();
+ }
+
public KahaDBStore getStore() {
return letter;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Thu Dec 22 21:44:01 2011
@@ -184,6 +184,9 @@ public abstract class MessageDatabase ex
private boolean archiveCorruptedIndex = false;
private boolean useIndexLFRUEviction = false;
private float indexLFUEvictionFactor = 0.2f;
+ private boolean enableIndexDiskSyncs = true;
+ private boolean enableIndexRecoveryFile = true;
+ private boolean enableIndexPageCaching = true;
public MessageDatabase() {
}
@@ -2058,6 +2061,9 @@ public abstract class MessageDatabase ex
index.setPageCacheSize(indexCacheSize);
index.setUseLFRUEviction(isUseIndexLFRUEviction());
index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
+ index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
+ index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
+ index.setEnablePageCaching(isEnableIndexPageCaching());
return index;
}
@@ -2297,6 +2303,30 @@ public abstract class MessageDatabase ex
this.useIndexLFRUEviction = useIndexLFRUEviction;
}
+ public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
+ this.enableIndexDiskSyncs = enableIndexDiskSyncs;
+ }
+
+ public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
+ this.enableIndexRecoveryFile = enableIndexRecoveryFile;
+ }
+
+ public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
+ this.enableIndexPageCaching = enableIndexPageCaching;
+ }
+
+ public boolean isEnableIndexDiskSyncs() {
+ return enableIndexDiskSyncs;
+ }
+
+ public boolean isEnableIndexRecoveryFile() {
+ return enableIndexRecoveryFile;
+ }
+
+ public boolean isEnableIndexPageCaching() {
+ return enableIndexPageCaching;
+ }
+
// /////////////////////////////////////////////////////////////////
// Internal conversion methods.
// /////////////////////////////////////////////////////////////////
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBFastEnqueueTest.java
Thu Dec 22 21:44:01 2011
@@ -35,6 +35,7 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.command.ConnectionControl;
import org.junit.After;
import org.junit.Ignore;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class KahaDBFastEnqueueTest {
private boolean useBytesMessage= true;
private final int parallelProducer = 2;
private Vector<Exception> exceptions = new Vector<Exception>();
- final long toSend = 500000;
+ final long toSend = 1000;//500000;
@Ignore("not ready yet, exploring getting broker disk bound")
public void testPublishNoConsumer() throws Exception {
@@ -91,8 +92,8 @@ public class KahaDBFastEnqueueTest {
System.out.println("Journal writes %: " +
kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent *
100 + "%");
//System.out.println("Index writes %: " +
kahaDBPersistenceAdapter.getStore().getPageFile().totalWritten /
(double)totalSent * 100 + "%");
- //restartBroker(0);
- //consumeMessages(toSend);
+ restartBroker(0);
+ consumeMessages(toSend);
}
private void consumeMessages(long count) throws Exception {
@@ -158,11 +159,13 @@ public class KahaDBFastEnqueueTest {
kahaDBPersistenceAdapter.setCheckpointInterval(20 * 60 * 1000);
// optimise for disk best batch rate
- //kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(128*1024);
//4mb default
- kahaDBPersistenceAdapter.setJournalMaxFileLength(1024*1024*1024); //
32mb default
+ kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(24*1024*1024);
//4mb default
+ kahaDBPersistenceAdapter.setJournalMaxFileLength(128*1024*1024); //
32mb default
// keep index in memory
kahaDBPersistenceAdapter.setIndexCacheSize(500000);
kahaDBPersistenceAdapter.setIndexWriteBatchSize(500000);
+ kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false);
+ kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false);
broker.setUseJmx(false);
broker.addConnector("tcp://0.0.0.0:0");
Copied:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
(from r1222219,
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java)
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java?p2=activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java&p1=activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java&r1=1222219&r2=1222471&rev=1222471&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/CallerBufferingDataFileAppender.java
Thu Dec 22 21:44:01 2011
@@ -21,26 +21,26 @@ import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
-
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.LinkedNode;
import org.apache.kahadb.util.LinkedNodeList;
/**
* An optimized writer to do batch appends to a data file. This object is
thread
* safe and gains throughput as you increase the number of concurrent writes it
* does.
- *
+ * The thread calling enqueue does the file open and buffering of the data,
which
+ * reduces the round trip of the write thread.
*
*/
-class DataFileAppender {
+class CallerBufferingDataFileAppender implements FileAppender {
protected final Journal journal;
- protected final Map<WriteKey, WriteCommand> inflightWrites;
+ protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
protected final Object enqueueMutex = new Object() {
};
protected WriteBatch nextWriteBatch;
@@ -53,51 +53,34 @@ class DataFileAppender {
private boolean running;
private Thread thread;
- public static class WriteKey {
- private final int file;
- private final long offset;
- private final int hash;
-
- public WriteKey(Location item) {
- file = item.getDataFileId();
- offset = item.getOffset();
- // TODO: see if we can build a better hash
- hash = (int)(file ^ offset);
- }
-
- public int hashCode() {
- return hash;
- }
-
- public boolean equals(Object obj) {
- if (obj instanceof WriteKey) {
- WriteKey di = (WriteKey)obj;
- return di.file == file && di.offset == offset;
- }
- return false;
- }
- }
-
+ final DataByteArrayOutputStream cachedBuffers[] = new
DataByteArrayOutputStream[] {
+ new DataByteArrayOutputStream(maxWriteBatchSize),
+ new DataByteArrayOutputStream(maxWriteBatchSize)
+ };
+ AtomicInteger writeBatchInstanceCount = new AtomicInteger();
public class WriteBatch {
+ DataByteArrayOutputStream buff =
cachedBuffers[writeBatchInstanceCount.getAndIncrement()%2];
public final DataFile dataFile;
- public final LinkedNodeList<WriteCommand> writes = new
LinkedNodeList<WriteCommand>();
+ public final LinkedNodeList<Journal.WriteCommand> writes = new
LinkedNodeList<Journal.WriteCommand>();
public final CountDownLatch latch = new CountDownLatch(1);
private final int offset;
public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
public AtomicReference<IOException> exception = new
AtomicReference<IOException>();
+ public boolean forceToDisk;
- public WriteBatch(DataFile dataFile, int offset, WriteCommand write)
throws IOException {
+ public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand
write) throws IOException {
this.dataFile = dataFile;
this.offset = offset;
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+ initBuffer(buff);
append(write);
}
- public boolean canAppend(WriteCommand write) {
+ public boolean canAppend(Journal.WriteCommand write) {
int newSize = size + write.location.getSize();
if (newSize >= maxWriteBatchSize || offset+newSize >
journal.getMaxFileLength() ) {
return false;
@@ -105,7 +88,7 @@ class DataFileAppender {
return true;
}
- public void append(WriteCommand write) throws IOException {
+ public void append(Journal.WriteCommand write) throws IOException {
this.writes.addLast(write);
write.location.setDataFileId(dataFile.getDataFileId());
write.location.setOffset(offset+size);
@@ -113,34 +96,22 @@ class DataFileAppender {
size += s;
dataFile.incrementLength(s);
journal.addToTotalLength(s);
+ forceToDisk |= appendToBuffer(write, buff);
}
}
- public static class WriteCommand extends LinkedNode<WriteCommand> {
- public final Location location;
- public final ByteSequence data;
- final boolean sync;
- public final Runnable onComplete;
-
- public WriteCommand(Location location, ByteSequence data, boolean
sync) {
- this.location = location;
- this.data = data;
- this.sync = sync;
- this.onComplete = null;
- }
-
- public WriteCommand(Location location, ByteSequence data, Runnable
onComplete) {
- this.location = location;
- this.data = data;
- this.onComplete = onComplete;
- this.sync = false;
- }
+ private void initBuffer(DataByteArrayOutputStream buff) throws IOException
{
+ // Write an empty batch control record.
+ buff.reset();
+ buff.write(Journal.BATCH_CONTROL_RECORD_HEADER);
+ buff.writeInt(0);
+ buff.writeLong(0);
}
/**
* Construct a Store writer
*/
- public DataFileAppender(Journal dataManager) {
+ public CallerBufferingDataFileAppender(Journal dataManager) {
this.journal = dataManager;
this.inflightWrites = this.journal.getInflightWrites();
this.maxWriteBatchSize = this.journal.getWriteBatchSize();
@@ -155,7 +126,7 @@ class DataFileAppender {
location.setSize(size);
location.setType(type);
- WriteCommand write = new WriteCommand(location, data, sync);
+ Journal.WriteCommand write = new Journal.WriteCommand(location, data,
sync);
WriteBatch batch = enqueue(write);
location.setLatch(batch.latch);
@@ -182,7 +153,7 @@ class DataFileAppender {
location.setSize(size);
location.setType(type);
- WriteCommand write = new WriteCommand(location, data, onComplete);
+ Journal.WriteCommand write = new Journal.WriteCommand(location, data,
onComplete);
WriteBatch batch = enqueue(write);
@@ -190,7 +161,7 @@ class DataFileAppender {
return location;
}
- private WriteBatch enqueue(WriteCommand write) throws IOException {
+ private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
@@ -220,7 +191,7 @@ class DataFileAppender {
if( file.getLength() > journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
-
+
nextWriteBatch = new WriteBatch(file, file.getLength(),
write);
enqueueMutex.notifyAll();
break;
@@ -249,7 +220,7 @@ class DataFileAppender {
}
}
if (!write.sync) {
- inflightWrites.put(new WriteKey(write.location), write);
+ inflightWrites.put(new Journal.WriteKey(write.location),
write);
}
return nextWriteBatch;
}
@@ -293,7 +264,6 @@ class DataFileAppender {
WriteBatch wb = null;
try {
- DataByteArrayOutputStream buff = new
DataByteArrayOutputStream(maxWriteBatchSize);
while (true) {
Object o = null;
@@ -327,24 +297,8 @@ class DataFileAppender {
}
}
- WriteCommand write = wb.writes.getHead();
-
- // Write an empty batch control record.
- buff.reset();
- buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
- buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
- buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
- buff.writeInt(0);
- buff.writeLong(0);
-
- boolean forceToDisk = false;
- while (write != null) {
- forceToDisk |= write.sync | write.onComplete != null;
- buff.writeInt(write.location.getSize());
- buff.writeByte(write.location.getType());
- buff.write(write.data.getData(), write.data.getOffset(),
write.data.getLength());
- write = write.getNext();
- }
+ final DataByteArrayOutputStream buff = wb.buff;
+ final boolean forceToDisk = wb.forceToDisk;
ByteSequence sequence = buff.toByteSequence();
@@ -372,7 +326,6 @@ class DataFileAppender {
}
}
file.write(sequence.getData(), sequence.getOffset(),
sequence.getLength());
-
ReplicationTarget replicationTarget =
journal.getReplicationTarget();
if( replicationTarget!=null ) {
replicationTarget.replicate(wb.writes.getHead().location, sequence,
forceToDisk);
@@ -382,16 +335,16 @@ class DataFileAppender {
file.getFD().sync();
}
- WriteCommand lastWrite = wb.writes.getTail();
+ Journal.WriteCommand lastWrite = wb.writes.getTail();
journal.setLastAppendLocation(lastWrite.location);
// Now that the data is on disk, remove the writes from the in
// flight
// cache.
- write = wb.writes.getHead();
+ Journal.WriteCommand write = wb.writes.getHead();
while (write != null) {
if (!write.sync) {
- inflightWrites.remove(new WriteKey(write.location));
+ inflightWrites.remove(new
Journal.WriteKey(write.location));
}
if (write.onComplete != null) {
try {
@@ -431,4 +384,10 @@ class DataFileAppender {
}
}
+ private boolean appendToBuffer(Journal.WriteCommand write,
DataByteArrayOutputStream buff) throws IOException {
+ buff.writeInt(write.location.getSize());
+ buff.writeByte(write.location.getType());
+ buff.write(write.data.getData(), write.data.getOffset(),
write.data.getLength());
+ return write.sync | write.onComplete != null;
+ }
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
Thu Dec 22 21:44:01 2011
@@ -33,7 +33,7 @@ public class DataFile extends LinkedNode
protected final File file;
protected final Integer dataFileId;
- protected int length;
+ protected volatile int length;
protected final SequenceSet corruptedBlocks = new SequenceSet();
DataFile(File file, int number, int preferedSize) {
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
Thu Dec 22 21:44:01 2011
@@ -20,8 +20,6 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Map;
-import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
-import org.apache.kahadb.journal.DataFileAppender.WriteKey;
import org.apache.kahadb.util.ByteSequence;
/**
@@ -33,7 +31,7 @@ import org.apache.kahadb.util.ByteSequen
final class DataFileAccessor {
private final DataFile dataFile;
- private final Map<WriteKey, WriteCommand> inflightWrites;
+ private final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
private final RandomAccessFile file;
private boolean disposed;
@@ -71,7 +69,7 @@ final class DataFileAccessor {
throw new IOException("Invalid location: " + location);
}
- WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new
WriteKey(location));
+ Journal.WriteCommand asyncWrite =
(Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
if (asyncWrite != null) {
return asyncWrite.data;
}
@@ -106,7 +104,7 @@ final class DataFileAccessor {
}
public void readLocationDetails(Location location) throws IOException {
- WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new
WriteKey(location));
+ Journal.WriteCommand asyncWrite =
(Journal.WriteCommand)inflightWrites.get(new Journal.WriteKey(location));
if (asyncWrite != null) {
location.setSize(asyncWrite.location.getSize());
location.setType(asyncWrite.location.getType());
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
(original)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Thu Dec 22 21:44:01 2011
@@ -27,7 +27,6 @@ import java.util.zip.Checksum;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.LinkedNode;
import org.apache.kahadb.util.LinkedNodeList;
/**
@@ -37,10 +36,10 @@ import org.apache.kahadb.util.LinkedNode
*
*
*/
-class DataFileAppender {
+class DataFileAppender implements FileAppender {
protected final Journal journal;
- protected final Map<WriteKey, WriteCommand> inflightWrites;
+ protected final Map<Journal.WriteKey, Journal.WriteCommand> inflightWrites;
protected final Object enqueueMutex = new Object() {
};
protected WriteBatch nextWriteBatch;
@@ -82,13 +81,13 @@ class DataFileAppender {
public final DataFile dataFile;
- public final LinkedNodeList<WriteCommand> writes = new
LinkedNodeList<WriteCommand>();
+ public final LinkedNodeList<Journal.WriteCommand> writes = new
LinkedNodeList<Journal.WriteCommand>();
public final CountDownLatch latch = new CountDownLatch(1);
private final int offset;
public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
public AtomicReference<IOException> exception = new
AtomicReference<IOException>();
- public WriteBatch(DataFile dataFile, int offset, WriteCommand write)
throws IOException {
+ public WriteBatch(DataFile dataFile, int offset, Journal.WriteCommand
write) throws IOException {
this.dataFile = dataFile;
this.offset = offset;
this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
@@ -97,7 +96,7 @@ class DataFileAppender {
append(write);
}
- public boolean canAppend(WriteCommand write) {
+ public boolean canAppend(Journal.WriteCommand write) {
int newSize = size + write.location.getSize();
if (newSize >= maxWriteBatchSize || offset+newSize >
journal.getMaxFileLength() ) {
return false;
@@ -105,7 +104,7 @@ class DataFileAppender {
return true;
}
- public void append(WriteCommand write) throws IOException {
+ public void append(Journal.WriteCommand write) throws IOException {
this.writes.addLast(write);
write.location.setDataFileId(dataFile.getDataFileId());
write.location.setOffset(offset+size);
@@ -116,27 +115,6 @@ class DataFileAppender {
}
}
- public static class WriteCommand extends LinkedNode<WriteCommand> {
- public final Location location;
- public final ByteSequence data;
- final boolean sync;
- public final Runnable onComplete;
-
- public WriteCommand(Location location, ByteSequence data, boolean
sync) {
- this.location = location;
- this.data = data;
- this.sync = sync;
- this.onComplete = null;
- }
-
- public WriteCommand(Location location, ByteSequence data, Runnable
onComplete) {
- this.location = location;
- this.data = data;
- this.onComplete = onComplete;
- this.sync = false;
- }
- }
-
/**
* Construct a Store writer
*/
@@ -155,7 +133,7 @@ class DataFileAppender {
location.setSize(size);
location.setType(type);
- WriteCommand write = new WriteCommand(location, data, sync);
+ Journal.WriteCommand write = new Journal.WriteCommand(location, data,
sync);
WriteBatch batch = enqueue(write);
location.setLatch(batch.latch);
@@ -182,7 +160,7 @@ class DataFileAppender {
location.setSize(size);
location.setType(type);
- WriteCommand write = new WriteCommand(location, data, onComplete);
+ Journal.WriteCommand write = new Journal.WriteCommand(location, data,
onComplete);
WriteBatch batch = enqueue(write);
@@ -190,7 +168,7 @@ class DataFileAppender {
return location;
}
- private WriteBatch enqueue(WriteCommand write) throws IOException {
+ private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
@@ -220,7 +198,7 @@ class DataFileAppender {
if( file.getLength() > journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
-
+
nextWriteBatch = new WriteBatch(file, file.getLength(),
write);
enqueueMutex.notifyAll();
break;
@@ -249,7 +227,7 @@ class DataFileAppender {
}
}
if (!write.sync) {
- inflightWrites.put(new WriteKey(write.location), write);
+ inflightWrites.put(new Journal.WriteKey(write.location),
write);
}
return nextWriteBatch;
}
@@ -327,7 +305,7 @@ class DataFileAppender {
}
}
- WriteCommand write = wb.writes.getHead();
+ Journal.WriteCommand write = wb.writes.getHead();
// Write an empty batch control record.
buff.reset();
@@ -336,7 +314,7 @@ class DataFileAppender {
buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
buff.writeInt(0);
buff.writeLong(0);
-
+
boolean forceToDisk = false;
while (write != null) {
forceToDisk |= write.sync | write.onComplete != null;
@@ -382,7 +360,7 @@ class DataFileAppender {
file.getFD().sync();
}
- WriteCommand lastWrite = wb.writes.getTail();
+ Journal.WriteCommand lastWrite = wb.writes.getTail();
journal.setLastAppendLocation(lastWrite.location);
// Now that the data is on disk, remove the writes from the in
@@ -391,7 +369,7 @@ class DataFileAppender {
write = wb.writes.getHead();
while (write != null) {
if (!write.sync) {
- inflightWrites.remove(new WriteKey(write.location));
+ inflightWrites.remove(new
Journal.WriteKey(write.location));
}
if (write.onComplete != null) {
try {
Added:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java?rev=1222471&view=auto
==============================================================================
---
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
(added)
+++
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
Thu Dec 22 21:44:01 2011
@@ -0,0 +1,15 @@
+package org.apache.kahadb.journal;
+
+import java.io.IOException;
+import org.apache.kahadb.util.ByteSequence;
+
+/**
+ * User: gtully
+ */
+public interface FileAppender {
+ Location storeItem(ByteSequence data, byte type, boolean sync) throws
IOException;
+
+ Location storeItem(ByteSequence data, byte type, Runnable onComplete)
throws IOException;
+
+ void close() throws IOException;
+}
Propchange:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/FileAppender.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Thu Dec 22 21:44:01 2011
@@ -36,10 +36,9 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
+import org.apache.kahadb.util.LinkedNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
-import org.apache.kahadb.journal.DataFileAppender.WriteKey;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
@@ -53,6 +52,8 @@ import org.apache.kahadb.util.Sequence;
*
*/
public class Journal {
+ public static final String CALLER_BUFFER_APPENDER =
"org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
+ public static final boolean callerBufferAppender =
Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
private static final int MAX_BATCH_SIZE = 32*1024*1024;
@@ -103,7 +104,7 @@ public class Journal {
protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
- protected DataFileAppender appender;
+ protected FileAppender appender;
protected DataFileAccessorPool accessorPool;
protected Map<Integer, DataFile> fileMap = new HashMap<Integer,
DataFile>();
@@ -130,7 +131,7 @@ public class Journal {
started = true;
preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() -
PREFERED_DIFF);
- appender = new DataFileAppender(this);
+ appender = callerBufferAppender ? new
CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
File[] files = directory.listFiles(new FilenameFilter() {
public boolean accept(File dir, String n) {
@@ -751,4 +752,50 @@ public class Journal {
public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
this.totalLength = storeSizeAccumulator;
}
+
+ public static class WriteCommand extends LinkedNode<WriteCommand> {
+ public final Location location;
+ public final ByteSequence data;
+ final boolean sync;
+ public final Runnable onComplete;
+
+ public WriteCommand(Location location, ByteSequence data, boolean
sync) {
+ this.location = location;
+ this.data = data;
+ this.sync = sync;
+ this.onComplete = null;
+ }
+
+ public WriteCommand(Location location, ByteSequence data, Runnable
onComplete) {
+ this.location = location;
+ this.data = data;
+ this.onComplete = onComplete;
+ this.sync = false;
+ }
+ }
+
+ public static class WriteKey {
+ private final int file;
+ private final long offset;
+ private final int hash;
+
+ public WriteKey(Location item) {
+ file = item.getDataFileId();
+ offset = item.getOffset();
+ // TODO: see if we can build a better hash
+ hash = (int)(file ^ offset);
+ }
+
+ public int hashCode() {
+ return hash;
+ }
+
+ public boolean equals(Object obj) {
+ if (obj instanceof WriteKey) {
+ WriteKey di = (WriteKey)obj;
+ return di.file == file && di.offset == offset;
+ }
+ return false;
+ }
+ }
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1222471&r1=1222470&r2=1222471&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Thu Dec 22 21:44:01 2011
@@ -1039,7 +1039,9 @@ public class PageFile {
}
Checksum checksum = new Adler32();
- recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+ if (enableRecoveryFile) {
+ recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+ }
for (PageWrite w : batch) {
if (enableRecoveryFile) {
try {
@@ -1078,7 +1080,9 @@ public class PageFile {
if (enableDiskSyncs) {
// Sync to make sure recovery buffer writes land on disk..
- recoveryFile.getFD().sync();
+ if (enableRecoveryFile) {
+ recoveryFile.getFD().sync();
+ }
writeFile.getFD().sync();
}
} finally {