Author: gtully
Date: Fri Feb  6 10:59:47 2009
New Revision: 741528

URL: http://svn.apache.org/viewvc?rev=741528&view=rev
Log:
expose relevant index attributes for configuration from kahadb

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741528&r1=741527&r2=741528&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 Fri Feb  6 10:59:47 2009
@@ -151,6 +151,9 @@
     protected boolean enableJournalDiskSyncs=true;
     long checkpointInterval = 5*1000;
     long cleanupInterval = 30*1000;
+    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
+    boolean enableIndexWriteAsync = false;
+    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 
     
     protected AtomicBoolean started = new AtomicBoolean();
     protected AtomicBoolean opened = new AtomicBoolean();
@@ -1138,7 +1141,7 @@
     // /////////////////////////////////////////////////////////////////
     protected final LinkedHashMap<TransactionId, ArrayList<Operation>> 
inflightTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
     protected final LinkedHashMap<TransactionId, ArrayList<Operation>> 
preparedTransactions = new LinkedHashMap<TransactionId, ArrayList<Operation>>();
-
+ 
     private ArrayList<Operation> getInflightTx(KahaTransactionInfo info, 
Location location) {
         TransactionId key = key(info);
         ArrayList<Operation> tx = inflightTransactions.get(key);
@@ -1219,13 +1222,16 @@
     // /////////////////////////////////////////////////////////////////
 
     private PageFile createPageFile() {
-        return new PageFile(directory, "db");
+        PageFile index = new PageFile(directory, "db");
+        index.setEnableWriteThread(isEnableIndexWriteAsync());
+        index.setWriteBatchSize(getIndexWriteBatchSize());
+        return index;
     }
 
     private Journal createJournal() {
         Journal manager = new Journal();
         manager.setDirectory(directory);
-        manager.setMaxFileLength(1024 * 1024 * 20);
+        manager.setMaxFileLength(getJournalMaxFileLength());
         manager.setUseNio(false);
         return manager;
     }
@@ -1245,7 +1251,23 @@
     public void setDeleteAllMessages(boolean deleteAllMessages) {
         this.deleteAllMessages = deleteAllMessages;
     }
+    
+    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
+        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
+    }
 
+    public int getIndexWriteBatchSize() {
+        return setIndexWriteBatchSize;
+    }
+    
+    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
+        this.enableIndexWriteAsync = enableIndexWriteAsync;
+    }
+    
+    boolean isEnableIndexWriteAsync() {
+        return enableIndexWriteAsync;
+    }
+    
     public boolean isEnableJournalDiskSyncs() {
         return enableJournalDiskSyncs;
     }
@@ -1270,6 +1292,14 @@
         this.cleanupInterval = cleanupInterval;
     }
 
+    public void setJournalMaxFileLength(int journalMaxFileLength) {
+        this.journalMaxFileLength = journalMaxFileLength;
+    }
+    
+    public int getJournalMaxFileLength() {
+        return journalMaxFileLength;
+    }
+    
     public PageFile getPageFile() {
         if (pageFile == null) {
             pageFile = createPageFile();

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741528&r1=741527&r2=741528&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
 Fri Feb  6 10:59:47 2009
@@ -56,7 +56,7 @@
         broker.stop();
     }
 
-    public void testForDataFileNotDeleted() throws Exception {
+    public void testEnqueueRateCanMeetSLA() throws Exception {
         if (true) {
             return;
         }
@@ -68,8 +68,8 @@
         final AtomicLong total = new AtomicLong(0);
         final AtomicLong slaViolations = new AtomicLong(0);
         final AtomicLong max = new AtomicLong(0);
-        long reportTime = 0;
-
+        final int numThreads = 6;
+        
         Runnable runner = new Runnable() {
 
             public void run() {
@@ -108,7 +108,7 @@
             }
         };
         ExecutorService executor = Executors.newCachedThreadPool();
-        int numThreads = 6;
+        
         for (int i = 0; i < numThreads; i++) {
             executor.execute(runner);
         }
@@ -127,7 +127,7 @@
 
     private void startBroker() throws Exception {
         broker = new BrokerService();
-        broker.setDeleteAllMessagesOnStartup(true);
+        //broker.setDeleteAllMessagesOnStartup(true);
         broker.setPersistent(true);
         broker.setUseJmx(true);
 
@@ -155,9 +155,13 @@
             // Index is going to be in consistent, but can it be repaired?
             kaha.setEnableJournalDiskSyncs(false);
             // Using a bigger journal file size makes he take fewer spikes as 
it is not switching files as often.
-            kaha.getJournal().setMaxFileLength(1024*1024*100);
-            kaha.getPageFile().setWriteBatchSize(100);
-            kaha.getPageFile().setEnableWriteThread(true);
+            kaha.setJournalMaxFileLength(1024*1024*100);
+            
+            // small batch means more frequent and smaller writes
+            kaha.setIndexWriteBatchSize(100);
+            // do the index write in a separate thread
+            kaha.setEnableIndexWriteAsync(true);
+            
             broker.setPersistenceAdapter(kaha);
         }
 

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741528&r1=741527&r2=741528&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
Fri Feb  6 10:59:47 2009
@@ -69,6 +69,7 @@
     
     // 4k Default page size.
     public static final int DEFAULT_PAGE_SIZE = 
Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
+    public static final int DEFAULT_WRITE_BATCH_SIZE = 
Integer.parseInt(System.getProperty("defaultWriteBatchSize", ""+1000));
     private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
     private static final int PAGE_FILE_HEADER_SIZE=1024*4;
 
@@ -101,7 +102,7 @@
     private AtomicBoolean loaded = new AtomicBoolean();
     // The number of pages we are aiming to write every time we 
     // write to disk.
-    int writeBatchSize = 1000;
+    int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
 
     // We keep a cache of pages recently used?
     private LRUCache<Long, Page> pageCache;


Reply via email to