Author: j16sdiz
Date: 2008-07-01 07:36:20 +0000 (Tue, 01 Jul 2008)
New Revision: 20886

Modified:
   
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
refactor to BatchProcessor interface and fix key count

Modified: 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
--- 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java  
    2008-07-01 04:07:26 UTC (rev 20885)
+++ 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java  
    2008-07-01 07:36:20 UTC (rev 20886)
@@ -11,7 +11,6 @@
 import java.nio.channels.FileChannel;
 import java.security.MessageDigest;
 import java.text.DecimalFormat;
-import java.util.AbstractList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -252,8 +251,10 @@

                                        // Overwrite old offset
                                        Entry entry = new Entry(routingKey, 
header, data);
-                                       writeEntry(entry, oldOffset); // 
overwrite, don't increase keyCount
+                                       writeEntry(entry, oldOffset);
                                        writes.incrementAndGet();
+                                       if (oldEntry.getGeneration() != 
generation)
+                                               keyCount.incrementAndGet();
                                        return;
                                }

@@ -268,7 +269,7 @@
                                                if (updateBloom)
                                                        
bloomFilter.updateFilter(getDigestedRoutingKey(routingKey));
                                                writeEntry(entry, offset[i]);
-                                               long written = 
writes.incrementAndGet();
+                                               writes.incrementAndGet();
                                                keyCount.incrementAndGet();

                                                return;
@@ -280,8 +281,11 @@
                                        Logger.debug(this, "collision, write to 
i=0, offset=" + offset[0]);
                                if (updateBloom)
                                        
bloomFilter.updateFilter(getDigestedRoutingKey(routingKey));
+                               oldEntry = readEntry(offset[0], null);
                                writeEntry(entry, offset[0]);
-                               long written = writes.incrementAndGet();
+                               writes.incrementAndGet();
+                               if (oldEntry.getGeneration() != generation)
+                                       keyCount.incrementAndGet();
                        } finally {
                                unlockPlainKey(routingKey, false);
                        }
@@ -568,6 +572,14 @@
                                digestedRoutingKey = 
SaltedHashFreenetStore.this.getDigestedRoutingKey(this.plainRoutingKey);
                        return digestedRoutingKey;
                }
+
+               public byte getGeneration() {
+                       return generation;
+               }
+
+               public void setGeneration(byte generation) {
+                       this.generation = generation;
+               }
        }

        /**
@@ -820,7 +832,7 @@
        /**
         * Write config file
         */
