Author: nextgens
Date: 2007-03-24 02:59:23 +0000 (Sat, 24 Mar 2007)
New Revision: 12301
Modified:
trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
Log:
More not-tested code: if it works, every FEC operation is serialized, using the
FIFO policy
Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-03-24
02:18:15 UTC (rev 12300)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-03-24
02:59:23 UTC (rev 12301)
@@ -13,6 +13,7 @@
import com.onionnetworks.fec.PureCode;
import com.onionnetworks.util.Buffer;
+import freenet.client.async.SplitFileFetcher;
import freenet.support.LRUHashtable;
import freenet.support.Logger;
import freenet.support.api.Bucket;
@@ -111,6 +112,9 @@
logMINOR = Logger.shouldLog(Logger.MINOR, this);
}
+ /**
+ * @deprecated
+ */
public void decode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
logMINOR = Logger.shouldLog(Logger.MINOR, getClass());
if(logMINOR)
@@ -251,7 +255,7 @@
}
/**
- * @decapreted
+ * @deprecated
*/
public void encode(Bucket[] dataBlockStatus, Bucket[] checkBlockStatus,
int blockLength, BucketFactory bf) throws IOException {
logMINOR = Logger.shouldLog(Logger.MINOR, getClass());
@@ -279,7 +283,7 @@
}
/**
- * @decapreted
+ * @deprecated
*/
public void encode(SplitfileBlock[] dataBlockStatus, SplitfileBlock[]
checkBlockStatus, int blockLength, BucketFactory bf) throws IOException {
Bucket[] dataBlocks = new Bucket[dataBlockStatus.length];
@@ -435,7 +439,7 @@
// ###############################
- public void addToQueue(Bucket[] dataBlocks, Bucket[] checkBlocks, int
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback
callback){
+ public void addToQueue(Bucket[] dataBlocks, Bucket[] checkBlocks, int
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback
callback, boolean isADecodingJob){
synchronized (_awaitingJobs) {
if((fecRunnerThread == null) ||
!fecRunnerThread.isAlive()){
if(fecRunnerThread != null) Logger.error(this,
"The callback died!! restarting a new one, please report that error.");
@@ -446,7 +450,7 @@
fecRunnerThread.start();
}
- _awaitingJobs.addFirst(new FECJob(dataBlocks,
checkBlocks, blockLength, bucketFactory, callback));
+ _awaitingJobs.addFirst(new FECJob(dataBlocks,
checkBlocks, blockLength, bucketFactory, callback, isADecodingJob));
}
if(logMINOR) Logger.minor(this, "Adding a new job to the queue
(" +_awaitingJobs.size() + ").");
synchronized (fecRunner){
@@ -454,14 +458,14 @@
}
}
- public void addToQueue(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory
bucketFactory, StandardOnionFECCodecEncoderCallback callback){
+ public void addToQueue(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory
bucketFactory, StandardOnionFECCodecEncoderCallback callback, boolean
isADecodingJob){
Bucket[] dataBlocks = new Bucket[dataBlockStatus.length];
Bucket[] checkBlocks = new Bucket[checkBlockStatus.length];
for(int i=0;i<dataBlocks.length;i++)
dataBlocks[i] = dataBlockStatus[i].getData();
for(int i=0;i<checkBlocks.length;i++)
checkBlocks[i] = checkBlockStatus[i].getData();
- addToQueue(dataBlocks, checkBlocks, blockLength, bucketFactory,
callback);
+ addToQueue(dataBlocks, checkBlocks, blockLength, bucketFactory,
callback, isADecodingJob);
for(int i=0;i<dataBlocks.length;i++)
dataBlockStatus[i].setData(dataBlocks[i]);
for(int i=0;i<checkBlocks.length;i++)
@@ -474,6 +478,7 @@
public interface StandardOnionFECCodecEncoderCallback{
public void onEncodedSegment();
+ public void onDecodedSegment();
}
private class FECJob {
@@ -481,13 +486,15 @@
final BucketFactory bucketFactory;
final int blockLength;
final StandardOnionFECCodecEncoderCallback callback;
+ final boolean isADecodingJob;
- FECJob(Bucket[] dataBlocks, Bucket[] checkBlocks, int
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback
callback) {
+ FECJob(Bucket[] dataBlocks, Bucket[] checkBlocks, int
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback
callback, boolean isADecodingJob) {
this.dataBlocks = dataBlocks;
this.checkBlocks = checkBlocks;
this.blockLength = blockLength;
this.bucketFactory = bucketFactory;
this.callback = callback;
+ this.isADecodingJob = isADecodingJob;
}
}
@@ -503,13 +510,19 @@
if(job != null){
// Encode it
try {
- encode(job.dataBlocks,
job.checkBlocks, job.blockLength, job.bucketFactory);
+ if(job.isADecodingJob)
+
realDecode(job.dataBlocks, job.checkBlocks, job.blockLength, job.bucketFactory);
+ else
+
realEncode(job.dataBlocks, job.checkBlocks, job.blockLength, job.bucketFactory);
} catch (IOException e) {
Logger.error(this, "BOH! ioe:"
+ e.getMessage());
}
// Call the callback
try {
- job.callback.onEncodedSegment();
+ if(job.isADecodingJob)
+
job.callback.onDecodedSegment();
+ else
+
job.callback.onEncodedSegment();
} catch (Throwable e) {
Logger.error(this, "The
callback failed!" + e.getMessage());
}
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2007-03-24 02:18:15 UTC (rev 12300)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2007-03-24 02:59:23 UTC (rev 12301)
@@ -27,7 +27,7 @@
* A single segment within a SplitFileFetcher.
* This in turn controls a large number of SplitFileFetcherSubSegment's, which
are registered on the ClientRequestScheduler.
*/
-public class SplitFileFetcherSegment {
+public class SplitFileFetcherSegment implements
StandardOnionFECCodecEncoderCallback {
private static boolean logMINOR;
final short splitfileType;
@@ -59,6 +59,8 @@
final FailureCodeTracker errors;
private boolean finishing;
+ private FECCodec codec;
+
public SplitFileFetcherSegment(short splitfileType, ClientCHK[]
splitfileDataKeys, ClientCHK[] splitfileCheckKeys, SplitFileFetcher fetcher,
ArchiveContext archiveContext, FetchContext fetchContext, long maxTempLength,
int recursionLevel) throws MetadataParseException, FetchException {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
this.parentFetcher = fetcher;
@@ -174,98 +176,95 @@
}
parentFetcher.parent.completedBlock(dontNotify);
if(decodeNow) {
- Runnable r = new Decoder();
- Thread t = new Thread(r, "Decoder for "+this);
- t.setDaemon(true);
- t.start();
+ decode();
}
}
- class Decoder implements Runnable, StandardOnionFECCodecEncoderCallback
{
+ public void decode() {
+ // Now decode
+ if(logMINOR) Logger.minor(this, "Decoding
"+SplitFileFetcherSegment.this);
- public void run() {
-
- // Now decode
- if(logMINOR) Logger.minor(this, "Decoding
"+SplitFileFetcherSegment.this);
-
- boolean[] dataBlocksSucceeded = new
boolean[dataBuckets.length];
- boolean[] checkBlocksSucceeded = new
boolean[checkBuckets.length];
- for(int i=0;i<dataBuckets.length;i++)
- dataBlocksSucceeded[i] = dataBuckets[i].data !=
null;
- for(int i=0;i<checkBuckets.length;i++)
- checkBlocksSucceeded[i] = checkBuckets[i].data
!= null;
-
- FECCodec codec = FECCodec.getCodec(splitfileType,
dataKeys.length, checkKeys.length);
- try {
- if(splitfileType !=
Metadata.SPLITFILE_NONREDUNDANT) {
- codec.decode(dataBuckets, checkBuckets,
CHKBlock.DATA_LENGTH, fetchContext.bucketFactory);
- // Now have all the data blocks (not
necessarily all the check blocks)
- }
-
- decodedData =
fetchContext.bucketFactory.makeBucket(-1);
- if(logMINOR) Logger.minor(this, "Copying data
from data blocks");
- OutputStream os = decodedData.getOutputStream();
- for(int i=0;i<dataBuckets.length;i++) {
- SplitfileBlock status = dataBuckets[i];
- Bucket data = status.getData();
- BucketTools.copyTo(data, os,
Long.MAX_VALUE);
- }
- if(logMINOR) Logger.minor(this, "Copied data");
- os.close();
- // Must set finished BEFORE calling
parentFetcher.
- // Otherwise a race is possible that might
result in it not seeing our finishing.
- finished = true;
-
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
- } catch (IOException e) {
- Logger.normal(this, "Caught bucket error?: "+e,
e);
- finished = true;
- failureException = new
FetchException(FetchException.BUCKET_ERROR);
-
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
- return;
- }
-
- // Now heal
-
- /** Splitfile healing:
- * Any block which we have tried and failed to download
should be
- * reconstructed and reinserted.
- */
-
- // Encode any check blocks we don't have
- if(codec != null) {
- StandardOnionFECCodec fec =
(StandardOnionFECCodec) codec;
- fec.addToQueue(dataBuckets, checkBuckets,
32768, fetchContext.bucketFactory, this);
- }
- }
+ boolean[] dataBlocksSucceeded = new boolean[dataBuckets.length];
+ boolean[] checkBlocksSucceeded = new
boolean[checkBuckets.length];
+ for(int i=0;i<dataBuckets.length;i++)
+ dataBlocksSucceeded[i] = dataBuckets[i].data != null;
+ for(int i=0;i<checkBuckets.length;i++)
+ checkBlocksSucceeded[i] = checkBuckets[i].data != null;
+
+ codec = FECCodec.getCodec(splitfileType, dataKeys.length,
checkKeys.length);
- public void onEncodedSegment() {
- // Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
+ if(splitfileType != Metadata.SPLITFILE_NONREDUNDANT) {
+ StandardOnionFECCodec fec =
(StandardOnionFECCodec)codec;
+ fec.addToQueue(dataBuckets, checkBuckets,
CHKBlock.DATA_LENGTH, fetchContext.bucketFactory, this, true);
+ // Now have all the data blocks (not necessarily all
the check blocks)
+ }
+ }
+
+ public void onDecodedSegment() {
+ try {
+ decodedData = fetchContext.bucketFactory.makeBucket(-1);
+ if(logMINOR) Logger.minor(this, "Copying data from data
blocks");
+ OutputStream os = decodedData.getOutputStream();
for(int i=0;i<dataBuckets.length;i++) {
- boolean heal = false;
- if(dataRetries[i] > 0)
- heal = true;
- if(heal) {
- queueHeal(dataBuckets[i].getData());
- } else {
- dataBuckets[i].data.free();
- dataBuckets[i].data = null;
- }
- dataBuckets[i] = null;
- dataKeys[i] = null;
+ SplitfileBlock status = dataBuckets[i];
+ Bucket data = status.getData();
+ BucketTools.copyTo(data, os, Long.MAX_VALUE);
}
- for(int i=0;i<checkBuckets.length;i++) {
- boolean heal = false;
- if(checkRetries[i] > 0)
- heal = true;
- if(heal) {
- queueHeal(checkBuckets[i].getData());
- } else {
- checkBuckets[i].data.free();
- }
- checkBuckets[i] = null;
- checkKeys[i] = null;
+ if(logMINOR) Logger.minor(this, "Copied data");
+ os.close();
+ // Must set finished BEFORE calling parentFetcher.
+ // Otherwise a race is possible that might result in it
not seeing our finishing.
+ finished = true;
+
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+ } catch (IOException e) {
+ Logger.normal(this, "Caught bucket error?: "+e, e);
+ finished = true;
+ failureException = new
FetchException(FetchException.BUCKET_ERROR);
+
parentFetcher.segmentFinished(SplitFileFetcherSegment.this);
+ return;
+ }
+
+ // Now heal
+
+ /** Splitfile healing:
+ * Any block which we have tried and failed to download should
be
+ * reconstructed and reinserted.
+ */
+
+ // Encode any check blocks we don't have
+ if(codec != null) {
+ StandardOnionFECCodec fec = (StandardOnionFECCodec)
codec;
+ fec.addToQueue(dataBuckets, checkBuckets, 32768,
fetchContext.bucketFactory, this, false);
+ }
+ }
+
+ public void onEncodedSegment() {
+ // Now insert *ALL* blocks on which we had at least one
failure, and didn't eventually succeed
+ for(int i=0;i<dataBuckets.length;i++) {
+ boolean heal = false;
+ if(dataRetries[i] > 0)
+ heal = true;
+ if(heal) {
+ queueHeal(dataBuckets[i].getData());
+ } else {
+ dataBuckets[i].data.free();
+ dataBuckets[i].data = null;
}
+ dataBuckets[i] = null;
+ dataKeys[i] = null;
}
+ for(int i=0;i<checkBuckets.length;i++) {
+ boolean heal = false;
+ if(checkRetries[i] > 0)
+ heal = true;
+ if(heal) {
+ queueHeal(checkBuckets[i].getData());
+ } else {
+ checkBuckets[i].data.free();
+ }
+ checkBuckets[i] = null;
+ checkKeys[i] = null;
+ }
}
private void queueHeal(Bucket data) {
Modified: trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2007-03-24 02:18:15 UTC (rev 12300)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2007-03-24 02:59:23 UTC (rev 12301)
@@ -424,7 +424,7 @@
synchronized(this) {
if(!encoded){
StandardOnionFECCodec fec =
(StandardOnionFECCodec) splitfileAlgo;
- fec.addToQueue(dataBlocks,
checkBlocks, CHKBlock.DATA_LENGTH, blockInsertContext.persistentBucketFactory,
this);
+ fec.addToQueue(dataBlocks,
checkBlocks, CHKBlock.DATA_LENGTH, blockInsertContext.persistentBucketFactory,
this, false);
}
}
fin = false;
@@ -461,6 +461,8 @@
}
}
+ public void onDecodedSegment() {} // irrevelant
+
public void onEncodedSegment() {
// Start the inserts
try {