Author: chirino
Date: Thu Feb  5 15:29:48 2009
New Revision: 741137

URL: http://svn.apache.org/viewvc?rev=741137&view=rev
Log:
Added the ability to customize thw write batch size in the PageFile.  
Added some diagnostic logging to see when store updates take too long and why.


Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741137&r1=741136&r2=741137&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 Thu Feb  5 15:29:48 2009
@@ -415,6 +415,7 @@
 
     protected void checkpointCleanup(final boolean cleanup) {
         try {
+               long start = System.currentTimeMillis();
             synchronized (indexMutex) {
                if( !opened.get() ) {
                        return;
@@ -425,6 +426,10 @@
                     }
                 });
             }
+               long end = System.currentTimeMillis();
+               if( end-start > 100 ) { 
+                       LOG.warn("KahaDB Cleanup took "+(end-start));
+               }
         } catch (IOException e) {
                e.printStackTrace();
         }
@@ -457,12 +462,22 @@
      * durring a recovery process.
      */
     public Location store(JournalCommand data, boolean sync) throws 
IOException {
+
+       
         int size = data.serializedSizeFramed();
         DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
         os.writeByte(data.type().getNumber());
         data.writeFramed(os);
+
+        long start = System.currentTimeMillis();
         Location location = journal.write(os.toByteSequence(), sync);
+        long start2 = System.currentTimeMillis();
         process(data, location);
+       long end = System.currentTimeMillis();
+       if( end-start > 100 ) { 
+               LOG.warn("KahaDB long enqueue time: Journal Add Took: 
"+(start2-start)+" ms, Index Update took "+(end-start2)+" ms");
+       }
+
         metadata.lastUpdate = location;
         return location;
     }

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java?rev=741137&r1=741136&r2=741137&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/VerifySteadyEnqueueRate.java
 Thu Feb  5 15:29:48 2009
@@ -145,6 +145,7 @@
             KahaDBStore kaha = new KahaDBStore();
             kaha.setDirectory(new File("target/activemq-data/kahadb"));
             kaha.deleteAllMessages();
+            kaha.getPageFile().setWriteBatchSize(10);
             broker.setPersistenceAdapter(kaha);
         }
 

Modified: 
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=741137&r1=741136&r2=741137&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
Thu Feb  5 15:29:48 2009
@@ -104,6 +104,9 @@
     private int recoveryPageCount;
 
     private AtomicBoolean loaded = new AtomicBoolean();
+    // The number of pages we are aiming to write every time we 
+    // write to disk.
+    int writeBatchSize = 1000;
 
     // We keep a cache of pages recently used?
     private LRUCache<Long, Page> pageCache;
@@ -824,7 +827,7 @@
     }
     
     private boolean canStartWriteBatch() {
-        int capacityUsed = ((writes.size() * 100)/1000);
+               int capacityUsed = ((writes.size() * 100)/writeBatchSize);
         if( enableAsyncWrites ) {
             // The constant 10 here controls how soon write batches start 
going to disk..
             // would be nice to figure out how to auto tune that value.  Make 
to small and
@@ -1099,4 +1102,12 @@
                return getMainPageFile();
        }
 
+       public int getWriteBatchSize() {
+               return writeBatchSize;
+       }
+
+       public void setWriteBatchSize(int writeBatchSize) {
+               this.writeBatchSize = writeBatchSize;
+       }
+
 }


Reply via email to