Author: toad
Date: 2005-11-05 20:22:00 +0000 (Sat, 05 Nov 2005)
New Revision: 7475
Modified:
trunk/freenet/src/freenet/client/FECCodec.java
trunk/freenet/src/freenet/client/FetcherContext.java
trunk/freenet/src/freenet/client/Segment.java
trunk/freenet/src/freenet/client/SplitFetcher.java
trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
Log:
More splitfiles.
Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java 2005-11-05 20:01:19 UTC
(rev 7474)
+++ trunk/freenet/src/freenet/client/FECCodec.java 2005-11-05 20:22:00 UTC
(rev 7475)
@@ -1,6 +1,9 @@
package freenet.client;
+import java.io.IOException;
+
import freenet.client.Segment.BlockStatus;
+import freenet.support.BucketFactory;
/**
* FEC (forward error correction) handler.
@@ -12,17 +15,34 @@
abstract class FECCodec {
public static FECCodec getCodec(short splitfileType, int dataBlocks,
int checkBlocks) {
+ if(splitfileType == Metadata.SPLITFILE_NONREDUNDANT)
+ return null;
if(splitfileType == Metadata.SPLITFILE_ONION_STANDARD)
return StandardOnionFECCodec.getInstance(dataBlocks,
checkBlocks);
else return null;
}
/**
- * Decode all missing blocks.
+ * Decode all missing *data* blocks.
+ * Requires that the total number of available blocks is equal to or
greater than the length of
+ * the data blocks array. (i.e. it is > k).
* @param dataBlockStatus The data blocks.
* @param checkBlockStatus The check blocks.
- * @param packetLength The packet length in bytes.
+ * @param blockLength The block length in bytes.
+ * @param bf The BucketFactory to use to generate buckets.
+ * @throws IOException If there is an error in decoding caused by an
I/O error (usually involving buckets).
*/
- public abstract void decode(BlockStatus[] dataBlockStatus,
BlockStatus[] checkBlockStatus, int packetLength);
+ public abstract void decode(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) throws
IOException;
+ /**
+ * Encode all missing *check* blocks.
+ * Requires that all the data blocks be present.
+ * @param dataBlockStatus The data blocks.
+ * @param checkBlockStatus The check blocks.
+ * @param blockLength The block length in bytes.
+ * @param bf The BucketFactory to use to generate buckets.
+ * @throws IOException If there is an error in decoding caused by an
I/O error (usually involving buckets).
+ */
+ public abstract void encode(BlockStatus[] dataBlockStatus,
BlockStatus[] checkBlockStatus, int blockLength, BucketFactory bucketFactory);
+
}
Modified: trunk/freenet/src/freenet/client/FetcherContext.java
===================================================================
--- trunk/freenet/src/freenet/client/FetcherContext.java 2005-11-05
20:01:19 UTC (rev 7474)
+++ trunk/freenet/src/freenet/client/FetcherContext.java 2005-11-05
20:22:00 UTC (rev 7475)
@@ -5,7 +5,7 @@
import freenet.support.BucketFactory;
/** Context for a Fetcher. Contains all the settings a Fetcher needs to know
about. */
-public class FetcherContext {
+public class FetcherContext implements Cloneable {
/** Low-level client to send low-level requests to. */
final SimpleLowLevelClient client;
@@ -47,4 +47,14 @@
this.localRequestOnly = localRequestOnly;
}
+ /** Make public, but just call parent for a field for field copy */
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException e) {
+ // Impossible
+ throw new Error(e);
+ }
+ }
+
}
Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java 2005-11-05 20:01:19 UTC
(rev 7474)
+++ trunk/freenet/src/freenet/client/Segment.java 2005-11-05 20:22:00 UTC
(rev 7475)
@@ -26,12 +26,14 @@
final FreenetURI uri;
int completedTries;
Bucket fetchedData;
+ boolean actuallyFetched;
public BlockStatus(FreenetURI freenetURI, int index) {
uri = freenetURI;
completedTries = 0;
fetchedData = null;
this.index = index;
+ actuallyFetched = false;
}
public void startFetch() {
@@ -60,6 +62,7 @@
try {
FetchResult fr = f.realRun(new
ClientMetadata(), recursionLevel, uri,
(!nonFullBlocksAllowed)
|| fetcherContext.dontEnterImplicitArchives);
+ actuallyFetched = true;
fetchedData = fr.data;
} catch (MetadataParseException e) {
fatalError(e);
@@ -124,6 +127,16 @@
public boolean succeeded() {
return fetchedData != null;
}
+
+ /**
+ * Queue a healing block for insert.
+ * Will be implemented using the download manager.
+ * FIXME: implement!
+ */
+ public void queueHeal() {
+ // TODO Auto-generated method stub
+
+ }
}
final short splitfileType;
@@ -192,11 +205,11 @@
Vector firstSet = new
Vector(dataBlocks.length+checkBlocks.length);
blocksNotTried.add(0, firstSet);
for(int i=0;i<dataBlocks.length;i++) {
- dataBlockStatus[i] = new BlockStatus(dataBlocks[i]);
+ dataBlockStatus[i] = new BlockStatus(dataBlocks[i], i);
firstSet.add(dataBlockStatus[i]);
}
for(int i=0;i<checkBlocks.length;i++) {
- checkBlockStatus[i] = new BlockStatus(checkBlocks[i]);
+ checkBlockStatus[i] = new BlockStatus(checkBlocks[i],
dataBlockStatus.length + i);
firstSet.add(checkBlockStatus[i]);
}
recentlyCompletedFetches = new LinkedList();
@@ -208,6 +221,10 @@
} else throw new MetadataParseException("Unknown splitfile type
"+splitfileType);
minRetryLevel = 0;
this.recursionLevel = recursionLevel;
+ // FIXME be a bit more flexible here depending on flags
+ blockFetchContext = (FetcherContext) fetcherContext.clone();
+ blockFetchContext.allowSplitfiles = false;
+ blockFetchContext.dontEnterImplicitArchives = true;
}
/**
@@ -335,10 +352,12 @@
// Now decode
+ FECCodec codec = FECCodec.getCodec(splitfileType,
dataBlocks.length, checkBlocks.length);
try {
if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
- FECCodec codec =
FECCodec.getCodec(splitfileType, dataBlocks.length, checkBlocks.length);
- codec.decode(dataBlockStatus, checkBlockStatus);
+ // FIXME hardcoded block size below.
+ codec.decode(dataBlockStatus, checkBlockStatus,
32768, fetcherContext.bucketFactory);
+ // Now have all the data blocks (not
necessarily all the check blocks)
}
Bucket output =
fetcherContext.bucketFactory.makeBucket(-1);
@@ -350,13 +369,28 @@
fetcherContext.bucketFactory.freeBucket(data);
}
os.close();
+ parentFetcher.decoded(this, output);
} catch (IOException e) {
parentFetcher.internalBucketError(this, e);
+ return;
}
- parentFetcher.decoded(this, output);
+ // Now heal
- // TODO create healing blocks
+ // Encode any check blocks we don't have
+ if(codec != null)
+ codec.encode(dataBlockStatus, checkBlockStatus, 32768,
fetcherContext.bucketFactory);
+
+ // Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ BlockStatus block = dataBlockStatus[i];
+ if(block.actuallyFetched) continue;
+ if(block.completedTries == 0) {
+ // 80% chance of not inserting, if we never
tried it
+ if(fetcherContext.random.nextInt(5) == 0)
continue;
+ }
+ block.queueHeal();
+ }
}
/**
Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java 2005-11-05 20:01:19 UTC
(rev 7474)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java 2005-11-05 20:22:00 UTC
(rev 7475)
@@ -212,9 +212,14 @@
}
- public void decoded(Segment segment) {
+ public void decoded(Segment segment, Bucket output) {
// TODO Auto-generated method stub
}
+ public void internalBucketError(Segment segment, IOException e) {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-05
20:01:19 UTC (rev 7474)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-05
20:22:00 UTC (rev 7475)
@@ -1,16 +1,15 @@
package freenet.client;
-import java.io.InputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
import java.io.OutputStream;
import com.onionnetworks.fec.DefaultFECCodeFactory;
import com.onionnetworks.fec.FECCode;
+import com.onionnetworks.util.Buffer;
-import freenet.client.Segment.BlockStatus;
import freenet.support.Bucket;
import freenet.support.BucketFactory;
-import freenet.support.Fields;
-import freenet.support.LRUHashBag;
import freenet.support.LRUHashtable;
/**
@@ -22,6 +21,8 @@
private static int MAX_CACHED_CODECS = 16;
// REDFLAG: Optimal stripe size? Smaller => less memory usage, but more
JNI overhead
private static int STRIPE_SIZE = 4096;
+ // REDFLAG: Make this configurable, maybe make it depend on # CPUs
+ private static int PARALLEL_DECODES = 1;
private static class MyKey {
/** Number of input blocks */
@@ -72,24 +73,46 @@
code = DefaultFECCodeFactory.getDefault().createFECCode(k,n);
}
- public void decode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) {
+ private static Object runningDecodesSync = new Object();
+ private static int runningDecodes;
+
+ public void decode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
// Ensure that there are only K simultaneous running decodes.
+ synchronized(runningDecodesSync) {
+ while(runningDecodes >= PARALLEL_DECODES) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ runningDecodes++;
+ }
+ try {
+ realDecode(dataBlockStatus, checkBlockStatus,
blockLength, bf);
+ } finally {
+ synchronized(runningDecodesSync) {
+ runningDecodes--;
+ }
+ }
}
- public void realDecode(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) {
+ public void realDecode(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) throws
IOException {
if(dataBlockStatus.length + checkBlockStatus.length != n)
throw new IllegalArgumentException();
if(dataBlockStatus.length != k)
throw new IllegalArgumentException();
- byte[][] packets = new byte[k][];
+ Buffer[] packets = new Buffer[k];
Bucket[] buckets = new Bucket[k];
- InputStream[] readers = new InputStream[k];
+ DataInputStream[] readers = new DataInputStream[k];
OutputStream[] writers = new OutputStream[k];
int[] toDecode = new int[n-k];
- int numberToDecode; // can be less than n-k
+ int numberToDecode = 0; // can be less than n-k
+ byte[] realBuffer = new byte[k * STRIPE_SIZE];
+
for(int i=0;i<n;i++)
- packets[i] = new byte[STRIPE_SIZE];
+ packets[i] = new Buffer(realBuffer, i*STRIPE_SIZE,
STRIPE_SIZE);
for(int i=0;i<dataBlockStatus.length;i++) {
buckets[i] = dataBlockStatus[i].getData();
@@ -100,7 +123,7 @@
toDecode[numberToDecode++] = i;
} else {
writers[i] = null;
- readers[i] = buckets[i].getInputStream();
+ readers[i] = new
DataInputStream(buckets[i].getInputStream());
}
}
for(int i=0;i<checkBlockStatus.length;i++) {
@@ -112,7 +135,7 @@
toDecode[numberToDecode++] = i+k;
} else {
writers[i+k] = null;
- readers[i+k] = buckets[i+k].getInputStream();
+ readers[i+k] = new
DataInputStream(buckets[i+k].getInputStream());
}
}
@@ -127,18 +150,26 @@
if(numberToDecode > 0) {
// Do the (striped) decode
- for(int
offset=0;offset<packetLength;offset+=STRIPE_SIZE) {
+ for(int
offset=0;offset<blockLength;offset+=STRIPE_SIZE) {
// Read the data in first
for(int i=0;i<n;i++) {
if(readers[i] != null) {
- Fields.readFully(readers[i],
packets[i]);
+
readers[i].readFully(realBuffer, i*STRIPE_SIZE, STRIPE_SIZE);
}
}
// Do the decode
- code.decode(packets, offsets, toDecode,
blockLength, true);
+ // Not shuffled
+ code.decode(packets, offsets);
+ // packets now contains an array of decoded
blocks, in order
+ // Write the data out
+ for(int i=0;i<n;i++) {
+ writers[i].write(realBuffer,
i*STRIPE_SIZE, STRIPE_SIZE);
+ }
+ }
}
- // TODO Auto-generated method stub
-
+ for(int i=0;i<k;i++) {
+ writers[i].close();
+ readers[i].close();
+ }
}
-
}