Author: toad
Date: 2008-08-27 16:09:08 +0000 (Wed, 27 Aug 2008)
New Revision: 22194
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
Log:
Fill the queue up, don't just add one item to it.
Activation.
Call countNegative() (stats related to false positives, here tracking
negatives).
Comments.
Modified:
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-27 16:06:55 UTC (rev 22193)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
2008-08-27 16:09:08 UTC (rev 22194)
@@ -601,9 +601,9 @@
SendableRequest request =
schedCore.removeFirstInner(fuzz, random, offeredKeys, starter, schedTransient,
false, true, Short.MAX_VALUE, Integer.MAX_VALUE, context, container);
if(request == null) return;
boolean full = addToStarterQueue(request,
container);
+ container.deactivate(request, 1);
starter.wakeUp();
if(full) return;
- return;
}
}
};
@@ -691,8 +691,7 @@
}
/**
- * Remove a SendableGet from the list of getters we maintain for each
key, indicating that we are no longer interested
- * in that key.
+ * Remove a KeyListener from the list of KeyListeners.
* @param getter
* @param complain
*/
@@ -704,8 +703,7 @@
}
/**
- * Remove a SendableGet from the list of getters we maintain for each
key, indicating that we are no longer interested
- * in that key.
+ * Remove a KeyListener from the list of KeyListeners.
* @param getter
* @param complain
*/
@@ -768,8 +766,7 @@
schedCore.tripPendingKey(key, block,
container, clientContext);
}
}, TRIP_PENDING_PRIORITY, false);
- }
-
+ } else schedCore.countNegative();
}
/** If we want the offered key, or if force is enabled, queue it */
@@ -867,8 +864,11 @@
if(logMINOR) Logger.minor(this, "Restoring key
but no keys queued?? for "+key);
}
if(reqs != null) {
- for(int i=0;i<reqs.length;i++)
+ for(int i=0;i<reqs.length;i++) {
+ container.activate(reqs[i], 1);
reqs[i].requeueAfterCooldown(key, now,
container, clientContext);
+ container.deactivate(reqs[i], 1);
+ }
}
if(transientReqs != null) {
for(int i=0;i<transientReqs.length;i++)
@@ -905,7 +905,9 @@
jobRunner.queue(new DBJob() {
public void run(ObjectContainer container,
ClientContext context) {
+ container.activate(get, 1);
get.onFailure(e, null, container,
clientContext);
+ container.deactivate(get, 1);
}
}, prio, false);
@@ -919,7 +921,9 @@
jobRunner.queue(new DBJob() {
public void run(ObjectContainer container,
ClientContext context) {
+ container.activate(insert, 1);
insert.onFailure(e, null, container,
context);
+ container.deactivate(insert, 1);
}
}, prio, false);
Modified: branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-08-27 16:06:55 UTC (rev 22193)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcher.java
2008-08-27 16:09:08 UTC (rev 22194)
@@ -120,6 +120,9 @@
this.fetchContext = newCtx;
this.archiveContext = actx;
this.decompressors = decompressors2;
+ if(decompressors.size() > 1) {
+ Logger.error(this, "Multiple decompressors:
"+decompressors.size()+" - this is almost certainly a bug", new
Exception("debug"));
+ }
this.clientMetadata = clientMetadata;
this.cb = rcb;
this.recursionLevel = recursionLevel + 1;
@@ -152,6 +155,8 @@
if(eventualLength > 0 && newCtx.maxOutputLength > 0 &&
eventualLength > newCtx.maxOutputLength)
throw new FetchException(FetchException.TOO_BIG,
eventualLength, true, clientMetadata.getMIMEType());
+ this.token = token2;
+
if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
// Don't need to do much - just fetch everything and
piece it together.
blocksPerSegment = -1;
@@ -193,6 +198,57 @@
", check blocks per segment:
"+checkBlocksPerSegment+", segments: "+segmentCount+
", data blocks:
"+splitfileDataBlocks.length+", check blocks: "+splitfileCheckBlocks.length);
segments = new SplitFileFetcherSegment[segmentCount]; //
initially null on all entries
+
+ // Setup bloom parameters.
+ if(persistent) {
+ // FIXME: Should this be encrypted? It's protected to
some degree by the salt...
+ // Since it isn't encrypted, it's likely to be very
sparse; we should name
+ // it appropriately...
+ try {
+ mainBloomFile =
context.persistentFG.makeRandomFile();
+ altBloomFile =
context.persistentFG.makeRandomFile();
+ } catch (IOException e) {
+ throw new
FetchException(FetchException.BUCKET_ERROR, "Unable to create Bloom filter
files", e);
+ }
+ } else {
+ // Not persistent, keep purely in RAM.
+ mainBloomFile = null;
+ altBloomFile = null;
+ }
+ int mainElementsPerKey = DEFAULT_MAIN_BLOOM_ELEMENTS_PER_KEY;
+ int origSize = splitfileDataBlocks.length +
splitfileCheckBlocks.length;
+ mainBloomK = (int) (mainElementsPerKey * 0.7);
+ long elementsLong = origSize * mainElementsPerKey;
+ // REDFLAG: SIZE LIMIT: 3.36TB limit!
+ if(elementsLong > Integer.MAX_VALUE)
+ throw new FetchException(FetchException.TOO_BIG,
"Cannot fetch splitfiles with more than
"+(Integer.MAX_VALUE/mainElementsPerKey)+" keys! (approx 3.3TB)");
+ int mainSizeBits = (int)elementsLong; // counting filter
+ if((mainSizeBits & 7) != 0)
+ mainSizeBits += (8 - (mainSizeBits & 7));
+ mainBloomFilterSizeBytes = mainSizeBits / 8 * 2; // counting
filter
+ double acceptableFalsePositives =
ACCEPTABLE_BLOOM_FALSE_POSITIVES_ALL_SEGMENTS / segments.length;
+ int perSegmentBitsPerKey = (int)
Math.ceil(Math.log(acceptableFalsePositives) / Math.log(0.6185));
+ int segBlocks = blocksPerSegment + checkBlocksPerSegment;
+ if(segBlocks > origSize)
+ segBlocks = origSize;
+ int perSegmentSize = perSegmentBitsPerKey * segBlocks;
+ if((perSegmentSize & 7) != 0)
+ perSegmentSize += (8 - (perSegmentSize & 7));
+ perSegmentBloomFilterSizeBytes = perSegmentSize / 8;
+ perSegmentK = BloomFilter.optimialK(perSegmentSize,
blocksPerSegment + checkBlocksPerSegment);
+ keyCount = origSize;
+ // Now create it.
+ if(Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this, "Creating block filter for "+this+":
keys="+(splitfileDataBlocks.length+splitfileCheckBlocks.length)+" main bloom
size "+mainBloomFilterSizeBytes+" bytes, K="+mainBloomK+",
filename="+mainBloomFile+" alt bloom filter: filename="+altBloomFile+"
segments: "+segments.length+" each is "+perSegmentBloomFilterSizeBytes+" bytes
k="+perSegmentK);
+ try {
+ tempListener = new SplitFileFetcherKeyListener(this,
keyCount, mainBloomFile, altBloomFile, mainBloomFilterSizeBytes, mainBloomK,
!fetchContext.cacheLocalRequests, localSalt, segments.length,
perSegmentBloomFilterSizeBytes, perSegmentK, persistent, true);
+ } catch (IOException e) {
+ throw new FetchException(FetchException.BUCKET_ERROR,
"Unable to write Bloom filters for splitfile");
+ }
+
+ if(persistent)
+ container.set(this);
+
if(segmentCount == 1) {
// splitfile* will be overwritten, this is bad
// so copy them
@@ -203,8 +259,13 @@
System.arraycopy(splitfileCheckBlocks, 0,
newSplitfileCheckBlocks, 0, splitfileCheckBlocks.length);
segments[0] = new
SplitFileFetcherSegment(splitfileType, newSplitfileDataBlocks,
newSplitfileCheckBlocks,
this, archiveContext, fetchContext,
maxTempLength, recursionLevel, parent, 0);
+ for(int i=0;i<newSplitfileDataBlocks.length;i++)
+
tempListener.addKey(newSplitfileDataBlocks[i].getNodeKey(), 0, context);
+ for(int i=0;i<newSplitfileCheckBlocks.length;i++)
+
tempListener.addKey(newSplitfileCheckBlocks[i].getNodeKey(), 0, context);
if(persistent) {
container.set(segments[0]);
+ segments[0].deactivateKeys(container);
}
} else {
int dataBlocksPtr = 0;
@@ -223,8 +284,13 @@
checkBlocksPtr += copyCheckBlocks;
segments[i] = new
SplitFileFetcherSegment(splitfileType, dataBlocks, checkBlocks, this,
archiveContext,
fetchContext, maxTempLength,
recursionLevel+1, parent, i);
+ for(int j=0;j<dataBlocks.length;j++)
+
tempListener.addKey(dataBlocks[j].getNodeKey(), i, context);
+ for(int j=0;j<checkBlocks.length;j++)
+
tempListener.addKey(checkBlocks[j].getNodeKey(), i, context);
if(persistent) {
container.set(segments[i]);
+ segments[i].deactivateKeys(container);
}
}
if(dataBlocksPtr != splitfileDataBlocks.length)
@@ -232,80 +298,15 @@
if(checkBlocksPtr != splitfileCheckBlocks.length)
throw new
FetchException(FetchException.INVALID_METADATA, "Unable to allocate all check
blocks to segments - buggy or malicious inserter");
}
- this.token = token2;
parent.addBlocks(splitfileDataBlocks.length +
splitfileCheckBlocks.length, container);
parent.addMustSucceedBlocks(splitfileDataBlocks.length,
container);
parent.notifyClients(container, context);
- // Setup bloom parameters.
- if(persistent) {
- // FIXME: Should this be encrypted? It's protected to
some degree by the salt...
- // Since it isn't encrypted, it's likely to be very
sparse; we should name
- // it appropriately...
- try {
- mainBloomFile = context.fg.makeRandomFile();
- altBloomFile = context.fg.makeRandomFile();
- } catch (IOException e) {
- throw new
FetchException(FetchException.BUCKET_ERROR, "Unable to create Bloom filter
files", e);
- }
- } else {
- // Not persistent, keep purely in RAM.
- mainBloomFile = null;
- altBloomFile = null;
- }
- int mainElementsPerKey = DEFAULT_MAIN_BLOOM_ELEMENTS_PER_KEY;
- int origSize = splitfileDataBlocks.length +
splitfileCheckBlocks.length;
- mainBloomK = (int) (mainElementsPerKey * 0.7);
- long elementsLong = origSize * mainElementsPerKey;
- // REDFLAG: SIZE LIMIT: 3.36TB limit!
- if(elementsLong > Integer.MAX_VALUE)
- throw new FetchException(FetchException.TOO_BIG,
"Cannot fetch splitfiles with more than
"+(Integer.MAX_VALUE/mainElementsPerKey)+" keys! (approx 3.3TB)");
- int mainSizeBits = (int)elementsLong; // counting filter
- if((mainSizeBits & 7) != 0)
- mainSizeBits += (8 - (mainSizeBits & 7));
- mainBloomFilterSizeBytes = mainSizeBits / 8 * 2; // counting
filter
- double acceptableFalsePositives =
ACCEPTABLE_BLOOM_FALSE_POSITIVES_ALL_SEGMENTS / segments.length;
- int perSegmentBitsPerKey = (int)
Math.ceil(Math.log(acceptableFalsePositives) / Math.log(0.6185));
- int segBlocks = blocksPerSegment + checkBlocksPerSegment;
- if(segBlocks < origSize)
- segBlocks = origSize;
- int perSegmentSize = perSegmentBitsPerKey * segBlocks;
- if((perSegmentSize & 7) != 0)
- perSegmentSize += (8 - (perSegmentSize & 7));
- perSegmentBloomFilterSizeBytes = perSegmentSize / 8;
- perSegmentK = BloomFilter.optimialK(perSegmentSize,
blocksPerSegment + checkBlocksPerSegment);
- keyCount = origSize;
- // Now create it.
- Logger.error(this, "Creating block filter for "+this+":
keys="+(splitfileDataBlocks.length+splitfileCheckBlocks.length)+" main bloom
size "+mainBloomFilterSizeBytes+" bytes, K="+mainBloomK+",
filename="+mainBloomFile+" alt bloom filter: segments: "+segments.length+" each
is "+perSegmentBloomFilterSizeBytes+" bytes k="+perSegmentK);
try {
- tempListener = new SplitFileFetcherKeyListener(this,
keyCount, mainBloomFile, altBloomFile, mainBloomFilterSizeBytes, mainBloomK,
!fetchContext.cacheLocalRequests, localSalt, segments.length,
perSegmentBloomFilterSizeBytes, perSegmentK, persistent, true);
-
- // Now add the keys
- int dataKeysIndex = 0;
- int checkKeysIndex = 0;
- int segNo = 0;
- while(dataKeysIndex < splitfileDataBlocks.length) {
- int dataKeysEnd = dataKeysIndex +
blocksPerSegment;
- int checkKeysEnd = checkKeysIndex +
checkBlocksPerSegment;
- dataKeysEnd = Math.min(dataKeysEnd,
splitfileDataBlocks.length);
- checkKeysEnd = Math.min(checkKeysEnd,
splitfileCheckBlocks.length);
- for(int j=dataKeysIndex;j<dataKeysEnd;j++)
-
tempListener.addKey(splitfileDataBlocks[j].getNodeKey(), segNo, context);
- for(int j=checkKeysIndex;j<checkKeysEnd;j++)
-
tempListener.addKey(splitfileCheckBlocks[j].getNodeKey(), segNo, context);
- segNo++;
- dataKeysIndex = dataKeysEnd;
- checkKeysIndex = checkKeysEnd;
- }
tempListener.writeFilters();
} catch (IOException e) {
throw new FetchException(FetchException.BUCKET_ERROR,
"Unable to write Bloom filters for splitfile");
}
- if(persistent) {
- for(int i=0;i<segments.length;i++) {
- segments[i].deactivateKeys(container);
- }
- }
}
/** Return the final status of the fetch. Throws an exception, or
returns a
@@ -430,14 +431,21 @@
container.activate(cb, 1);
container.activate(fetchContext, 1);
}
+ int count = 0;
while(!decompressors.isEmpty()) {
Compressor c = (Compressor)
decompressors.remove(decompressors.size()-1);
+ if(Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this, "Decompressing with
"+c);
long maxLen =
Math.max(fetchContext.maxTempLength, fetchContext.maxOutputLength);
try {
Bucket out = returnBucket;
if(!decompressors.isEmpty()) out = null;
data = c.decompress(data,
context.getBucketFactory(parent.persistent()), maxLen, maxLen * 4, out);
} catch (IOException e) {
+ if(e.getMessage().equals("Not in GZIP
format") && count == 1) {
+ Logger.error(this, "Attempting
to decompress twice, failed, returning first round data: "+this);
+ break;
+ }
cb.onFailure(new
FetchException(FetchException.BUCKET_ERROR, e), this, container, context);
return;
} catch (CompressionOutputSizeException e) {
@@ -446,6 +454,7 @@
cb.onFailure(new
FetchException(FetchException.TOO_BIG, e.estimatedSize, false /* FIXME */,
clientMetadata.getMIMEType()), this, container, context);
return;
}
+ count++;
}
cb.onSuccess(new FetchResult(clientMetadata, data),
this, container, context);
} catch (FetchException e) {
@@ -504,15 +513,31 @@
*/
public KeyListener makeKeyListener(ObjectContainer container,
ClientContext context) throws KeyListenerConstructionException {
synchronized(this) {
+ if(finished) return null;
if(tempListener != null) {
// Recently constructed
return tempListener;
}
+ File main;
+ File alt;
+ if(persistent) {
+ container.activate(mainBloomFile, 5);
+ container.activate(altBloomFile, 5);
+ main = new File(mainBloomFile.getPath());
+ alt = new File(altBloomFile.getPath());
+ container.deactivate(mainBloomFile, 1);
+ container.deactivate(altBloomFile, 1);
+ } else {
+ main = null;
+ alt = null;
+ }
try {
+ if(Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this, "Attempting to read
Bloom filter for "+this+" main file="+main+" alt file="+alt);
tempListener =
- new SplitFileFetcherKeyListener(this,
keyCount, mainBloomFile, altBloomFile, mainBloomFilterSizeBytes, mainBloomK,
!fetchContext.cacheLocalRequests, localSalt, segments.length,
perSegmentBloomFilterSizeBytes, perSegmentK, persistent, false);
+ new SplitFileFetcherKeyListener(this,
keyCount, main, alt, mainBloomFilterSizeBytes, mainBloomK,
!fetchContext.cacheLocalRequests, localSalt, segments.length,
perSegmentBloomFilterSizeBytes, perSegmentK, persistent, false);
} catch (IOException e) {
- Logger.error(this, "Unable to read Bloom filter
for "+this+" attempting to reconstruct...");
+ Logger.error(this, "Unable to read Bloom filter
for "+this+" attempting to reconstruct...", e);
mainBloomFile.delete();
altBloomFile.delete();
try {
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
2008-08-27 16:06:55 UTC (rev 22193)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherKeyListener.java
2008-08-27 16:09:08 UTC (rev 22194)
@@ -76,11 +76,12 @@
* @throws IOException
*/
public SplitFileFetcherKeyListener(SplitFileFetcher parent, int
keyCount, File bloomFile, File altBloomFile, int mainBloomSizeBytes, int
mainBloomK, boolean dontCache, byte[] localSalt, int segments, int
segmentFilterSizeBytes, int segmentBloomK, boolean persistent, boolean
newFilter) throws IOException {
+ System.err.println("Persistent = "+persistent);
fetcher = parent;
this.persistent = persistent;
this.keyCount = keyCount;
- this.mainBloomFile = persistent ? new File(bloomFile.getPath())
: null;
- this.altBloomFile = persistent ? new
File(altBloomFile.getPath()) : null;
+ this.mainBloomFile = bloomFile;
+ this.altBloomFile = altBloomFile;
this.dontCache = dontCache;
assert(localSalt.length == 32);
if(persistent) {
@@ -118,6 +119,8 @@
dis.readFully(segmentsFilterBuffer);
dis.close();
}
+ if(Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this, "Created "+this+" for "+fetcher);
}
public long countKeys() {
@@ -132,6 +135,8 @@
byte[] saltedKey = context.getChkFetchScheduler().saltKey(key);
filter.addKey(saltedKey);
segmentFilters[segNo].addKey(localSaltKey(key));
+ if(!segmentFilters[segNo].checkFilter(localSaltKey(key)))
+ Logger.error(this, "Key added but not in filter:
"+key+" on "+this);
}
private byte[] localSaltKey(Key key) {
@@ -152,7 +157,21 @@
// Caller has already called probablyWantKey(), so don't do it
again.
byte[] salted = localSaltKey(key);
for(int i=0;i<segmentFilters.length;i++) {
- if(segmentFilters[i].checkFilter(salted)) return prio;
+ if(segmentFilters[i].checkFilter(salted)) {
+ if(persistent)
+ container.activate(fetcher, 1);
+ SplitFileFetcherSegment segment =
fetcher.getSegment(i);
+ if(persistent)
+ container.deactivate(fetcher, 1);
+ if(persistent)
+ container.activate(segment, 1);
+ boolean found = segment.getBlockNumber(key,
container) >= 0;
+ if(!found)
+ Logger.error(this, "Found block in
primary and segment bloom filters but segment doesn't want it: "+segment+" on
"+this);
+ if(persistent)
+ container.deactivate(segment, 1);
+ if(found) return prio;
+ }
}
return -1;
}
@@ -162,28 +181,35 @@
// Caller has already called probablyWantKey(), so don't do it
again.
boolean found = false;
byte[] salted = localSaltKey(key);
+ boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ if(logMINOR)
+ Logger.minor(this, "handleBlock("+key+") on "+this);
for(int i=0;i<segmentFilters.length;i++) {
- if(segmentFilters[i].checkFilter(salted)) {
+ boolean match;
+ synchronized(this) {
+ match = segmentFilters[i].checkFilter(salted);
+ }
+ if(match) {
if(persistent)
container.activate(fetcher, 1);
SplitFileFetcherSegment segment =
fetcher.getSegment(i);
if(persistent)
container.activate(segment, 1);
+ if(logMINOR)
+ Logger.minor(this, "Key may be in
segment "+segment);
if(segment.onGotKey(key, block, container,
context)) {
keyCount--;
synchronized(this) {
filter.removeKey(saltedKey);
}
// Update the persistent keyCount.
- if(persistent)
- container.activate(fetcher, 1);
fetcher.setKeyCount(keyCount,
container);
- if(persistent)
- container.deactivate(fetcher,
1);
found = true;
}
if(persistent)
container.deactivate(segment, 1);
+ if(persistent)
+ container.deactivate(fetcher, 1);
}
}
return found;
@@ -211,10 +237,16 @@
if(persistent)
container.activate(fetcher, 1);
SplitFileFetcherSegment segment =
fetcher.getSegment(i);
+ if(persistent)
+ container.deactivate(fetcher, 1);
+ if(persistent)
+ container.activate(segment, 1);
int blockNum = segment.getBlockNumber(key,
container);
if(blockNum >= 0) {
ret.add(segment.getSubSegmentFor(blockNum, container));
}
+ if(persistent)
+ container.deactivate(segment, 1);
}
}
return ret.toArray(new SendableGet[ret.size()]);
@@ -248,6 +280,8 @@
int segNo = segment.segNum;
segmentFilters[segNo].unsetAll();
Key[] removeKeys = segment.listKeys(container);
+ if(Logger.shouldLog(Logger.MINOR, this))
+ Logger.minor(this, "Removing segment from bloom filter:
"+segment+" keys: "+removeKeys.length);
for(int i=0;i<removeKeys.length;i++) {
byte[] salted =
context.getChkFetchScheduler().saltKey(removeKeys[i]);
if(filter.checkFilter(salted)) {