Author: gtully Date: Fri Feb 6 10:59:47 2009 New Revision: 741528 URL: http://svn.apache.org/viewvc?rev=741528&view=rev Log: expose relevant index attributes for configuration from kahadb
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741528&r1=741527&r2=741528&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Feb 6 10:59:47 2009 @@ -151,6 +151,9 @@ protected boolean enableJournalDiskSyncs=true; long checkpointInterval = 5*1000; long cleanupInterval = 30*1000; + int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; + boolean enableIndexWriteAsync = false; + int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; protected AtomicBoolean started = new AtomicBoolean(); protected AtomicBoolean opened = new AtomicBoolean(); @@ -1138,7 +1141,7 @@ // ///////////////////////////////////////////////////////////////// protected final LinkedHashMap<TransactionId, ArrayList<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>(); protected final LinkedHashMap<TransactionId, ArrayList<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>(); - + private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, Location location) { TransactionId key = key(info); ArrayList<Operation> tx = inflightTransactions.get(key); @@ -1219,13 +1222,16 @@ // ///////////////////////////////////////////////////////////////// private PageFile createPageFile() { - return new PageFile(directory, "db"); + PageFile index = new PageFile(directory, "db"); + index.setEnableWriteThread(isEnableIndexWriteAsync()); + index.setWriteBatchSize(getIndexWriteBatchSize()); + return index; } private Journal createJournal() { Journal manager = new Journal(); manager.setDirectory(directory); - manager.setMaxFileLength(1024 * 1024 * 20); + manager.setMaxFileLength(getJournalMaxFileLength()); manager.setUseNio(false); return manager; } @@ -1245,7 +1251,23 @@ public void setDeleteAllMessages(boolean deleteAllMessages) { this.deleteAllMessages = deleteAllMessages; } + + public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { + this.setIndexWriteBatchSize = setIndexWriteBatchSize; + } + public int getIndexWriteBatchSize() { + return setIndexWriteBatchSize; + } + + public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { + this.enableIndexWriteAsync = enableIndexWriteAsync; + } + + boolean isEnableIndexWriteAsync() { + return enableIndexWriteAsync; + } + public boolean isEnableJournalDiskSyncs() { return enableJournalDiskSyncs; } @@ -1270,6 +1292,14 @@ this.cleanupInterval = cleanupInterval; } + public void setJournalMaxFileLength(int journalMaxFileLength) { + this.journalMaxFileLength = journalMaxFileLength; + } + + public int getJournalMaxFileLength() { + return journalMaxFileLength; + } + public PageFile getPageFile() { if (pageFile == null) { pageFile = createPageFile(); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741528&r1=741527&r2=741528&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java Fri Feb 6 10:59:47 2009 @@ -56,7 +56,7 @@ broker.stop(); } - public void testForDataFileNotDeleted() throws Exception { + public void testEnqueueRateCanMeetSLA() throws Exception { if (true) { return; } @@ -68,8 +68,8 @@ final AtomicLong total = new AtomicLong(0); final AtomicLong slaViolations = new AtomicLong(0); final AtomicLong max = new AtomicLong(0); - long reportTime = 0; - + final int numThreads = 6; + Runnable runner = new Runnable() { public void run() { @@ -108,7 +108,7 @@ } }; ExecutorService executor = Executors.newCachedThreadPool(); - int numThreads = 6; + for (int i = 0; i < numThreads; i++) { executor.execute(runner); } @@ -127,7 +127,7 @@ private void startBroker() throws Exception { broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); + //broker.setDeleteAllMessagesOnStartup(true); broker.setPersistent(true); broker.setUseJmx(true); @@ -155,9 +155,13 @@ // Index is going to be in consistent, but can it be repaired? kaha.setEnableJournalDiskSyncs(false); // Using a bigger journal file size makes he take fewer spikes as it is not switching files as often. - kaha.getJournal().setMaxFileLength(1024*1024*100); - kaha.getPageFile().setWriteBatchSize(100); - kaha.getPageFile().setEnableWriteThread(true); + kaha.setJournalMaxFileLength(1024*1024*100); + + // small batch means more frequent and smaller writes + kaha.setIndexWriteBatchSize(100); + // do the index write in a separate thread + kaha.setEnableIndexWriteAsync(true); + broker.setPersistenceAdapter(kaha); } Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741528&r1=741527&r2=741528&view=diff ============================================================================== --- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original) +++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Fri Feb 6 10:59:47 2009 @@ -69,6 +69,7 @@ // 4k Default page size. public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); + public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000)); private static final int RECOVERY_FILE_HEADER_SIZE=1024*4; private static final int PAGE_FILE_HEADER_SIZE=1024*4; @@ -101,7 +102,7 @@ private AtomicBoolean loaded = new AtomicBoolean(); // The number of pages we are aiming to write every time we // write to disk. - int writeBatchSize = 1000; + int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE; // We keep a cache of pages recently used? private LRUCache<Long, Page> pageCache;