Author: nextgens
Date: 2007-04-14 01:18:50 +0000 (Sat, 14 Apr 2007)
New Revision: 12676
Modified:
trunk/freenet/src/freenet/client/FECCodec.java
trunk/freenet/src/freenet/client/FECJob.java
trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
Log:
Implement 1291: Single FECRunner for all codecs
Now FecRunner belongs to FECCodec and is private static final; it should do it
Modified: trunk/freenet/src/freenet/client/FECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/FECCodec.java 2007-04-14 01:01:06 UTC
(rev 12675)
+++ trunk/freenet/src/freenet/client/FECCodec.java 2007-04-14 01:18:50 UTC
(rev 12676)
@@ -6,6 +6,8 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
import com.onionnetworks.fec.FECCode;
import com.onionnetworks.util.Buffer;
@@ -24,10 +26,10 @@
*/
public abstract class FECCodec {
+ // REDFLAG: Optimal stripe size? Smaller => less memory usage, but more
JNI overhead
+ private static int STRIPE_SIZE = 4096;
static boolean logMINOR;
- // REDFLAG: Optimal stripe size? Smaller => less memory usage, but more
JNI overhead
- private static int STRIPE_SIZE = 4096;
FECCode fec;
int k, n;
@@ -64,9 +66,6 @@
*/
public abstract int countCheckBlocks();
- /** Queue an asynchronous encode or decode job */
- public abstract void addToQueue(FECJob job);
-
protected void realDecode(SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory bf) throws
IOException {
if(logMINOR)
Logger.minor(this, "Doing decode: " +
dataBlockStatus.length
@@ -328,4 +327,109 @@
checkBlockStatus[i] = data;
}
}
+
+ /**
+ * The method used to submit {@link FECJob}s to the pool
+ *
+ * @author Florent Daignière <nextgens at
freenetproject.org>
+ *
+ * @param FECJob
+ */
+ public void addToQueue(FECJob job) {
+ addToQueue(job, this);
+ }
+
+ public static void addToQueue(FECJob job, FECCodec codec){
+ synchronized (_awaitingJobs) {
+ if(fecRunnerThread == null) {
+ if(fecRunnerThread != null)
Logger.error(FECCodec.class, "The callback died!! restarting a new one, please
report that error.");
+ fecRunnerThread = new Thread(fecRunner, "FEC
Pool");
+ fecRunnerThread.setDaemon(true);
+
fecRunnerThread.setPriority(Thread.MIN_PRIORITY);
+
+ fecRunnerThread.start();
+ }
+
+ _awaitingJobs.addFirst(job);
+ }
+ if(logMINOR) Logger.minor(StandardOnionFECCodec.class, "Adding
a new job to the queue (" +_awaitingJobs.size() + ").");
+ synchronized (fecRunner){
+ fecRunner.notifyAll();
+ }
+ }
+
+ private static final LinkedList _awaitingJobs = new LinkedList();
+ private static final FECRunner fecRunner = new FECRunner();
+ private static Thread fecRunnerThread;
+
+ /**
+ * A private Thread started by {@link FECCodec}...
+ *
+ * @author Florent Daignière <nextgens at
freenetproject.org>
+ *
+ * TODO: maybe it ought to start more than one thread on SMP
system ? (take care, it's memory consumpsive)
+ */
+ private static class FECRunner implements Runnable {
+
+ public void run(){
+ try {
+ while(true){
+ FECJob job = null;
+ try {
+ // Get a job
+ synchronized (_awaitingJobs) {
+ job = (FECJob)
_awaitingJobs.removeLast();
+ }
+
+ // Encode it
+ try {
+ if(job.isADecodingJob) {
+
job.codec.realDecode(job.dataBlockStatus, job.checkBlockStatus,
job.blockLength, job.bucketFactory);
+ } else {
+
job.codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength,
job.bucketFactory);
+ // Update
SplitFileBlocks from buckets if necessary
+ if((job.dataBlockStatus
!= null) || (job.checkBlockStatus != null)){
+ for(int
i=0;i<job.dataBlocks.length;i++)
+
job.dataBlockStatus[i].setData(job.dataBlocks[i]);
+ for(int
i=0;i<job.checkBlocks.length;i++)
+
job.checkBlockStatus[i].setData(job.checkBlocks[i]);
+ }
+ }
+ } catch (IOException e) {
+ Logger.error(this, "BOH! ioe:"
+ e.getMessage());
+ }
+
+ // Call the callback
+ try {
+ if(job.isADecodingJob)
+
job.callback.onDecodedSegment();
+ else
+
job.callback.onEncodedSegment();
+
+ } catch (Throwable e) {
+ Logger.error(this, "The
callback failed!" + e.getMessage(), e);
+ }
+ } catch (NoSuchElementException ne) {
+ try {
+ synchronized (this) {
+
wait(Integer.MAX_VALUE);
+ }
+ } catch (InterruptedException e) {}
+ }
+ }
+ } finally { fecRunnerThread = null; }
+ }
+ }
+
+ /**
+ * An interface wich has to be implemented by FECJob submitters
+ *
+ * @author Florent Daignière <nextgens at
freenetproject.org>
+ *
+ * WARNING: the callback is expected to release the thread !
+ */
+ public interface StandardOnionFECCodecEncoderCallback{
+ public void onEncodedSegment();
+ public void onDecodedSegment();
+ }
}
Modified: trunk/freenet/src/freenet/client/FECJob.java
===================================================================
--- trunk/freenet/src/freenet/client/FECJob.java 2007-04-14 01:01:06 UTC
(rev 12675)
+++ trunk/freenet/src/freenet/client/FECJob.java 2007-04-14 01:18:50 UTC
(rev 12676)
@@ -3,7 +3,7 @@
*/
package freenet.client;
-import
freenet.client.StandardOnionFECCodec.StandardOnionFECCodecEncoderCallback;
+import freenet.client.FECCodec.StandardOnionFECCodecEncoderCallback;
import freenet.support.api.Bucket;
import freenet.support.api.BucketFactory;
Modified: trunk/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-04-14
01:01:06 UTC (rev 12675)
+++ trunk/freenet/src/freenet/client/StandardOnionFECCodec.java 2007-04-14
01:18:50 UTC (rev 12676)
@@ -3,10 +3,6 @@
* http://www.gnu.org/ for further details of the GPL. */
package freenet.client;
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.NoSuchElementException;
-
import com.onionnetworks.fec.FECCode;
import com.onionnetworks.fec.Native8Code;
import com.onionnetworks.fec.PureCode;
@@ -100,111 +96,4 @@
public int countCheckBlocks() {
return n-k;
}
-
- // ###############################
-
- /**
- * The method used to submit {@link FECJob}s to the pool
- *
- * @author Florent Daignière <nextgens at
freenetproject.org>
- *
- * @param FECJob
- */
- public void addToQueue(FECJob job) {
- addToQueue(job, this);
- }
-
- public static void addToQueue(FECJob job, StandardOnionFECCodec codec){
- synchronized (_awaitingJobs) {
- if(fecRunnerThread == null) {
- if(fecRunnerThread != null)
Logger.error(StandardOnionFECCodec.class, "The callback died!! restarting a new
one, please report that error.");
- fecRunnerThread = new Thread(fecRunner, "FEC
Pool");
- fecRunnerThread.setDaemon(true);
-
fecRunnerThread.setPriority(Thread.MIN_PRIORITY);
-
- fecRunnerThread.start();
- }
-
- _awaitingJobs.addFirst(job);
- }
- if(logMINOR) Logger.minor(StandardOnionFECCodec.class, "Adding
a new job to the queue (" +_awaitingJobs.size() + ").");
- synchronized (fecRunner){
- fecRunner.notifyAll();
- }
- }
-
- private static final LinkedList _awaitingJobs = new LinkedList();
- private static final FECRunner fecRunner = new FECRunner();
- private static Thread fecRunnerThread;
-
- /**
- * An interface wich has to be implemented by FECJob submitters
- *
- * @author Florent Daignière <nextgens at
freenetproject.org>
- *
- * WARNING: the callback is expected to release the thread !
- */
- public interface StandardOnionFECCodecEncoderCallback{
- public void onEncodedSegment();
- public void onDecodedSegment();
- }
-
- /**
- * A private Thread started by {@link StandardOnionFECCodec}...
- *
- * @author Florent Daignière <nextgens at
freenetproject.org>
- *
- * TODO: maybe it ought to start more than one thread on SMP
system ? (take care, it's memory consumpsive)
- */
- private static class FECRunner implements Runnable {
-
- public void run(){
- try {
- while(true){
- FECJob job = null;
- try {
- // Get a job
- synchronized (_awaitingJobs) {
- job = (FECJob)
_awaitingJobs.removeLast();
- }
-
- // Encode it
- try {
- if(job.isADecodingJob) {
-
job.codec.realDecode(job.dataBlockStatus, job.checkBlockStatus,
job.blockLength, job.bucketFactory);
- } else {
-
job.codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength,
job.bucketFactory);
- // Update
SplitFileBlocks from buckets if necessary
- if((job.dataBlockStatus
!= null) || (job.checkBlockStatus != null)){
- for(int
i=0;i<job.dataBlocks.length;i++)
-
job.dataBlockStatus[i].setData(job.dataBlocks[i]);
- for(int
i=0;i<job.checkBlocks.length;i++)
-
job.checkBlockStatus[i].setData(job.checkBlocks[i]);
- }
- }
- } catch (IOException e) {
- Logger.error(this, "BOH! ioe:"
+ e.getMessage());
- }
-
- // Call the callback
- try {
- if(job.isADecodingJob)
-
job.callback.onDecodedSegment();
- else
-
job.callback.onEncodedSegment();
-
- } catch (Throwable e) {
- Logger.error(this, "The
callback failed!" + e.getMessage(), e);
- }
- } catch (NoSuchElementException ne) {
- try {
- synchronized (this) {
-
wait(Integer.MAX_VALUE);
- }
- } catch (InterruptedException e) {}
- }
- }
- } finally { fecRunnerThread = null; }
- }
- }
}
Modified: trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2007-04-14 01:01:06 UTC (rev 12675)
+++ trunk/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2007-04-14 01:18:50 UTC (rev 12676)
@@ -16,7 +16,7 @@
import freenet.client.Metadata;
import freenet.client.MetadataParseException;
import freenet.client.SplitfileBlock;
-import
freenet.client.StandardOnionFECCodec.StandardOnionFECCodecEncoderCallback;
+import freenet.client.FECCodec.StandardOnionFECCodecEncoderCallback;
import freenet.keys.CHKBlock;
import freenet.keys.ClientCHK;
import freenet.support.Logger;
Modified: trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2007-04-14 01:01:06 UTC (rev 12675)
+++ trunk/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2007-04-14 01:18:50 UTC (rev 12676)
@@ -8,7 +8,7 @@
import freenet.client.InserterContext;
import freenet.client.InserterException;
import freenet.client.Metadata;
-import
freenet.client.StandardOnionFECCodec.StandardOnionFECCodecEncoderCallback;
+import freenet.client.FECCodec.StandardOnionFECCodecEncoderCallback;
import freenet.keys.BaseClientKey;
import freenet.keys.CHKBlock;
import freenet.keys.ClientCHK;
@@ -22,8 +22,7 @@
import freenet.support.io.SerializableToFieldSetBucket;
import freenet.support.io.SerializableToFieldSetBucketUtil;
-public class SplitFileInserterSegment implements PutCompletionCallback,
- StandardOnionFECCodecEncoderCallback {
+public class SplitFileInserterSegment implements PutCompletionCallback,
StandardOnionFECCodecEncoderCallback {
private static boolean logMINOR;