Author: toad
Date: 2005-11-09 20:58:30 +0000 (Wed, 09 Nov 2005)
New Revision: 7505
Added:
trunk/freenet/src/freenet/client/SplitFetchException.java
Modified:
trunk/freenet/src/freenet/client/FECCodec.java
trunk/freenet/src/freenet/client/FetchException.java
trunk/freenet/src/freenet/client/InsertSegment.java
trunk/freenet/src/freenet/client/Segment.java
trunk/freenet/src/freenet/client/SplitFetcher.java
trunk/freenet/src/freenet/client/SplitInserter.java
trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
trunk/freenet/src/freenet/node/Version.java
Log:
147:
Might just work now...
Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java 2005-11-09 20:14:20 UTC
(rev 7504)
+++ trunk/freenet/src/freenet/client/FECCodec.java 2005-11-09 20:58:30 UTC
(rev 7505)
@@ -78,7 +78,7 @@
* @param bf The BucketFactory to use to generate buckets.
* @throws IOException If there is an error in decoding caused by an
I/O error (usually involving buckets).
*/
- public abstract void encode(SplitfileBlock[] dataBlocks,
SplitfileBlock[] checkBlocks, int blockLength, BucketFactory bucketFactory);
+ public abstract void encode(SplitfileBlock[] dataBlocks,
SplitfileBlock[] checkBlocks, int blockLength, BucketFactory bucketFactory)
throws IOException;
/**
* How many check blocks?
Modified: trunk/freenet/src/freenet/client/FetchException.java
===================================================================
--- trunk/freenet/src/freenet/client/FetchException.java 2005-11-09
20:14:20 UTC (rev 7504)
+++ trunk/freenet/src/freenet/client/FetchException.java 2005-11-09
20:58:30 UTC (rev 7505)
@@ -72,4 +72,6 @@
public static final int INTERNAL_ERROR = 17;
/** The node found the data but the transfer failed */
public static final int TRANSFER_FAILED = 18;
+ /** Splitfile error. This should be a SplitFetchException. */
+ public static final int SPLITFILE_ERROR = 19;
}
Modified: trunk/freenet/src/freenet/client/InsertSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-09 20:14:20 UTC
(rev 7504)
+++ trunk/freenet/src/freenet/client/InsertSegment.java 2005-11-09 20:58:30 UTC
(rev 7505)
@@ -1,5 +1,7 @@
package freenet.client;
+import java.io.IOException;
+
import freenet.keys.FreenetURI;
import freenet.support.BucketFactory;
@@ -37,8 +39,9 @@
/**
* Encode the data blocks into check blocks.
* @return The number of check blocks generated.
+ * @throws IOException If the encode fails due to a bucket error.
*/
- public int encode(int offset, RetryTracker tracker, InserterContext
ctx) {
+ public int encode(int offset, RetryTracker tracker, InserterContext
ctx) throws IOException {
if(codec == null) return 0; // no FEC
for(int i=0;i<checkBlocks.length;i++)
checkBlocks[i] = new BlockInserter(null, offset + i,
tracker, ctx);
Modified: trunk/freenet/src/freenet/client/Segment.java
===================================================================
--- trunk/freenet/src/freenet/client/Segment.java 2005-11-09 20:14:20 UTC
(rev 7504)
+++ trunk/freenet/src/freenet/client/Segment.java 2005-11-09 20:58:30 UTC
(rev 7505)
@@ -9,6 +9,7 @@
import freenet.keys.FreenetURI;
import freenet.support.Bucket;
import freenet.support.BucketTools;
+import freenet.support.Logger;
/**
* A segment, within a splitfile.
@@ -48,6 +49,7 @@
final int recursionLevel;
/** Retry tracker */
private final RetryTracker tracker;
+ private FetchException failureException;
/**
* Create a Segment.
@@ -173,9 +175,12 @@
public void finished(SplitfileBlock[] succeeded, SplitfileBlock[]
failed, SplitfileBlock[] fatalErrors) {
if(succeeded.length > minFetched)
+ // Not finished yet, need to decode
successfulFetch();
else {
- parentFetcher.failed(this, minFetched,
succeeded.length, failed.length, fatalErrors.length);
+ failureException = new
SplitFetchException(failed.length, fatalErrors.length);
+ finished = true;
+ parentFetcher.segmentFinished(this);
}
}
@@ -204,17 +209,27 @@
fetcherContext.bucketFactory.freeBucket(data);
}
os.close();
- parentFetcher.decoded(this, output);
+ // Must set finished BEFORE calling parentFetcher.
+ // Otherwise a race is possible that might result in it
not seeing our finishing.
+ finished = true;
+ parentFetcher.segmentFinished(this);
} catch (IOException e) {
- parentFetcher.internalBucketError(this, e);
+ finished = true;
+ failureException = new
FetchException(FetchException.BUCKET_ERROR);
+ parentFetcher.segmentFinished(this);
return;
}
// Now heal
// Encode any check blocks we don't have
- if(codec != null)
- codec.encode(dataBlockStatus, checkBlockStatus, 32768,
fetcherContext.bucketFactory);
+ if(codec != null) {
+ try {
+ codec.encode(dataBlockStatus, checkBlockStatus,
32768, fetcherContext.bucketFactory);
+ } catch (IOException e) {
+ Logger.error(this, "Bucket error while healing:
"+e, e);
+ }
+ }
// Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
for(int i=0;i<dataBlockStatus.length;i++) {
@@ -226,5 +241,7 @@
}
block.queueHeal();
}
+
+ // FIXME heal check blocks too
}
}
Added: trunk/freenet/src/freenet/client/SplitFetchException.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetchException.java 2005-11-09
20:14:20 UTC (rev 7504)
+++ trunk/freenet/src/freenet/client/SplitFetchException.java 2005-11-09
20:58:30 UTC (rev 7505)
@@ -0,0 +1,20 @@
+package freenet.client;
+
+public class SplitFetchException extends FetchException {
+
+ final int failed;
+ final int fatal;
+
+ public SplitFetchException(int failed, int fatal) {
+ super(FetchException.SPLITFILE_ERROR);
+ this.failed = failed;
+ this.fatal = fatal;
+ }
+
+ public String getMessage() {
+ return "Splitfile fetch failure: "+failed+" failed, "+fatal+"
fatal errors";
+ }
+
+ private static final long serialVersionUID = 1523809424508826893L;
+
+}
Modified: trunk/freenet/src/freenet/client/SplitFetcher.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitFetcher.java 2005-11-09 20:14:20 UTC
(rev 7504)
+++ trunk/freenet/src/freenet/client/SplitFetcher.java 2005-11-09 20:58:30 UTC
(rev 7505)
@@ -170,6 +170,9 @@
finalLength += s.decodedLength();
// Healing is done by Segment
}
+ if(finalLength > overrideLength)
+ finalLength = overrideLength;
+
long bytesWritten = 0;
OutputStream os = null;
Bucket output;
@@ -196,33 +199,15 @@
}
public void gotBlocks(Segment segment) {
- // TODO Auto-generated method stub
-
+ synchronized(this) {
+ fetchingSegment = null;
+ }
}
- public void decoded(Segment segment, Bucket output) {
- // TODO Auto-generated method stub
-
+ public void segmentFinished(Segment segment) {
+ synchronized(this) {
+ notifyAll();
+ }
}
- public void internalBucketError(Segment segment, IOException e) {
- // TODO Auto-generated method stub
-
- }
-
- /**
- * The segment fetch failed.
- * @param segment The segment that failed.
- * @param minFetched The minimum number of successful blocks for a
successful fetch.
- * @param successfulBlocks The number of blocks successfully fetched.
- * @param failedBlocks The number of blocks that failed because they got
- * non-fatal errors on every try, and ran out of retries.
- * @param fatalErrors The number of blocks that got fatal errors.
- */
- public void failed(Segment segment, int minFetched, int
successfulBlocks,
- int failedBlocks, int fatalErrors) {
- // TODO Auto-generated method stub
-
- }
-
}
Modified: trunk/freenet/src/freenet/client/SplitInserter.java
===================================================================
--- trunk/freenet/src/freenet/client/SplitInserter.java 2005-11-09 20:14:20 UTC
(rev 7504)
+++ trunk/freenet/src/freenet/client/SplitInserter.java 2005-11-09 20:58:30 UTC
(rev 7505)
@@ -67,8 +67,13 @@
startInsertingDataBlocks();
splitIntoSegments(segmentSize);
// Backwards, because the last is the shortest
- for(int i=segments.length-1;i>=0;i--)
- countCheckBlocks += encodeSegment(i,
origDataBlocks.length + checkSegmentSize * i);
+ try {
+ for(int i=segments.length-1;i>=0;i--) {
+ countCheckBlocks += encodeSegment(i,
origDataBlocks.length + checkSegmentSize * i);
+ }
+ } catch (IOException e) {
+ throw new
InserterException(InserterException.BUCKET_ERROR);
+ }
// Wait for the insertion thread to finish
return waitForCompletion();
}
@@ -136,7 +141,7 @@
return uris;
}
- private int encodeSegment(int i, int offset) {
+ private int encodeSegment(int i, int offset) throws IOException {
encodingSegment = segments[i];
return encodingSegment.encode(offset, tracker, ctx);
}
Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-09
20:14:20 UTC (rev 7504)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2005-11-09
20:58:30 UTC (rev 7505)
@@ -103,13 +103,13 @@
if(dataBlockStatus.length != k)
throw new IllegalArgumentException();
Buffer[] packets = new Buffer[k];
- Bucket[] buckets = new Bucket[k];
+ Bucket[] buckets = new Bucket[n];
DataInputStream[] readers = new DataInputStream[k];
OutputStream[] writers = new OutputStream[k];
int[] toDecode = new int[n-k];
int numberToDecode = 0; // can be less than n-k
- byte[] realBuffer = new byte[k * STRIPE_SIZE];
+ byte[] realBuffer = new byte[n * STRIPE_SIZE];
for(int i=0;i<n;i++)
packets[i] = new Buffer(realBuffer, i*STRIPE_SIZE,
STRIPE_SIZE);
@@ -145,9 +145,6 @@
toDecode = newToDecode;
}
- int[] offsets = new int[n];
- for(int i=0;i<n;i++) offsets[i] = 0;
-
if(numberToDecode > 0) {
// Do the (striped) decode
for(int
offset=0;offset<blockLength;offset+=STRIPE_SIZE) {
@@ -159,23 +156,115 @@
}
// Do the decode
// Not shuffled
- code.decode(packets, offsets);
+ code.decode(packets, toDecode);
// packets now contains an array of decoded
blocks, in order
// Write the data out
for(int i=0;i<n;i++) {
- writers[i].write(realBuffer,
i*STRIPE_SIZE, STRIPE_SIZE);
+ if(writers[i] != null)
+ writers[i].write(realBuffer,
i*STRIPE_SIZE, STRIPE_SIZE);
}
}
}
for(int i=0;i<k;i++) {
- writers[i].close();
- readers[i].close();
+ if(writers[i] != null) writers[i].close();
+ if(readers[i] != null) readers[i].close();
}
+ // Set new buckets only after have a successful decode.
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ dataBlockStatus[i].setData(buckets[i]);
+ }
+ for(int i=0;i<checkBlockStatus.length;i++) {
+ checkBlockStatus[i].setData(buckets[i+k]);
+ }
}
- public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bucketFactory) {
- // TODO Auto-generated method stub
+ public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
+ // Encodes count as decodes.
+ synchronized(runningDecodesSync) {
+ while(runningDecodes >= PARALLEL_DECODES) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ runningDecodes++;
+ }
+ try {
+ realEncode(dataBlockStatus, checkBlockStatus,
blockLength, bf);
+ } finally {
+ synchronized(runningDecodesSync) {
+ runningDecodes--;
+ }
+ }
+ }
+
+ /**
+ * Do the actual encode.
+ */
+ private void realEncode(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) throws
IOException {
+ if(dataBlockStatus.length + checkBlockStatus.length != n)
+ throw new IllegalArgumentException();
+ if(dataBlockStatus.length != k)
+ throw new IllegalArgumentException();
+ Buffer[] dataPackets = new Buffer[k];
+ Buffer[] checkPackets = new Buffer[n-k];
+ Bucket[] buckets = new Bucket[n];
+ DataInputStream[] readers = new DataInputStream[k];
+ OutputStream[] writers = new OutputStream[n-k];
+ int[] toEncode = new int[n-k];
+ int numberToEncode = 0; // can be less than n-k
+ byte[] realBuffer = new byte[n * STRIPE_SIZE];
+
+ for(int i=0;i<k;i++)
+ dataPackets[i] = new Buffer(realBuffer, i*STRIPE_SIZE,
STRIPE_SIZE);
+ for(int i=0;i<n-k;i++)
+ checkPackets[i] = new Buffer(realBuffer,
(i+k)*STRIPE_SIZE, STRIPE_SIZE);
+
+ for(int i=0;i<dataBlockStatus.length;i++) {
+ buckets[i] = dataBlockStatus[i].getData();
+ readers[i] = new
DataInputStream(buckets[i].getInputStream());
+ }
+ for(int i=0;i<checkBlockStatus.length;i++) {
+ buckets[i+k] = checkBlockStatus[i].getData();
+ if(buckets[i+k] == null) {
+ buckets[i+k] = bf.makeBucket(blockLength);
+ writers[i+k] = buckets[i+k].getOutputStream();
+ readers[i+k] = null;
+ toEncode[numberToEncode++] = i+k;
+ } else {
+ writers[i+k] = null;
+ readers[i+k] = new
DataInputStream(buckets[i+k].getInputStream());
+ }
+ }
+
+ if(numberToEncode > 0) {
+ // Do the (striped) decode
+ for(int
offset=0;offset<blockLength;offset+=STRIPE_SIZE) {
+ // Read the data in first
+ for(int i=0;i<n;i++) {
+ readers[i].readFully(realBuffer,
i*STRIPE_SIZE, STRIPE_SIZE);
+ }
+ // Do the encode
+ // Not shuffled
+ code.encode(dataPackets, checkPackets,
toEncode);
+ // packets now contains an array of decoded
blocks, in order
+ // Write the data out
+ for(int i=k;i<n;i++) {
+ if(writers[i] != null)
+ writers[i].write(realBuffer,
i*STRIPE_SIZE, STRIPE_SIZE);
+ }
+ }
+ }
+ for(int i=0;i<n;i++) {
+ if(writers[i] != null) writers[i].close();
+ if(readers[i] != null) readers[i].close();
+ }
+ // Set new buckets only after have a successful decode.
+ for(int i=0;i<checkBlockStatus.length;i++) {
+ checkBlockStatus[i].setData(buckets[i+k]);
+ }
}
public int countCheckBlocks() {
Modified: trunk/freenet/src/freenet/node/Version.java
===================================================================
--- trunk/freenet/src/freenet/node/Version.java 2005-11-09 20:14:20 UTC (rev
7504)
+++ trunk/freenet/src/freenet/node/Version.java 2005-11-09 20:58:30 UTC (rev
7505)
@@ -20,7 +20,7 @@
public static final String protocolVersion = "1.0";
/** The build number of the current revision */
- public static final int buildNumber = 146;
+ public static final int buildNumber = 147;
/** Oldest build of Fred we will talk to */
public static final int lastGoodBuild = 144;