Author: chirino
Date: Mon Aug 11 13:24:15 2008
New Revision: 684905

URL: http://svn.apache.org/viewvc?rev=684905&view=rev
Log:
Delaying the write thread a bit so that it does work in larger batches.  

Modified:
    
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
    
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java

Modified: 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java?rev=684905&r1=684904&r2=684905&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
 (original)
+++ 
activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/HashIndex.java
 Mon Aug 11 13:24:15 2008
@@ -58,7 +58,7 @@
     public static final int MAXIMUM_CAPACITY;
     public static final int DEFAULT_LOAD_FACTOR;
     private static final int LOW_WATER_MARK=1024*16;
-    private static final int MAX_PAGES_IN_RECOVERY_BUFFER=100;
+    private static final int MAX_PAGES_IN_RECOVERY_BUFFER=1000;
     // Recovery header is (long offset) + (int data_size) 
     private static final int RECOVERY_HEADER_SIZE=12;  
     
@@ -83,6 +83,8 @@
     private LinkedList<HashPage> freeList = new LinkedList<HashPage>();
     private AtomicBoolean loaded = new AtomicBoolean();
     private LRUCache<Long, HashPage> pageCache;
+    private boolean enableRecoveryBuffer=true;
+    private boolean enableSyncedWrites=true;
     
     private boolean enablePageCaching=false;//this is off by default - see 
AMQ-1667
     private int pageCacheSize = 10;
@@ -711,10 +713,24 @@
             } else {
                 write.setCurrent(data);
             }
-            writes.notify();
+            
+            // Once we start approaching capacity, notify the writer to start 
writing
+            if( canStartWriteBatch() ) {
+                writes.notify();
+            }
         }
     }
 
+    private boolean canStartWriteBatch() {
+        int capacityUsed = ((writes.size() * 100)/MAXIMUM_CAPACITY);
+        
+        // The constant 10 here controls how soon write batches start going to 
disk..
+        // would be nice to figure out how to auto tune that value.  Make to 
small and
+        // we reduce through put because we are locking the write mutex too 
offen doing writes
+    
+        return capacityUsed >= 10 || checkpointLatch!=null;
+    }
+
     
     /**
      * 
@@ -725,13 +741,13 @@
      * @throws IOException 
      */
     public boolean doWrites(long timeout, TimeUnit unit) throws IOException {
-        
+                
         int batchLength=8+4; // Account for the:  lastTxid + recovery record 
counter
         ArrayList<PageWrite> batch = new 
ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
         
         synchronized( writes ) {            
-            // If there is nothing to write, wait for a notification...
-            if( writes.isEmpty() ) {
+            // If there is not enough to write, wait for a notification...
+            if( !canStartWriteBatch() ) {
                 releaseCheckpointWaiter();
                 try {
                     writes.wait(unit.toMillis(timeout));
@@ -744,7 +760,6 @@
                 return false;
             }
             
-              
             // build a write batch from the current write cache. 
             for (PageWrite write : writes.values()) {
                 
@@ -765,22 +780,28 @@
         }
        long txId = nextTxid.get();
         
-       StringBuilder pageOffsets = new StringBuilder();
-       
-        // Now the batch array has all the writes, write the batch to the 
recovery buffer.
-        writeIndexFile.seek(0);
-        writeIndexFile.writeLong(txId); // write txid of the batch
-        writeIndexFile.writeInt(batch.size()); // write the recovery record 
counter.
-        for (PageWrite w : batch) {
-            writeIndexFile.writeLong(w.page.getOffset());
-            writeIndexFile.writeInt(w.diskBound.length);
-            writeIndexFile.write(w.diskBound.data, w.diskBound.offset, 
w.diskBound.length);
+ 
+       if (enableRecoveryBuffer) {
+            // Now the batch array has all the writes, write the batch to the
+            // recovery buffer.
+            writeIndexFile.seek(0);
+            writeIndexFile.writeLong(txId); // write txid of the batch
+            writeIndexFile.writeInt(batch.size()); // write the recovery record
+                                                    // counter.
+            for (PageWrite w : batch) {
+                writeIndexFile.writeLong(w.page.getOffset());
+                writeIndexFile.writeInt(w.diskBound.length);
+                writeIndexFile.write(w.diskBound.data, w.diskBound.offset, 
w.diskBound.length);
+            }
+            if( enableSyncedWrites ) {
+                // Sync to make sure recovery buffer writes land on disk..
+                writeIndexFile.getFD().sync();
+            }
         }
-        
-        // Sync to make sure recovery buffer writes land on disk..
-        writeIndexFile.getFD().sync(); 
+       
         
         // Now update the actual index locations
+        StringBuilder pageOffsets = new StringBuilder();
         for (PageWrite w : batch) {
             if( pageOffsets.length()!=0 ) {
                 pageOffsets.append(", ");
@@ -791,10 +812,13 @@
         }
         
         // Sync again
-        writeIndexFile.getFD().sync();
-        LOG.debug("Index write complete tx: "+txId+", pages: "+pageOffsets);
-
+        if( enableSyncedWrites ) {
+            writeIndexFile.getFD().sync();
+            LOG.debug("Index write complete tx: "+txId+", pages: 
"+pageOffsets);
+        }
+        
         nextTxid.incrementAndGet();
+
         synchronized( writes ) {
             for (PageWrite w : batch) {
                 // If there are no more pending writes, then remove it from 
the write cache.
@@ -806,6 +830,7 @@
                 releaseCheckpointWaiter();
             }
         }
+        
         return true;
     }
 
@@ -839,6 +864,7 @@
                 this.checkpointLatch = new CountDownLatch(1);
             }
             checkpointLatch = this.checkpointLatch;
+            writes.notify();
         }        
         try {
             checkpointLatch.await();        
@@ -963,7 +989,7 @@
             public void run() {
                 try {
                     while( !stopWriter.get() ) {
-                        doWrites(500, TimeUnit.MILLISECONDS);
+                        doWrites(1000, TimeUnit.MILLISECONDS);
                     }
                 } catch (IOException e) {
                     e.printStackTrace();
@@ -980,4 +1006,20 @@
         writerThread.join();
     }
 
+    public boolean isEnableRecoveryBuffer() {
+        return enableRecoveryBuffer;
+    }
+
+    public void setEnableRecoveryBuffer(boolean doubleBuffer) {
+        this.enableRecoveryBuffer = doubleBuffer;
+    }
+
+    public boolean isEnableSyncedWrites() {
+        return enableSyncedWrites;
+    }
+
+    public void setEnableSyncedWrites(boolean syncWrites) {
+        this.enableSyncedWrites = syncWrites;
+    }
+
 }

Modified: 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java
URL: 
http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java?rev=684905&r1=684904&r2=684905&view=diff
==============================================================================
--- 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java
 (original)
+++ 
activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/robusthash/HashIndexBenchMark.java
 Mon Aug 11 13:24:15 2008
@@ -28,9 +28,9 @@
     @Override
     protected Index createIndex(File root, String name) throws Exception {
         HashIndex index = new HashIndex(root, name, indexManager);
-        //index.setNumberOfBins(12);
-        //index.setPageSize(32 * 1024);
         index.setKeyMarshaller(Store.STRING_MARSHALLER);
+//        index.setEnableRecoveryBuffer(false);
+//        index.setEnableSyncedWrites(false);
         return index;
     }
 


Reply via email to