Author: chirino
Date: Thu Feb 5 15:29:48 2009
New Revision: 741137
URL: http://svn.apache.org/viewvc?rev=741137&view=rev
Log:
Added the ability to customize thw write batch size in the PageFile.
Added some diagnostic logging to see when store updates take too long and why.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741137&r1=741136&r2=741137&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Thu Feb 5 15:29:48 2009
@@ -415,6 +415,7 @@
protected void checkpointCleanup(final boolean cleanup) {
try {
+ long start = System.currentTimeMillis();
synchronized (indexMutex) {
if( !opened.get() ) {
return;
@@ -425,6 +426,10 @@
}
});
}
+ long end = System.currentTimeMillis();
+ if( end-start > 100 ) {
+ LOG.warn("KahaDB Cleanup took "+(end-start));
+ }
} catch (IOException e) {
e.printStackTrace();
}
@@ -457,12 +462,22 @@
* durring a recovery process.
*/
public Location store(JournalCommand data, boolean sync) throws
IOException {
+
+
int size = data.serializedSizeFramed();
DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
os.writeByte(data.type().getNumber());
data.writeFramed(os);
+
+ long start = System.currentTimeMillis();
Location location = journal.write(os.toByteSequence(), sync);
+ long start2 = System.currentTimeMillis();
process(data, location);
+ long end = System.currentTimeMillis();
+ if( end-start > 100 ) {
+ LOG.warn("KahaDB long enqueue time: Journal Add Took:
"+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
+ }
+
metadata.lastUpdate = location;
return location;
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741137&r1=741136&r2=741137&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
Thu Feb 5 15:29:48 2009
@@ -145,6 +145,7 @@
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File("target/activemq-data/kahadb"));
kaha.deleteAllMessages();
+ kaha.getPageFile().setWriteBatchSize(10);
broker.setPersistenceAdapter(kaha);
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741137&r1=741136&r2=741137&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Thu Feb 5 15:29:48 2009
@@ -104,6 +104,9 @@
private int recoveryPageCount;
private AtomicBoolean loaded = new AtomicBoolean();
+ // The number of pages we are aiming to write every time we
+ // write to disk.
+ int writeBatchSize = 1000;
// We keep a cache of pages recently used?
private LRUCache<Long, Page> pageCache;
@@ -824,7 +827,7 @@
}
private boolean canStartWriteBatch() {
- int capacityUsed = ((writes.size() * 100)/1000);
+ int capacityUsed = ((writes.size() * 100)/writeBatchSize);
if( enableAsyncWrites ) {
// The constant 10 here controls how soon write batches start
going to disk..
// would be nice to figure out how to auto tune that value. Make
to small and
@@ -1099,4 +1102,12 @@
return getMainPageFile();
}
+ public int getWriteBatchSize() {
+ return writeBatchSize;
+ }
+
+ public void setWriteBatchSize(int writeBatchSize) {
+ this.writeBatchSize = writeBatchSize;
+ }
+
}