Author: j16sdiz Date: 2008-07-01 07:36:20 +0000 (Tue, 01 Jul 2008) New Revision: 20886
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
refactor to BatchProcessor interface and fix key count
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
---
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-07-01 04:07:26 UTC (rev 20885)
+++
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-07-01 07:36:20 UTC (rev 20886)
@@ -11,7 +11,6 @@
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.text.DecimalFormat;
-import java.util.AbstractList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
@@ -252,8 +251,10 @@
// Overwrite old offset
Entry entry = new Entry(routingKey,
header, data);
- writeEntry(entry, oldOffset); //
overwrite, don't increase keyCount
+ writeEntry(entry, oldOffset);
writes.incrementAndGet();
+ if (oldEntry.getGeneration() !=
generation)
+ keyCount.incrementAndGet();
return;
}
@@ -268,7 +269,7 @@
if (updateBloom)
bloomFilter.updateFilter(getDigestedRoutingKey(routingKey));
writeEntry(entry, offset[i]);
- long written =
writes.incrementAndGet();
+ writes.incrementAndGet();
keyCount.incrementAndGet();
return;
@@ -280,8 +281,11 @@
Logger.debug(this, "collision, write to
i=0, offset=" + offset[0]);
if (updateBloom)
bloomFilter.updateFilter(getDigestedRoutingKey(routingKey));
+ oldEntry = readEntry(offset[0], null);
writeEntry(entry, offset[0]);
- long written = writes.incrementAndGet();
+ writes.incrementAndGet();
+ if (oldEntry.getGeneration() != generation)
+ keyCount.incrementAndGet();
} finally {
unlockPlainKey(routingKey, false);
}
@@ -568,6 +572,14 @@
digestedRoutingKey =
SaltedHashFreenetStore.this.getDigestedRoutingKey(this.plainRoutingKey);
return digestedRoutingKey;
}
+
+ public byte getGeneration() {
+ return generation;
+ }
+
+ public void setGeneration(byte generation) {
+ this.generation = generation;
+ }
}
/**
@@ -820,7 +832,7 @@
/**
* Write config file
*/
- private void writeConfigFile() throws IOException {
+ private void writeConfigFile() {
configLock.writeLock().lock();
try {
File tempConfig = new File(configFile.getPath() +
".tmp");
@@ -838,6 +850,8 @@
raf.close();
FileUtil.renameTo(tempConfig, configFile);
+ } catch (IOException ioe) {
+ Logger.error(this, "error writing config file for " +
name, ioe);
} finally {
configLock.writeLock().unlock();
}
@@ -849,6 +863,11 @@
private static Lock cleanerGlobalLock = new ReentrantLock(); // global
across all datastore
private Cleaner cleanerThread;
+ private interface BatchProcessor {
+ // return <code>null</code> to free the entry
+ Entry processs(Entry entry);
+ }
+
private class Cleaner extends Thread {
/**
* How often the clean should run
@@ -898,12 +917,7 @@
} catch (Exception e) { // may throw
IOException (even if it is not defined)
Logger.error(this, "Can't force
bloom filter", e);
}
- try {
- writeConfigFile();
- } catch (IOException e) {
- Logger.error(this, "Can't write
config file", e);
- }
-
+ writeConfigFile();
cleanerLock.notifyAll();
try {
@@ -930,7 +944,7 @@
initOldEntriesFile();
- List<Entry> oldEntryList = new LinkedList<Entry>();
+ final List<Entry> oldEntryList = new
LinkedList<Entry>();
// start from end of store, make store shrinking
quicker
long startOffset = (_prevStoreSize /
RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
@@ -938,8 +952,16 @@
if (shutdown)
return;
- batchReadEntries(curOffset,
RESIZE_MEMORY_ENTRIES, oldEntryList, true);
+ batchReadEntries(curOffset,
RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
+ public Entry processs(Entry entry) {
+ if (entry.getStoreSize() ==
storeSize) // new size
+ return entry;
+ oldEntryList.add(entry);
+ return null;
+ }
+ });
+
if (storeSize < _prevStoreSize)
setStoreFileSize(Math.max(storeSize,
curOffset));
@@ -984,32 +1006,33 @@
Logger.normal(this, "Start rebuilding bloom filter for
" + callback);
- bloomFilter.fork();
- List<Entry> buildList = new AbstractList<Entry>() {
- @Override
- public void add(int index, Entry entry) {
-
bloomFilter.updateFilter(entry.getDigestedRoutingKey());
- }
- @Override
- public Entry get(int index) {
- return null;
- }
+ configLock.writeLock().lock();
+ try {
+ generation++;
+ bloomFilter.fork();
+ keyCount.set(0);
+ } finally {
+ configLock.writeLock().unlock();
+ }
- @Override
- public int size() {
- return 0;
- }
-
- };
-
for (long curOffset = 0; curOffset < storeSize;
curOffset += RESIZE_MEMORY_ENTRIES) {
if (shutdown) {
bloomFilter.discard();
return;
}
- batchReadEntries(curOffset,
RESIZE_MEMORY_ENTRIES, buildList, false);
+ batchReadEntries(curOffset,
RESIZE_MEMORY_ENTRIES, new BatchProcessor() {
+ public Entry processs(Entry entry) {
+ if (entry.getGeneration() !=
generation) {
+
bloomFilter.updateFilter(entry.getDigestedRoutingKey());
+
keyCount.incrementAndGet();
+ }
+ return entry;
+ }
+ });
+
Logger.normal(this, "Rebuilding bloom filter
for " + callback + ": " + curOffset + "/" + storeSize);
+ writeConfigFile();
}
bloomFilter.merge();
@@ -1024,23 +1047,19 @@
}
/**
- * Read a list of items from store. In resizing mode, only old
items are read and the
- * original offsets are freed.
+ * Read a list of items from store.
*
* @param offset
* start offset, must be multiple of {@link
FILE_SPLIT}
* @param length
* number of items to read, must be multiple of
{@link FILE_SPLIT}. If this
* excess store size, read as much as possible.
- * @param items
- * a list of items
- * @param resizing
- * If <code>true</code>, only get old items and free
the offset. Otherwise, get
- * all items.
+ * @param processor
+ * batch processor
* @return <code>true</code> if operation complete
successfully; <code>false</code>
* otherwise (e.g. can't acquire locks, node shutting
down)
*/
- private boolean batchReadEntries(long offset, int length,
List<Entry> items, boolean resizing) {
+ private boolean batchReadEntries(long offset, int length,
BatchProcessor processor) {
assert offset % FILE_SPLIT == 0;
assert length % FILE_SPLIT == 0;
@@ -1060,6 +1079,7 @@
ByteBuffer buf = ByteBuffer.allocate((int)
bufLen);
for (int i = 0; i < FILE_SPLIT; i++) { // for
each split file
+ boolean dirty = false;
buf.clear();
try {
while (buf.hasRemaining()) {
@@ -1074,27 +1094,31 @@
}
buf.flip();
- for (int j = 0; buf.remaining() >=
entryTotalLength; j++) {
- if (shutdown)
- return false;
+ try {
+ for (int j = 0; buf.limit() >=
j * entryTotalLength; j++) {
+ if (shutdown)
+ return false;
- ByteBuffer enBuf = buf.slice();
- buf.position(buf.position() +
(int) entryTotalLength);
+ buf.position((int) (j *
entryTotalLength));
+ if (buf.remaining() <
entryTotalLength) // EOF
+ break;
- enBuf.limit((int)
entryTotalLength);
+ ByteBuffer enBuf =
buf.slice();
+ enBuf.limit((int)
entryTotalLength);
- Entry entry = new Entry(enBuf);
- entry.curOffset = offset + j *
FILE_SPLIT + i;
+ Entry entry = new
Entry(enBuf);
+ entry.curOffset =
offset + j * FILE_SPLIT + i;
- try {
if (entry.isFree())
continue; //
not occupied
- if (resizing &&
entry.storeSize != storeSize)
- continue; //
resizing mode, not new item
- items.add(entry);
-
- if (resizing) { // free
the offset
+ Entry newEntry =
processor.processs(entry);
+ if (newEntry != null) {
+ // write back
+
buf.position((int) (j * entryTotalLength));
+
buf.put(newEntry.toByteBuffer());
+ dirty = true;
+ } else { // free the
offset
try {
freeOffset(entry.curOffset);
keyCount.decrementAndGet();
@@ -1103,11 +1127,20 @@
Logger.error(this, "error freeing entry " + entry.curOffset, ioe);
}
}
- } finally {
- // unlock current entry
-
unlockEntry(entry.curOffset);
- locked[(int)
(entry.curOffset - offset)] = false;
}
+ } finally {
+ // write back.
+ if (dirty) {
+ buf.flip();
+
+ try {
+ while
(buf.hasRemaining()) {
+
storeFC[i].write(buf, startFileOffset + buf.position());
+ }
+ } catch (IOException
ioe) {
+
Logger.error(this, "unexpected IOException", ioe);
+ }
+ }
}
}
@@ -1401,11 +1434,7 @@
try {
flushAndClose();
flags &= ~FLAG_DIRTY; // clean shutdown
- try {
- writeConfigFile();
- } catch (IOException e) {
- Logger.error(this, "error writing store
config", e);
- }
+ writeConfigFile();
} finally {
configLock.writeLock().unlock();
}
