Author: j16sdiz
Date: 2008-06-05 15:32:27 +0000 (Thu, 05 Jun 2008)
New Revision: 20221
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
newer and better resize scheme:-
- does not deadlock
- read entries in batch
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
---
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-06-05 15:32:00 UTC (rev 20220)
+++
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-06-05 15:32:27 UTC (rev 20221)
@@ -13,6 +13,9 @@
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -686,7 +689,22 @@
return ((bf.getLong(0) & ENTRY_FLAG_OCCUPIED) == 0);
}
+
+ private byte[] getDigestedKeyFromOffset(long offset) throws IOException
{
+ int split = (int) (offset % FILE_SPLIT);
+ long rawOffset = (offset / FILE_SPLIT) * entryTotalLength;
+ ByteBuffer bf = ByteBuffer.wrap(new byte[0x20]);
+
+ do {
+ int status = storeFC[split].read(bf, rawOffset +
bf.position());
+ if (status == -1)
+ throw new EOFException();
+ } while (bf.hasRemaining());
+
+ return bf.array();
+ }
+
private void flushAndClose() {
for (int i = 0; i < FILE_SPLIT; i++) {
try {
@@ -800,18 +818,32 @@
setDaemon(true);
}
- @Override
- public void run() {
+ @Override
+ public void run() {
while (!shutdown) {
synchronized (cleanerLock) {
+ boolean resizeFinished = false;
+
configLock.readLock().lock();
try {
if (prevStoreSize != 0)
- resizeStore();
+ resizeFinished =
resizeStore();
} finally {
configLock.readLock().unlock();
}
+ if (resizeFinished) {
+ configLock.writeLock().lock();
+ try {
+ prevStoreSize = 0;
+ writeConfigFile();
+ } catch (IOException ioe) {
+ Logger.error(this,
"can't write store config file", ioe);
+ } finally {
+
configLock.writeLock().unlock();
+ }
+ }
+
cleanerLock.notifyAll();
try {
cleanerLock.wait(CLEANER_PERIOD);
@@ -822,329 +854,278 @@
}
}
- /**
- * Phase 1 Rounds
- */
- private static final int RESIZE_PHASE1_ROUND = 8;
- /**
- * Maximum resize round
- */
- private static final int RESIZE_MAX_ROUND = 16;
+ private static final int RESIZE_MEMORY_ENTRIES = 256; //
temporary memory store size (in # of entries)
+ private static final int RESIZE_DISK_ENTRIES = 8192; //
temporary disk store size (in # of entries)
/**
- * Are we shrinking the store?
- */
- private boolean shrinking;
- private long newEntries;
- private long oldEntries;
- private long freeEntries;
- private long resolvedEntries;
- private long droppedEntries;
- private long maxOldItemOffset;
-
- /**
- * Numbers of round resize have ran
- */
- private int resizeRound;
-
- /**
* Move old entries to new location and resize store
*/
- private void resizeStore() {
- ++resizeRound;
- Logger.normal(this, "Starting datastore resize (round "
+ resizeRound + ")");
+ private boolean resizeStore() {
+ Logger.normal(this, "Starting datastore resize");
+ long startTime = System.currentTimeMillis();
- if (resizeRound == 1) { // first round
- if (storeSize < prevStoreSize) {
- shrinking = true;
- } else {
- setStoreFileSize(storeSize);
- }
- maxOldItemOffset = prevStoreSize - 1;
- }
+ if (storeSize > prevStoreSize)
+ setStoreFileSize(storeSize);
- boolean needQueue = false;
- if (resizeRound > RESIZE_PHASE1_ROUND) // too many
rounds
- needQueue = true;
- if (resizeRound > 1 && droppedEntries == 0 &&
resolvedEntries == 0) // no progress
- needQueue = true;
- moveOldEntry0(needQueue);
+ initOldEntriesFile();
- if (logMINOR)
- Logger.minor(this, "Finished resize round " +
resizeRound + ": newEntries=" + newEntries
- + ", oldEntries=" + oldEntries + ",
freeEntries=" + freeEntries + ", resolvedEntries="
- + resolvedEntries + ", droppedEntries="
+ droppedEntries);
+ List<Entry> oldEntryList = new LinkedList<Entry>();
- if (shutdown)
- return;
+ // start from end of store, make store shrinking
quicker
+ long startOffset = (prevStoreSize /
RESIZE_MEMORY_ENTRIES) * RESIZE_MEMORY_ENTRIES;
+ for (long curOffset = startOffset; curOffset >= 0;
curOffset -= RESIZE_MEMORY_ENTRIES) {
+ if (shutdown)
+ return false;
- // Shrink store file size
- if (shrinking)
- setStoreFileSize(Math.max(storeSize,
maxOldItemOffset + 1));
+ batchReadEntries(curOffset,
RESIZE_MEMORY_ENTRIES, oldEntryList);
- // Check finished
- if (resizeRound >= RESIZE_MAX_ROUND || oldEntries == 0
|| resolvedEntries + droppedEntries >= oldEntries) {
- // Finished
- Logger.normal(this, "Datastore resize finished
(total " + resizeRound + "rounds)");
+ if (storeSize < prevStoreSize)
+ setStoreFileSize(Math.max(storeSize,
curOffset));
- prevStoreSize = 0;
- resizeRound = 0;
+ // try to resolve the list
+ ListIterator<Entry> it =
oldEntryList.listIterator();
+ while (it.hasNext()) {
+ if (resolveOldEntry(it.next()))
+ it.remove();
+ }
+
+ // write unresolved entry to file
+ it = oldEntryList.listIterator();
+ while (it.hasNext())
+ rrWriteOldEntry(it.next());
+
+ long processed = prevStoreSize - curOffset;
+ Logger.normal(this, "Store resize " + callback
+ ": " + processed + "/" + prevStoreSize);
}
+
+ resolveOldEntriesFile();
+
+ long endTime = System.currentTimeMillis();
+ Logger.normal(this, "Finish resizing " + callback + "
in " + (endTime - startTime) / 1000 + "s");
+
+ return true;
}
/**
- * Scan all entries and try to move them
+ * Read a list of old items from store, the original offset
will be marked as free. To save
+ * some system calls, items are locked and read in batch.
+ *
+ * @param offset
+ * start offset, must be multiple of {@link
FILE_SPLIT}
+ * @param length
+ * number of items to read, must be multiple of
{@link FILE_SPLIT}. If this
+ * excess store size, read as much as possible.
+ * @param items
+ * a list of items
+ * @return <code>true</code> if operation complete
successfully; <code>false</code>
+ * otherwise (e.g. can't acquire locks, node shutting
down)
*/
- private void moveOldEntry0(boolean queueItem) {
- newEntries = 0;
- oldEntries = 0;
- freeEntries = 0;
- resolvedEntries = 0;
- droppedEntries = 0;
+ private boolean batchReadEntries(long offset, int length,
List<Entry> items) {
+ assert offset % FILE_SPLIT == 0;
+ assert length % FILE_SPLIT == 0;
- File oldItemFile = null;
- RandomAccessFile oldItemsRAF = null;
- FileChannel oldItemsFC = null;
- if (queueItem) {
- try {
- oldItemFile = new File(baseDir, name +
".oldItems");
- oldItemsRAF = new
RandomAccessFile(oldItemFile, "rw");
- oldItemsRAF.seek((oldItemsRAF.length()
/ entryTotalLength) * entryTotalLength);
- oldItemsFC = oldItemsRAF.getChannel();
- } catch (IOException e) {
+ boolean[] locked = new boolean[length];
+ try {
+ // acquire all locks in the region, will unlock
in the finally block
+ for (int i = 0; i < length; i++) {
+ if (lockEntry(offset + i))
+ locked[i] = true;
+ else
+ return false;
}
- }
- long maxOffset = maxOldItemOffset;
- maxOldItemOffset = 0;
- LOOP_ENTRIES: for (long offset = 0; offset <=
maxOffset; offset++) {
- if (logDEBUG && offset % 1024 == 0) {
- Logger.debug(this, "Resize progress:
newEntries=" + newEntries + ", oldEntries=" + oldEntries
- + ", freeEntries=" +
freeEntries + ", resolvedEntries=" + resolvedEntries
- + ", droppedEntries=" +
droppedEntries);
- }
+ long startFileOffset = (offset / FILE_SPLIT) *
entryTotalLength;
+ long entriesToRead = length / FILE_SPLIT;
+ long bufLen = entryTotalLength * entriesToRead;
- if (shutdown)
- return;
-
- if (!lockEntry(offset)) //lock
- continue LOOP_ENTRIES;
- try {
- if (isFree(offset)) {
- // free block
- freeEntries++;
- continue LOOP_ENTRIES;
- }
-
- if (getStoreSize(offset) == storeSize) {
- // new store size entries
- maxOldItemOffset = offset;
- newEntries++;
- continue LOOP_ENTRIES;
- }
-
- // if (entry.getStoreSize() ==
prevStoreSize)
- // old store size entries, try to move
them
- maxOldItemOffset = offset;
- oldEntries++;
-
- Entry entry = readEntry(offset, null);
- entry.setStoreSize(storeSize);
- long[] newOffset = entry.getOffset();
-
- // Check if I can keep my current offset
- for (int i = 0; i < newOffset.length;
i++) {
- if (newOffset[i] == offset) {
// lucky!
- writeEntry(entry,
offset); // write back entry storeSize
- resolvedEntries++;
-
- if (logDEBUG)
-
Logger.debug(this, "old entry " +
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
- + "
resolved without moving");
-
- continue LOOP_ENTRIES;
+ ByteBuffer buf = ByteBuffer.allocate((int)
bufLen);
+ for (int i = 0; i < FILE_SPLIT; i++) { // for
each split file
+ buf.clear();
+ try {
+ while (buf.hasRemaining()) {
+ int status =
storeFC[i].read(buf, startFileOffset + buf.position());
+ if (status == -1)
+ break;
}
+ } catch (IOException ioe) {
+ if (shutdown)
+ return false;
+ Logger.error(this, "unexpected
IOException", ioe);
}
+ buf.flip();
- boolean[] locked = new
boolean[newOffset.length];
- try {
- // Lock all possible slots first
- for (int i = 0; i <
newOffset.length; i++) {
- if
(lockEntry(newOffset[i])) { // lock
- locked[i] =
true;
- } else if (shutdown) {
// oops
- return;
- }
- }
+ for (int j = 0; buf.remaining() >=
entryTotalLength; j++) {
+ if (shutdown)
+ return false;
- // Probe for a free slot
- for (int i = 0; i <
newOffset.length; i++) {
- // see what's in the
new offset
- Entry newOffsetEntry =
readEntry(newOffset[i], null);
+ ByteBuffer enBuf = buf.slice();
+ buf.position(buf.position() +
(int) entryTotalLength);
- // Free slot
- if
(newOffsetEntry.isFree()) {
- // the new
offset is freeeeeeee..
-
writeEntry(entry, newOffset[i]);
-
freeOffset(offset);
-
resolvedEntries++;
+ enBuf.limit((int)
entryTotalLength);
- if (logDEBUG)
-
Logger.debug(this, "old entry " +
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
-
+ " resolved by moving to free block");
+ Entry entry = new Entry(enBuf);
+ entry.curOffset = offset + j *
FILE_SPLIT + i;
- continue
LOOP_ENTRIES;
- }
+ if (!entry.isFree() &&
entry.storeSize != storeSize) {
+ // old entry
+ items.add(entry);
- // Same digested key:
same routing key or SHA-256 collision
- byte[]
digestedRoutingKey = entry.getDigestedRoutingKey();
- byte[]
digestedRoutingKey2 = newOffsetEntry.getDigestedRoutingKey();
- if
(Arrays.equals(digestedRoutingKey, digestedRoutingKey2)) {
- // assume same
routing key, drop this as duplicate
-
freeOffset(offset);
+ try {
+
freeOffset(entry.curOffset);
keyCount.decrementAndGet();
-
droppedEntries++;
-
- if (logDEBUG)
-
Logger.debug(this, "old entry " +
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
-
+ " dropped duplicate");
-
- continue
LOOP_ENTRIES;
+ } catch (IOException
ioe) {
+
Logger.error(this, "error freeing entry " + entry.curOffset + ", node shutting
down?");
}
}
- if (queueItem &&
oldItemsFC.position() < 0x10000000) { // Limit to 256MiB
- if (logDEBUG)
-
Logger.debug(this, "old entry " +
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
- + "
queued");
-
writeOldItem(oldItemsFC, entry);
- freeOffset(offset);
-
keyCount.decrementAndGet();
- }
- } finally {
- // unlock all entries
- for (int i = 0; i <
newOffset.length; i++) {
- if (locked[i]) {
-
unlockEntry(newOffset[i]);
- }
- }
+ // unlock current entry
+ unlockEntry(entry.curOffset);
+ locked[(int) (entry.curOffset -
offset)] = false;
}
- } catch (IOException e) {
- Logger.debug(this, "IOExcception on
moveOldEntries0", e);
- } finally {
- unlockEntry(offset);
}
- }
- if (queueItem) {
- try {
- oldItemsRAF.seek(0);
- putBackOldItems(oldItemsFC);
- } catch (IOException e) {
- } finally {
- try {
- oldItemsRAF.close();
- oldItemFile.delete();
- } catch (IOException e2) {
- }
- }
+ return true;
+ } finally {
+ // unlock
+ for (int i = 0; i < length; i++)
+ if (locked[i])
+ unlockEntry(offset + i);
}
}
/**
- * Put back oldItems with best effort
- *
- * @throws IOException
+ * Put back an old entry to store file
+ *
+ * @param entry
+ * @return <code>true</code> if the entry have put back
successfully.
*/
- private void putBackOldItems(FileChannel oldItems) throws
IOException {
- LOOP_ITEMS: while (true) {
- Entry entry = readOldItem(oldItems);
- if (entry == null)
- break;
-
+ private boolean resolveOldEntry(Entry entry) {
+ if (!lockDigestedKey(entry.getDigestedRoutingKey(),
false))
+ return false;
+ try {
entry.setStoreSize(storeSize);
+ long[] offsets = entry.getOffset();
- long[] newOffset = entry.getOffset();
+ // Check for occupied entry with same key
+ for (long offset : offsets) {
+ try {
+ if (!isFree(offset)
+ &&
Arrays.equals(getDigestedKeyFromOffset(offset), entry.getDigestedRoutingKey()))
{
+ writeEntry(entry,
offset); // overwrite, don't update key count
+ return true;
+ }
+ } catch (IOException e) {
+ Logger.debug(this,
"IOExcception on resolveOldEntry", e);
+ }
+ }
- for (int i = 0; i < newOffset.length; i++) {
- if (!lockEntry(newOffset[i])) // lock
- continue;
+ // Check for free entry
+ for (long offset : offsets) {
try {
- if (isFree(newOffset[i])) {
- if (logDEBUG)
- Logger
-
.debug(this, "Put back old item: "
-
+ HexUtil.bytesToHex(entry.digestedRoutingKey));
- writeEntry(entry,
newOffset[i]);
+ if (isFree(offset)) {
+ writeEntry(entry,
offset);
keyCount.incrementAndGet();
- resolvedEntries++;
- continue LOOP_ITEMS;
+ return true;
}
} catch (IOException e) {
- Logger.debug(this,
"IOExcception on putBackOldItems", e);
- } finally {
- unlockEntry(newOffset[i]);
+ Logger.debug(this,
"IOExcception on resolveOldEntry", e);
}
}
+ return false;
+ } finally {
+
unlockDigestedKey(entry.getDigestedRoutingKey(), false);
+ }
+ }
- if (logDEBUG)
- Logger.debug(this, "Drop old item: " +
HexUtil.bytesToHex(entry.digestedRoutingKey));
+ private File oldEntriesFile; // round-ribbon
+ private RandomAccessFile oldEntriesRAF;
+ private long oldEntriesFileOffset;
- droppedEntries++;
+ private void initOldEntriesFile() {
+ try {
+ oldEntriesFile = new File(baseDir, name +
".oldEntries");
+ oldEntriesRAF = new
RandomAccessFile(oldEntriesFile, "rw");
+ oldEntriesRAF.setLength(RESIZE_DISK_ENTRIES *
entryTotalLength);
+ oldEntriesFileOffset = 0;
+ } catch (IOException ioe) {
+ Logger.error(this, "Cannot create oldEntries
file for resize, will use memory only", ioe);
}
}
- private void writeOldItem(FileChannel fc, Entry e) throws
IOException {
- ByteBuffer bf = e.toByteBuffer();
- do {
- fc.write(bf);
- } while (bf.hasRemaining());
+ private void resolveOldEntriesFile() {
+ if (oldEntriesRAF == null)
+ return;
+
+ for (int offset = 0; offset < RESIZE_DISK_ENTRIES;
offset++) {
+ Entry oldEntry = readOldEntry(offset);
+ if (oldEntry != null && !oldEntry.isFree()) //
the current position already in use
+ resolveOldEntry(oldEntry);
+ }
+ try {
+ oldEntriesRAF.close();
+ } catch (IOException ioe) {
+ // ignore
+ }
+ oldEntriesFile.delete();
}
- private Entry readOldItem(FileChannel fc) throws IOException {
- ByteBuffer bf = ByteBuffer.allocate((int)
entryTotalLength);
- do {
- int status = fc.read(bf);
- if (status == -1)
- return null;
- } while (bf.hasRemaining());
- bf.flip();
- return new Entry(bf);
+ private void rrWriteOldEntry(Entry entry) {
+ if (oldEntriesRAF == null)
+ return;
+
+ long offset = oldEntriesFileOffset++ %
RESIZE_DISK_ENTRIES;
+ Entry rrOldEntry = readOldEntry(offset);
+ if (rrOldEntry != null && !rrOldEntry.isFree()) // the
current position already in use
+ resolveOldEntry(rrOldEntry);
+
+ byte[] buf = new byte[(int) entryTotalLength];
+ entry.toByteBuffer().get(buf);
+ try {
+ oldEntriesRAF.seek(offset * entryTotalLength);
+ oldEntriesRAF.write(buf);
+ } catch (IOException e) {
+ Logger.debug(this, "IOException on
rrWriteOldEntry", e);
+ }
}
+
+ private Entry readOldEntry(long offset) {
+ if (oldEntriesRAF == null)
+ return null;
+
+ try {
+ byte[] buf = new byte[(int) entryTotalLength];
+ oldEntriesRAF.seek(offset * entryTotalLength);
+ oldEntriesRAF.readFully(buf);
+
+ return new Entry(ByteBuffer.wrap(buf));
+ } catch (IOException e) {
+ Logger.debug(this, "IOException on
readOldEntry", e);
+ return null;
+ }
+ }
}
public void setMaxKeys(long newStoreSize, boolean shrinkNow) throws
IOException {
Logger.normal(this, "[" + name + "] Resize newStoreSize=" +
newStoreSize + ", shinkNow=" + shrinkNow);
- assert newStoreSize > 0;
- // TODO assert newStoreSize > (141 * (3 * 3) + 13 * 3) * 2; //
store size too small
+ synchronized (cleanerLock) {
+ configLock.writeLock().lock();
+ try {
+ if (newStoreSize == this.storeSize)
+ return;
- configLock.writeLock().lock();
- try {
- if (newStoreSize == this.storeSize)
- return;
-
- if (prevStoreSize != 0) {
- if (shrinkNow) {
- // TODO shrink now
- } else {
+ if (prevStoreSize != 0) {
Logger.normal(this, "[" + name + "]
resize already in progress, ignore resize request");
return;
}
- }
- prevStoreSize = storeSize;
- storeSize = newStoreSize;
- writeConfigFile();
- synchronized (cleanerLock) {
- cleanerLock.notifyAll();
+ prevStoreSize = storeSize;
+ storeSize = newStoreSize;
+ writeConfigFile();
+ } finally {
+ configLock.writeLock().unlock();
}
-
- if (shrinkNow) {
- // TODO shrink now
- }
- } finally {
- configLock.writeLock().unlock();
+ // don't notify for now, or we will be held here for a
long time
}
}
@@ -1156,7 +1137,7 @@
// ------------- Locking
private boolean shutdown = false;
- private ReadWriteLock configLock = new ReentrantReadWriteLock();
+ private ReadWriteLock configLock = new ReentrantReadWriteLock();
private Lock entryLock = new ReentrantLock();
private Map<Long, Condition> lockMap = new HashMap<Long, Condition> ();
@@ -1443,4 +1424,4 @@
setBloomSync(true);
}
-}
+}
\ No newline at end of file