Author: gtully
Date: Fri Feb 6 10:59:47 2009
New Revision: 741528
URL: http://svn.apache.org/viewvc?rev=741528&view=rev
Log:
expose relevant index attributes for configuration from kahadb
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741528&r1=741527&r2=741528&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Feb 6 10:59:47 2009
@@ -151,6 +151,9 @@
protected boolean enableJournalDiskSyncs=true;
long checkpointInterval = 5*1000;
long cleanupInterval = 30*1000;
+ int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+ boolean enableIndexWriteAsync = false;
+ int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
protected AtomicBoolean started = new AtomicBoolean();
protected AtomicBoolean opened = new AtomicBoolean();
@@ -1138,7 +1141,7 @@
// /////////////////////////////////////////////////////////////////
protected final LinkedHashMap<TransactionId, ArrayList<Operation>>
inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
protected final LinkedHashMap<TransactionId, ArrayList<Operation>>
preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
-
+
private ArrayList<Operation> getInflightTx(KahaTransactionInfo info,
Location location) {
TransactionId key = key(info);
ArrayList<Operation> tx = inflightTransactions.get(key);
@@ -1219,13 +1222,16 @@
// /////////////////////////////////////////////////////////////////
private PageFile createPageFile() {
- return new PageFile(directory, "db");
+ PageFile index = new PageFile(directory, "db");
+ index.setEnableWriteThread(isEnableIndexWriteAsync());
+ index.setWriteBatchSize(getIndexWriteBatchSize());
+ return index;
}
private Journal createJournal() {
Journal manager = new Journal();
manager.setDirectory(directory);
- manager.setMaxFileLength(1024 * 1024 * 20);
+ manager.setMaxFileLength(getJournalMaxFileLength());
manager.setUseNio(false);
return manager;
}
@@ -1245,7 +1251,23 @@
public void setDeleteAllMessages(boolean deleteAllMessages) {
this.deleteAllMessages = deleteAllMessages;
}
+
+ public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
+ this.setIndexWriteBatchSize = setIndexWriteBatchSize;
+ }
+ public int getIndexWriteBatchSize() {
+ return setIndexWriteBatchSize;
+ }
+
+ public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
+ this.enableIndexWriteAsync = enableIndexWriteAsync;
+ }
+
+ boolean isEnableIndexWriteAsync() {
+ return enableIndexWriteAsync;
+ }
+
public boolean isEnableJournalDiskSyncs() {
return enableJournalDiskSyncs;
}
@@ -1270,6 +1292,14 @@
this.cleanupInterval = cleanupInterval;
}
+ public void setJournalMaxFileLength(int journalMaxFileLength) {
+ this.journalMaxFileLength = journalMaxFileLength;
+ }
+
+ public int getJournalMaxFileLength() {
+ return journalMaxFileLength;
+ }
+
public PageFile getPageFile() {
if (pageFile == null) {
pageFile = createPageFile();
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741528&r1=741527&r2=741528&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
Fri Feb 6 10:59:47 2009
@@ -56,7 +56,7 @@
broker.stop();
}
- public void testForDataFileNotDeleted() throws Exception {
+ public void testEnqueueRateCanMeetSLA() throws Exception {
if (true) {
return;
}
@@ -68,8 +68,8 @@
final AtomicLong total = new AtomicLong(0);
final AtomicLong slaViolations = new AtomicLong(0);
final AtomicLong max = new AtomicLong(0);
- long reportTime = 0;
-
+ final int numThreads = 6;
+
Runnable runner = new Runnable() {
public void run() {
@@ -108,7 +108,7 @@
}
};
ExecutorService executor = Executors.newCachedThreadPool();
- int numThreads = 6;
+
for (int i = 0; i < numThreads; i++) {
executor.execute(runner);
}
@@ -127,7 +127,7 @@
private void startBroker() throws Exception {
broker = new BrokerService();
- broker.setDeleteAllMessagesOnStartup(true);
+ //broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
broker.setUseJmx(true);
@@ -155,9 +155,13 @@
// Index is going to be in consistent, but can it be repaired?
kaha.setEnableJournalDiskSyncs(false);
// Using a bigger journal file size makes he take fewer spikes as
it is not switching files as often.
- kaha.getJournal().setMaxFileLength(1024*1024*100);
- kaha.getPageFile().setWriteBatchSize(100);
- kaha.getPageFile().setEnableWriteThread(true);
+ kaha.setJournalMaxFileLength(1024*1024*100);
+
+ // small batch means more frequent and smaller writes
+ kaha.setIndexWriteBatchSize(100);
+ // do the index write in a separate thread
+ kaha.setEnableIndexWriteAsync(true);
+
broker.setPersistenceAdapter(kaha);
}
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741528&r1=741527&r2=741528&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
Fri Feb 6 10:59:47 2009
@@ -69,6 +69,7 @@
// 4k Default page size.
public static final int DEFAULT_PAGE_SIZE =
Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4));
+ public static final int DEFAULT_WRITE_BATCH_SIZE =
Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
private static final int PAGE_FILE_HEADER_SIZE=1024*4;
@@ -101,7 +102,7 @@
private AtomicBoolean loaded = new AtomicBoolean();
// The number of pages we are aiming to write every time we
// write to disk.
- int writeBatchSize = 1000;
+ int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
// We keep a cache of pages recently used?
private LRUCache<Long, Page> pageCache;