Author: j16sdiz
Date: 2008-05-09 02:09:10 +0000 (Fri, 09 May 2008)
New Revision: 19857
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
Initial datastore resize code
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
---
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-05-09 02:08:50 UTC (rev 19856)
+++
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-05-09 02:09:10 UTC (rev 19857)
@@ -28,6 +28,9 @@
import freenet.support.io.FileUtil;
import freenet.support.math.RunningAverage;
import freenet.support.math.SimpleRunningAverage;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
/**
* Index-less data store based on salted hash
@@ -36,7 +39,7 @@
*/
public class SaltedHashFreenetStore implements FreenetStore {
private static final boolean OPTION_SAVE_PLAINKEY = true;
-
+
private static boolean logMINOR;
private static boolean logDEBUG;
@@ -116,8 +119,12 @@
}
private Entry probeEntry(byte[] routingKey) throws IOException {
- // TODO probe store resize
- return probeEntry0(routingKey, storeSize);
+ Entry entry = probeEntry0(routingKey, storeSize);
+
+ if (entry == null && prevStoreSize != 0)
+ entry = probeEntry0(routingKey, prevStoreSize);
+
+ return entry;
}
private Entry probeEntry0(byte[] routingKey, long probeStoreSize)
throws IOException {
@@ -132,7 +139,6 @@
entry = readEntry(offset, routingKey);
} catch (EOFException e) {
// may occur on resize, silent it a bit
- //TODO store resize
Logger.error(this, "EOFException on probeEntry", e);
return null;
} finally {
@@ -252,7 +258,7 @@
System.arraycopy(header, 0, this.header, 0,
headerBlockLength);
this.data = new byte[dataBlockLength];
System.arraycopy(data, 0, this.data, 0,
dataBlockLength);
-
+
if (OPTION_SAVE_PLAINKEY) {
flag |= ENTRY_FLAG_PLAINKEY;
}
@@ -260,6 +266,21 @@
isEncrypted = false;
}
+ /**
+ * @return the storeSize
+ */
+ protected long getStoreSize() {
+ return storeSize;
+ }
+
+ /**
+ * @param storeSize
+ * the storeSize to set
+ */
+ protected void setStoreSize(long storeSize) {
+ this.storeSize = storeSize;
+ }
+
public Entry(ByteBuffer in) {
assert in.remaining() == entryTotalLength;
@@ -276,7 +297,7 @@
plainRoutingKey = new byte[0x20];
in.get(plainRoutingKey);
}
-
+
// reserved bytes
in.position((int) ENTRY_HEADER_LENGTH);
@@ -300,10 +321,10 @@
out.putLong(flag);
out.putLong(storeSize);
- if (OPTION_SAVE_PLAINKEY) {
+ if (OPTION_SAVE_PLAINKEY && plainRoutingKey != null) {
out.put(plainRoutingKey);
}
-
+
// reserved bytes
out.position((int) ENTRY_HEADER_LENGTH);
@@ -356,7 +377,7 @@
else
return false;
}
-
+
if (plainRoutingKey != null) {
// we knew the key
if (!Arrays.equals(this.plainRoutingKey,
routingKey)) {
@@ -416,6 +437,10 @@
throw new RuntimeException(e);
}
}
+
+ public boolean isFree() {
+ return (flag & ENTRY_FLAG_OCCUPIED) == 0;
+ }
}
/**
@@ -435,13 +460,13 @@
storeFiles[i] = new File(baseDir, name + ".data-" +
fmt.format(i));
storeRAF[i] = new RandomAccessFile(storeFiles[i], "rw");
- //TODO store resize
- if (storeRAF[i].length() == 0) { // New file?
- storeRAF[i].setLength(entryTotalLength *
(storeSize / FILE_SPLIT + 1));
- }
+
storeFC[i] = storeRAF[i].getChannel();
storeFC[i].lock();
}
+
+ long storeFileSize = Math.max(storeSize, prevStoreSize);
+ setStoreFileSize(storeFileSize);
}
/**
@@ -637,6 +662,11 @@
private Cleaner cleanerThread;
private class Cleaner extends Thread {
+ /**
+ * How often the clean should run
+ */
+ private static final int CLEANER_PERIOD = 10 * 60 * 1000; // 10
minutes
+
public Cleaner() {
setName("Store-" + name + "-Cleaner");
setPriority(MIN_PRIORITY);
@@ -645,14 +675,15 @@
public void run() {
while (!shutdown) {
- if (prevStoreSize != 0)
- moveOldEntries();
- else
- estimateStoreSize();
+ synchronized (cleanerLock) {
+ if (prevStoreSize != 0)
+ resizeStore();
+ else
+ estimateStoreSize();
- synchronized (cleanerLock) {
+ cleanerLock.notifyAll();
try {
- cleanerLock.wait(10 * 60 *
1000); // 10 minutes
+
cleanerLock.wait(CLEANER_PERIOD);
} catch (InterruptedException e) {
Logger.debug(this,
"interrupted", e);
}
@@ -661,19 +692,227 @@
}
/**
- * Move old entries to new location
+ * Maximum memory to be used on resize
*/
- private void moveOldEntries() {
- Logger.minor(this, "move old entries");
- prevStoreSize = 0;
+ private static final int RESIZE_MEMORY = 2 * 1024 * 1024; //
2MiB
+ /**
+ * Phase 1 Rounds
+ */
+ private static final int RESIZE_PHASE1_ROUND = 12;
+ /**
+ * Maximum resize round
+ */
+ private static final int RESIZE_MAX_ROUND = 16;
+
+ /**
+ * Are we shrinking the store?
+ */
+ private boolean shrinking;
+ private long newEntries;
+ private long oldEntries;
+ private long freeEntries;
+ private long resolvedEntries;
+ private long droppedEntries;
+ private long maxOldItemOffset;
+
+ /**
+ * Numbers of round resize have ran
+ */
+ private int resizeRound;
+
+ /**
+ * Move old entries to new location and resize store
+ */
+ private void resizeStore() {
+ ++resizeRound;
+ Logger.normal(this, "Starting datastore resize (round "
+ resizeRound + ")");
+
+ if (resizeRound == 1) { // first round
+ if (storeSize < prevStoreSize) {
+ shrinking = true;
+ } else {
+ setStoreFileSize(storeSize);
+ }
+ maxOldItemOffset = prevStoreSize - 1;
+ }
+
+ moveOldEntry0(resizeRound > RESIZE_PHASE1_ROUND);
+
+ if (logMINOR)
+ Logger.minor(this, "Finished resize round " +
resizeRound + ": newEntries=" + newEntries
+ + ", oldEntries=" + oldEntries + ",
freeEntries=" + freeEntries + ", resolvedEntries="
+ + resolvedEntries + ", droppedEntries="
+ droppedEntries);
+
+ if (shutdown)
+ return;
+
+ // report key count
+ estimatedCount.report(newEntries + oldEntries -
droppedEntries);
+
+ // Shrink store file size
+ if (shrinking)
+ setStoreFileSize(Math.max(storeSize,
maxOldItemOffset));
+
+ // Check finished
+ if (resizeRound >= RESIZE_MAX_ROUND || oldEntries == 0
|| resolvedEntries + droppedEntries >= oldEntries) {
+ // Finished
+ Logger.normal(this, "Datastore resize finished
(total " + resizeRound + "rounds)");
+
+ prevStoreSize = 0;
+ resizeRound = 0;
+ }
}
/**
- * Sample to take at a time
+ * Scan all entries and try to move them
*/
- private static final double SAMPLE_RATE = 0.05; // 5%
+ private void moveOldEntry0(boolean queueItem) {
+ newEntries = 0;
+ oldEntries = 0;
+ freeEntries = 0;
+ resolvedEntries = 0;
+ droppedEntries = 0;
+ List oldItems = null;
+ if (queueItem) {
+ oldItems = new ArrayList();
+ }
+
+ long maxOffset = maxOldItemOffset;
+ maxOldItemOffset = 0;
+ for (long offset = 0; offset <= maxOffset; offset++) {
+ if (logDEBUG && offset % 1024 == 0) {
+ Logger.debug(this, "Resize progress:
newEntries=" + newEntries + ", oldEntries=" + oldEntries
+ + ", freeEntries=" +
freeEntries + ", resolvedEntries=" + resolvedEntries
+ + ", droppedEntries=" +
droppedEntries);
+ }
+
+ if (shutdown)
+ return;
+
+ if (!lockEntry(offset)) //lock
+ continue;
+ try {
+ Entry entry = readEntry(offset, null);
+
+ if (entry.isFree()) {
+ // free block
+ freeEntries++;
+ } else if (entry.getStoreSize() ==
storeSize) {
+ // new store size entries
+ maxOldItemOffset = offset;
+ newEntries++;
+ } else { // if (entry.getStoreSize() ==
prevStoreSize)
+ // old store size entries, try
to move them
+ oldEntries++;
+ maxOldItemOffset = offset;
+
+ entry.setStoreSize(storeSize);
+ long newOffset =
entry.getOffset();
+
+ if (newOffset == offset) { //
lucky!
+ writeEntry(entry); //
write back entry storeSize
+ resolvedEntries++;
+ continue;
+ }
+
+ if (!lockEntry(newOffset)) //
lock
+ continue;
+ try {
+ // see what's in the
new offset
+ Entry newOffsetEntry =
readEntry(newOffset, null);
+
+ if
(newOffsetEntry.isFree()) {
+ // the new
offset is freeeeeeee..
+
writeEntry(entry);
+
freeOffset(offset);
+
resolvedEntries++;
+ } else if
(newOffsetEntry.getStoreSize() == storeSize) {
+ // new offset
already have a new entry, free old entry
+
freeOffset(offset);
+
droppedEntries++;
+ } else if
(Arrays.equals(entry.digestedRoutingKey, newOffsetEntry.digestedRoutingKey)) {
+ // same
digested routing key, free the old entry
+
freeOffset(offset);
+
resolvedEntries++;
+ } else if (queueItem) {
+ // break tie by
moveing old item to queue
+ if
(oldItems.size() * entryTotalLength < RESIZE_MEMORY) {
+
oldItems.add(newOffsetEntry);
+ if
(newOffset > offset) {
+
oldEntries++; // newOffset wasn't counted count it
+ }
+
+
writeEntry(entry);
+
freeOffset(offset);
+
resolvedEntries++;
+ }
+ }
+ } finally {
+ unlockEntry(newOffset);
+ }
+ }
+ } catch (IOException e) {
+ Logger.debug(this, "IOExcception on
moveOldEntries0", e);
+ } finally {
+ unlockEntry(offset);
+ }
+ }
+
+ if (queueItem) {
+ putBackOldItems(oldItems);
+ }
+ }
+
/**
+ * Put back oldItems with best effort
+ */
+ private void putBackOldItems(List oldItems) {
+ Iterator it = oldItems.iterator();
+ while (it.hasNext()) {
+ boolean done = false;
+
+ Entry entry = (Entry) it.next();
+ entry.setStoreSize(storeSize);
+
+ long newOffset = entry.getOffset();
+
+ if (!lockEntry(newOffset)) // lock
+ continue;
+ try {
+ if (isFree(newOffset)) {
+ if (logDEBUG)
+ Logger.debug(this, "Put
back old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
+ writeEntry(entry);
+ done = true;
+ } else {
+ if (logDEBUG)
+ Logger.debug(this,
"Drop old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
+ }
+ } catch (IOException e) {
+ Logger.debug(this, "IOExcception on
putBackOldItems", e);
+ } finally {
+ unlockEntry(newOffset);
+ it.remove();
+
+ if (done)
+ resolvedEntries++;
+ else
+ droppedEntries++;
+ }
+ }
+ oldItems.clear();
+ }
+
+ /**
+ * Samples to take on key count estimation
+ */
+ private static final double SAMPLE_RATE = 0.05; // 5%
+ /**
+ * Minimum samples to take on key count estimation
+ */
+ private static final int MIN_SAMPLE = 10000;
+ /**
* Last sample position
*/
private long samplePos = 0;
@@ -683,7 +922,7 @@
*/
private void estimateStoreSize() {
Logger.minor(this, "start estimating key count");
- long numSample = Math.min((long) (SAMPLE_RATE *
storeSize), 10000);
+ long numSample = Math.min((long) (SAMPLE_RATE *
storeSize), MIN_SAMPLE);
long sampled = 0;
long occupied = 0;
while (sampled < numSample) {
@@ -712,9 +951,32 @@
}
}
- public void setMaxKeys(long maxStoreKeys, boolean shrinkNow) throws
IOException {
- // TODO
- // NO-OP now
+ public void setMaxKeys(long newStoreSize, boolean shrinkNow) throws
IOException {
+ Logger.normal(this, "[" + name + "] Resize newStoreSize=" +
newStoreSize + ", shinkNow=" + shrinkNow);
+
+ assert newStoreSize > 0;
+
+ synchronized (cleanerLock) {
+ if (newStoreSize == this.storeSize)
+ return;
+
+ if (prevStoreSize != 0) {
+ if (shrinkNow) {
+ // TODO shrink now
+ } else {
+ Logger.normal(this, "[" + name + "]
resize already in progress, ignore resize request");
+ return;
+ }
+ }
+
+ prevStoreSize = storeSize;
+ storeSize = newStoreSize;
+ cleanerLock.notifyAll();
+
+ if (shrinkNow) {
+ // TODO shrink now
+ }
+ }
}
// ------------- Locking
@@ -941,4 +1203,19 @@
public long getMaxKeys() {
return storeSize;
}
+
+ /**
+ * Change on disk store file size
+ *
+ * @param storeFileSize
+ */
+ private void setStoreFileSize(long storeFileSize) {
+ for (int i = 0; i < FILE_SPLIT; i++) {
+ try {
+ storeRAF[i].setLength(entryTotalLength *
(storeFileSize / FILE_SPLIT + 1));
+ } catch (IOException e) {
+ Logger.error(this, "error resizing store file",
e);
+ }
+ }
+ }
}