Author: j16sdiz
Date: 2008-05-09 02:09:10 +0000 (Fri, 09 May 2008)
New Revision: 19857

Modified:
   
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
Initial datastore resize code


Modified: 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
--- 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java  
    2008-05-09 02:08:50 UTC (rev 19856)
+++ 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java  
    2008-05-09 02:09:10 UTC (rev 19857)
@@ -28,6 +28,9 @@
 import freenet.support.io.FileUtil;
 import freenet.support.math.RunningAverage;
 import freenet.support.math.SimpleRunningAverage;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;

 /**
  * Index-less data store based on salted hash
@@ -36,7 +39,7 @@
  */
 public class SaltedHashFreenetStore implements FreenetStore {
        private static final boolean OPTION_SAVE_PLAINKEY = true;
-       
+
        private static boolean logMINOR;
        private static boolean logDEBUG;

@@ -116,8 +119,12 @@
        }

        private Entry probeEntry(byte[] routingKey) throws IOException {
-               // TODO probe store resize
-               return probeEntry0(routingKey, storeSize);
+               Entry entry = probeEntry0(routingKey, storeSize);
+
+               if (entry == null && prevStoreSize != 0)
+                       entry = probeEntry0(routingKey, prevStoreSize);
+
+               return entry;
        }

        private Entry probeEntry0(byte[] routingKey, long probeStoreSize) 
throws IOException {
@@ -132,7 +139,6 @@
                        entry = readEntry(offset, routingKey);
                } catch (EOFException e) {
                        // may occur on resize, silent it a bit
-                       //TODO store resize
                        Logger.error(this, "EOFException on probeEntry", e);
                        return null;
                } finally {
@@ -252,7 +258,7 @@
                        System.arraycopy(header, 0, this.header, 0, 
headerBlockLength);
                        this.data = new byte[dataBlockLength];
                        System.arraycopy(data, 0, this.data, 0, 
dataBlockLength);
-                       
+
                        if (OPTION_SAVE_PLAINKEY) {
                                flag |= ENTRY_FLAG_PLAINKEY;
                        }
@@ -260,6 +266,21 @@
                        isEncrypted = false;
                }

+               /**
+                * @return the storeSize
+                */
+               protected long getStoreSize() {
+                       return storeSize;
+               }
+
+               /**
+                * @param storeSize
+                *            the storeSize to set
+                */
+               protected void setStoreSize(long storeSize) {
+                       this.storeSize = storeSize;
+               }
+
                public Entry(ByteBuffer in) {
                        assert in.remaining() == entryTotalLength;

@@ -276,7 +297,7 @@
                                plainRoutingKey = new byte[0x20];
                                in.get(plainRoutingKey);
                        }
-                       
+
                        // reserved bytes
                        in.position((int) ENTRY_HEADER_LENGTH);

@@ -300,10 +321,10 @@
                        out.putLong(flag);
                        out.putLong(storeSize);

-                       if (OPTION_SAVE_PLAINKEY) {
+                       if (OPTION_SAVE_PLAINKEY && plainRoutingKey != null) {
                                out.put(plainRoutingKey);
                        }
-                       
+
                        // reserved bytes
                        out.position((int) ENTRY_HEADER_LENGTH);

@@ -356,7 +377,7 @@
                                else
                                        return false;
                        }
-                       
+
                        if (plainRoutingKey != null) {
                                // we knew the key
                                if (!Arrays.equals(this.plainRoutingKey, 
routingKey)) {
@@ -416,6 +437,10 @@
                                throw new RuntimeException(e);
                        }
                }
