Author: toad
Date: 2005-11-05 18:28:07 +0000 (Sat, 05 Nov 2005)
New Revision: 7473
Added:
trunk/freenet/src/freenet/client/FECCodec.java
trunk/freenet/src/freenet/client/SplitfileBlock.java
trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
Modified:
trunk/freenet/src/freenet/client/FetchException.java
trunk/freenet/src/freenet/client/Segment.java
trunk/freenet/src/freenet/client/SplitFetcher.java
Log:
Lots of work on splitfiles.
Added: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java 2005-11-05 18:14:59 UTC
(rev 7472)
+++ trunk/freenet/src/freenet/client/FECCodec.java 2005-11-05 18:28:07 UTC
(rev 7473)
@@ -0,0 +1,28 @@
+package freenet.client;
+
+import freenet.client.Segment.BlockStatus;
+
+/**
+ * FEC (forward error correction) handler.
+ * I didn't keep the old freenet.client.FEC* etc as it seemed grossly
overengineered with
+ * a lot of code there only because of API confusion.
+ * @author root
+ *
+ */
+abstract class FECCodec {
+
+ public static FECCodec getCodec(short splitfileType, int dataBlocks,
int checkBlocks) {
+ if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD)
+ return StandardOnionFECCodec.getInstance(dataBlocks,
checkBlocks);
+ else return null;
+ }
+
+ /**
+ * Decode all missing blocks.
+ * @param dataBlockStatus The data blocks.
+ * @param checkBlockStatus The check blocks.
+ * @param packetLength The packet length in bytes.
+ */
+ public abstract void decode(BlockStatus[] dataBlockStatus,
BlockStatus[] checkBlockStatus, int packetLength);
+
+}
Modified: trunk/freenet/src/freenet/client/FetchException.java
===================================================================
--- trunk/freenet/src/freenet/client/FetchException.java 2005-11-05
18:14:59 UTC (rev 7472)
+++ trunk/freenet/src/freenet/client/FetchException.java 2005-11-05
18:28:07 UTC (rev 7473)
@@ -41,7 +41,7 @@
/** Don't know what to do with splitfile */
public static final int UNKNOWN_SPLITFILE_METADATA = 2;
/** Too many ordinary redirects */
- public static final int TOO_MANY_REDIRECTS = 3;
+ public static final int TOO_MANY_REDIRECTS = 16;
/** Don't know what to do with metadata */
public static final int UNKNOWN_METADATA = 3;
/** Got a MetadataParseException */
@@ -62,4 +62,10 @@
public static final int HAS_MORE_METASTRINGS = 11;
/** Internal error, probably failed to read from a bucket */
public static final int BUCKET_ERROR = 12;
+ /** Data not found */
+ public static final int DATA_NOT_FOUND = 13;
+ /** Route not found */
+ public static final int ROUTE_NOT_FOUND = 14;
+ /** Downstream overload */
+ public static final int REJECTED_OVERLOAD = 15;
}
Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java 2005-11-05 18:14:59 UTC
(rev 7472)
+++ trunk/freenet/src/freenet/client/Segment.java 2005-11-05 18:28:07 UTC
(rev 7473)
@@ -19,17 +19,25 @@
*/
public class Segment implements Runnable {
- public class BlockStatus implements Runnable {
+ public class BlockStatus implements Runnable, SplitfileBlock {
+ /** Splitfile index - [0,k[ is the data blocks, [k,n[ is the
check blocks */
+ final int index;
final FreenetURI uri;
int completedTries;
+ Bucket fetchedData;
- public BlockStatus(FreenetURI freenetURI) {
+ public BlockStatus(FreenetURI freenetURI, int index) {
uri = freenetURI;
completedTries = 0;
+ fetchedData = null;
+ this.index = index;
}
public void startFetch() {
+ if(fetchedData != null) {
+ throw new IllegalStateException("Already have
data");
+ }
synchronized(runningFetches) {
runningFetches.add(this);
try {
@@ -50,22 +58,51 @@
// Do the fetch
Fetcher f = new Fetcher(uri, blockFetchContext);
try {
- f.realRun(new ClientMetadata(),
recursionLevel, uri,
+ FetchResult fr = f.realRun(new
ClientMetadata(), recursionLevel, uri,
(!nonFullBlocksAllowed)
|| fetcherContext.dontEnterImplicitArchives);
+ fetchedData = fr.data;
} catch (MetadataParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ fatalError(e);
} catch (FetchException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ int code = e.getMode();
+ switch(code) {
+ case FetchException.ARCHIVE_FAILURE:
+ case FetchException.BLOCK_DECODE_ERROR:
+ case
FetchException.HAS_MORE_METASTRINGS:
+ case FetchException.INVALID_METADATA:
+ case FetchException.NOT_IN_ARCHIVE:
+ case
FetchException.TOO_DEEP_ARCHIVE_RECURSION:
+ case
FetchException.TOO_MANY_ARCHIVE_RESTARTS:
+ case
FetchException.TOO_MANY_METADATA_LEVELS:
+ case FetchException.TOO_MANY_REDIRECTS:
+ case FetchException.TOO_MUCH_RECURSION:
+ case FetchException.UNKNOWN_METADATA:
+ case
FetchException.UNKNOWN_SPLITFILE_METADATA:
+ // Fatal, probably an error on
insert
+ fatalError(e);
+ return;
+
+ case FetchException.DATA_NOT_FOUND:
+ case FetchException.ROUTE_NOT_FOUND:
+ case FetchException.REJECTED_OVERLOAD:
+ // Non-fatal
+ nonfatalError(e);
+
+ case FetchException.BUCKET_ERROR:
+ // Maybe fatal
+ nonfatalError(e);
+ }
} catch (ArchiveFailureException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ fatalError(e);
} catch (ArchiveRestartException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- };
+ fatalError(e);
+ }
} finally {
+ completedTries++;
+ // Add before removing from runningFetches, to
avoid race
+ synchronized(recentlyCompletedFetches) {
+ recentlyCompletedFetches.add(this);
+ }
synchronized(runningFetches) {
runningFetches.remove(this);
}
@@ -75,6 +112,18 @@
}
}
+ private void fatalError(Exception e) {
+ Logger.normal(this, "Giving up on block: "+this+": "+e);
+ completedTries = -1;
+ }
+
+ private void nonfatalError(Exception e) {
+ Logger.minor(this, "Non-fatal error on "+this+": "+e);
+ }
+
+ public boolean succeeded() {
+ return fetchedData != null;
+ }
}
final short splitfileType;
@@ -113,6 +162,8 @@
private final FetcherContext blockFetchContext;
/** Recursion level */
private final int recursionLevel;
+ /** Number of blocks which got fatal errors */
+ private int fatalErrorCount;
/**
* Create a Segment.
@@ -235,7 +286,11 @@
// Now wait for any thread to complete
synchronized(this) {
- wait(10*1000);
+ try {
+ wait(10*1000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
}
while(true) {
@@ -244,10 +299,12 @@
block = (BlockStatus)
recentlyCompletedFetches.removeFirst();
}
if(block == null) break;
- if(block.failed()) {
+ if(!block.succeeded()) {
// Retry
int retryLevel = block.completedTries;
- if(retryLevel == maxRetryLevel) {
+ if(retryLevel == maxRetryLevel ||
retryLevel == -1) {
+ if(retryLevel == -1)
+ fatalErrorCount++;
// This block failed
} else {
Vector levelSet = (Vector)
blocksNotTried.get(retryLevel);
@@ -262,7 +319,7 @@
// Can't start a fetch
if(runningFetches() == 0) {
// Failed
-
parentFetcher.failedNotEnoughBlocks();
+
parentFetcher.failedNotEnoughBlocks(this);
return;
}
}
@@ -275,15 +332,29 @@
}
parentFetcher.gotBlocks(this);
-
+
// Now decode
- if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT) {
- // TODO put the data together
- } else {
- // TODO decode via onion
+
+ try {
+ if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
+ FECCodec codec =
FECCodec.getCodec(splitfileType, dataBlocks.length, checkBlocks.length);
+ codec.decode(dataBlockStatus, checkBlockStatus);
+ }
+
+ Bucket output =
fetcherContext.bucketFactory.makeBucket(-1);
+ OutputStream os = output.getOutputStream();
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ BlockStatus status = dataBlockStatus[i];
+ Bucket data = status.fetchedData;
+ BucketTools.copyTo(data, os, Long.MAX_VALUE);
+ fetcherContext.bucketFactory.freeBucket(data);
+ }
+ os.close();
+ } catch (IOException e) {
+ parentFetcher.internalBucketError(this, e);
}
- parentFetcher.decoded(this);
+ parentFetcher.decoded(this, output);
// TODO create healing blocks
}
@@ -304,11 +375,16 @@
private boolean startFetch() {
if(minRetryLevel == maxRetryLevel) return false; // nothing to
start
// Don't need to synchronize as these are only accessed by main
thread
- Vector v = (Vector) blocksNotTried.get(minRetryLevel);
- int len = v.size();
- int idx = fetcherContext.random.nextInt(len);
- BlockStatus b = (BlockStatus) v.remove(idx);
- if(v.isEmpty()) minRetryLevel++;
- b.startFetch();
+ while(true) {
+ if(minRetryLevel >= blocksNotTried.size())
+ return false;
+ Vector v = (Vector) blocksNotTried.get(minRetryLevel);
+ int len = v.size();
+ int idx = fetcherContext.random.nextInt(len);
+ BlockStatus b = (BlockStatus) v.remove(idx);
+ if(v.isEmpty()) minRetryLevel++;
+ b.startFetch();
+ return true;
+ }
}
}
Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java 2005-11-05 18:14:59 UTC
(rev 7472)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java 2005-11-05 18:28:07 UTC
(rev 7473)
@@ -202,4 +202,19 @@
return output;
}
+ public void failedNotEnoughBlocks(Segment segment) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void gotBlocks(Segment segment) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void decoded(Segment segment) {
+ // TODO Auto-generated method stub
+
+ }
+
}
Added: trunk/freenet/src/freenet/client/SplitfileBlock.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitfileBlock.java 2005-11-05
18:14:59 UTC (rev 7472)
+++ trunk/freenet/src/freenet/client/SplitfileBlock.java 2005-11-05
18:28:07 UTC (rev 7473)
@@ -0,0 +1,20 @@
+package freenet.client;
+
+import freenet.support.Bucket;
+
+/** Simple interface for a splitfile block */
+public interface SplitfileBlock {
+
+ /** Get block number. [0,k[ = data blocks, [k, n[ = check blocks */
+ int getNumber();
+
+ /** Has data? */
+ boolean hasData();
+
+ /** Get data */
+ Bucket getData();
+
+ /** Set data */
+ void setData(Bucket data);
+
+}
Added: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-05
18:14:59 UTC (rev 7472)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-05
18:28:07 UTC (rev 7473)
@@ -0,0 +1,144 @@
+package freenet.client;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import com.onionnetworks.fec.DefaultFECCodeFactory;
+import com.onionnetworks.fec.FECCode;
+
+import freenet.client.Segment.BlockStatus;
+import freenet.support.Bucket;
+import freenet.support.BucketFactory;
+import freenet.support.Fields;
+import freenet.support.LRUHashBag;
+import freenet.support.LRUHashtable;
+
+/**
+ * FECCodec implementation using the onion code.
+ */
+public class StandardOnionFECCodec extends FECCodec {
+
+ // REDFLAG: How big is one of these?
+ private static int MAX_CACHED_CODECS = 16;
+ // REDFLAG: Optimal stripe size? Smaller => less memory usage, but more
JNI overhead
+ private static int STRIPE_SIZE = 4096;
+
+ private static class MyKey {
+ /** Number of input blocks */
+ int k;
+ /** Number of output blocks, including input blocks */
+ int n;
+
+ public MyKey(int n, int k) {
+ this.n = n;
+ this.k = k;
+ }
+
+ public boolean equals(Object o) {
+ if(o instanceof MyKey) {
+ MyKey key = (MyKey)o;
+ return key.n == n && key.k == k;
+ } else return false;
+ }
+
+ public int hashCode() {
+ return (n << 16) + k;
+ }
+ }
+
+ private static LRUHashtable recentlyUsedCodecs;
+
+ public synchronized static FECCodec getInstance(int dataBlocks, int
checkBlocks) {
+ MyKey key = new MyKey(dataBlocks, checkBlocks + dataBlocks);
+ StandardOnionFECCodec codec = (StandardOnionFECCodec)
recentlyUsedCodecs.get(key);
+ if(codec != null) {
+ recentlyUsedCodecs.push(key, codec);
+ return codec;
+ }
+ codec = new StandardOnionFECCodec(dataBlocks, checkBlocks +
dataBlocks);
+ recentlyUsedCodecs.push(key, codec);
+ while(recentlyUsedCodecs.size() > MAX_CACHED_CODECS) {
+ recentlyUsedCodecs.popKey();
+ }
+ return codec;
+ }
+
+ private FECCode code;
+
+ private int k;
+ private int n;
+
+ public StandardOnionFECCodec(int k, int n) {
+ code = DefaultFECCodeFactory.getDefault().createFECCode(k,n);
+ }
+
+ public void decode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) {
+ // Ensure that there are only K simultaneous running decodes.
+ }
+
+ public void realDecode(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) {
+ if(dataBlockStatus.length + checkBlockStatus.length != n)
+ throw new IllegalArgumentException();
+ if(dataBlockStatus.length != k)
+ throw new IllegalArgumentException();
+ byte[][] packets = new byte[k][];
+ Bucket[] buckets = new Bucket[k];
+ InputStream[] readers = new InputStream[k];
+ OutputStream[] writers = new OutputStream[k];
+ int[] toDecode = new int[n-k];
+ int numberToDecode; // can be less than n-k
+
+ for(int i=0;i<n;i++)
+ packets[i] = new byte[STRIPE_SIZE];
+
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ buckets[i] = dataBlockStatus[i].getData();
+ if(buckets[i] == null) {
+ buckets[i] = bf.makeBucket(blockLength);
+ writers[i] = buckets[i].getOutputStream();
+ readers[i] = null;
+ toDecode[numberToDecode++] = i;
+ } else {
+ writers[i] = null;
+ readers[i] = buckets[i].getInputStream();
+ }
+ }
+ for(int i=0;i<checkBlockStatus.length;i++) {
+ buckets[i+k] = checkBlockStatus[i].getData();
+ if(buckets[i+k] == null) {
+ buckets[i+k] = bf.makeBucket(blockLength);
+ writers[i+k] = buckets[i+k].getOutputStream();
+ readers[i+k] = null;
+ toDecode[numberToDecode++] = i+k;
+ } else {
+ writers[i+k] = null;
+ readers[i+k] = buckets[i+k].getInputStream();
+ }
+ }
+
+ if(numberToDecode != toDecode.length) {
+ int[] newToDecode = new int[numberToDecode];
+ System.arraycopy(toDecode, 0, newToDecode, 0,
numberToDecode);
+ toDecode = newToDecode;
+ }
+
+ int[] offsets = new int[n];
+ for(int i=0;i<n;i++) offsets[i] = 0;
+
+ if(numberToDecode > 0) {
+ // Do the (striped) decode
+ for(int
offset=0;offset<packetLength;offset+=STRIPE_SIZE) {
+ // Read the data in first
+ for(int i=0;i<n;i++) {
+ if(readers[i] != null) {
+ Fields.readFully(readers[i],
packets[i]);
+ }
+ }
+ // Do the decode
+ code.decode(packets, offsets, toDecode,
blockLength, true);
+ }
+ // TODO Auto-generated method stub
+
+ }
+
+}