-       private void writeConfigFile() throws IOException {
+       private void writeConfigFile() {
                configLock.writeLock().lock();
                try {
                        File tempConfig = new File(configFile.getPath() + 
".tmp");
@@ -838,6 +850,8 @@
                        raf.close();

                        FileUtil.renameTo(tempConfig, configFile);
+               } catch (IOException ioe) {
+                       Logger.error(this, "error writing config file for " + 
name, ioe);
                } finally {
                        configLock.writeLock().unlock();
                }
@@ -849,6 +863,11 @@
        private static Lock cleanerGlobalLock = new ReentrantLock(); // global 
across all datastore
        private Cleaner cleanerThread;

+       private interface BatchProcessor {
+               // return <code>null</code> to free the entry
+               Entry processs(Entry entry);
+       }
+       
        private class Cleaner extends Thread {
                /**
                 * How often the clean should run
@@ -898,12 +917,7 @@
                                        } catch (Exception e) { // may throw 
IOException (even if it is not defined)
                                                Logger.error(this, "Can't force 
bloom filter", e);
                                        }
-                                       try {
-                                               writeConfigFile();
-                                       } catch (IOException e) {
-                                               Logger.error(this, "Can't write 
config file", e);
-                                       }
-
+                                       writeConfigFile();
                                        cleanerLock.notifyAll();

                                        try {
@@ -930,7 +944,7 @@

                        initOldEntriesFile();

-                       List<Entry> oldEntryList = new LinkedList<Entry>();
+                       final List<Entry> oldEntryList = new 
LinkedList<Entry>();

                        // start from end of store, make store shrinking 
quicker 
                        long startOffset = (_prevStoreSize / 
RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
@@ -938,8 +952,16 @@
                                if (shutdown)
                                        return;

-                               batchReadEntries(curOffset, 
RESIZE_MEMORY_ENTRIES, oldEntryList, true);
+                               batchReadEntries(curOffset, 
RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
+                                       public Entry processs(Entry entry) {
+                                               if (entry.getStoreSize() == 
storeSize) // new size
+                                                       return entry;

+                                               oldEntryList.add(entry);
+                                               return null;
+                                       }
+                               });
+
                                if (storeSize < _prevStoreSize)
                                        setStoreFileSize(Math.max(storeSize, 
curOffset));

@@ -984,32 +1006,33 @@

                        Logger.normal(this, "Start rebuilding bloom filter for 
" + callback);

-                       bloomFilter.fork();
-                       List<Entry> buildList = new AbstractList<Entry>() {
-                               @Override
-                               public void add(int index, Entry entry) {
-                                       
bloomFilter.updateFilter(entry.getDigestedRoutingKey());
-                               }

-                               @Override
-                               public Entry get(int index) {
-                                       return null;
-                               }
+                       configLock.writeLock().lock();
+                       try {
+                               generation++;
+                               bloomFilter.fork();
+                               keyCount.set(0);
+                       } finally {
+                               configLock.writeLock().unlock();
+                       }

-                               @Override
-                               public int size() {
-                                       return 0;
-                               }
-
-                       };
-
                        for (long curOffset = 0; curOffset < storeSize; 
curOffset += RESIZE_MEMORY_ENTRIES) {
                                if (shutdown) {
                                        bloomFilter.discard();
                                        return;
                                }
-                               batchReadEntries(curOffset, 
RESIZE_MEMORY_ENTRIES, buildList, false);
+                               batchReadEntries(curOffset, 
RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
+                                       public Entry processs(Entry entry) {
+                                               if (entry.getGeneration() != 
generation) {
+                                                       
bloomFilter.updateFilter(entry.getDigestedRoutingKey());
+                                                       
keyCount.incrementAndGet();
+                                               }
+                                               return entry;
+                                       }
+                               });
+                               
                                Logger.normal(this, "Rebuilding bloom filter 
for " + callback + ": " + curOffset + "/" + storeSize);
+                               writeConfigFile();
                        }

                        bloomFilter.merge();
@@ -1024,23 +1047,19 @@
                }

                /**
-                * Read a list of items from store. In resizing mode, only old 
items are read and the
-                * original offsets are freed.
+                * Read a list of items from store.
                 * 
                 * @param offset
                 *            start offset, must be multiple of {@link 
FILE_SPLIT}
                 * @param length
                 *            number of items to read, must be multiple of 
{@link FILE_SPLIT}. If this
                 *            excess store size, read as much as possible.
-                * @param items
-                *            a list of items
-                * @param resizing
-                *            If <code>true</code>, only get old items and free 
the offset. Otherwise, get
-                *            all items.
+                * @param processor
+                *            batch processor
                 * @return <code>true</code> if operation complete 
successfully; <code>false</code>
                 *         otherwise (e.g. can't acquire locks, node shutting 
down)
                 */
-               private boolean batchReadEntries(long offset, int length, 
List<Entry> items, boolean resizing) {
+               private boolean batchReadEntries(long offset, int length, 
BatchProcessor processor) {
                        assert offset % FILE_SPLIT == 0;
                        assert length % FILE_SPLIT == 0;

@@ -1060,6 +1079,7 @@

                                ByteBuffer buf = ByteBuffer.allocate((int) 
bufLen);
                                for (int i = 0; i < FILE_SPLIT; i++) { // for 
each split file
+                                       boolean dirty = false;
                                        buf.clear();
                                        try {
                                                while (buf.hasRemaining()) {
@@ -1074,27 +1094,31 @@
                                        }
                                        buf.flip();

-                                       for (int j = 0; buf.remaining() >= 
entryTotalLength; j++) {
-                                               if (shutdown)
-                                                       return false;
+                                       try { 
+                                               for (int j = 0; buf.limit() >= 
j * entryTotalLength; j++) {
+                                                       if (shutdown)
+                                                               return false;

-                                               ByteBuffer enBuf = buf.slice();
-                                               buf.position(buf.position() + 
(int) entryTotalLength);
+                                                       buf.position((int) (j * 
entryTotalLength));
+                                                       if (buf.remaining() < 
entryTotalLength) // EOF
+                                                               break;

-                                               enBuf.limit((int) 
entryTotalLength);
+                                                       ByteBuffer enBuf = 
buf.slice();
+                                                       enBuf.limit((int) 
entryTotalLength);

-                                               Entry entry = new Entry(enBuf);
-                                               entry.curOffset = offset + j * 
FILE_SPLIT + i;
+                                                       Entry entry = new 
Entry(enBuf);
+                                                       entry.curOffset = 
offset + j * FILE_SPLIT + i;

-                                               try {
                                                        if (entry.isFree())
                                                                continue; // 
not occupied
-                                                       if (resizing && 
entry.storeSize != storeSize)
-                                                               continue; // 
resizing mode, not new item

-                                                       items.add(entry);
-
-                                                       if (resizing) { // free 
the offset
+                                                       Entry newEntry = 
processor.processs(entry);
+                                                       if (newEntry != null) {
+                                                               // write back
+                                                               
buf.position((int) (j * entryTotalLength));
+                                                               
buf.put(newEntry.toByteBuffer());
+                                                               dirty = true;
+                                                       } else { // free the 
offset
                                                                try {
                                                                        
freeOffset(entry.curOffset);
                                                                        
keyCount.decrementAndGet();
@@ -1103,11 +1127,20 @@
                                                                                
Logger.error(this, "error freeing entry " + entry.curOffset, ioe);
                                                                }
                                                        }
-                                               } finally {
-                                                       // unlock current entry
-                                                       
unlockEntry(entry.curOffset);
-                                                       locked[(int) 
(entry.curOffset - offset)] = false;
                                                }
+                                       } finally {
+                                               // write back.
+                                               if (dirty) {
+                                                       buf.flip();
+
+                                                       try {
+                                                               while 
(buf.hasRemaining()) {
+                                                                       
storeFC[i].write(buf, startFileOffset + buf.position());
+                                                               }
+                                                       } catch (IOException 
ioe) {
+                                                               
Logger.error(this, "unexpected IOException", ioe);
+                                                       }
+                                               }
                                        }
                                }

@@ -1401,11 +1434,7 @@
                        try {
                                flushAndClose();
                                flags &= ~FLAG_DIRTY; // clean shutdown
-                               try {
-                                       writeConfigFile();
-                               } catch (IOException e) {
-                                       Logger.error(this, "error writing store 
config", e);
-                               }
+                               writeConfigFile();
                        } finally {
                                configLock.writeLock().unlock();
                        }


Reply via email to