+
+               public boolean isFree() {
+                       return (flag & ENTRY_FLAG_OCCUPIED) == 0;
+               }
        }

        /**
@@ -435,13 +460,13 @@
                        storeFiles[i] = new File(baseDir, name + ".data-" + 
fmt.format(i));

                        storeRAF[i] = new RandomAccessFile(storeFiles[i], "rw");
-                       //TODO store resize
-                       if (storeRAF[i].length() == 0) { // New file?
-                               storeRAF[i].setLength(entryTotalLength * 
(storeSize / FILE_SPLIT + 1));
-                       }
+
                        storeFC[i] = storeRAF[i].getChannel();
                        storeFC[i].lock();
                }
+
+               long storeFileSize = Math.max(storeSize, prevStoreSize);
+               setStoreFileSize(storeFileSize);
        }

        /**
@@ -637,6 +662,11 @@
        private Cleaner cleanerThread;

        private class Cleaner extends Thread {
+               /**
+                * How often the clean should run
+                */
+               private static final int CLEANER_PERIOD = 10 * 60 * 1000; // 10 
minutes
+
                public Cleaner() {
                        setName("Store-" + name + "-Cleaner");
                        setPriority(MIN_PRIORITY);
@@ -645,14 +675,15 @@

                public void run() {
                        while (!shutdown) {
-                               if (prevStoreSize != 0)
-                                       moveOldEntries();
-                               else
-                                       estimateStoreSize();
+                               synchronized (cleanerLock) {
+                                       if (prevStoreSize != 0)
+                                               resizeStore();
+                                       else
+                                               estimateStoreSize();

-                               synchronized (cleanerLock) {
+                                       cleanerLock.notifyAll();
                                        try {
-                                               cleanerLock.wait(10 * 60 * 
1000); // 10 minutes
+                                               
cleanerLock.wait(CLEANER_PERIOD);
                                        } catch (InterruptedException e) {
                                                Logger.debug(this, 
"interrupted", e);
                                        }
@@ -661,19 +692,227 @@
                }

                /**
-                * Move old entries to new location
+                * Maximum memory to be used on resize
                 */
-               private void moveOldEntries() {
-                       Logger.minor(this, "move old entries");
-                       prevStoreSize = 0;
+               private static final int RESIZE_MEMORY = 2 * 1024 * 1024; // 
2MiB
+               /**
+                * Phase 1 Rounds
+                */
+               private static final int RESIZE_PHASE1_ROUND = 12;
+               /**
+                * Maximum resize round
+                */
+               private static final int RESIZE_MAX_ROUND = 16;
+
+               /**
+                * Are we shrinking the store?
+                */
+               private boolean shrinking;
+               private long newEntries;
+               private long oldEntries;
+               private long freeEntries;
+               private long resolvedEntries;
+               private long droppedEntries;
+               private long maxOldItemOffset;
+
+               /**
+                * Numbers of round resize have ran
+                */
+               private int resizeRound;
+
+               /**
+                * Move old entries to new location and resize store
+                */
+               private void resizeStore() {
+                       ++resizeRound;
+                       Logger.normal(this, "Starting datastore resize (round " 
+ resizeRound + ")");
+
+                       if (resizeRound == 1) { // first round
+                               if (storeSize < prevStoreSize) {
+                                       shrinking = true;
+                               } else {
+                                       setStoreFileSize(storeSize);
+                               }
+                               maxOldItemOffset = prevStoreSize - 1;
+                       }
+
+                       moveOldEntry0(resizeRound > RESIZE_PHASE1_ROUND);
+
+                       if (logMINOR)
+                               Logger.minor(this, "Finished resize round " + 
resizeRound + ": newEntries=" + newEntries
+                                       + ", oldEntries=" + oldEntries + ", 
freeEntries=" + freeEntries + ", resolvedEntries="
+                                       + resolvedEntries + ", droppedEntries=" 
+ droppedEntries);
+
+                       if (shutdown)
+                               return;
+
+                       // report key count
+                       estimatedCount.report(newEntries + oldEntries - 
droppedEntries);
+
+                       // Shrink store file size
+                       if (shrinking)
+                               setStoreFileSize(Math.max(storeSize, 
maxOldItemOffset));
+
+                       // Check finished
+                       if (resizeRound >= RESIZE_MAX_ROUND || oldEntries == 0 
|| resolvedEntries + droppedEntries >= oldEntries) {
+                               // Finished
+                               Logger.normal(this, "Datastore resize finished 
(total " + resizeRound + "rounds)");
+
+                               prevStoreSize = 0;
+                               resizeRound = 0;
+                       }
                }

                /**
-                * Sample to take at a time
+                * Scan all entries and try to move them
                 */
-               private static final double SAMPLE_RATE = 0.05; // 5%
+               private void moveOldEntry0(boolean queueItem) {
+                       newEntries = 0;
+                       oldEntries = 0;
+                       freeEntries = 0;
+                       resolvedEntries = 0;
+                       droppedEntries = 0;

+                       List oldItems = null;
+                       if (queueItem) {
+                               oldItems = new ArrayList();
+                       }
+
+                       long maxOffset = maxOldItemOffset;
+                       maxOldItemOffset = 0;
+                       for (long offset = 0; offset <= maxOffset; offset++) {
+                               if (logDEBUG && offset % 1024 == 0) {
+                                       Logger.debug(this, "Resize progress: 
newEntries=" + newEntries + ", oldEntries=" + oldEntries
+                                               + ", freeEntries=" + 
freeEntries + ", resolvedEntries=" + resolvedEntries
+                                               + ", droppedEntries=" + 
droppedEntries);
+                               }
+
+                               if (shutdown)
+                                       return;
+
+                               if (!lockEntry(offset)) //lock 
+                                       continue;
+                               try {
+                                       Entry entry = readEntry(offset, null);
+
+                                       if (entry.isFree()) {
+                                               // free block
+                                               freeEntries++;
+                                       } else if (entry.getStoreSize() == 
storeSize) {
+                                               // new store size entries
+                                               maxOldItemOffset = offset;
+                                               newEntries++;
+                                       } else { // if (entry.getStoreSize() == 
prevStoreSize)
+                                               // old store size entries, try 
to move them
+                                               oldEntries++;
+                                               maxOldItemOffset = offset;
+
+                                               entry.setStoreSize(storeSize);
+                                               long newOffset = 
entry.getOffset();
+
+                                               if (newOffset == offset) { // 
lucky! 
+                                                       writeEntry(entry); // 
write back entry storeSize
+                                                       resolvedEntries++;
+                                                       continue;
+                                               }
+
+                                               if (!lockEntry(newOffset)) // 
lock
+                                                       continue;
+                                               try {
+                                                       // see what's in the 
new offset
+                                                       Entry newOffsetEntry = 
readEntry(newOffset, null);
+
+                                                       if 
(newOffsetEntry.isFree()) {
+                                                               // the new 
offset is freeeeeeee..
+                                                               
writeEntry(entry);
+                                                               
freeOffset(offset);
+                                                               
resolvedEntries++;
+                                                       } else if 
(newOffsetEntry.getStoreSize() == storeSize) {
+                                                               // new offset 
already have a new entry, free old entry
+                                                               
freeOffset(offset);
+                                                               
droppedEntries++;
+                                                       } else if 
(Arrays.equals(entry.digestedRoutingKey, newOffsetEntry.digestedRoutingKey)) {
+                                                               // same 
digested routing key, free the old entry
+                                                               
freeOffset(offset);
+                                                               
resolvedEntries++;
+                                                       } else if (queueItem) {
+                                                               // break tie by 
moveing old item to queue
+                                                               if 
(oldItems.size() * entryTotalLength < RESIZE_MEMORY) {
+                                                                       
oldItems.add(newOffsetEntry);
+                                                                       if 
(newOffset > offset) {
+                                                                               
oldEntries++; // newOffset wasn't counted count it
+                                                                       }
+
+                                                                       
writeEntry(entry);
+                                                                       
freeOffset(offset);
+                                                                       
resolvedEntries++;
+                                                               }
+                                                       }
+                                               } finally {
+                                                       unlockEntry(newOffset);
+                                               }
+                                       }
+                               } catch (IOException e) {
+                                       Logger.debug(this, "IOExcception on 
moveOldEntries0", e);
+                               } finally {
+                                       unlockEntry(offset);
+                               }
+                       }
+
+                       if (queueItem) {
+                               putBackOldItems(oldItems);
+                       }
+               }
+
                /**
+                * Put back oldItems with best effort
+                */
+               private void putBackOldItems(List oldItems) {
+                       Iterator it = oldItems.iterator();
+                       while (it.hasNext()) {
+                               boolean done = false;
+
+                               Entry entry = (Entry) it.next();
+                               entry.setStoreSize(storeSize);
+
+                               long newOffset = entry.getOffset();
+
+                               if (!lockEntry(newOffset)) // lock
+                                       continue;
+                               try {
+                                       if (isFree(newOffset)) {
+                                               if (logDEBUG)
+                                                       Logger.debug(this, "Put 
back old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
+                                               writeEntry(entry);
+                                               done = true;
+                                       } else {
+                                               if (logDEBUG)
+                                                       Logger.debug(this, 
"Drop old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
+                                       }
+                               } catch (IOException e) {
+                                       Logger.debug(this, "IOExcception on 
putBackOldItems", e);
+                               } finally {
+                                       unlockEntry(newOffset);
+                                       it.remove();
+
+                                       if (done)
+                                               resolvedEntries++;
+                                       else
+                                               droppedEntries++;
+                               }
+                       }
+                       oldItems.clear();
+               }
+
+               /**
+                * Samples to take on key count estimation
+                */
+               private static final double SAMPLE_RATE = 0.05; // 5%
+               /**
+                * Minimum samples to take on key count estimation
+                */
+               private static final int MIN_SAMPLE = 10000;
+               /**
                 * Last sample position
                 */
                private long samplePos = 0;
@@ -683,7 +922,7 @@
                 */
                private void estimateStoreSize() {
                        Logger.minor(this, "start estimating key count");
-                       long numSample = Math.min((long) (SAMPLE_RATE * 
storeSize), 10000);
+                       long numSample = Math.min((long) (SAMPLE_RATE * 
storeSize), MIN_SAMPLE);
                        long sampled = 0;
                        long occupied = 0;
                        while (sampled < numSample) {
@@ -712,9 +951,32 @@
                }
        }

-       public void setMaxKeys(long maxStoreKeys, boolean shrinkNow) throws 
IOException {
-               // TODO
-               // NO-OP now
+       public void setMaxKeys(long newStoreSize, boolean shrinkNow) throws 
IOException {
+               Logger.normal(this, "[" + name + "] Resize newStoreSize=" + 
newStoreSize + ", shinkNow=" + shrinkNow);
+
+               assert newStoreSize > 0;
+
+               synchronized (cleanerLock) {
+                       if (newStoreSize == this.storeSize)
+                               return;
+
+                       if (prevStoreSize != 0) {
+                               if (shrinkNow) {
+                                       // TODO shrink now
+                               } else {
+                                       Logger.normal(this, "[" + name + "] 
resize already in progress, ignore resize request");
+                                       return;
+                               }
+                       }
+
+                       prevStoreSize = storeSize;
+                       storeSize = newStoreSize;
+                       cleanerLock.notifyAll();
+
+                       if (shrinkNow) {
+                               // TODO shrink now
+                       }
+               }
        }

        // ------------- Locking
@@ -941,4 +1203,19 @@
        public long getMaxKeys() {
                return storeSize;
        }
+
+       /**
+        * Change on disk store file size
+        * 
+        * @param storeFileSize
+        */
+       private void setStoreFileSize(long storeFileSize) {
+               for (int i = 0; i < FILE_SPLIT; i++) {
+                       try {
+                               storeRAF[i].setLength(entryTotalLength * 
(storeFileSize / FILE_SPLIT + 1));
+                       } catch (IOException e) {
+                               Logger.error(this, "error resizing store file", 
e);
+                       }
+               }
+       }
 }


Reply via email to