Author: j16sdiz Date: 2008-05-11 17:05:03 +0000 (Sun, 11 May 2008) New Revision: 19897
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
Log:
Fix datastore resize
Modified:
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
===================================================================
---
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-05-11 17:04:42 UTC (rev 19896)
+++
branches/saltedhashstore/freenet/src/freenet/store/SaltedHashFreenetStore.java
2008-05-11 17:05:03 UTC (rev 19897)
@@ -132,7 +132,7 @@
for (int i = 0; i < offset.length; i++) {
if (logDEBUG)
Logger.debug(this, "probing for i=" + i + ",
offset=" + offset[i]);
-
+
if (!lockEntry(offset[i])) {
Logger.error(this, "can't lock entry: " +
offset[i]);
continue;
@@ -310,7 +310,7 @@
/**
* @param storeSize
- * the storeSize to set
+ * the storeSize to set
*/
protected void setStoreSize(long storeSize) {
this.storeSize = storeSize;
@@ -401,8 +401,7 @@
* Verify and decrypt this entry
*
* @param routingKey
- * @return <code>true</code> if the <code>routeKey</code> match
and the entry is
- * decrypted.
+ * @return <code>true</code> if the <code>routeKey</code> match
and the entry is decrypted.
*/
private boolean decrypt(byte[] routingKey) {
if (!isEncrypted) {
@@ -599,7 +598,7 @@
*
* @param offset
* @param entry
- * entry, maybe <code>null</code>
+ * entry, maybe <code>null</code>
* @throws IOException
*/
private boolean isFree(long offset, Entry entry) throws IOException {
@@ -742,7 +741,7 @@
while (!shutdown) {
synchronized (cleanerLock) {
if (prevStoreSize != 0)
- ; //resizeStore();
+ resizeStore();
else
estimateStoreSize();
@@ -781,11 +780,9 @@
*/
private int resizeRound;
- /*-
- *
- *
+ /**
* Move old entries to new location and resize store
- *
+ */
private void resizeStore() {
++resizeRound;
Logger.normal(this, "Starting datastore resize (round "
+ resizeRound + ")");
@@ -828,7 +825,7 @@
/**
* Scan all entries and try to move them
- *
+ */
private void moveOldEntry0(boolean queueItem) {
newEntries = 0;
oldEntries = 0;
@@ -851,7 +848,7 @@
long maxOffset = maxOldItemOffset;
maxOldItemOffset = 0;
- for (long offset = 0; offset <= maxOffset; offset++) {
+ LOOP_ENTRIES: for (long offset = 0; offset <=
maxOffset; offset++) {
if (logDEBUG && offset % 1024 == 0) {
Logger.debug(this, "Resize progress:
newEntries=" + newEntries + ", oldEntries=" + oldEntries
+ ", freeEntries=" +
freeEntries + ", resolvedEntries=" + resolvedEntries
@@ -862,68 +859,104 @@
return;
if (!lockEntry(offset)) //lock
- continue;
+ continue LOOP_ENTRIES;
try {
Entry entry = readEntry(offset, null);
if (entry.isFree()) {
// free block
freeEntries++;
- } else if (entry.getStoreSize() ==
storeSize) {
+ continue LOOP_ENTRIES;
+ }
+ if (entry.getStoreSize() == storeSize) {
// new store size entries
maxOldItemOffset = offset;
newEntries++;
- } else { // if (entry.getStoreSize() ==
prevStoreSize)
- // old store size entries, try
to move them
- oldEntries++;
- maxOldItemOffset = offset;
+ continue LOOP_ENTRIES;
+ }
- entry.setStoreSize(storeSize);
- long newOffset =
entry.getOffset();
+ // if (entry.getStoreSize() ==
prevStoreSize)
+ // old store size entries, try to move
them
+ maxOldItemOffset = offset;
+ oldEntries++;
- if (newOffset == offset) { //
lucky!
- lockAndWrite(entry); //
write back entry storeSize
+ entry.setStoreSize(storeSize);
+ long[] newOffset = entry.getOffset();
+
+ // Check if I can keep my current offset
+ for (int i = 0; i < newOffset.length;
i++) {
+ if (newOffset[i] == offset) {
// lucky!
+ writeEntry(entry,
offset); // write back entry storeSize
resolvedEntries++;
- continue;
+
+ if (logDEBUG)
+
Logger.debug(this, "old entry " +
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
+ + "
resolved without moving");
+
+ continue LOOP_ENTRIES;
}
+ }
- if (!lockEntry(newOffset)) //
lock
- continue;
- try {
+ boolean[] locked = new
boolean[newOffset.length];
+ try {
+ // Lock all possible slots first
+ for (int i = 0; i <
newOffset.length; i++) {
+ if
(lockEntry(newOffset[i])) { // lock
+ locked[i] =
true;
+ } else if (shutdown) {
// oops
+ return;
+ }
+ }
+
+ // Probe for a free slot
+ for (int i = 0; i <
newOffset.length; i++) {
// see what's in the
new offset
- Entry newOffsetEntry =
readEntry(newOffset, null);
+ Entry newOffsetEntry =
readEntry(newOffset[i], null);
+ // Free slot
if
(newOffsetEntry.isFree()) {
// the new
offset is freeeeeeee..
-
lockAndWrite(entry);
+
writeEntry(entry, newOffset[i]);
freeOffset(offset);
resolvedEntries++;
- } else if
(newOffsetEntry.getStoreSize() == storeSize) {
- // new offset
already have a new entry, free old entry
+
+ if (logDEBUG)
+
Logger.debug(this, "old entry " +
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
+
+ " resolved by moving to free block");
+
+ continue
LOOP_ENTRIES;
+ }
+
+ // Same digested key:
same routing key or SHA-256 collision
+ byte[]
digestedRoutingKey = entry.getDigestedRoutingKey();
+ byte[]
digestedRoutingKey2 = newOffsetEntry.getDigestedRoutingKey();
+ if
(Arrays.equals(digestedRoutingKey, digestedRoutingKey2)) {
+ // assume same
routing key, drop this as duplicate
freeOffset(offset);
droppedEntries++;
- } else if
(Arrays.equals(entry.digestedRoutingKey, newOffsetEntry.digestedRoutingKey)) {
- // same
digested routing key, free the old entry
-
freeOffset(offset);
-
resolvedEntries++;
- } else if (queueItem) {
- // break tie by
moveing old item to queue
+
if (logDEBUG)
-
Logger.debug(this, "Write item "
-
+ HexUtil.bytesToHex(newOffsetEntry.digestedRoutingKey)
-
+ " to old item file");
-
writeOldItem(oldItemsFC, newOffsetEntry);
- if (newOffset >
offset) {
-
oldEntries++; // newOffset wasn't counted count it
- }
+
Logger.debug(this, "old entry " +
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
+
+ " dropped duplicate");
-
lockAndWrite(entry);
-
freeOffset(offset);
-
resolvedEntries++;
+ continue
LOOP_ENTRIES;
}
- } finally {
- unlockEntry(newOffset);
}
+
+ if (queueItem) {
+ if (logDEBUG)
+
Logger.debug(this, "old entry " +
HexUtil.bytesToHex(entry.getDigestedRoutingKey())
+ + "
queued");
+
writeOldItem(oldItemsFC, entry);
+ freeOffset(offset);
+ }
+ } finally {
+ // unlock all entries
+ for (int i = 0; i <
newOffset.length; i++) {
+ if (locked[i]) {
+
unlockEntry(newOffset[i]);
+ }
+ }
}
} catch (IOException e) {
Logger.debug(this, "IOExcception on
moveOldEntries0", e);
@@ -951,40 +984,41 @@
* Put back oldItems with best effort
*
* @throws IOException
- *
+ */
private void putBackOldItems(FileChannel oldItems) throws
IOException {
- while (true) {
+ LOOP_ITEMS: while (true) {
Entry entry = readOldItem(oldItems);
if (entry == null)
break;
entry.setStoreSize(storeSize);
- long newOffset = entry.getOffset();
+ long[] newOffset = entry.getOffset();
- if (!lockEntry(newOffset)) // lock
- continue;
- boolean done = false;
- try {
- if (isFree(newOffset)) {
- if (logDEBUG)
- Logger.debug(this, "Put
back old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
- lockAndWrite(entry);
- done = true;
- } else {
- if (logDEBUG)
- Logger.debug(this,
"Drop old item: " + HexUtil.bytesToHex(entry.digestedRoutingKey));
+ for (int i = 0; i < newOffset.length; i++) {
+ if (!lockEntry(newOffset[i])) // lock
+ continue;
+ try {
+ if (isFree(newOffset[i],
entry)) {
+ if (logDEBUG)
+ Logger
+
.debug(this, "Put back old item: "
+
+ HexUtil.bytesToHex(entry.digestedRoutingKey));
+ writeEntry(entry,
newOffset[i]);
+ resolvedEntries++;
+ continue LOOP_ITEMS;
+ } else {
+ if (logDEBUG)
+
Logger.debug(this, "Drop old item: " +
HexUtil.bytesToHex(entry.digestedRoutingKey));
+ }
+ } catch (IOException e) {
+ Logger.debug(this,
"IOExcception on putBackOldItems", e);
+ } finally {
+ unlockEntry(newOffset[i]);
}
- } catch (IOException e) {
- Logger.debug(this, "IOExcception on
putBackOldItems", e);
- } finally {
- unlockEntry(newOffset);
+ }
- if (done)
- resolvedEntries++;
- else
- droppedEntries++;
- }
+ droppedEntries++;
}
}
@@ -996,14 +1030,17 @@
}
private Entry readOldItem(FileChannel fc) throws IOException {
- ByteBuffer bf = ByteBuffer.allocate((int)
entryTotalLength);
- do {
- fc.read(bf);
- } while (bf.hasRemaining());
- bf.flip();
- return new Entry(bf);
+ try {
+ ByteBuffer bf = ByteBuffer.allocate((int)
entryTotalLength);
+ do {
+ fc.read(bf);
+ } while (bf.hasRemaining());
+ bf.flip();
+ return new Entry(bf);
+ } catch (EOFException e) {
+ return null;
+ }
}
- */
/**
* Samples to take on key count estimation
@@ -1056,6 +1093,7 @@
Logger.normal(this, "[" + name + "] Resize newStoreSize=" +
newStoreSize + ", shinkNow=" + shrinkNow);
assert newStoreSize > 0;
+ // TODO assert newStoreSize > (141 * (3 * 3) + 13 * 3) * 2; //
store size too small
synchronized (cleanerLock) {
if (newStoreSize == this.storeSize)
@@ -1088,8 +1126,8 @@
/**
* Lock the entry
*
- * This lock is <strong>not</strong> reentrance. No threads except
Cleaner should hold more
- * then one lock at a time (or deadlock may occur).
+ * This lock is <strong>not</strong> reentrance. No threads except
Cleaner should hold more then
+ * one lock at a time (or deadlock may occur).
*/
private boolean lockEntry(long offset) {
if (logDEBUG && logLOCK)
@@ -1140,7 +1178,7 @@
* Use this method to stop all read / write before database shutdown.
*
* @param timeout
- * the maximum time to wait in milliseconds.
+ * the maximum time to wait in milliseconds.
*/
private boolean lockGlobal(long timeout) {
synchronized (lockMap) {
