Author: j16sdiz
Date: 2008-06-05 15:32:27 +0000 (Thu, 05 Jun 2008)
New Revision: 20221

Modified:
   
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
newer and better resize scheme:-
 - does not deadlock
 - read entries in batch


Modified: 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
--- 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java  
    2008-06-05 15:32:00 UTC (rev 20220)
+++ 
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java  
    2008-06-05 15:32:27 UTC (rev 20221)
@@ -13,6 +13,9 @@
 import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -686,7 +689,22 @@

                return ((bf.getLong(0) & ENTRY_FLAG_OCCUPIED) == 0);
        }
+       
+       private byte[] getDigestedKeyFromOffset(long offset) throws IOException 
{
+               int split = (int) (offset % FILE_SPLIT);
+               long rawOffset = (offset / FILE_SPLIT) * entryTotalLength;

+               ByteBuffer bf = ByteBuffer.wrap(new byte[0x20]);
+
+               do {
+                       int status = storeFC[split].read(bf, rawOffset + 
bf.position());
+                       if (status == -1)
+                               throw new EOFException();
+               } while (bf.hasRemaining());
+
+               return bf.array();
+       }
+
        private void flushAndClose() {
                for (int i = 0; i < FILE_SPLIT; i++) {
                        try {
@@ -800,18 +818,32 @@
                        setDaemon(true);
                }

-        @Override
-        public void run() {
+               @Override
+               public void run() {
                        while (!shutdown) {
                                synchronized (cleanerLock) {
+                                       boolean resizeFinished = false;
+
                                        configLock.readLock().lock();
                                        try {
                                                if (prevStoreSize != 0)
-                                                       resizeStore();
+                                                       resizeFinished = 
resizeStore();
                                        } finally {
                                                configLock.readLock().unlock();
                                        }

+                                       if (resizeFinished) {
+                                               configLock.writeLock().lock();
+                                               try {
+                                                       prevStoreSize = 0;
+                                                       writeConfigFile();
+                                               } catch (IOException ioe) {
+                                                       Logger.error(this, 
"can't write store config file", ioe);
+                                               } finally {
+                                                       
configLock.writeLock().unlock();
+                                               }
+                                       }
+
                                        cleanerLock.notifyAll();
                                        try {
                                                
cleanerLock.wait(CLEANER_PERIOD);
@@ -822,329 +854,278 @@
                        }
                }

-               /**
-                * Phase 1 Rounds
-                */
-               private static final int RESIZE_PHASE1_ROUND = 8;
-               /**
-                * Maximum resize round
-                */
-               private static final int RESIZE_MAX_ROUND = 16;
+               private static final int RESIZE_MEMORY_ENTRIES = 256; // 
temporary memory store size (in # of entries)
+               private static final int RESIZE_DISK_ENTRIES = 8192; // 
temporary disk store size (in # of entries)

                /**
-                * Are we shrinking the store?
-                */
-               private boolean shrinking;
-               private long newEntries;
-               private long oldEntries;
-               private long freeEntries;
-               private long resolvedEntries;
-               private long droppedEntries;
-               private long maxOldItemOffset;
-
-               /**
-                * Numbers of round resize have ran
-                */
-               private int resizeRound;
-
-               /**
                 * Move old entries to new location and resize store
                 */
-               private void resizeStore() {
-                       ++resizeRound;
-                       Logger.normal(this, "Starting datastore resize (round " 
+ resizeRound + ")");
+               private boolean resizeStore() {
+                       Logger.normal(this, "Starting datastore resize");
+                       long startTime = System.currentTimeMillis();

-                       if (resizeRound == 1) { // first round
-                               if (storeSize < prevStoreSize) {
-                                       shrinking = true;
-                               } else {
-                                       setStoreFileSize(storeSize);
-                               }
-                               maxOldItemOffset = prevStoreSize - 1;
-                       }
+                       if (storeSize > prevStoreSize)
+                               setStoreFileSize(storeSize);

-                       boolean needQueue = false;
-                       if (resizeRound > RESIZE_PHASE1_ROUND) // too many 
rounds
-                               needQueue = true;
-                       if (resizeRound > 1 && droppedEntries == 0 && 
resolvedEntries == 0) // no progress
-                               needQueue = true;
-                       moveOldEntry0(needQueue);
+                       initOldEntriesFile();

-                       if (logMINOR)
-                               Logger.minor(this, "Finished resize round " + 
resizeRound + ": newEntries=" + newEntries
-                                       + ", oldEntries=" + oldEntries + ", 
freeEntries=" + freeEntries + ", resolvedEntries="
-                                       + resolvedEntries + ", droppedEntries=" 
+ droppedEntries);
+                       List<Entry> oldEntryList = new LinkedList<Entry>();

-                       if (shutdown)
-                               return;
+                       // start from end of store, make store shrinking 
quicker 
+                       long startOffset = (prevStoreSize / 
RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
+                       for (long curOffset = startOffset; curOffset >= 0; 
curOffset -= RESIZE_MEMORY_ENTRIES) {
+                               if (shutdown)
+                                       return false;

-                       // Shrink store file size
-                       if (shrinking)
-                               setStoreFileSize(Math.max(storeSize, 
maxOldItemOffset + 1));
+                               batchReadEntries(curOffset, 
RESIZE_MEMORY_ENTRIES, oldEntryList);

-                       // Check finished
-                       if (resizeRound >= RESIZE_MAX_ROUND || oldEntries == 0 
|| resolvedEntries + droppedEntries >= oldEntries) {
-                               // Finished
-                               Logger.normal(this, "Datastore resize finished 
(total " + resizeRound + "rounds)");
+                               if (storeSize < prevStoreSize)
+                                       setStoreFileSize(Math.max(storeSize, 
curOffset));

-                               prevStoreSize = 0;
-                               resizeRound = 0;
+                               // try to resolve the list
+                               ListIterator<Entry> it = 
oldEntryList.listIterator();
+                               while (it.hasNext()) {
+                                       if (resolveOldEntry(it.next()))
+                                               it.remove();
+                               }
+
+                               // write unresolved entry to file
+                               it = oldEntryList.listIterator();
+                               while (it.hasNext())
+                                       rrWriteOldEntry(it.next());
+                               
+                               long processed = prevStoreSize - curOffset;
+                               Logger.normal(this, "Store resize " + callback 
+ ": " + processed + "/" + prevStoreSize);
                        }
+
+                       resolveOldEntriesFile();
+
+                       long endTime = System.currentTimeMillis();
+                       Logger.normal(this, "Finish resizing " + callback + " 
in " + (endTime - startTime) / 1000 + "s");
+                       
+                       return true;
                }

                /**
-                * Scan all entries and try to move them
+                * Read a list of old items from store, the original offset 
will be marked as free. To save
+                * some system calls, items are locked and read in batch.
+                * 
+                * @param offset
+                *            start offset, must be multiple of {@link 
FILE_SPLIT}
+                * @param length
+                *            number of items to read, must be multiple of 
{@link FILE_SPLIT}. If this
+                *            excess store size, read as much as possible.
+                * @param items
+                *            a list of items
+                * @return <code>true</code> if operation complete 
successfully; <code>false</code>
+                *         otherwise (e.g. can't acquire locks, node shutting 
down)
                 */
-               private void moveOldEntry0(boolean queueItem) {
-                       newEntries = 0;
-                       oldEntries = 0;
-                       freeEntries = 0;
-                       resolvedEntries = 0;
-                       droppedEntries = 0;
+               private boolean batchReadEntries(long offset, int length, 
List<Entry> items) {
+                       assert offset % FILE_SPLIT == 0;
+                       assert length % FILE_SPLIT == 0;

-                       File oldItemFile = null;
-                       RandomAccessFile oldItemsRAF = null;
-                       FileChannel oldItemsFC = null;
-                       if (queueItem) {
-                               try {
-                                       oldItemFile = new File(baseDir, name + 
".oldItems");
-                                       oldItemsRAF = new 
RandomAccessFile(oldItemFile, "rw");
-                                       oldItemsRAF.seek((oldItemsRAF.length() 
/ entryTotalLength) * entryTotalLength);
-                                       oldItemsFC = oldItemsRAF.getChannel();
-                               } catch (IOException e) {
+                       boolean[] locked = new boolean[length];
+                       try {
+                               // acquire all locks in the region, will unlock 
in the finally block
+                               for (int i = 0; i < length; i++) {
+                                       if (lockEntry(offset + i))
+                                               locked[i] = true;
+                                       else
+                                               return false;
                                }
-                       }

-                       long maxOffset = maxOldItemOffset;
-                       maxOldItemOffset = 0;
-                       LOOP_ENTRIES: for (long offset = 0; offset <= 
maxOffset; offset++) {
-                               if (logDEBUG && offset % 1024 == 0) {
-                                       Logger.debug(this, "Resize progress: 
newEntries=" + newEntries + ", oldEntries=" + oldEntries
-                                               + ", freeEntries=" + 
freeEntries + ", resolvedEntries=" + resolvedEntries
-                                               + ", droppedEntries=" + 
droppedEntries);
-                               }
+                               long startFileOffset = (offset / FILE_SPLIT) * 
entryTotalLength;
+                               long entriesToRead = length / FILE_SPLIT;
+                               long bufLen = entryTotalLength * entriesToRead;

-                               if (shutdown)
-                                       return;
-
-                               if (!lockEntry(offset)) //lock
-                                       continue LOOP_ENTRIES;
-                               try {
-                                       if (isFree(offset)) {
-                                               // free block
-                                               freeEntries++;
-                                               continue LOOP_ENTRIES;
-                                       }
-
-                                       if (getStoreSize(offset) == storeSize) {
-                                               // new store size entries
-                                               maxOldItemOffset = offset;
-                                               newEntries++;
-                                               continue LOOP_ENTRIES;
-                                       }
-
-                                       // if (entry.getStoreSize() == 
prevStoreSize)
-                                       // old store size entries, try to move 
them
-                                       maxOldItemOffset = offset;
-                                       oldEntries++;
-
-                                       Entry entry = readEntry(offset, null);
-                                       entry.setStoreSize(storeSize);
-                                       long[] newOffset = entry.getOffset();
-
-                                       // Check if I can keep my current offset
-                                       for (int i = 0; i < newOffset.length; 
i++) {
-                                               if (newOffset[i] == offset) { 
// lucky!
-                                                       writeEntry(entry, 
offset); // write back entry storeSize
-                                                       resolvedEntries++;
-
-                                                       if (logDEBUG)
-                                                               
Logger.debug(this, "old entry " + 
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
-                                                                       + " 
resolved without moving");
-
-                                                       continue LOOP_ENTRIES;
+                               ByteBuffer buf = ByteBuffer.allocate((int) 
bufLen);
+                               for (int i = 0; i < FILE_SPLIT; i++) { // for 
each split file
+                                       buf.clear();
+                                       try {
+                                               while (buf.hasRemaining()) {
+                                                       int status = 
storeFC[i].read(buf, startFileOffset + buf.position());
+                                                       if (status == -1)
+                                                               break;
                                                }
+                                       } catch (IOException ioe) {
+                                               if (shutdown)
+                                                       return false;
+                                               Logger.error(this, "unexpected 
IOException", ioe);
                                        }
+                                       buf.flip();

-                                       boolean[] locked = new 
boolean[newOffset.length];
-                                       try {
-                                               // Lock all possible slots first
-                                               for (int i = 0; i < 
newOffset.length; i++) {
-                                                       if 
(lockEntry(newOffset[i])) { // lock
-                                                               locked[i] = 
true;
-                                                       } else if (shutdown) { 
// oops
-                                                               return;
-                                                       }
-                                               }
+                                       for (int j = 0; buf.remaining() >= 
entryTotalLength; j++) {
+                                               if (shutdown)
+                                                       return false;

-                                               // Probe for a free slot
-                                               for (int i = 0; i < 
newOffset.length; i++) {
-                                                       // see what's in the 
new offset
-                                                       Entry newOffsetEntry = 
readEntry(newOffset[i], null);
+                                               ByteBuffer enBuf = buf.slice();
+                                               buf.position(buf.position() + 
(int) entryTotalLength);

-                                                       // Free slot
-                                                       if 
(newOffsetEntry.isFree()) {
-                                                               // the new 
offset is freeeeeeee..
-                                                               
writeEntry(entry, newOffset[i]);
-                                                               
freeOffset(offset);
-                                                               
resolvedEntries++;
+                                               enBuf.limit((int) 
entryTotalLength);

-                                                               if (logDEBUG)
-                                                                       
Logger.debug(this, "old entry " + 
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
-                                                                               
+ " resolved by moving to free block");
+                                               Entry entry = new Entry(enBuf);
+                                               entry.curOffset = offset + j * 
FILE_SPLIT + i;

-                                                               continue 
LOOP_ENTRIES;
-                                                       }
+                                               if (!entry.isFree() && 
entry.storeSize != storeSize) {
+                                                       // old entry
+                                                       items.add(entry);

-                                                       // Same digested key: 
same routing key or SHA-256 collision
-                                                       byte[] 
digestedRoutingKey = entry.getDigestedRoutingKey();
-                                                       byte[] 
digestedRoutingKey2 = newOffsetEntry.getDigestedRoutingKey();
-                                                       if 
(Arrays.equals(digestedRoutingKey, digestedRoutingKey2)) {
-                                                               // assume same 
routing key, drop this as duplicate
-                                                               
freeOffset(offset);
+                                                       try {
+                                                               
freeOffset(entry.curOffset);
                                                                
keyCount.decrementAndGet();
-                                                               
droppedEntries++;
-
-                                                               if (logDEBUG)
-                                                                       
Logger.debug(this, "old entry " + 
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
-                                                                               
+ " dropped duplicate");
-
-                                                               continue 
LOOP_ENTRIES;
+                                                       } catch (IOException 
ioe) {
+                                                               
Logger.error(this, "error freeing entry " + entry.curOffset + ", node shutting 
down?");
                                                        }
                                                }

-                                               if (queueItem && 
oldItemsFC.position() < 0x10000000) { // Limit to 256MiB
-                                                       if (logDEBUG)
-                                                               
Logger.debug(this, "old entry " + 
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
-                                                                       + " 
queued");
-                                                       
writeOldItem(oldItemsFC, entry);
-                                                       freeOffset(offset);
-                                                       
keyCount.decrementAndGet();
-                                               }
-                                       } finally {
-                                               // unlock all entries
-                                               for (int i = 0; i < 
newOffset.length; i++) {
-                                                       if (locked[i]) {
-                                                               
unlockEntry(newOffset[i]);
-                                                       }
-                                               }
+                                               // unlock current entry
+                                               unlockEntry(entry.curOffset);
+                                               locked[(int) (entry.curOffset - 
offset)] = false;
                                        }
-                               } catch (IOException e) {
-                                       Logger.debug(this, "IOExcception on 
moveOldEntries0", e);
-                               } finally {
-                                       unlockEntry(offset);
                                }
-                       }

-                       if (queueItem) {
-                               try {
-                                       oldItemsRAF.seek(0);
-                                       putBackOldItems(oldItemsFC);
-                               } catch (IOException e) {
-                               } finally {
-                                       try {
-                                               oldItemsRAF.close();
-                                               oldItemFile.delete();
-                                       } catch (IOException e2) {
-                                       }
-                               }
+                               return true;
+                       } finally {
+                               // unlock
+                               for (int i = 0; i < length; i++)
+                                       if (locked[i])
+                                               unlockEntry(offset + i);
                        }
                }

                /**
-                * Put back oldItems with best effort
-                *
-                * @throws IOException
+                * Put back an old entry to store file
+                * 
+                * @param entry
+                * @return <code>true</code> if the entry have put back 
successfully.
                 */
-               private void putBackOldItems(FileChannel oldItems) throws 
IOException {
-                       LOOP_ITEMS: while (true) {
-                               Entry entry = readOldItem(oldItems);
-                               if (entry == null)
-                                       break;
-
+               private boolean resolveOldEntry(Entry entry) {
+                       if (!lockDigestedKey(entry.getDigestedRoutingKey(), 
false))
+                               return false;
+                       try {
                                entry.setStoreSize(storeSize);
+                               long[] offsets = entry.getOffset();

-                               long[] newOffset = entry.getOffset();
+                               // Check for occupied entry with same key
+                               for (long offset : offsets) {
+                                       try {
+                                               if (!isFree(offset)
+                                                               && 
Arrays.equals(getDigestedKeyFromOffset(offset), entry.getDigestedRoutingKey())) 
{
+                                                       writeEntry(entry, 
offset);      // overwrite, don't update key count
+                                                       return true;
+                                               }
+                                       } catch (IOException e) {
+                                               Logger.debug(this, 
"IOExcception on resolveOldEntry", e);
+                                       }
+                               }

-                               for (int i = 0; i < newOffset.length; i++) {
-                                       if (!lockEntry(newOffset[i])) // lock
-                                               continue;
+                               // Check for free entry
+                               for (long offset : offsets) {
                                        try {
-                                               if (isFree(newOffset[i])) {
-                                                       if (logDEBUG)
-                                                               Logger
-                                                                       
.debug(this, "Put back old item: "
-                                                                               
+ HexUtil.bytesToHex(entry.digestedRoutingKey));
-                                                       writeEntry(entry, 
newOffset[i]);
+                                               if (isFree(offset)) {
+                                                       writeEntry(entry, 
offset);
                                                        
keyCount.incrementAndGet();
-                                                       resolvedEntries++;
-                                                       continue LOOP_ITEMS;
+                                                       return true;
                                                }
                                        } catch (IOException e) {
-                                               Logger.debug(this, 
"IOExcception on putBackOldItems", e);
-                                       } finally {
-                                               unlockEntry(newOffset[i]);
+                                               Logger.debug(this, 
"IOExcception on resolveOldEntry", e);
                                        }
                                }
+                               return false;
+                       } finally {
+                               
unlockDigestedKey(entry.getDigestedRoutingKey(), false);
+                       }
+               }

-                               if (logDEBUG)
-                                       Logger.debug(this, "Drop old item: " + 
HexUtil.bytesToHex(entry.digestedRoutingKey));
+               private File oldEntriesFile; // round-ribbon
+               private RandomAccessFile oldEntriesRAF;
+               private long oldEntriesFileOffset;

-                               droppedEntries++;
+               private void initOldEntriesFile() {
+                       try {
+                               oldEntriesFile = new File(baseDir, name + 
".oldEntries");
+                               oldEntriesRAF = new 
RandomAccessFile(oldEntriesFile, "rw");
+                               oldEntriesRAF.setLength(RESIZE_DISK_ENTRIES * 
entryTotalLength);
+                               oldEntriesFileOffset = 0;
+                       } catch (IOException ioe) {
+                               Logger.error(this, "Cannot create oldEntries 
file for resize, will use memory only", ioe);
                        }
                }

-               private void writeOldItem(FileChannel fc, Entry e) throws 
IOException {
-                       ByteBuffer bf = e.toByteBuffer();
-                       do {
-                               fc.write(bf);
-                       } while (bf.hasRemaining());
+               private void resolveOldEntriesFile() {
+                       if (oldEntriesRAF == null)
+                               return;
+
+                       for (int offset = 0; offset < RESIZE_DISK_ENTRIES; 
offset++) {
+                               Entry oldEntry = readOldEntry(offset);
+                               if (oldEntry != null && !oldEntry.isFree()) // 
the current position already in use
+                                       resolveOldEntry(oldEntry);
+                       }
+                       try {
+                               oldEntriesRAF.close();
+                       } catch (IOException ioe) {
+                               // ignore
+                       }
+                       oldEntriesFile.delete();
                }

-               private Entry readOldItem(FileChannel fc) throws IOException {
-                       ByteBuffer bf = ByteBuffer.allocate((int) 
entryTotalLength);
-                       do {
-                               int status = fc.read(bf);
-                               if (status == -1)
-                                       return null;
-                       } while (bf.hasRemaining());
-                       bf.flip();
-                       return new Entry(bf);
+               private void rrWriteOldEntry(Entry entry) {
+                       if (oldEntriesRAF == null)
+                               return;
+
+                       long offset = oldEntriesFileOffset++ % 
RESIZE_DISK_ENTRIES;
+                       Entry rrOldEntry = readOldEntry(offset);
+                       if (rrOldEntry != null && !rrOldEntry.isFree()) // the 
current position already in use
+                               resolveOldEntry(rrOldEntry);
+
+                       byte[] buf = new byte[(int) entryTotalLength];
+                       entry.toByteBuffer().get(buf);
+                       try {
+                               oldEntriesRAF.seek(offset * entryTotalLength);
+                               oldEntriesRAF.write(buf);
+                       } catch (IOException e) {
+                               Logger.debug(this, "IOException on 
rrWriteOldEntry", e);
+                       }
                }
+
+               private Entry readOldEntry(long offset) {
+                       if (oldEntriesRAF == null)
+                               return null;
+
+                       try { 
+                               byte[] buf = new byte[(int) entryTotalLength];
+                               oldEntriesRAF.seek(offset * entryTotalLength);
+                               oldEntriesRAF.readFully(buf);
+
+                               return new Entry(ByteBuffer.wrap(buf));
+                       } catch (IOException e) {
+                               Logger.debug(this, "IOException on 
readOldEntry", e);
+                               return null;
+                       }
+               }
        }

        public void setMaxKeys(long newStoreSize, boolean shrinkNow) throws 
IOException {
                Logger.normal(this, "[" + name + "] Resize newStoreSize=" + 
newStoreSize + ", shinkNow=" + shrinkNow);

-               assert newStoreSize > 0;
-               // TODO assert newStoreSize > (141 * (3 * 3) + 13 * 3) * 2; // 
store size too small
+               synchronized (cleanerLock) {
+                       configLock.writeLock().lock();
+                       try {
+                               if (newStoreSize == this.storeSize)
+                                       return;

-               configLock.writeLock().lock();
-               try {
-                       if (newStoreSize == this.storeSize)
-                               return;
-
-                       if (prevStoreSize != 0) {
-                               if (shrinkNow) {
-                                       // TODO shrink now
-                               } else {
+                               if (prevStoreSize != 0) {
                                        Logger.normal(this, "[" + name + "] 
resize already in progress, ignore resize request");
                                        return;
                                }
-                       }

-                       prevStoreSize = storeSize;
-                       storeSize = newStoreSize;
-                       writeConfigFile();
-                       synchronized (cleanerLock) {
-                               cleanerLock.notifyAll();
+                               prevStoreSize = storeSize;
+                               storeSize = newStoreSize;
+                               writeConfigFile();
+                       } finally {
+                               configLock.writeLock().unlock();
                        }
-
-                       if (shrinkNow) {
-                               // TODO shrink now
-                       }
-               } finally {
-                       configLock.writeLock().unlock();
+                       // don't notify for now, or we will be held here for a 
long time
                }
        }

@@ -1156,7 +1137,7 @@

        // ------------- Locking
        private boolean shutdown = false;
-       private ReadWriteLock configLock = new ReentrantReadWriteLock();
+       private ReadWriteLock configLock = new ReentrantReadWriteLock(); 
        private Lock entryLock = new ReentrantLock();
        private Map<Long, Condition> lockMap = new HashMap<Long, Condition> ();

@@ -1443,4 +1424,4 @@

                setBloomSync(true);
        }
-}
+}
\ No newline at end of file


Reply via email to