Author: toad
Date: 2008-06-24 21:07:10 +0000 (Tue, 24 Jun 2008)
New Revision: 20663
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
Log:
More activation and updating work for fetches.
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-06-24 20:52:49 UTC (rev 20662)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-06-24 21:07:10 UTC (rev 20663)
@@ -157,6 +157,8 @@
}
public void onSuccess(Bucket data, int blockNo,
SplitFileFetcherSubSegment seg, ClientKeyBlock block, ObjectContainer
container, RequestScheduler sched) {
+ if(persistent)
+ container.activate(this, 1);
boolean decodeNow = false;
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Fetched block "+blockNo+" on
"+seg);
@@ -173,6 +175,8 @@
}
dataRetries[blockNo] = 0; // Prevent healing of
successfully fetched block.
dataKeys[blockNo] = null;
+ if(persistent)
+
container.activate(dataBuckets[blockNo], 1);
dataBuckets[blockNo].setData(data);
} else if(blockNo < checkKeys.length + dataKeys.length)
{
blockNo -= dataKeys.length;
@@ -183,6 +187,8 @@
}
checkRetries[blockNo] = 0; // Prevent healing
of successfully fetched block.
checkKeys[blockNo] = null;
+ if(persistent)
+
container.activate(checkBuckets[blockNo], 1);
checkBuckets[blockNo].setData(data);
} else
Logger.error(this, "Unrecognized block number:
"+blockNo, new Exception("error"));
@@ -199,6 +205,8 @@
}
dontNotify = !scheduled;
}
+ if(persistent)
+ container.set(this);
parentFetcher.parent.completedBlock(dontNotify, container,
sched.getContext());
seg.possiblyRemoveFromParent(container);
if(decodeNow) {
@@ -208,10 +216,14 @@
}
public void decode(ObjectContainer container, ClientContext context,
RequestScheduler sched) {
+ if(persistent)
+ container.activate(this, 1);
// Now decode
if(logMINOR) Logger.minor(this, "Decoding
"+SplitFileFetcherSegment.this);
codec = FECCodec.getCodec(splitfileType, dataKeys.length,
checkKeys.length, sched.getContext().mainExecutor);
+ if(persistent)
+ container.set(this);
if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
FECQueue queue = sched.getFECQueue();
@@ -222,6 +234,8 @@
}
public void onDecodedSegment(ObjectContainer container, ClientContext
context, FECJob job, Bucket[] dataBuckets2, Bucket[] checkBuckets2,
SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus) {
+ if(persistent)
+ container.activate(this, 1);
// Because we use SplitfileBlock, we DON'T have to copy here.
// See FECCallback comments for explanation.
try {
@@ -241,7 +255,9 @@
OutputStream os = decodedData.getOutputStream();
for(int i=0;i<dataBuckets.length;i++) {
SplitfileBlock status = dataBuckets[i];
+ if(persistent) container.activate(status, 1);
Bucket data = status.getData();
+ if(persistent) container.activate(data, 1);
BucketTools.copyTo(data, os, Long.MAX_VALUE);
}
if(logMINOR) Logger.minor(this, "Copied data");
@@ -251,12 +267,14 @@
finished = true;
if(codec == null || !isCollectingBinaryBlob())
parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container, context);
+ if(persistent) container.set(this);
} catch (IOException e) {
Logger.normal(this, "Caught bucket error?: "+e, e);
synchronized(this) {
finished = true;
failureException = new
FetchException(FetchException.BUCKET_ERROR);
}
+ if(persistent) container.set(this);
parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container, context);
return;
}
@@ -276,13 +294,19 @@
}
public void onEncodedSegment(ObjectContainer container, ClientContext
context, FECJob job, Bucket[] dataBuckets2, Bucket[] checkBuckets2,
SplitfileBlock[] dataBlockStatus, SplitfileBlock[] checkBlockStatus) {
+ if(persistent)
+ container.activate(this, 1);
// Because we use SplitfileBlock, we DON'T have to copy here.
// See FECCallback comments for explanation.
synchronized(this) {
// Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
for(int i=0;i<dataBuckets.length;i++) {
boolean heal = false;
+ if(persistent)
+ container.activate(dataBuckets[i], 1);
Bucket data = dataBuckets[i].getData();
+ if(persistent)
+ container.activate(data, 1);
if(dataRetries[i] > 0)
heal = true;
if(heal) {
@@ -296,7 +320,11 @@
}
for(int i=0;i<checkBuckets.length;i++) {
boolean heal = false;
+ if(persistent)
+ container.activate(checkBuckets[i], 1);
Bucket data = checkBuckets[i].getData();
+ if(persistent)
+ container.activate(data, 1);
try {
maybeAddToBinaryBlob(data, i, true,
container, context);
} catch (FetchException e) {
@@ -314,6 +342,8 @@
checkKeys[i] = null;
}
}
+ if(persistent)
+ container.set(this);
// Defer the completion until we have generated healing blocks
if we are collecting binary blobs.
if(isCollectingBinaryBlob())
parentFetcher.segmentFinished(SplitFileFetcherSegment.this, container, context);
@@ -352,6 +382,8 @@
/** This is after any retries and therefore is either out-of-retries or
fatal
* @param container */
public synchronized void onFatalFailure(FetchException e, int blockNo,
SplitFileFetcherSubSegment seg, ObjectContainer container, ClientContext
context) {
+ if(persistent)
+ container.activate(this, 1);
logMINOR = Logger.shouldLog(Logger.MINOR, this);
if(logMINOR) Logger.minor(this, "Permanently failed block:
"+blockNo+" on "+this+" : "+e, e);
boolean allFailed;
@@ -385,6 +417,8 @@
// Once it is no longer possible to have a successful
fetch, fail...
allFailed = failedBlocks + fatallyFailedBlocks >
(dataKeys.length + checkKeys.length - minFetched);
}
+ if(persistent)
+ container.set(this);
if(allFailed)
fail(new FetchException(FetchException.SPLITFILE_ERROR,
errors), container, context);
else
@@ -394,6 +428,8 @@
/** A request has failed non-fatally, so the block may be retried
* @param container */
public void onNonFatalFailure(FetchException e, int blockNo,
SplitFileFetcherSubSegment seg, RequestScheduler sched, ObjectContainer
container) {
+ if(persistent)
+ container.activate(this, 1);
ClientContext context = sched.getContext();
int tries;
int maxTries = blockFetchContext.maxNonSplitfileRetries;
@@ -405,6 +441,8 @@
if(isFinished()) return;
if(blockNo < dataKeys.length) {
key = dataKeys[blockNo];
+ if(persistent)
+ container.activate(key, 5);
tries = ++dataRetries[blockNo];
if(tries > maxTries && maxTries >= 0) failed =
true;
else {
@@ -421,6 +459,8 @@
} else {
int checkNo = blockNo - dataKeys.length;
key = checkKeys[checkNo];
+ if(persistent)
+ container.activate(key, 5);
tries = ++checkRetries[checkNo];
if(tries > maxTries && maxTries >= 0) failed =
true;
else {
@@ -436,6 +476,8 @@
}
}
}
+ if(persistent)
+ container.set(this);
if(failed) {
onFatalFailure(e, blockNo, seg, container, context);
if(logMINOR)
@@ -496,12 +538,21 @@
}
}
removeSubSegments(container);
+ if(persistent)
+ container.set(this);
parentFetcher.segmentFinished(this, container, context);
}
public void schedule(ObjectContainer container, ClientContext context) {
+ if(persistent) {
+ container.activate(this, 1);
+ container.activate(parentFetcher, 1);
+ container.activate(parentFetcher.parent, 1);
+ }
try {
SplitFileFetcherSubSegment seg = getSubSegment(0);
+ if(persistent)
+ container.activate(seg, 1);
for(int
i=0;i<dataRetries.length+checkRetries.length;i++)
seg.add(i, true, container, context);
@@ -509,6 +560,8 @@
synchronized(this) {
scheduled = true;
}
+ if(persistent)
+ container.set(this);
parentFetcher.parent.notifyClients(container, context);
if(logMINOR)
Logger.minor(this, "scheduling "+seg+" :
"+seg.blockNums);
@@ -585,6 +638,8 @@
deadSegs = (SplitFileFetcherSubSegment[])
subSegments.toArray(new SplitFileFetcherSubSegment[subSegments.size()]);
subSegments.clear();
}
+ if(persistent && deadSegs.length > 0)
+ container.set(this);
for(int i=0;i<deadSegs.length;i++) {
deadSegs[i].kill(container);
}
@@ -598,6 +653,8 @@
}
public void requeueAfterCooldown(Key key, long time, ObjectContainer
container, ClientContext context) {
+ if(persistent)
+ container.activate(this, 1);
Vector v = null;
boolean notFound = true;
synchronized(this) {
@@ -707,6 +764,8 @@
}
finished = true;
}
+ if(persistent)
+ container.activate(this, 1);
this.fail(new FetchException(FetchException.INTERNAL_ERROR,
"FEC failure: "+t, t), container, context);
}
}