Author: j16sdiz Date: 2008-07-17 13:38:00 +0000 (Thu, 17 Jul 2008) New Revision: 21167
Modified:
branches/saltedhashstore/freenet/src/freenet/store/saltedhash/SaltedHashFreenetStore.java
Log:
refactor BatchProcessor code, prepare for UserAlert progress report
Modified:
branches/saltedhashstore/freenet/src/freenet/store/saltedhash/SaltedHashFreenetStore.java
===================================================================
---
branches/saltedhashstore/freenet/src/freenet/store/saltedhash/SaltedHashFreenetStore.java
2008-07-17 13:37:40 UTC (rev 21166)
+++
branches/saltedhashstore/freenet/src/freenet/store/saltedhash/SaltedHashFreenetStore.java
2008-07-17 13:38:00 UTC (rev 21167)
@@ -136,7 +136,7 @@
cleanerThread = new Cleaner();
// finish all resizing before continue
if (prevStoreSize != 0 && cleanerGlobalLock.tryLock()) {
- System.out.println("Resizing datastore (" + name + "),
see freenet-latest.log for progress.");
+ System.out.println("Resizing datastore (" + name + ")");
try {
cleanerThread.resizeStore(prevStoreSize, false);
} finally {
@@ -830,6 +830,18 @@
private final Entry NOT_MODIFIED = new Entry();
private interface BatchProcessor {
+ // initialize
+ void init();
+
+ // call this after reading RESIZE_MEMORY_ENTRIES entries
+ // return false to abort
+ boolean batch(long entriesLeft);
+
+ // call this on abort (e.g. node shutdown)
+ void abort();
+
+ void finish();
+
// return <code>null</code> to free the entry
// return NOT_MODIFIED to keep the old entry
Entry process(Entry entry);
@@ -915,115 +927,112 @@
/**
* Move old entries to new location and resize store
*/
- private void resizeStore(long _prevStoreSize, boolean sleep) {
+ private void resizeStore(final long _prevStoreSize, final
boolean sleep) {
Logger.normal(this, "Starting datastore resize");
- long startTime = System.currentTimeMillis();
- if (storeSize > _prevStoreSize)
- setStoreFileSize(storeSize);
+ BatchProcessor resizeProcesser = new BatchProcessor() {
+ List<Entry> oldEntryList = new
LinkedList<Entry>();
+ int optimialK;
- int optimialK = BloomFilter.optimialK(bloomFilterSize,
storeSize);
- configLock.writeLock().lock();
- try {
- generation++;
- bloomFilter.fork(optimialK);
- keyCount.set(0);
- } finally {
- configLock.writeLock().unlock();
- }
+ public void init() {
+ if (storeSize > _prevStoreSize)
+ setStoreFileSize(storeSize);
- final List<Entry> oldEntryList = new
LinkedList<Entry>();
+ optimialK =
BloomFilter.optimialK(bloomFilterSize, storeSize);
+ configLock.writeLock().lock();
+ try {
+ generation++;
+ bloomFilter.fork(optimialK);
+ keyCount.set(0);
+ } finally {
+ configLock.writeLock().unlock();
+ }
- // start from end of store, make store shrinking
quicker
- long startOffset = (_prevStoreSize /
RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
- int i = 0;
- for (long curOffset = startOffset; curOffset >= 0;
curOffset -= RESIZE_MEMORY_ENTRIES) {
- if (shutdown || _prevStoreSize !=
prevStoreSize) {
- bloomFilter.discard();
- return;
+
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 30 * 1000 + 1000);
}
-
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 30 * 1000 + 1000);
- batchProcessEntries(curOffset,
RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
- public Entry process(Entry entry) {
- int oldGeneration =
entry.generation;
- if (oldGeneration !=
generation) {
- entry.generation =
generation;
-
keyCount.incrementAndGet();
- }
+ public Entry process(Entry entry) {
+ int oldGeneration = entry.generation;
+ if (oldGeneration != generation) {
+ entry.generation = generation;
+ keyCount.incrementAndGet();
+ }
- if (entry.storeSize ==
storeSize) {
- // new size, don't have
to relocate
- if (entry.generation !=
generation) {
- // update filter
-
bloomFilter.addKey(entry.getDigestedRoutingKey());
- return entry;
- } else {
- return
NOT_MODIFIED;
- }
+ if (entry.storeSize == storeSize) {
+ // new size, don't have to
relocate
+ if (entry.generation !=
generation) {
+ // update filter
+
bloomFilter.addKey(entry.getDigestedRoutingKey());
+ return entry;
+ } else {
+ return NOT_MODIFIED;
}
+ }
- // remove from store, prepare
for relocation
- if (oldGeneration ==
generation) {
- // should be impossible
- Logger.error(this, //
- "new generation
object with wrong storeSize. DigestedRoutingKey=" //
- +
HexUtil.bytesToHex(entry.getDigestedRoutingKey()) //
- + ", Offset=" +
entry.curOffset);
-
bloomFilter.removeKey(entry.getDigestedRoutingKey());
- }
- try {
-
entry.setData(readHeader(entry.curOffset), readData(entry.curOffset));
- oldEntryList.add(entry);
- if (oldEntryList.size()
> RESIZE_MEMORY_ENTRIES)
-
oldEntryList.remove(0);
- } catch (IOException e) {
- Logger.error(this,
"error reading entry (offset=" + entry.curOffset + ")", e);
- }
- return null;
+ // remove from store, prepare for
relocation
+ if (oldGeneration == generation) {
+ // should be impossible
+ Logger.error(this, //
+ "new generation object
with wrong storeSize. DigestedRoutingKey=" //
+ +
HexUtil.bytesToHex(entry.getDigestedRoutingKey()) //
+ + ", Offset=" +
entry.curOffset);
+
bloomFilter.removeKey(entry.getDigestedRoutingKey());
}
- });
+ try {
+
entry.setData(readHeader(entry.curOffset), readData(entry.curOffset));
+ oldEntryList.add(entry);
+ if (oldEntryList.size() >
RESIZE_MEMORY_ENTRIES)
+ oldEntryList.remove(0);
+ } catch (IOException e) {
+ Logger.error(this, "error
reading entry (offset=" + entry.curOffset + ")", e);
+ }
+ return null;
+ }
- // shrink data file to current size
- if (storeSize < _prevStoreSize)
- setStoreFileSize(Math.max(storeSize,
curOffset));
+ int i = 0;
+ public boolean batch(long entriesLeft) {
+
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 30 * 1000 + 1000);
- // try to resolve the list
- ListIterator<Entry> it =
oldEntryList.listIterator();
- while (it.hasNext()) {
- if (resolveOldEntry(it.next()))
- it.remove();
+ if (i++ % 16 == 0)
+ writeConfigFile();
+
+ // shrink data file to current size
+ if (storeSize < _prevStoreSize)
+
setStoreFileSize(Math.max(storeSize, entriesLeft));
+
+ // try to resolve the list
+ ListIterator<Entry> it =
oldEntryList.listIterator();
+ while (it.hasNext())
+ if (resolveOldEntry(it.next()))
+ it.remove();
+
+ return _prevStoreSize == prevStoreSize;
}
- long processed = _prevStoreSize - curOffset;
- if (i++ % 16 == 0)
- Logger.normal(this, "Store resize (" +
name + "): " + processed + "/" + _prevStoreSize);
-
- try {
- if (sleep)
- Thread.sleep(500);
- } catch (InterruptedException e) {
+ public void abort() {
bloomFilter.discard();
- return;
}
- }
- long endTime = System.currentTimeMillis();
- Logger.normal(this, "Finish resizing (" + name + ") in
" + (endTime - startTime) / 1000 + "s");
+ public void finish() {
+ configLock.writeLock().lock();
+ try {
+ if (_prevStoreSize !=
prevStoreSize)
+ return;
+ bloomFilter.merge();
+ prevStoreSize = 0;
- configLock.writeLock().lock();
- try {
- if (_prevStoreSize != prevStoreSize)
- return;
- bloomFilter.merge();
- prevStoreSize = 0;
+ flags &= ~FLAG_REBUILD_BLOOM;
+ checkBloom = true;
+ bloomFilterK = optimialK;
+ } finally {
+ configLock.writeLock().unlock();
+ }
- flags &= ~FLAG_REBUILD_BLOOM;
- checkBloom = true;
- bloomFilterK = optimialK;
- } finally {
- configLock.writeLock().unlock();
- }
+ Logger.normal(this, "Finish resizing ("
+ name + ")");
+ }
+ };
+
+ batchProcessEntries(resizeProcesser, _prevStoreSize,
true, sleep);
}
/**
@@ -1032,66 +1041,115 @@
private void rebuildBloom(boolean sleep) {
if (bloomFilter == null)
return;
-
Logger.normal(this, "Start rebuilding bloom filter (" +
name + ")");
- long startTime = System.currentTimeMillis();
- int optimialK = BloomFilter.optimialK(bloomFilterSize,
storeSize);
- configLock.writeLock().lock();
- try {
- generation++;
- bloomFilter.fork(bloomFilterK);
- keyCount.set(0);
- } finally {
- configLock.writeLock().unlock();
- }
+ BatchProcessor rebuildBloomProcessor = new
BatchProcessor() {
+ int optimialK;
- int i = 0;
- for (long curOffset = 0; curOffset < storeSize;
curOffset += RESIZE_MEMORY_ENTRIES) {
- if (shutdown || prevStoreSize != 0) {
- bloomFilter.discard();
- return;
+ public void init() {
+ optimialK =
BloomFilter.optimialK(bloomFilterSize, storeSize);
+
+ configLock.writeLock().lock();
+ try {
+ generation++;
+ bloomFilter.fork(bloomFilterK);
+ keyCount.set(0);
+ } finally {
+ configLock.writeLock().unlock();
+ }
+
+
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 5 * 1000 + 1000);
}
-
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 5 * 1000 + 1000);
- batchProcessEntries(curOffset,
RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
- public Entry process(Entry entry) {
- if (entry.generation !=
generation) {
-
bloomFilter.addKey(entry.getDigestedRoutingKey());
-
keyCount.incrementAndGet();
- entry.generation =
generation;
- return entry;
- }
- return NOT_MODIFIED;
+ public Entry process(Entry entry) {
+ if (entry.generation != generation) {
+
bloomFilter.addKey(entry.getDigestedRoutingKey());
+ keyCount.incrementAndGet();
+
+ entry.generation = generation;
+ return entry;
}
- });
+ return NOT_MODIFIED;
+ }
- if (i++ % 16 == 0) {
- Logger.normal(this, "Rebuilding bloom
filter (" + name + "): " + curOffset + "/" + storeSize);
- writeConfigFile();
+ int i = 0;
+ public boolean batch(long entriesLeft) {
+
WrapperManager.signalStarting(RESIZE_MEMORY_ENTRIES * 5 * 1000 + 1000);
+
+ if (i++ % 16 == 0)
+ writeConfigFile();
+
+ return prevStoreSize == 0;
}
- try {
- if (sleep)
- Thread.sleep(500);
- } catch (InterruptedException e) {
+ public void abort() {
bloomFilter.discard();
- return;
}
+
+ public void finish() {
+ bloomFilter.merge();
+ configLock.writeLock().lock();
+ try {
+ flags &= ~FLAG_REBUILD_BLOOM;
+ checkBloom = true;
+ bloomFilterK = optimialK;
+ } finally {
+ configLock.writeLock().unlock();
+ }
+
+ Logger.normal(this, "Finish rebuilding
bloom filter (" + name + ")");
+ }
+ };
+
+ batchProcessEntries(rebuildBloomProcessor, storeSize,
false, sleep);
+ }
+
+ private volatile long entriesLeft;
+ private volatile long entriesTotal;
+
+ private void batchProcessEntries(BatchProcessor processor, long
storeSize, boolean reverse, boolean sleep) {
+ entriesLeft = entriesTotal = storeSize;
+
+ long startOffset, step;
+ if (!reverse) {
+ startOffset = 0;
+ step = RESIZE_MEMORY_ENTRIES;
+ } else {
+ startOffset = ((storeSize - 1) /
RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
+ step = -RESIZE_MEMORY_ENTRIES;
}
- bloomFilter.merge();
- long endTime = System.currentTimeMillis();
- Logger.normal(this, "Finish rebuilding bloom filter ("
+ name + ") in " + (endTime - startTime) / 1000
- + "s");
+ int i = 0;
+ processor.init();
+ try {
+ for (long curOffset = startOffset; curOffset >=
0 && curOffset < storeSize; curOffset += step) {
+ if (shutdown) {
+ processor.abort();
+ return;
+ }
+
+ if (i++ % 64 == 0)
+ System.err.println(name + "
cleaner in progress: " + (entriesTotal - entriesLeft) + "/"
+ + entriesTotal);
+
+ batchProcessEntries(curOffset,
RESIZE_MEMORY_ENTRIES, processor);
+ entriesLeft = reverse ? curOffset :
Math.max(storeSize - curOffset - RESIZE_MEMORY_ENTRIES, 0);
+ if (!processor.batch(entriesLeft)) {
+ processor.abort();
+ return;
+ }
- configLock.writeLock().lock();
- try {
- flags &= ~FLAG_REBUILD_BLOOM;
- checkBloom = true;
- bloomFilterK = optimialK;
- } finally {
- configLock.writeLock().unlock();
+ try {
+ if (sleep)
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ processor.abort();
+ return;
+ }
+ }
+ processor.finish();
+ } catch (Exception e) {
+ processor.abort();
}
}
