Author: j16sdiz
Date: 2008-07-17 13:38:00 +0000 (Thu, 17 Jul 2008)
New Revision: 21167

Modified:
   
branches/saltedhashstore/freenet/src/freenet/store/saltedhash/SaltedHashFreenetStore.java
Log:
refactor BatchProcessor code, prepare for UserAlert progress report

Modified: 
branches/saltedhashstore/freenet/src/freenet/store/saltedhash/SaltedHashFreenetStore.java
===================================================================
--- 
branches/saltedhashstore/freenet/src/freenet/store/saltedhash/SaltedHashFreenetStore.java
   2008-07-17 13:37:40 UTC (rev 21166)
+++ 
branches/saltedhashstore/freenet/src/freenet/store/saltedhash/SaltedHashFreenetStore.java
   2008-07-17 13:38:00 UTC (rev 21167)
@@ -136,7 +136,7 @@
                cleanerThread = new Cleaner();
                // finish all resizing before continue
                if (prevStoreSize != 0 && cleanerGlobalLock.tryLock()) {
-                       System.out.println("Resizing datastore (" + name + "), 
see freenet-latest.log for progress.");
+                       System.out.println("Resizing datastore (" + name + ")");
                        try {
                                cleanerThread.resizeStore(prevStoreSize, false);
                        } finally {
@@ -830,6 +830,18 @@
        private final Entry NOT_MODIFIED = new Entry();

        private interface BatchProcessor {
+               // initialize
+               void init();
+
+               // call this after reading RESIZE_MEMORY_ENTRIES entries
+               // return false to abort
+               boolean batch(long entriesLeft);
+
+               // call this on abort (e.g. node shutdown)
+               void abort();
+
+               void finish();
+               
                // return <code>null</code> to free the entry
                // return NOT_MODIFIED to keep the old entry
                Entry process(Entry entry);
@@ -915,115 +927,112 @@
                /**
                 * Move old entries to new location and resize store
                 */
-               private void resizeStore(long _prevStoreSize, boolean sleep) {
+               private void resizeStore(final long _prevStoreSize, final 
boolean sleep) {
                        Logger.normal(this, "Starting datastore resize");
-                       long startTime = System.currentTimeMillis();

-                       if (storeSize > _prevStoreSize)
-                               setStoreFileSize(storeSize);
+                       BatchProcessor resizeProcesser = new BatchProcessor() {
+                               List<Entry> oldEntryList = new 
LinkedList<Entry>();
+                               int optimialK;

-                       int optimialK = BloomFilter.optimialK(bloomFilterSize, 
storeSize);
-                       configLock.writeLock().lock();
-                       try {
-                               generation++;
-                               bloomFilter.fork(optimialK);
-                               keyCount.set(0);
-                       } finally {
-                               configLock.writeLock().unlock();
-                       }
+                               public void init() {
+                                       if (storeSize > _prevStoreSize)
+                                               setStoreFileSize(storeSize);

-                       final List<Entry> oldEntryList = new 
LinkedList<Entry>();
+                                       optimialK = 
BloomFilter.optimialK(bloomFilterSize, storeSize);
+                                       configLock.writeLock().lock();
+                                       try {
+                                               generation++;
+                                               bloomFilter.fork(optimialK);
+                                               keyCount.set(0);
+                                       } finally {
+                                               configLock.writeLock().unlock();
+                                       }

-                       // start from end of store, make store shrinking 
quicker 
-                       long startOffset = (_prevStoreSize / 
RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
-                       int i = 0;
-                       for (long curOffset = startOffset; curOffset >= 0; 
curOffset -= RESIZE_MEMORY_ENTRIES) {
-                               if (shutdown || _prevStoreSize != 
prevStoreSize) {
-                                       bloomFilter.discard();
-                                       return;
+                                       
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 30 * 1000 + 1000);
                                }

-                               
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 30 * 1000 + 1000);
-                               batchProcessEntries(curOffset, 
RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
-                                       public Entry process(Entry entry) {
-                                               int oldGeneration = 
entry.generation;
-                                               if (oldGeneration != 
generation) {
-                                                       entry.generation = 
generation;
-                                                       
keyCount.incrementAndGet();
-                                               }
+                               public Entry process(Entry entry) {
+                                       int oldGeneration = entry.generation;
+                                       if (oldGeneration != generation) {
+                                               entry.generation = generation;
+                                               keyCount.incrementAndGet();
+                                       }

-                                               if (entry.storeSize == 
storeSize) {
-                                                       // new size, don't have 
to relocate
-                                                       if (entry.generation != 
generation) {
-                                                               // update filter
-                                                               
bloomFilter.addKey(entry.getDigestedRoutingKey());
-                                                               return entry;
-                                                       } else {
-                                                               return 
NOT_MODIFIED;
-                                                       }
+                                       if (entry.storeSize == storeSize) {
+                                               // new size, don't have to 
relocate
+                                               if (entry.generation != 
generation) {
+                                                       // update filter
+                                                       
bloomFilter.addKey(entry.getDigestedRoutingKey());
+                                                       return entry;
+                                               } else {
+                                                       return NOT_MODIFIED;
                                                }
+                                       }

-                                               // remove from store, prepare 
for relocation
-                                               if (oldGeneration == 
generation) {
-                                                       // should be impossible
-                                                       Logger.error(this, //
-                                                               "new generation 
object with wrong storeSize. DigestedRoutingKey=" //
-                                                               + 
HexUtil.bytesToHex(entry.getDigestedRoutingKey()) //
-                                                               + ", Offset=" + 
entry.curOffset);
-                                                       
bloomFilter.removeKey(entry.getDigestedRoutingKey());
-                                               }
-                                               try {
-                                                       
entry.setData(readHeader(entry.curOffset), readData(entry.curOffset));
-                                                       oldEntryList.add(entry);
-                                                       if (oldEntryList.size() 
> RESIZE_MEMORY_ENTRIES)
-                                                               
oldEntryList.remove(0);
-                                               } catch (IOException e) {
-                                                       Logger.error(this, 
"error reading entry (offset=" + entry.curOffset + ")", e);
-                                               }
-                                               return null;
+                                       // remove from store, prepare for 
relocation
+                                       if (oldGeneration == generation) {
+                                               // should be impossible
+                                               Logger.error(this, //
+                                                       "new generation object 
with wrong storeSize. DigestedRoutingKey=" //
+                                                               + 
HexUtil.bytesToHex(entry.getDigestedRoutingKey()) //
+                                                               + ", Offset=" + 
entry.curOffset);
+                                               
bloomFilter.removeKey(entry.getDigestedRoutingKey());
                                        }
-                               });
+                                       try {
+                                               
entry.setData(readHeader(entry.curOffset), readData(entry.curOffset));
+                                               oldEntryList.add(entry);
+                                               if (oldEntryList.size() > 
RESIZE_MEMORY_ENTRIES)
+                                                       oldEntryList.remove(0);
+                                       } catch (IOException e) {
+                                               Logger.error(this, "error 
reading entry (offset=" + entry.curOffset + ")", e);
+                                       }
+                                       return null;
+                               }

-                               // shrink data file to current size
-                               if (storeSize < _prevStoreSize)
-                                       setStoreFileSize(Math.max(storeSize, 
curOffset));
+                               int i = 0;
+                               public boolean batch(long entriesLeft) {
+                                       
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 30 * 1000 + 1000);

-                               // try to resolve the list
-                               ListIterator<Entry> it = 
oldEntryList.listIterator();
-                               while (it.hasNext()) {
-                                       if (resolveOldEntry(it.next()))
-                                               it.remove();
+                                       if (i++ % 16 == 0)
+                                               writeConfigFile();
+
+                                       // shrink data file to current size
+                                       if (storeSize < _prevStoreSize)
+                                               
setStoreFileSize(Math.max(storeSize, entriesLeft));
+
+                                       // try to resolve the list
+                                       ListIterator<Entry> it = 
oldEntryList.listIterator();
+                                       while (it.hasNext())
+                                               if (resolveOldEntry(it.next()))
+                                                       it.remove();
+
+                                       return _prevStoreSize == prevStoreSize;
                                }

-                               long processed = _prevStoreSize - curOffset;
-                               if (i++ % 16 == 0)
-                                       Logger.normal(this, "Store resize (" + 
name + "): " + processed + "/" + _prevStoreSize);
-
-                               try {
-                                       if (sleep)
-                                               Thread.sleep(500);
-                               } catch (InterruptedException e) {
+                               public void abort() {
                                        bloomFilter.discard();
-                                       return;
                                }
-                       }

-                       long endTime = System.currentTimeMillis();
-                       Logger.normal(this, "Finish resizing (" + name + ") in 
" + (endTime - startTime) / 1000 + "s");
+                               public void finish() {
+                                       configLock.writeLock().lock();
+                                       try {
+                                               if (_prevStoreSize != 
prevStoreSize)
+                                                       return;
+                                               bloomFilter.merge();
+                                               prevStoreSize = 0;

-                       configLock.writeLock().lock();
-                       try {
-                               if (_prevStoreSize != prevStoreSize)
-                                       return;
-                               bloomFilter.merge();
-                               prevStoreSize = 0;
+                                               flags &= ~FLAG_REBUILD_BLOOM;
+                                               checkBloom = true;
+                                               bloomFilterK = optimialK;
+                                       } finally {
+                                               configLock.writeLock().unlock();
+                                       }

-                               flags &= ~FLAG_REBUILD_BLOOM;
-                               checkBloom = true;
-                               bloomFilterK = optimialK;
-                       } finally {
-                               configLock.writeLock().unlock();
-                       }
+                                       Logger.normal(this, "Finish resizing (" 
+ name + ")");
+                               }
+                       };
+
+                       batchProcessEntries(resizeProcesser, _prevStoreSize, 
true, sleep);
                }

                /**
@@ -1032,66 +1041,115 @@
                private void rebuildBloom(boolean sleep) {
                        if (bloomFilter == null)
                                return;
-
                        Logger.normal(this, "Start rebuilding bloom filter (" + 
name + ")");
-                       long startTime = System.currentTimeMillis();
-                       int optimialK = BloomFilter.optimialK(bloomFilterSize, 
storeSize);

-                       configLock.writeLock().lock();
-                       try {
-                               generation++;
-                               bloomFilter.fork(bloomFilterK);
-                               keyCount.set(0);
-                       } finally {
-                               configLock.writeLock().unlock();
-                       }
+                       BatchProcessor rebuildBloomProcessor = new 
BatchProcessor() {
+                               int optimialK;

-                       int i = 0;
-                       for (long curOffset = 0; curOffset < storeSize; 
curOffset += RESIZE_MEMORY_ENTRIES) {
-                               if (shutdown || prevStoreSize != 0) {
-                                       bloomFilter.discard();
-                                       return;
+                               public void init() {
+                                       optimialK = 
BloomFilter.optimialK(bloomFilterSize, storeSize);
+
+                                       configLock.writeLock().lock();
+                                       try {
+                                               generation++;
+                                               bloomFilter.fork(bloomFilterK);
+                                               keyCount.set(0);
+                                       } finally {
+                                               configLock.writeLock().unlock();
+                                       }
+
+                                       
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 5 * 1000 + 1000);
                                }
-                               
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 5 * 1000 + 1000);
-                               batchProcessEntries(curOffset, 
RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
-                                       public Entry process(Entry entry) {
-                                               if (entry.generation != 
generation) {
-                                                       
bloomFilter.addKey(entry.getDigestedRoutingKey());
-                                                       
keyCount.incrementAndGet();

-                                                       entry.generation = 
generation;
-                                                       return entry;
-                                               }
-                                               return NOT_MODIFIED;
+                               public Entry process(Entry entry) {
+                                       if (entry.generation != generation) {
+                                               
bloomFilter.addKey(entry.getDigestedRoutingKey());
+                                               keyCount.incrementAndGet();
+
+                                               entry.generation = generation;
+                                               return entry;
                                        }
-                               });
+                                       return NOT_MODIFIED;
+                               }

-                               if (i++ % 16 == 0) {
-                                       Logger.normal(this, "Rebuilding bloom 
filter (" + name + "): " + curOffset + "/" + storeSize);
-                                       writeConfigFile();
+                               int i = 0;
+                               public boolean batch(long entriesLeft) {
+                                       
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 5 * 1000 + 1000);
+
+                                       if (i++ % 16 == 0)
+                                               writeConfigFile();
+                                       
+                                       return prevStoreSize == 0;
                                }

-                               try {
-                                       if (sleep)
-                                               Thread.sleep(500);
-                               } catch (InterruptedException e) {
+                               public void abort() {
                                        bloomFilter.discard();
-                                       return;
                                }
+
+                               public void finish() {
+                                       bloomFilter.merge();
+                                       configLock.writeLock().lock();
+                                       try {
+                                               flags &= ~FLAG_REBUILD_BLOOM;
+                                               checkBloom = true;
+                                               bloomFilterK = optimialK;
+                                       } finally {
+                                               configLock.writeLock().unlock();
+                                       }
+
+                                       Logger.normal(this, "Finish rebuilding 
bloom filter (" + name + ")");
+                               }
+                       };
+
+                       batchProcessEntries(rebuildBloomProcessor, storeSize, 
false, sleep);
+               }
+
+               private volatile long entriesLeft;
+               private volatile long entriesTotal;
+               
+               private void batchProcessEntries(BatchProcessor processor, long 
storeSize, boolean reverse, boolean sleep) {
+                       entriesLeft = entriesTotal = storeSize;
+                       
+                       long startOffset, step;
+                       if (!reverse) {
+                               startOffset = 0;
+                               step = RESIZE_MEMORY_ENTRIES;
+                       } else {
+                               startOffset = ((storeSize - 1) / 
RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
+                               step = -RESIZE_MEMORY_ENTRIES;
                        }

-                       bloomFilter.merge();
-                       long endTime = System.currentTimeMillis();
-                       Logger.normal(this, "Finish rebuilding bloom filter (" 
+ name + ") in " + (endTime - startTime) / 1000
-                               + "s");
+                       int i = 0;
+                       processor.init();
+                       try {
+                               for (long curOffset = startOffset; curOffset >= 
0 && curOffset < storeSize; curOffset += step) {
+                                       if (shutdown) {
+                                               processor.abort();
+                                               return;
+                                       }
+                                       
+                                       if (i++ % 64 == 0)
+                                               System.err.println(name + " 
cleaner in progress: " + (entriesTotal - entriesLeft) + "/"
+                                                       + entriesTotal);
+                                               
+                                       batchProcessEntries(curOffset, 
RESIZE_MEMORY_ENTRIES, processor);
+                                       entriesLeft = reverse ? curOffset : 
Math.max(storeSize - curOffset - RESIZE_MEMORY_ENTRIES, 0);
+                                       if (!processor.batch(entriesLeft)) {
+                                               processor.abort();
+                                               return;
+                                       }

-                       configLock.writeLock().lock();
-                       try {
-                               flags &= ~FLAG_REBUILD_BLOOM;
-                               checkBloom = true;
-                               bloomFilterK = optimialK;
-                       } finally {
-                               configLock.writeLock().unlock();
+                                       try {
+                                               if (sleep) 
+                                                       Thread.sleep(500);
+                                       } catch (InterruptedException e) {
+                                               processor.abort();
+                                               return;
+                                       }
+                               }
+                               processor.finish();
+                       } catch (Exception e) {
+                               processor.abort();
                        }
                }



Reply via email to