Author: chirino
Date: Fri Sep 12 07:17:47 2008
New Revision: 694681

URL: http://svn.apache.org/viewvc?rev=694681&view=rev
Log:
The periodic checkpoint is enabled which will clean up old journal data files 
and fush outstanding index writes to disk.
Also enabled the recovery buffer, using synced writes and the async thread 
writer as the perf looks good with them on still.


Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java 
(original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java 
Fri Sep 12 07:17:47 2008
@@ -520,7 +520,7 @@
                 }
             }
         } else {
-            visitor.visit(keys, values);
+            visitor.visit(Arrays.asList(keys), Arrays.asList(values));
         }
     }
     

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java 
(original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java 
Fri Sep 12 07:17:47 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.kahadb.index;
 
+import java.util.List;
+
 /**
  * Interface used to selectively visit the entries in a BTree.
  * 
@@ -39,6 +41,6 @@
      * @param keys
      * @param values
      */
-    void visit(Key[] keys, Value[] values);
+    void visit(List<Key> keys, List<Value> values);
     
 }
\ No newline at end of file

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
(original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java 
Fri Sep 12 07:17:47 2008
@@ -107,11 +107,11 @@
     
     // Should first log the page write to the recovery buffer? Avoids partial
     // page write failures..
-    private boolean enableRecoveryFile=false;
+    private boolean enableRecoveryFile=true;
     // Will we sync writes to disk. Ensures that data will not be lost after a 
checkpoint()
-    private boolean enableSyncedWrites=false;
+    private boolean enableSyncedWrites=true;
     // Will writes be done in an async thread?
-    private boolean enableAsyncWrites=false;
+    private boolean enableAsyncWrites=true;
 
     // These are used if enableAsyncWrites==true 
     private AtomicBoolean stopWriter = new AtomicBoolean();
@@ -427,18 +427,17 @@
             if( writes.isEmpty()) {                
                 return;
             }
-            if( this.checkpointLatch == null ) {
-                this.checkpointLatch = new CountDownLatch(1);
-            }
-            checkpointLatch = this.checkpointLatch;
             if( enableAsyncWrites ) {
+                if( this.checkpointLatch == null ) {
+                    this.checkpointLatch = new CountDownLatch(1);
+                }
+                checkpointLatch = this.checkpointLatch;
                 writes.notify();
             } else {
-                while( !writes.isEmpty() ) {
-                    writeBatch(-1, TimeUnit.MILLISECONDS);
-                }
+                writeBatch();
+                return;
             }
-        }        
+        }
         try {
             checkpointLatch.await();        
         } catch (InterruptedException e) {
@@ -811,9 +810,7 @@
                 if( enableAsyncWrites  ) {
                     writes.notify();
                 } else {
-                    while( canStartWriteBatch() ) {
-                        writeBatch(-1, TimeUnit.MILLISECONDS);
-                    }
+                    writeBatch();
                 }
             }
         }            
@@ -865,33 +862,46 @@
     ///////////////////////////////////////////////////////////////////
     // Internal Double write implementation follows...
     ///////////////////////////////////////////////////////////////////
-    
+    /**
+     * 
+     */
+    private void pollWrites() {
+        try {
+            while( !stopWriter.get() ) {
+                // Wait for a notification...
+                synchronized( writes ) {   
+                    // If there is not enough to write, wait for a 
notification...
+                    while( !canStartWriteBatch() && !stopWriter.get() ) {
+                        writes.wait(100);
+                    }
+                    
+                    if( writes.isEmpty() ) {
+                        releaseCheckpointWaiter();
+                    }
+                }
+                writeBatch();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+        } finally {
+            releaseCheckpointWaiter();
+        }
+    }
+
     /**
      * 
      * @param timeout
      * @param unit
-     * @return true if a write was done.
+     * @return true if there are still pending writes to do.
      * @throws InterruptedException 
      * @throws IOException 
      */
-    private boolean writeBatch(long timeout, TimeUnit unit) throws IOException 
{
-                
-        ArrayList<PageWrite> batch;
-        synchronized( writes ) {   
+    private void writeBatch() throws IOException {
             
+        CountDownLatch checkpointLatch;
+        ArrayList<PageWrite> batch;
+        synchronized( writes ) {
             // If there is not enough to write, wait for a notification...
-            if( !canStartWriteBatch() && timeout>=0 ) {
-                releaseCheckpointWaiter();
-                try {
-                    writes.wait(unit.toMillis(timeout));
-                } catch (InterruptedException e) {
-                    throw new InterruptedIOException();
-                }
-            }
-            if( writes.isEmpty() ) {
-                releaseCheckpointWaiter();
-                return false;
-            }
 
             batch = new ArrayList<PageWrite>(writes.size());
             // build a write batch from the current write cache. 
@@ -901,6 +911,11 @@
                 // page again without blocking for this write.
                 write.begin();
             }
+
+            // Grab on to the existing checkpoint latch cause once we do this 
write we can 
+            // release the folks that were waiting for those writes to hit 
disk.
+            checkpointLatch = this.checkpointLatch;
+            this.checkpointLatch=null;
         }
         
  
@@ -964,12 +979,11 @@
                     writes.remove(w.page.getPageId());
                 }
             }
-            if( writes.isEmpty() ) {
-                releaseCheckpointWaiter();
-            }
         }
         
