Author: toad
Date: 2006-08-04 21:14:16 +0000 (Fri, 04 Aug 2006)
New Revision: 9889
Added:
trunk/freenet/src/freenet/client/async/HealingQueue.java
trunk/freenet/src/freenet/client/async/SimpleHealingQueue.java
Modified:
trunk/freenet/src/freenet/client/FetcherContext.java
trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
trunk/freenet/src/freenet/client/async/USKInserter.java
trunk/freenet/src/freenet/node/Node.java
trunk/freenet/src/freenet/node/Version.java
Log:
925: Implement transparent splitfile healing. Backlog can grow up to 16MB, and
runs at prefetch priority.
Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java 2006-08-04
18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/FetcherContext.java 2006-08-04
21:14:16 UTC (rev 9889)
@@ -1,5 +1,6 @@
package freenet.client;
+import freenet.client.async.HealingQueue;
import freenet.client.async.USKManager;
import freenet.client.events.ClientEventProducer;
import freenet.client.events.SimpleEventProducer;
@@ -43,6 +44,7 @@
/** If true, and we get a ZIP manifest, and we have no meta-strings
left, then
* return the manifest contents as data. */
public boolean returnZIPManifests;
+ public final HealingQueue healingQueue;
public FetcherContext(long curMaxLength,
long curMaxTempLength, int maxMetadataSize, int
maxRecursionLevel, int maxArchiveRestarts, int maxArchiveLevels,
@@ -51,7 +53,7 @@
boolean allowSplitfiles, boolean followRedirects,
boolean localRequestOnly,
int maxDataBlocksPerSegment, int
maxCheckBlocksPerSegment,
RandomSource random, ArchiveManager archiveManager,
BucketFactory bucketFactory,
- ClientEventProducer producer, boolean
cacheLocalRequests, USKManager uskManager) {
+ ClientEventProducer producer, boolean
cacheLocalRequests, USKManager uskManager, HealingQueue hq) {
this.maxOutputLength = curMaxLength;
this.uskManager = uskManager;
this.maxTempLength = curMaxTempLength;
@@ -74,9 +76,11 @@
this.maxDataBlocksPerSegment = maxDataBlocksPerSegment;
this.maxCheckBlocksPerSegment = maxCheckBlocksPerSegment;
this.cacheLocalRequests = cacheLocalRequests;
+ this.healingQueue = hq;
}
public FetcherContext(FetcherContext ctx, int maskID, boolean
keepProducer) {
+ this.healingQueue = ctx.healingQueue;
if(keepProducer)
this.eventProducer = ctx.eventProducer;
else
Modified: trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
===================================================================
--- trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2006-08-04 18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/HighLevelSimpleClientImpl.java
2006-08-04 21:14:16 UTC (rev 9889)
@@ -5,6 +5,7 @@
import freenet.client.async.ClientGetter;
import freenet.client.async.ClientPutter;
+import freenet.client.async.HealingQueue;
import freenet.client.async.SimpleManifestPutter;
import freenet.client.events.ClientEventListener;
import freenet.client.events.ClientEventProducer;
@@ -34,6 +35,7 @@
private long curMaxTempLength;
private int curMaxMetadataLength;
private final RandomSource random;
+ private final HealingQueue healingQueue;
/** See comments in Node */
private final boolean cacheLocalRequests;
static final int MAX_RECURSION = 10;
@@ -83,6 +85,7 @@
curMaxMetadataLength = 1024 * 1024;
this.cacheLocalRequests = cacheLocalRequests;
this.persistentBucketFactory =
node.persistentEncryptedTempBucketFactory;
+ this.healingQueue = node.getHealingQueue();
}
public void setMaxLength(long maxLength) {
@@ -174,7 +177,7 @@
FETCH_SPLITFILES, FOLLOW_REDIRECTS,
LOCAL_REQUESTS_ONLY,
MAX_SPLITFILE_BLOCKS_PER_SEGMENT,
MAX_SPLITFILE_CHECK_BLOCKS_PER_SEGMENT,
random, archiveManager, bucketFactory,
globalEventProducer,
- cacheLocalRequests, node.uskManager);
+ cacheLocalRequests, node.uskManager,
healingQueue);
}
public InserterContext getInserterContext(boolean forceNonPersistent) {
Added: trunk/freenet/src/freenet/client/async/HealingQueue.java
===================================================================
--- trunk/freenet/src/freenet/client/async/HealingQueue.java 2006-08-04
18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/HealingQueue.java 2006-08-04
21:14:16 UTC (rev 9889)
@@ -0,0 +1,10 @@
+package freenet.client.async;
+
+import freenet.support.io.Bucket;
+
+public interface HealingQueue {
+
+ /** Queue a Bucket of data to insert as a CHK. */
+ void queue(Bucket data);
+
+}
Added: trunk/freenet/src/freenet/client/async/SimpleHealingQueue.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SimpleHealingQueue.java
2006-08-04 18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/SimpleHealingQueue.java
2006-08-04 21:14:16 UTC (rev 9889)
@@ -0,0 +1,113 @@
+package freenet.client.async;
+
+import java.util.HashMap;
+
+import freenet.client.InserterContext;
+import freenet.client.InserterException;
+import freenet.client.Metadata;
+import freenet.keys.BaseClientKey;
+import freenet.keys.CHKBlock;
+import freenet.keys.FreenetURI;
+import freenet.support.Logger;
+import freenet.support.io.Bucket;
+
+public class SimpleHealingQueue extends BaseClientPutter implements
HealingQueue, PutCompletionCallback {
+
+ final int maxRunning;
+ int counter;
+ InserterContext ctx;
+ final HashMap runningInserters;
+
+ public SimpleHealingQueue(ClientRequestScheduler scheduler,
InserterContext context, short prio, int maxRunning) {
+ super(prio, scheduler, null, context);
+ this.ctx = context;
+ this.runningInserters = new HashMap();
+ this.maxRunning = maxRunning;
+ }
+
+ public boolean innerQueue(Bucket data) {
+ SingleBlockInserter sbi;
+ int ctr;
+ synchronized(this) {
+ ctr = counter++;
+ if(runningInserters.size() > maxRunning) return false;
+ try {
+ sbi = new SingleBlockInserter(this, data,
(short)-1,
+
FreenetURI.EMPTY_CHK_URI, ctx, this, false,
+ CHKBlock.DATA_LENGTH,
ctr, false, false, false, data);
+ } catch (Throwable e) {
+ Logger.error(this, "Caught trying to insert
healing block: "+e, e);
+ return false;
+ }
+ runningInserters.put(data, sbi);
+ }
+ try {
+ sbi.schedule();
+ Logger.minor(this, "Started healing insert "+ctr+" for
"+data);
+ return true;
+ } catch (Throwable e) {
+ Logger.error(this, "Caught trying to insert healing
block: "+e, e);
+ return false;
+ }
+ }
+
+ public void queue(Bucket data) {
+ if(!innerQueue(data))
+ data.free();
+ }
+
+ public void onMajorProgress() {
+ // Ignore
+ }
+
+ public FreenetURI getURI() {
+ return FreenetURI.EMPTY_CHK_URI;
+ }
+
+ public boolean isFinished() {
+ return false;
+ }
+
+ public void notifyClients() {
+ // Do nothing
+ }
+
+ public void onSuccess(ClientPutState state) {
+ SingleBlockInserter sbi = (SingleBlockInserter)state;
+ Bucket data = (Bucket) sbi.getToken();
+ synchronized(this) {
+ runningInserters.remove(data);
+ }
+ Logger.minor(this, "Successfully inserted healing block:
"+sbi.getURI()+" for "+data+" ("+sbi.token+")");
+ data.free();
+ }
+
+ public void onFailure(InserterException e, ClientPutState state) {
+ SingleBlockInserter sbi = (SingleBlockInserter)state;
+ Bucket data = (Bucket) sbi.getToken();
+ synchronized(this) {
+ runningInserters.remove(data);
+ }
+ Logger.minor(this, "Failed to insert healing block:
"+sbi.getURI()+" : "+e+" for "+data+" ("+sbi.token+")", e);
+ data.free();
+ }
+
+ public void onEncode(BaseClientKey usk, ClientPutState state) {
+ // Ignore
+ }
+
+ public void onTransition(ClientPutState oldState, ClientPutState
newState) {
+ // Should never happen
+ Logger.error(this, "impossible: onTransition on
SimpleHealingQueue from "+oldState+" to "+newState, new Exception("debug"));
+ }
+
+ public void onMetadata(Metadata m, ClientPutState state) {
+ // Should never happen
+ Logger.error(this, "Got metadata on SimpleHealingQueue from
"+state+": "+m, new Exception("debug"));
+ }
+
+ public void onBlockSetFinished(ClientPutState state) {
+ // Ignore
+ }
+
+}
Modified: trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
2006-08-04 18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/SingleBlockInserter.java
2006-08-04 21:14:16 UTC (rev 9889)
@@ -44,7 +44,7 @@
final int sourceLength;
private int consecutiveRNFs;
- public SingleBlockInserter(BaseClientPutter parent, Bucket data, short
compressionCodec, FreenetURI uri, InserterContext ctx, PutCompletionCallback
cb, boolean isMetadata, int sourceLength, int token, boolean getCHKOnly,
boolean addToParent, boolean dontSendEncoded, Object tokenObject) throws
InserterException {
+ public SingleBlockInserter(BaseClientPutter parent, Bucket data, short
compressionCodec, FreenetURI uri, InserterContext ctx, PutCompletionCallback
cb, boolean isMetadata, int sourceLength, int token, boolean getCHKOnly,
boolean addToParent, boolean dontSendEncoded, Object tokenObject) {
this.consecutiveRNFs = 0;
this.tokenObject = tokenObject;
this.token = token;
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2006-08-04 18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2006-08-04 21:14:16 UTC (rev 9889)
@@ -186,6 +186,13 @@
// Now decode
Logger.minor(this, "Decoding "+this);
+ boolean[] dataBlocksSucceeded = new
boolean[dataBuckets.length];
+ boolean[] checkBlocksSucceeded = new
boolean[checkBuckets.length];
+ for(int i=0;i<dataBuckets.length;i++)
+ dataBlocksSucceeded[i] = dataBuckets[i].data !=
null;
+ for(int i=0;i<checkBuckets.length;i++)
+ checkBlocksSucceeded[i] = checkBuckets[i].data
!= null;
+
FECCodec codec = FECCodec.getCodec(splitfileType,
dataBlocks.length, checkBlocks.length);
try {
if(splitfileType !=
Metadata.SPLITFILE_NONREDUNDANT) {
@@ -218,51 +225,49 @@
// Now heal
-// // Encode any check blocks we don't have
-// if(codec != null) {
-// try {
-// codec.encode(dataBuckets, checkBuckets,
32768, fetcherContext.bucketFactory);
-// } catch (IOException e) {
-// Logger.error(this, "Bucket error while
healing: "+e, e);
-// }
-// }
-//
-// // Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
-// for(int i=0;i<dataBlockStatus.length;i++) {
-// if(dataBuckets[i].getData() != null) continue;
-// SingleFileFetcher fetcher = dataBlockStatus[i];
-// if(fetcher.getRetryCount() == 0) {
-// // 80% chance of not inserting, if we
never tried it
-// if(fetcherContext.random.nextInt(5) ==
0) continue;
-// }
-// queueHeal(dataBuckets[i].getData());
-// }
-// for(int i=0;i<checkBlockStatus.length;i++) {
-// if(checkBuckets[i].getData() != null) continue;
-// SingleFileFetcher fetcher = checkBlockStatus[i];
-// if(fetcher.getRetryCount() == 0) {
-// // 80% chance of not inserting, if we
never tried it
-// if(fetcherContext.random.nextInt(5) ==
0) continue;
-// }
-// queueHeal(checkBuckets[i].getData());
-// }
+ /** Splitfile healing:
+ * Any block which we have tried and failed to download
should be
+ * reconstructed and reinserted.
+ */
- for(int i=0;i<dataBlocks.length;i++) {
- MinimalSplitfileBlock b = dataBuckets[i];
- if(b != null) {
- Bucket d = b.getData();
- if(d != null) d.free();
+ // Encode any check blocks we don't have
+ if(codec != null) {
+ try {
+ codec.encode(dataBuckets, checkBuckets,
32768, fetcherContext.bucketFactory);
+ } catch (IOException e) {
+ Logger.error(this, "Bucket error while
healing: "+e, e);
}
+ }
+
+ // Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ boolean heal = false;
+ if(!dataBlocksSucceeded[i]) {
+ SingleFileFetcher fetcher =
dataBlockStatus[i];
+ if(fetcher.getRetryCount() > 0)
+ heal = true;
+ }
+ if(heal) {
+ queueHeal(dataBuckets[i].getData());
+ } else {
+ dataBuckets[i].data.free();
+ }
dataBuckets[i] = null;
dataBlockStatus[i] = null;
dataBlocks[i] = null;
}
- for(int i=0;i<checkBlocks.length;i++) {
- MinimalSplitfileBlock b = checkBuckets[i];
- if(b != null) {
- Bucket d = b.getData();
- if(d != null) d.free();
+ for(int i=0;i<checkBlockStatus.length;i++) {
+ boolean heal = false;
+ if(!checkBlocksSucceeded[i]) {
+ SingleFileFetcher fetcher =
checkBlockStatus[i];
+ if(fetcher.getRetryCount() > 0)
+ heal = true;
}
+ if(heal) {
+ queueHeal(checkBuckets[i].getData());
+ } else {
+ checkBuckets[i].data.free();
+ }
checkBuckets[i] = null;
checkBlockStatus[i] = null;
checkBlocks[i] = null;
@@ -272,8 +277,8 @@
}
private void queueHeal(Bucket data) {
- // TODO Auto-generated method stub
-
+ Logger.minor(this, "Queueing healing insert");
+ fetcherContext.healingQueue.queue(data);
}
/** This is after any retries and therefore is either out-of-retries or
fatal */
Modified: trunk/freenet/src/freenet/client/async/USKInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/async/USKInserter.java 2006-08-04
18:05:05 UTC (rev 9888)
+++ trunk/freenet/src/freenet/client/async/USKInserter.java 2006-08-04
21:14:16 UTC (rev 9889)
@@ -104,14 +104,9 @@
if(finished) return;
edition = edNo;
Logger.minor(this, "scheduling insert for "+pubUSK.getURI()+"
"+edition);
+ sbi = new SingleBlockInserter(parent, data, compressionCodec,
privUSK.getInsertableSSK(edition).getInsertURI(),
+ ctx, this, isMetadata, sourceLength, token,
getCHKOnly, false, true /* we don't use it */, tokenObject);
try {
- sbi = new SingleBlockInserter(parent, data,
compressionCodec, privUSK.getInsertableSSK(edition).getInsertURI(),
- ctx, this, isMetadata, sourceLength,
token, getCHKOnly, false, true /* we don't use it */, tokenObject);
- } catch (InserterException e) {
- cb.onFailure(e, this);
- return;
- }
- try {
sbi.schedule();
} catch (InserterException e) {
cb.onFailure(e, this);
Modified: trunk/freenet/src/freenet/node/Node.java
===================================================================
--- trunk/freenet/src/freenet/node/Node.java 2006-08-04 18:05:05 UTC (rev
9888)
+++ trunk/freenet/src/freenet/node/Node.java 2006-08-04 21:14:16 UTC (rev
9889)
@@ -39,13 +39,17 @@
import freenet.client.FetcherContext;
import freenet.client.HighLevelSimpleClient;
import freenet.client.HighLevelSimpleClientImpl;
+import freenet.client.InserterContext;
import freenet.client.InserterException;
import freenet.client.async.BaseClientPutter;
import freenet.client.async.ClientCallback;
import freenet.client.async.ClientGetter;
import freenet.client.async.ClientPutter;
import freenet.client.async.ClientRequestScheduler;
+import freenet.client.async.HealingQueue;
+import freenet.client.async.SimpleHealingQueue;
import freenet.client.async.USKManager;
+import freenet.client.events.SimpleEventProducer;
import freenet.clients.http.BookmarkManager;
import freenet.clients.http.FProxyToadlet;
import freenet.clients.http.SimpleToadletServer;
@@ -665,6 +669,7 @@
final RequestStarter sskInsertStarter;
public final UserAlertManager alerts;
final TimeDecayingRunningAverage throttledPacketSendAverage;
+ private final HealingQueue healingQueue;
/** Must be included as a hidden field in order for any dangerous HTTP
operation to complete successfully. */
public final String formPassword;
final TimeDecayingRunningAverage remoteChkFetchBytesSentAverage;
@@ -1626,6 +1631,11 @@
System.out.println("Initializing Plugin Manager");
pluginManager = new PluginManager(this);
+ healingQueue = new SimpleHealingQueue(chkPutScheduler,
+ new InserterContext(tempBucketFactory,
tempBucketFactory, persistentTempBucketFactory,
+ random, 0, 2, 1, 0, 0, new
SimpleEventProducer(),
+ false, uskManager),
RequestStarter.PREFETCH_PRIORITY_CLASS, 512 /* FIXME make configurable */);
+
FetcherContext ctx = makeClient((short)0).getFetcherContext();
ctx.allowSplitfiles = false;
@@ -3631,4 +3641,8 @@
redetectAddress();
shouldInsertARK();
}
+
+ public HealingQueue getHealingQueue() {
+ return healingQueue;
+ }
}
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2006-08-04 18:05:05 UTC (rev
9888)
+++ trunk/freenet/src/freenet/node/Version.java 2006-08-04 21:14:16 UTC (rev
9889)
@@ -18,7 +18,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- private static final int buildNumber = 924;
+ private static final int buildNumber = 925;
/** Oldest build of Fred we will talk to */
private static final int oldLastGoodBuild = 874;