Author: nextgens
Date: 2007-03-24 00:53:02 +0000 (Sat, 24 Mar 2007)
New Revision: 12293
Modified:
trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
Log:
More work on #403
Experimental code: might eat your cat, break your downloads/inserts and so on.
Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-03-23
22:56:49 UTC (rev 12292)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-03-24
00:53:02 UTC (rev 12293)
@@ -6,6 +6,7 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.LinkedList;
import com.onionnetworks.fec.FECCode;
import com.onionnetworks.fec.Native8Code;
@@ -120,6 +121,9 @@
// fec = new PureCode(k,n);
// Crashes are caused by bugs which cause to use 320/128 etc. -
n > 256, k < 256.
+ fecRunnerThread = new Thread(fecRunner, "FEC Pool");
+ fecRunnerThread.setDaemon(true);
+ fecRunnerThread.setPriority(Thread.MIN_PRIORITY);
logMINOR = Logger.shouldLog(Logger.MINOR, this);
}
@@ -472,4 +476,81 @@
public int countCheckBlocks() {
return n-k;
}
+
+ // ###############################
+
+ public void addToQueue(Bucket[] dataBlocks, Bucket[] checkBlocks, int
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback
callback){
+ if(!fecRunner.getIsStarted()) fecRunner.run();
+
+ synchronized (_awaitingJobs) {
+ _awaitingJobs.addFirst(new FECJob(dataBlocks,
checkBlocks, blockLength, bucketFactory, callback));
+ }
+ if(logMINOR) Logger.minor(this, "Adding a new job to the queue
(" +_awaitingJobs.size() + ").");
+ synchronized (fecRunner){
+ fecRunner.notifyAll();
+ }
+ }
+
+ private final LinkedList _awaitingJobs = new LinkedList();
+ private final FECRunner fecRunner = new FECRunner();
+ private final Thread fecRunnerThread;
+
+ public interface StandardOnionFECCodecEncoderCallback{
+ public void onEncodedSegment();
+ }
+
+ private class FECJob {
+ final Bucket[] dataBlocks, checkBlocks;
+ final BucketFactory bucketFactory;
+ final int blockLength;
+ final StandardOnionFECCodecEncoderCallback callback;
+
+ FECJob(Bucket[] dataBlocks, Bucket[] checkBlocks, int
blockLength, BucketFactory bucketFactory, StandardOnionFECCodecEncoderCallback
callback) {
+ this.dataBlocks = dataBlocks;
+ this.checkBlocks = checkBlocks;
+ this.blockLength = blockLength;
+ this.bucketFactory = bucketFactory;
+ this.callback = callback;
+ }
+ }
+
+ private class FECRunner implements Runnable {
+ private boolean isStarted = false;
+
+ public void run(){
+ isStarted = true;
+ while(true){
+ FECJob job = null;
+ // Get a job
+ synchronized (_awaitingJobs) {
+ job = (FECJob)
_awaitingJobs.removeLast();
+ }
+
+ if(job != null){
+ // Encode it
+ try {
+ encode(job.dataBlocks,
job.checkBlocks, job.blockLength, job.bucketFactory);
+ } catch (IOException e) {
+ Logger.error(this, "BOH! ioe:"
+ e.getMessage());
+ }
+ // Call the callback
+ try {
+ job.callback.onEncodedSegment();
+ } catch (Throwable e) {
+ Logger.error(this, "The
callback failed!" + e.getMessage());
+ }
+ } else {
+ try {
+ synchronized (this) {
+
wait(Integer.MAX_VALUE);
+ }
+ } catch (InterruptedException e) {}
+ }
+ }
+ }
+
+ public boolean getIsStarted(){
+ return isStarted;
+ }
+ }
}
Modified: trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2007-03-23 22:56:49 UTC (rev 12292)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2007-03-24 00:53:02 UTC (rev 12293)
@@ -1,6 +1,5 @@
package freenet.client.async;
-import java.io.IOException;
import java.net.MalformedURLException;
import freenet.client.FECCodec;
@@ -8,6 +7,8 @@
import freenet.client.InserterContext;
import freenet.client.InserterException;
import freenet.client.Metadata;
+import freenet.client.StandardOnionFECCodec;
+import
freenet.client.StandardOnionFECCodec.StandardOnionFECCodecEncoderCallback;
import freenet.keys.BaseClientKey;
import freenet.keys.CHKBlock;
import freenet.keys.ClientCHK;
@@ -21,30 +22,52 @@
import freenet.support.io.SerializableToFieldSetBucket;
import freenet.support.io.SerializableToFieldSetBucketUtil;
-public class SplitFileInserterSegment implements PutCompletionCallback {
+public class SplitFileInserterSegment implements PutCompletionCallback,
+ StandardOnionFECCodecEncoderCallback {
private static boolean logMINOR;
+
final SplitFileInserter parent;
+
final FECCodec splitfileAlgo;
+
final Bucket[] dataBlocks;
+
final Bucket[] checkBlocks;
+
final ClientCHK[] dataURIs;
+
final ClientCHK[] checkURIs;
+
final SingleBlockInserter[] dataBlockInserters;
+
final SingleBlockInserter[] checkBlockInserters;
+
final InserterContext blockInsertContext;
+
final int segNo;
+
private boolean encoded;
+
private boolean finished;
+
private final boolean getCHKOnly;
+
private boolean hasURIs;
+
private InserterException toThrow;
+
private final FailureCodeTracker errors;
+
private int blocksGotURI;
+
private int blocksCompleted;
+
private boolean started;
-
- public SplitFileInserterSegment(SplitFileInserter parent, FECCodec
splitfileAlgo, Bucket[] origDataBlocks, InserterContext blockInsertContext,
boolean getCHKOnly, int segNo) {
+
+ public SplitFileInserterSegment(SplitFileInserter parent,
+ FECCodec splitfileAlgo, Bucket[] origDataBlocks,
+ InserterContext blockInsertContext, boolean getCHKOnly,
int segNo) {
logMINOR = Logger.shouldLog(Logger.MINOR, this);
this.parent = parent;
this.getCHKOnly = getCHKOnly;
@@ -52,127 +75,159 @@
this.blockInsertContext = blockInsertContext;
this.splitfileAlgo = splitfileAlgo;
this.dataBlocks = origDataBlocks;
- int checkBlockCount = splitfileAlgo == null ? 0 :
splitfileAlgo.countCheckBlocks();
+ int checkBlockCount = splitfileAlgo == null ? 0 : splitfileAlgo
+ .countCheckBlocks();
checkBlocks = new Bucket[checkBlockCount];
checkURIs = new ClientCHK[checkBlockCount];
dataURIs = new ClientCHK[origDataBlocks.length];
dataBlockInserters = new SingleBlockInserter[dataBlocks.length];
checkBlockInserters = new
SingleBlockInserter[checkBlocks.length];
- parent.parent.addBlocks(dataURIs.length+checkURIs.length);
-
parent.parent.addMustSucceedBlocks(dataURIs.length+checkURIs.length);
+ parent.parent.addBlocks(dataURIs.length + checkURIs.length);
+ parent.parent.addMustSucceedBlocks(dataURIs.length +
checkURIs.length);
this.segNo = segNo;
}
-
- /** Resume an insert segment
- * @throws ResumeException */
- public SplitFileInserterSegment(SplitFileInserter parent,
SimpleFieldSet fs, short splitfileAlgorithm, InserterContext ctx, boolean
getCHKOnly, int segNo) throws ResumeException {
+
+ /**
+ * Resume an insert segment
+ *
+ * @throws ResumeException
+ */
+ public SplitFileInserterSegment(SplitFileInserter parent,
+ SimpleFieldSet fs, short splitfileAlgorithm,
InserterContext ctx,
+ boolean getCHKOnly, int segNo) throws ResumeException {
this.parent = parent;
this.getCHKOnly = getCHKOnly;
this.blockInsertContext = ctx;
this.segNo = segNo;
- if(!"SplitFileInserterSegment".equals(fs.get("Type")))
- throw new ResumeException("Wrong Type:
"+fs.get("Type"));
+ if (!"SplitFileInserterSegment".equals(fs.get("Type")))
+ throw new ResumeException("Wrong Type: " +
fs.get("Type"));
finished = Fields.stringToBool(fs.get("Finished"), false);
encoded = true;
started = Fields.stringToBool(fs.get("Started"), false);
SimpleFieldSet errorsFS = fs.subset("Errors");
- if(errorsFS != null)
+ if (errorsFS != null)
this.errors = new FailureCodeTracker(true, errorsFS);
else
this.errors = new FailureCodeTracker(true);
- if(finished && !errors.isEmpty())
+ if (finished && !errors.isEmpty())
toThrow = InserterException.construct(errors);
blocksGotURI = 0;
blocksCompleted = 0;
SimpleFieldSet dataFS = fs.subset("DataBlocks");
- if(dataFS == null)
+ if (dataFS == null)
throw new ResumeException("No data blocks");
String tmp = dataFS.get("Count");
- if(tmp == null) throw new ResumeException("No data block
count");
+ if (tmp == null)
+ throw new ResumeException("No data block count");
int dataBlockCount;
try {
dataBlockCount = Integer.parseInt(tmp);
} catch (NumberFormatException e) {
- throw new ResumeException("Corrupt data blocks count:
"+e+" : "+tmp);
+ throw new ResumeException("Corrupt data blocks count: "
+ e + " : "
+ + tmp);
}
-
+
hasURIs = true;
-
+
dataBlocks = new Bucket[dataBlockCount];
dataURIs = new ClientCHK[dataBlockCount];
dataBlockInserters = new SingleBlockInserter[dataBlockCount];
- // Check blocks first, because if there are missing check
blocks, we need
+ // Check blocks first, because if there are missing check
blocks, we
+ // need
// all the data blocks so we can re-encode.
-
+
SimpleFieldSet checkFS = fs.subset("CheckBlocks");
- if(checkFS != null) {
+ if (checkFS != null) {
tmp = checkFS.get("Count");
- if(tmp == null) throw new ResumeException("Check blocks
but no check block count");
+ if (tmp == null)
+ throw new ResumeException(
+ "Check blocks but no check
block count");
int checkBlockCount;
try {
checkBlockCount = Integer.parseInt(tmp);
} catch (NumberFormatException e) {
- throw new ResumeException("Corrupt check blocks
count: "+e+" : "+tmp);
+ throw new ResumeException("Corrupt check blocks
count: " + e
+ + " : " + tmp);
}
checkBlocks = new Bucket[checkBlockCount];
checkURIs = new ClientCHK[checkBlockCount];
checkBlockInserters = new
SingleBlockInserter[checkBlockCount];
- for(int i=0;i<checkBlockCount;i++) {
+ for (int i = 0; i < checkBlockCount; i++) {
String index = Integer.toString(i);
SimpleFieldSet blockFS = checkFS.subset(index);
- if(blockFS == null) {
+ if (blockFS == null) {
hasURIs = false;
encoded = false;
- Logger.normal(this, "Clearing encoded
because block "+i+" of "+segNo+" missing");
+ Logger.normal(this, "Clearing encoded
because block " + i
+ + " of " + segNo + "
missing");
continue;
}
tmp = blockFS.get("URI");
- if(tmp != null) {
+ if (tmp != null) {
try {
- checkURIs[i] = (ClientCHK)
ClientKey.getBaseKey(new FreenetURI(tmp));
+ checkURIs[i] = (ClientCHK)
ClientKey
+ .getBaseKey(new
FreenetURI(tmp));
blocksGotURI++;
} catch (MalformedURLException e) {
- throw new
ResumeException("Corrupt URI: "+e+" : "+tmp);
+ throw new
ResumeException("Corrupt URI: " + e + " : "
+ + tmp);
}
} else {
hasURIs = false;
}
- boolean blockFinished =
Fields.stringToBool(blockFS.get("Finished"), false) && checkURIs[i] != null;
- if(blockFinished && checkURIs[i] == null) {
- Logger.error(this, "No URI for check
block "+i+" of "+segNo+" yet apparently finished?");
+ boolean blockFinished =
Fields.stringToBool(blockFS
+ .get("Finished"), false)
+ && checkURIs[i] != null;
+ if (blockFinished && checkURIs[i] == null) {
+ Logger.error(this, "No URI for check
block " + i + " of "
+ + segNo + " yet
apparently finished?");
encoded = false;
}
// Read data; only necessary if the block isn't
finished.
- if(!blockFinished) {
+ if (!blockFinished) {
SimpleFieldSet bucketFS =
blockFS.subset("Data");
- if(bucketFS != null) {
+ if (bucketFS != null) {
try {
- checkBlocks[i] =
SerializableToFieldSetBucketUtil.create(bucketFS, ctx.random,
ctx.persistentFileTracker);
- if(logMINOR)
-
Logger.minor(this, "Check block "+i+" : "+checkBlocks[i]);
+ checkBlocks[i] =
SerializableToFieldSetBucketUtil
+
.create(bucketFS, ctx.random,
+
ctx.persistentFileTracker);
+ if (logMINOR)
+
Logger.minor(this, "Check block " + i + " : "
+
+ checkBlocks[i]);
} catch
(CannotCreateFromFieldSetException e) {
- Logger.error(this,
"Failed to deserialize check block "+i+" of "+segNo+" : "+e, e);
+ Logger.error(this,
+ "Failed
to deserialize check block " + i
+
+ " of " + segNo + " : " + e, e);
// Re-encode it.
checkBlocks[i] = null;
encoded = false;
}
- if(checkBlocks[i] == null)
- throw new
ResumeException("Check block "+i+" of "+segNo+" not finished but no data
(create returned null)");
+ if (checkBlocks[i] == null)
+ throw new
ResumeException(
+ "Check
block "
+
+ i
+
+ " of "
+
+ segNo
+
+ " not finished but no data (create returned null)");
}
- // Don't create fetcher yet; that happens in
start()
- } else blocksCompleted++;
- if(checkBlocks[i] == null && checkURIs[i] ==
null) {
- Logger.normal(this, "Clearing encoded
because block "+i+" of "+segNo+" missing");
+ // Don't create fetcher yet; that
happens in start()
+ } else
+ blocksCompleted++;
+ if (checkBlocks[i] == null && checkURIs[i] ==
null) {
+ Logger.normal(this, "Clearing encoded
because block " + i
+ + " of " + segNo + "
missing");
encoded = false;
}
checkFS.removeSubset(index);
}
- splitfileAlgo = FECCodec.getCodec(splitfileAlgorithm,
dataBlockCount, checkBlocks.length);
+ splitfileAlgo = FECCodec.getCodec(splitfileAlgorithm,
+ dataBlockCount, checkBlocks.length);
} else {
Logger.normal(this, "Not encoded because no check
blocks");
encoded = false;
- splitfileAlgo = FECCodec.getCodec(splitfileAlgorithm,
dataBlockCount);
+ splitfileAlgo = FECCodec.getCodec(splitfileAlgorithm,
+ dataBlockCount);
int checkBlocksCount = splitfileAlgo.countCheckBlocks();
this.checkURIs = new ClientCHK[checkBlocksCount];
this.checkBlocks = new Bucket[checkBlocksCount];
@@ -180,166 +235,208 @@
hasURIs = false;
}
- for(int i=0;i<dataBlockCount;i++) {
+ for (int i = 0; i < dataBlockCount; i++) {
String index = Integer.toString(i);
SimpleFieldSet blockFS = dataFS.subset(index);
- if(blockFS == null) throw new ResumeException("No data
block "+i+" on segment "+segNo);
+ if (blockFS == null)
+ throw new ResumeException("No data block " + i
+ " on segment "
+ + segNo);
tmp = blockFS.get("URI");
- if(tmp != null) {
+ if (tmp != null) {
try {
- dataURIs[i] = (ClientCHK)
ClientKey.getBaseKey(new FreenetURI(tmp));
+ dataURIs[i] = (ClientCHK) ClientKey
+ .getBaseKey(new
FreenetURI(tmp));
blocksGotURI++;
} catch (MalformedURLException e) {
- throw new ResumeException("Corrupt URI:
"+e+" : "+tmp);
+ throw new ResumeException("Corrupt URI:
" + e + " : " + tmp);
}
- } else hasURIs = false;
- boolean blockFinished =
Fields.stringToBool(blockFS.get("Finished"), false);
- if(blockFinished && dataURIs[i] == null)
- throw new ResumeException("Block "+i+" of
"+segNo+" finished but no URI");
- if(!blockFinished)
+ } else
+ hasURIs = false;
+ boolean blockFinished = Fields.stringToBool(
+ blockFS.get("Finished"), false);
+ if (blockFinished && dataURIs[i] == null)
+ throw new ResumeException("Block " + i + " of "
+ segNo
+ + " finished but no URI");
+ if (!blockFinished)
finished = false;
else
blocksCompleted++;
-
+
// Read data
SimpleFieldSet bucketFS = blockFS.subset("Data");
- if(bucketFS == null) {
- if(!blockFinished)
- throw new ResumeException("Block "+i+"
of "+segNo+" not finished but no data");
- else if(splitfileAlgorithm > 0 && !encoded)
- throw new ResumeException("Block "+i+"
of "+segNo+" data not available even though not encoded");
+ if (bucketFS == null) {
+ if (!blockFinished)
+ throw new ResumeException("Block " + i
+ " of " + segNo
+ + " not finished but no
data");
+ else if (splitfileAlgorithm > 0 && !encoded)
+ throw new ResumeException("Block " + i
+ " of " + segNo
+ + " data not available
even though not encoded");
} else {
try {
- dataBlocks[i] =
SerializableToFieldSetBucketUtil.create(bucketFS, ctx.random,
ctx.persistentFileTracker);
- if(logMINOR)
- Logger.minor(this, "Data block
"+i+" : "+checkBlocks[i]);
+ dataBlocks[i] =
SerializableToFieldSetBucketUtil.create(
+ bucketFS, ctx.random,
ctx.persistentFileTracker);
+ if (logMINOR)
+ Logger.minor(this, "Data block
" + i + " : "
+ +
checkBlocks[i]);
} catch (CannotCreateFromFieldSetException e) {
- throw new ResumeException("Failed to
deserialize block "+i+" of "+segNo+" : "+e, e);
+ throw new ResumeException("Failed to
deserialize block "
+ + i + " of " + segNo +
" : " + e, e);
}
- if(dataBlocks[i] == null)
- throw new ResumeException("Block "+i+"
of "+segNo+" could not serialize data (create returned null) from "+bucketFS);
+ if (dataBlocks[i] == null)
+ throw new ResumeException(
+ "Block "
+ + i
+ + " of "
+ + segNo
+ + "
could not serialize data (create returned null) from "
+ +
bucketFS);
// Don't create fetcher yet; that happens in
start()
}
dataFS.removeSubset(index);
}
- if(!encoded) {
+ if (!encoded) {
finished = false;
hasURIs = false;
- for(int i=0;i<dataBlocks.length;i++)
- if(dataBlocks[i] == null)
- throw new ResumeException("Missing data
block "+i+" and need to reconstruct check blocks");
+ for (int i = 0; i < dataBlocks.length; i++)
+ if (dataBlocks[i] == null)
+ throw new ResumeException("Missing data
block " + i
+ + " and need to
reconstruct check blocks");
}
- parent.parent.addBlocks(dataURIs.length+checkURIs.length);
-
parent.parent.addMustSucceedBlocks(dataURIs.length+checkURIs.length);
+ parent.parent.addBlocks(dataURIs.length + checkURIs.length);
+ parent.parent.addMustSucceedBlocks(dataURIs.length +
checkURIs.length);
}
public synchronized SimpleFieldSet getProgressFieldset() {
SimpleFieldSet fs = new SimpleFieldSet(false); // these get BIG
fs.putSingle("Type", "SplitFileInserterSegment");
fs.put("Finished", finished);
- // If true, check blocks which are null are finished
+ // If true, check blocks which are null are finished
fs.put("Encoded", encoded);
// If true, data blocks which are null are finished
fs.put("Started", started);
fs.tput("Errors", errors.toFieldSet(false));
SimpleFieldSet dataFS = new SimpleFieldSet(false);
dataFS.put("Count", dataBlocks.length);
- for(int i=0;i<dataBlocks.length;i++) {
+ for (int i = 0; i < dataBlocks.length; i++) {
SimpleFieldSet block = new SimpleFieldSet(false);
- if(dataURIs[i] != null)
+ if (dataURIs[i] != null)
block.putSingle("URI",
dataURIs[i].getURI().toString());
- SingleBlockInserter sbi =
- dataBlockInserters[i];
+ SingleBlockInserter sbi = dataBlockInserters[i];
// If started, then sbi = null => block finished.
boolean finished = started && sbi == null;
- if(started) {
+ if (started) {
block.put("Finished", finished);
}
Bucket data = dataBlocks[i];
- if(data == null && finished) {
+ if (data == null && finished) {
// Ignore
- if(logMINOR) Logger.minor(this, "Could not save
to disk: null");
- } else if(data instanceof SerializableToFieldSetBucket)
{
- SimpleFieldSet tmp =
((SerializableToFieldSetBucket)data).toFieldSet();
- if(tmp == null) {
- if(logMINOR) Logger.minor(this, "Could
not save to disk: "+data);
+ if (logMINOR)
+ Logger.minor(this, "Could not save to
disk: null");
+ } else if (data instanceof
SerializableToFieldSetBucket) {
+ SimpleFieldSet tmp =
((SerializableToFieldSetBucket) data)
+ .toFieldSet();
+ if (tmp == null) {
+ if (logMINOR)
+ Logger.minor(this, "Could not
save to disk: " + data);
return null;
}
block.put("Data", tmp);
} else {
- if(logMINOR) Logger.minor(this, "Could not save
to disk (not serializable to fieldset): "+data);
+ if (logMINOR)
+ Logger.minor(this,
+ "Could not save to disk
(not serializable to fieldset): "
+ + data);
return null;
}
- if(!block.isEmpty())
+ if (!block.isEmpty())
dataFS.put(Integer.toString(i), block);
}
fs.put("DataBlocks", dataFS);
SimpleFieldSet checkFS = new SimpleFieldSet(false);
checkFS.put("Count", checkBlocks.length);
- for(int i=0;i<checkBlocks.length;i++) {
+ for (int i = 0; i < checkBlocks.length; i++) {
SimpleFieldSet block = new SimpleFieldSet(false);
- if(checkURIs[i] != null)
+ if (checkURIs[i] != null)
block.putSingle("URI",
checkURIs[i].getURI().toString());
- SingleBlockInserter sbi =
- checkBlockInserters[i];
+ SingleBlockInserter sbi = checkBlockInserters[i];
// If encoded, then sbi == null => block finished
boolean finished = encoded && sbi == null &&
checkURIs[i] != null;
- if(encoded) {
+ if (encoded) {
block.put("Finished", finished);
}
- if(!finished) {
+ if (!finished) {
Bucket data = checkBlocks[i];
- if(data != null &&
- data instanceof
SerializableToFieldSetBucket) {
- SimpleFieldSet tmp =
((SerializableToFieldSetBucket)data).toFieldSet();
- if(tmp == null)
- Logger.error(this, "Could not
serialize "+data+" - check block "+i+" of "+segNo);
+ if (data != null
+ && data instanceof
SerializableToFieldSetBucket) {
+ SimpleFieldSet tmp =
((SerializableToFieldSetBucket) data)
+ .toFieldSet();
+ if (tmp == null)
+ Logger.error(this, "Could not
serialize " + data
+ + " - check
block " + i + " of " + segNo);
else
block.put("Data", tmp);
- } else if(encoded) {
- Logger.error(this, "Could not save to
disk (null or not serializable to fieldset): "+data);
+ } else if (encoded) {
+ Logger.error(this,
+ "Could not save to disk
(null or not serializable to fieldset): "
+ + data);
return null;
}
}
- if(!block.isEmpty())
+ if (!block.isEmpty())
checkFS.put(Integer.toString(i), block);
}
fs.put("CheckBlocks", checkFS);
return fs;
}
-
+
public void start() throws InserterException {
- if(logMINOR) Logger.minor(this, "Starting segment "+segNo+" of
"+parent+" ("+parent.dataLength+"): "+this+" ( finished="+finished+"
encoded="+encoded+" hasURIs="+hasURIs+ ')');
+ if (logMINOR)
+ Logger.minor(this, "Starting segment " + segNo + " of "
+ parent
+ + " (" + parent.dataLength + "): " +
this + " ( finished="
+ + finished + " encoded=" + encoded + "
hasURIs=" + hasURIs
+ + ')');
boolean fin = true;
-
- for(int i=0;i<dataBlockInserters.length;i++) {
- if(dataBlocks[i] != null) { // else already finished on
creation
- dataBlockInserters[i] =
- new SingleBlockInserter(parent.parent,
dataBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
false, CHKBlock.DATA_LENGTH, i, getCHKOnly, false, false, parent.token);
+
+ for (int i = 0; i < dataBlockInserters.length; i++) {
+ if (dataBlocks[i] != null) { // else already finished
on creation
+ dataBlockInserters[i] = new
SingleBlockInserter(parent.parent,
+ dataBlocks[i], (short) -1,
FreenetURI.EMPTY_CHK_URI,
+ blockInsertContext, this,
false, CHKBlock.DATA_LENGTH,
+ i, getCHKOnly, false, false,
parent.token);
dataBlockInserters[i].schedule();
fin = false;
} else {
parent.parent.completedBlock(true);
}
}
- //parent.parent.notifyClients();
+ // parent.parent.notifyClients();
started = true;
- if(!encoded) {
- if(logMINOR) Logger.minor(this, "Segment "+segNo+" of
"+parent+" ("+parent.dataLength+") is not encoded");
- if(splitfileAlgo != null) {
- if(logMINOR) Logger.minor(this, "Encoding
segment "+segNo+" of "+parent+" ("+parent.dataLength+ ')');
+ if (!encoded) {
+ if (logMINOR)
+ Logger.minor(this, "Segment " + segNo + " of "
+ parent + " ("
+ + parent.dataLength + ") is not
encoded");
+ if (splitfileAlgo != null) {
+ if (logMINOR)
+ Logger.minor(this, "Encoding segment "
+ segNo + " of "
+ + parent + " (" +
parent.dataLength + ')');
// Encode blocks
- Thread t = new Thread(new
EncodeBlocksRunnable(), "Blocks encoder for "+this);
- t.setDaemon(true);
- t.start();
+ synchronized(this) {
+ if(!encoded){
+ StandardOnionFECCodec fec =
(StandardOnionFECCodec) splitfileAlgo;
+ fec.addToQueue(dataBlocks,
checkBlocks, CHKBlock.DATA_LENGTH, blockInsertContext.persistentBucketFactory,
this);
+ }
+ }
fin = false;
}
} else {
- for(int i=0;i<checkBlockInserters.length;i++) {
- if(checkBlocks[i] != null) {
- checkBlockInserters[i] =
- new
SingleBlockInserter(parent.parent, checkBlocks[i], (short)-1,
FreenetURI.EMPTY_CHK_URI, blockInsertContext, this, false,
CHKBlock.DATA_LENGTH, i + dataBlocks.length, getCHKOnly, false, false,
parent.token);
+ for (int i = 0; i < checkBlockInserters.length; i++) {
+ if (checkBlocks[i] != null) {
+ checkBlockInserters[i] = new
SingleBlockInserter(
+ parent.parent,
checkBlocks[i], (short) -1,
+
FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
+ false,
CHKBlock.DATA_LENGTH, i + dataBlocks.length,
+ getCHKOnly, false,
false, parent.token);
checkBlockInserters[i].schedule();
fin = false;
} else
@@ -348,63 +445,51 @@
onEncodedSegment();
parent.encodedSegment(this);
}
- if(hasURIs) {
+ if (hasURIs) {
parent.segmentHasURIs(this);
}
boolean fetchable;
- synchronized(this) {
+ synchronized (this) {
fetchable = (blocksCompleted > dataBlocks.length);
}
- if(fetchable)
+ if (fetchable)
parent.segmentFetchable(this);
- if(fin) finish();
- if(finished) {
+ if (fin)
+ finish();
+ if (finished) {
parent.segmentFinished(this);
}
}
-
- private class EncodeBlocksRunnable implements Runnable {
-
- public void run() {
- encode();
- }
- }
- void encode() {
+ public void onEncodedSegment() {
+ // Start the inserts
try {
- synchronized(this) {
- if(encoded) return;
- }
- splitfileAlgo.encode(dataBlocks, checkBlocks,
CHKBlock.DATA_LENGTH, blockInsertContext.persistentBucketFactory);
- // Start the inserts
- for(int i=0;i<checkBlockInserters.length;i++) {
- checkBlockInserters[i] =
- new SingleBlockInserter(parent.parent,
checkBlocks[i], (short)-1, FreenetURI.EMPTY_CHK_URI, blockInsertContext, this,
false, CHKBlock.DATA_LENGTH, i + dataBlocks.length, getCHKOnly, false, false,
parent.token);
+ for (int i = 0; i < checkBlockInserters.length; i++) {
+ checkBlockInserters[i] = new
SingleBlockInserter(parent.parent,
+ checkBlocks[i], (short) -1,
FreenetURI.EMPTY_CHK_URI,
+ blockInsertContext, this,
false, CHKBlock.DATA_LENGTH,
+ i + dataBlocks.length,
getCHKOnly, false, false,
+ parent.token);
checkBlockInserters[i].schedule();
}
- // Tell parent only after have started the inserts.
- // Because of the counting.
- synchronized(this) {
- encoded = true;
- }
- parent.encodedSegment(this);
- onEncodedSegment();
- } catch (IOException e) {
- InserterException ex =
- new
InserterException(InserterException.BUCKET_ERROR, e, null);
- finish(ex);
} catch (Throwable t) {
- Logger.error(this, "Caught "+t+" while encoding "+this,
t);
- InserterException ex =
- new
InserterException(InserterException.INTERNAL_ERROR, t, null);
+ Logger.error(this, "Caught " + t + " while encoding " +
this, t);
+ InserterException ex = new InserterException(
+ InserterException.INTERNAL_ERROR, t,
null);
finish(ex);
}
- }
- private void onEncodedSegment() {
- synchronized(this) {
- for(int i=0;i<dataBlockInserters.length;i++) {
- if(dataBlockInserters[i] == null &&
dataBlocks[i] != null) {
+ synchronized (this) {
+ encoded = true;
+ }
+
+ // Tell parent only after have started the inserts.
+ // Because of the counting.
+ parent.encodedSegment(this);
+
+ synchronized (this) {
+ for (int i = 0; i < dataBlockInserters.length; i++) {
+ if (dataBlockInserters[i] == null &&
dataBlocks[i] != null) {
dataBlocks[i].free();
dataBlocks[i] = null;
}
@@ -413,9 +498,11 @@
}
private void finish(InserterException ex) {
- if(logMINOR) Logger.minor(this, "Finishing "+this+" with "+ex,
ex);
- synchronized(this) {
- if(finished) return;
+ if (logMINOR)
+ Logger.minor(this, "Finishing " + this + " with " + ex,
ex);
+ synchronized (this) {
+ if (finished)
+ return;
finished = true;
toThrow = ex;
}
@@ -423,43 +510,46 @@
}
private void finish() {
- synchronized(this) {
- if(finished) return;
+ synchronized (this) {
+ if (finished)
+ return;
finished = true;
toThrow = InserterException.construct(errors);
}
parent.segmentFinished(this);
}
-
+
public void onEncode(BaseClientKey k, ClientPutState state) {
- ClientCHK key = (ClientCHK)k;
- SingleBlockInserter sbi = (SingleBlockInserter)state;
+ ClientCHK key = (ClientCHK) k;
+ SingleBlockInserter sbi = (SingleBlockInserter) state;
int x = sbi.token;
- synchronized(this) {
- if(finished) return;
- if(x >= dataBlocks.length) {
- if(checkURIs[x-dataBlocks.length] != null) {
+ synchronized (this) {
+ if (finished)
+ return;
+ if (x >= dataBlocks.length) {
+ if (checkURIs[x - dataBlocks.length] != null) {
return;
}
- checkURIs[x-dataBlocks.length] = key;
+ checkURIs[x - dataBlocks.length] = key;
} else {
- if(dataURIs[x] != null) {
+ if (dataURIs[x] != null) {
return;
}
dataURIs[x] = key;
}
blocksGotURI++;
- if(blocksGotURI != dataBlocks.length +
checkBlocks.length) return;
+ if (blocksGotURI != dataBlocks.length +
checkBlocks.length)
+ return;
// Double check
- for(int i=0;i<checkURIs.length;i++) {
- if(checkURIs[i] == null) {
- Logger.error(this, "Check URI "+i+" is
null");
+ for (int i = 0; i < checkURIs.length; i++) {
+ if (checkURIs[i] == null) {
+ Logger.error(this, "Check URI " + i + "
is null");
return;
}
}
- for(int i=0;i<dataURIs.length;i++) {
- if(dataURIs[i] == null) {
- Logger.error(this, "Data URI "+i+" is
null");
+ for (int i = 0; i < dataURIs.length; i++) {
+ if (dataURIs[i] == null) {
+ Logger.error(this, "Data URI " + i + "
is null");
return;
}
}
@@ -469,21 +559,21 @@
}
public void onSuccess(ClientPutState state) {
- if(parent.parent.isCancelled()) {
+ if (parent.parent.isCancelled()) {
parent.cancel();
return;
}
- SingleBlockInserter sbi = (SingleBlockInserter)state;
+ SingleBlockInserter sbi = (SingleBlockInserter) state;
int x = sbi.token;
completed(x);
}
public void onFailure(InserterException e, ClientPutState state) {
- if(parent.parent.isCancelled()) {
+ if (parent.parent.isCancelled()) {
parent.cancel();
return;
}
- SingleBlockInserter sbi = (SingleBlockInserter)state;
+ SingleBlockInserter sbi = (SingleBlockInserter) state;
int x = sbi.token;
errors.merge(e);
completed(x);
@@ -491,40 +581,50 @@
private void completed(int x) {
int total = innerCompleted(x);
- if(total == -1) return;
- if(total == dataBlockInserters.length) {
+ if (total == -1)
+ return;
+ if (total == dataBlockInserters.length) {
parent.segmentFetchable(this);
}
- if(total != dataBlockInserters.length +
checkBlockInserters.length) return;
+ if (total != dataBlockInserters.length +
checkBlockInserters.length)
+ return;
finish();
}
-
+
/**
* Called when a block has completed.
- * @param x The block number.
- * @return -1 if the segment has already finished, otherwise the number
of completed
- * blocks.
+ *
+ * @param x
+ * The block number.
+ * @return -1 if the segment has already finished, otherwise the number
of
+ * completed blocks.
*/
private synchronized int innerCompleted(int x) {
- if(logMINOR) Logger.minor(this, "Completed: "+x+" on "+this+" (
completed="+blocksCompleted+",
total="+(dataBlockInserters.length+checkBlockInserters.length));
+ if (logMINOR)
+ Logger.minor(this, "Completed: " + x + " on " + this
+ + " ( completed=" + blocksCompleted +
", total="
+ + (dataBlockInserters.length +
checkBlockInserters.length));
- if(finished) return -1;
- if(x >= dataBlocks.length) {
+ if (finished)
+ return -1;
+ if (x >= dataBlocks.length) {
x -= dataBlocks.length;
- if(checkBlockInserters[x] == null) {
- Logger.error(this, "Completed twice: check
block "+x+" on "+this, new Exception());
+ if (checkBlockInserters[x] == null) {
+ Logger.error(this, "Completed twice: check
block " + x + " on "
+ + this, new Exception());
return blocksCompleted;
}
checkBlockInserters[x] = null;
checkBlocks[x].free();
checkBlocks[x] = null;
} else {
- if(dataBlockInserters[x] == null) {
- Logger.error(this, "Completed twice: data block
"+x+" on "+this, new Exception());
+ if (dataBlockInserters[x] == null) {
+ Logger.error(this, "Completed twice: data block
" + x + " on "
+ + this, new Exception());
return blocksCompleted;
}
dataBlockInserters[x] = null;
- if(encoded) {
+ if (encoded) {
dataBlocks[x].free();
dataBlocks[x] = null;
}
@@ -536,7 +636,7 @@
public synchronized boolean isFinished() {
return finished;
}
-
+
public boolean isEncoded() {
return encoded;
}
@@ -544,7 +644,6 @@
public int countCheckBlocks() {
return checkBlocks.length;
}
-
public int countDataBlocks() {
return dataBlocks.length;
@@ -557,36 +656,37 @@
public ClientCHK[] getDataCHKs() {
return dataURIs;
}
-
+
InserterException getException() {
synchronized (this) {
- return toThrow;
+ return toThrow;
}
}
public void cancel() {
- synchronized(this) {
- if(finished) return;
+ synchronized (this) {
+ if (finished)
+ return;
finished = true;
- if(toThrow != null)
+ if (toThrow != null)
toThrow = new
InserterException(InserterException.CANCELLED);
}
- for(int i=0;i<dataBlockInserters.length;i++) {
+ for (int i = 0; i < dataBlockInserters.length; i++) {
SingleBlockInserter sbi = dataBlockInserters[i];
- if(sbi != null)
+ if (sbi != null)
sbi.cancel();
Bucket d = dataBlocks[i];
- if(d != null) {
+ if (d != null) {
d.free();
dataBlocks[i] = null;
}
}
- for(int i=0;i<checkBlockInserters.length;i++) {
+ for (int i = 0; i < checkBlockInserters.length; i++) {
SingleBlockInserter sbi = checkBlockInserters[i];
- if(sbi != null)
+ if (sbi != null)
sbi.cancel();
Bucket d = checkBlocks[i];
- if(d != null) {
+ if (d != null) {
d.free();
checkBlocks[i] = null;
}
@@ -595,16 +695,18 @@
}
public void onTransition(ClientPutState oldState, ClientPutState
newState) {
- Logger.error(this, "Illegal transition in
SplitFileInserterSegment: "+oldState+" -> "+newState);
+ Logger.error(this, "Illegal transition in
SplitFileInserterSegment: "
+ + oldState + " -> " + newState);
}
public void onMetadata(Metadata m, ClientPutState state) {
- Logger.error(this, "Got onMetadata from "+state);
+ Logger.error(this, "Got onMetadata from " + state);
}
public void onBlockSetFinished(ClientPutState state) {
// Ignore
- Logger.error(this, "Should not happen:
onBlockSetFinished("+state+") on "+this);
+ Logger.error(this, "Should not happen: onBlockSetFinished(" +
state
+ + ") on " + this);
}
public synchronized boolean hasURIs() {
@@ -619,7 +721,10 @@
// Ignore
}
- /** Force the remaining blocks which haven't been encoded so far to be
encoded ASAP. */
+ /**
+ * Force the remaining blocks which haven't been encoded so far to be
+ * encoded ASAP.
+ */
public void forceEncode() {
blockInsertContext.backgroundBlockEncoder.queue(dataBlockInserters);
blockInsertContext.backgroundBlockEncoder.queue(checkBlockInserters);