-        return true;
+        if( checkpointLatch!=null ) {
+            checkpointLatch.countDown();
+        }
     }
 
     private long recoveryFileSizeForPages(int pageCount) {
@@ -1054,31 +1068,22 @@
         synchronized( writes ) {
             if( enableAsyncWrites ) {
                 stopWriter.set(false);
-                writerThread = new Thread("Page Writer") {
+                writerThread = new Thread("KahaDB Page Writer") {
                     @Override
                     public void run() {
-                        try {
-                            while( !stopWriter.get() ) {
-                                writeBatch(1000, TimeUnit.MILLISECONDS);
-                            }
-                        } catch (Throwable e) {
-                            e.printStackTrace();
-                        } finally {
-                            releaseCheckpointWaiter();
-                        }
+                        pollWrites();
                     }
                 };
+                writerThread.setPriority(Thread.MAX_PRIORITY);
                 writerThread.start();
             }
         }
     }
  
     private void stopWriter() throws InterruptedException {
-        synchronized( writes ) {
-            if( enableAsyncWrites ) {
-                stopWriter.set(true);
-                writerThread.join();
-            }
+        if( enableAsyncWrites ) {
+            stopWriter.set(true);
+            writerThread.join();
         }
     }
 

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
 Fri Sep 12 07:17:47 2008
@@ -150,6 +150,7 @@
     protected boolean recovering;
     protected Thread checkpointThread;
     protected boolean syncWrites;
+    int checkpointInterval = 30*1000;
     
     protected AtomicBoolean started = new AtomicBoolean();
 
@@ -258,7 +259,7 @@
                     while (started.get()) {
                         Thread.sleep(500);
                         long now = System.currentTimeMillis();
-                        if( now - start >= 1000*1000 ) {
+                        if( now - start >= checkpointInterval ) {
                             checkpoint();
                             start = now;
                         }
@@ -663,9 +664,12 @@
      */
     private void checkpointUpdate(Transaction tx) throws IOException {
 
+        LOG.debug("Checkpoint started.");
+
         // Find empty journal files to remove.
         final HashSet<Integer> inUseFiles = new HashSet<Integer>();
         
+        
         for (StoredDestination sd : storedDestinations.values()) {
             // Use a visitor to cut down the number of pages that we load
             sd.orderIndex.visit(tx, new BTreeVisitor<Location, String>() {
@@ -683,18 +687,19 @@
                     return true;
                 }
 
-                public void visit(Location[] keys, String[] values) {
-                    for (int i = 0; i < keys.length; i++) {
-                        if( last == keys[i].getDataFileId() ) {
-                            inUseFiles.add(keys[i].getDataFileId());
-                            last = keys[i].getDataFileId();
+                public void visit(List<Location> keys, List<String> values) {
+                    for (int i = 0; i < keys.size(); i++) {
+                        if( last != keys.get(i).getDataFileId() ) {
+                            inUseFiles.add(keys.get(i).getDataFileId());
+                            last = keys.get(i).getDataFileId();
                         }
                     }
                     
                 }
+
             });
         }
-        
+                
         metadata.state = OPEN_STATE;
         metadata.firstInProgressTransactionLocation = 
getFirstInProgressTxLocation();
         tx.store(metadata.page, metadataMarshaller, true);
@@ -703,7 +708,13 @@
         if( metadata.firstInProgressTransactionLocation!=null ) {
             l = metadata.firstInProgressTransactionLocation;
         }
+        
+        LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
+
+        pageFile.flush();
         asyncDataManager.consolidateDataFilesNotIn(inUseFiles, 
l==null?null:l.getDataFileId());
+        
+        LOG.debug("Checkpoint done.");
     }
 
 
@@ -1058,4 +1069,12 @@
         this.syncWrites = syncWrites;
     }
 
+    public int getCheckpointInterval() {
+        return checkpointInterval;
+    }
+
+    public void setCheckpointInterval(int checkpointInterval) {
+        this.checkpointInterval = checkpointInterval;
+    }
+
 }

Modified: 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
 (original)
+++ 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
 Fri Sep 12 07:17:47 2008
@@ -19,6 +19,7 @@
 import java.io.PrintWriter;
 import java.text.NumberFormat;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.kahadb.LongMarshaller;
@@ -143,6 +144,36 @@
         tx.commit();
     }
     
+    
+    public void testVisitor() throws Exception {
+        createPageFileAndIndex(100);
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+        this.index.load(tx);
+        tx.commit();
+          
+        // Insert in reverse order..
+        doInsert(1000);
+        
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
+
+        // BTree should iterate it in sorted order.
+        
+        index.visit(tx, new BTreeVisitor<String, Long>(){
+            public boolean isInterestedInKeysBetween(String first, String 
second) {
+                return true;
+            }
+            public void visit(List<String> keys, List<Long> values) {
+            }
+        });
+        
+
+        this.index.unload(tx);
+        tx.commit();
+    }
+    
     void doInsertReverse(int count) throws Exception {
         for (int i = count-1; i >= 0; i--) {
             index.put(tx, key(i), (long)i);


Reply via email to