Author: toad
Date: 2008-06-05 18:34:18 +0000 (Thu, 05 Jun 2008)
New Revision: 20223

Added:
   branches/db4o/freenet/src/freenet/client/FECCallback.java
   branches/db4o/freenet/src/freenet/client/FECQueue.java
   branches/db4o/freenet/src/freenet/client/async/DBJob.java
   branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
Modified:
   branches/db4o/freenet/src/freenet/client/FECCodec.java
   branches/db4o/freenet/src/freenet/client/FECJob.java
   branches/db4o/freenet/src/freenet/client/StandardOnionFECCodec.java
   branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
   branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
Log:
Separate FECQueue class.
Supports both persistent and non-persistent jobs, with a cache of 
non-persistent jobs to speed up starting jobs.
Uses DBJobRunner.
Not wired in yet.

Added: branches/db4o/freenet/src/freenet/client/FECCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECCallback.java                   
        (rev 0)
+++ branches/db4o/freenet/src/freenet/client/FECCallback.java   2008-06-05 
18:34:18 UTC (rev 20223)
@@ -0,0 +1,20 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client;
+
+import com.db4o.ObjectContainer;
+
+/**
+ * An interface wich has to be implemented by FECJob submitters
+ * 
+ * @author Florent Daignière <nextgens at freenetproject.org>
+ * 
+ * WARNING: the callback is expected to release the thread !
+ */
+public interface FECCallback {
+
+       public void onEncodedSegment(ObjectContainer container);
+
+       public void onDecodedSegment(ObjectContainer container);
+}
\ No newline at end of file

Modified: branches/db4o/freenet/src/freenet/client/FECCodec.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECCodec.java      2008-06-05 
16:48:30 UTC (rev 20222)
+++ branches/db4o/freenet/src/freenet/client/FECCodec.java      2008-06-05 
18:34:18 UTC (rev 20223)
@@ -6,22 +6,17 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.LinkedList;

 import com.db4o.ObjectContainer;
 import com.onionnetworks.fec.FECCode;
 import com.onionnetworks.util.Buffer;

-import freenet.node.PrioRunnable;
 import freenet.support.Executor;
 import freenet.support.Logger;
-import freenet.support.OOMHandler;
-import freenet.support.OOMHook;
 import freenet.support.api.Bucket;
 import freenet.support.api.BucketFactory;
 import freenet.support.io.BucketTools;
 import freenet.support.io.Closer;
-import freenet.support.io.NativeThread;

 /**
  * FEC (forward error correction) handler.
@@ -30,7 +25,7 @@
  * @author root
  *
  */
-public abstract class FECCodec implements OOMHook {
+public abstract class FECCodec {

        // REDFLAG: Optimal stripe size? Smaller => less memory usage, but more 
JNI overhead

@@ -38,14 +33,10 @@
        static boolean logMINOR;
        FECCode fec;
        protected final int k, n;
-       private final Executor executor;

-       protected FECCodec(Executor executor, int k, int n) {
-               this.executor = executor;
+       protected FECCodec(int k, int n) {
                this.k = k;
                this.n = n;
-               
-               OOMHandler.addOOMHook(this);
        }

        /**
@@ -339,148 +330,7 @@
         * 
         * @param FECJob
         */
-       public void addToQueue(FECJob job) {
-               addToQueue(job, this, executor);
+       public void addToQueue(FECJob job, FECQueue queue, ObjectContainer 
container) {
+               queue.addToQueue(job, this, container);
        }
-
-       public static void addToQueue(FECJob job, FECCodec codec, Executor 
executor) {
-               int maxThreads = getMaxRunningFECThreads();
-               synchronized(_awaitingJobs) {
-                       _awaitingJobs.addFirst(job);
-                       if(runningFECThreads < maxThreads) {
-                               executor.execute(fecRunner, "FEC Pool 
"+fecPoolCounter++);
-                               runningFECThreads++;
-                       }
-                       _awaitingJobs.notifyAll();
-               }
-               if(logMINOR)
-                       Logger.minor(StandardOnionFECCodec.class, "Adding a new 
job to the queue (" + _awaitingJobs.size() + ").");
-       }
-       private static final LinkedList _awaitingJobs = new LinkedList();
-       private static final FECRunner fecRunner = new FECRunner();
-       private static int runningFECThreads;
-       private static int fecPoolCounter;
-       
-       private synchronized static int getMaxRunningFECThreads() {
-               if (maxRunningFECThreads != -1)
-                       return maxRunningFECThreads;
-               String osName = System.getProperty("os.name");
-               if(osName.indexOf("Windows") == -1 && 
(osName.toLowerCase().indexOf("mac os x") > 0) || 
(!NativeThread.usingNativeCode())) {
-                       // OS/X niceness is really weak, so we don't want any 
more background CPU load than necessary
-                       // Also, on non-Windows, we need the native threads 
library to be working.
-                       maxRunningFECThreads = 1;
-               } else {
-                       // Most other OSs will have reasonable niceness, so go 
by RAM.
-                       Runtime r = Runtime.getRuntime();
-                       int max = r.availableProcessors(); // FIXME this may 
change in a VM, poll it
-                       long maxMemory = r.maxMemory();
-                       if(maxMemory < 256*1024*1024) {
-                               max = 1;
-                       } else {
-                               // Measured 11MB decode 8MB encode on amd64.
-                               // No more than 10% of available RAM, so 110MB 
for each extra processor.
-                               // No more than 3 so that we don't reach any 
FileDescriptor related limit
-                               max = Math.min(3, Math.min(max, (int) 
(Math.min(Integer.MAX_VALUE, maxMemory / (128*1024*1024)))));
-                       }
-                       maxRunningFECThreads = max;
-               }
-               Logger.minor(FECCodec.class, "Maximum FEC threads: 
"+maxRunningFECThreads);
-               return maxRunningFECThreads;
-       }
-       
-       private static int maxRunningFECThreads = -1;
-
-       /**
-        * A private Thread started by {@link FECCodec}...
-        * 
-        * @author Florent Daigni&egrave;re &lt;nextgens at 
freenetproject.org&gt;
-        */
-       private static class FECRunner implements PrioRunnable {
-
-               public void run() {
-                       freenet.support.Logger.OSThread.logPID(this);
-                       try {
-                               while(true) {
-                                       FECJob job = null;
-                                       // Get a job
-                                       synchronized (_awaitingJobs) {
-                                               while (_awaitingJobs.isEmpty()) 
{
-                                                       
_awaitingJobs.wait(Integer.MAX_VALUE);
-                                                       if (runningFECThreads > 
getMaxRunningFECThreads())
-                                                               return;
-                                               }
-                                               job = (FECJob) 
_awaitingJobs.removeLast();
-                                       }
-
-                                       // Encode it
-                                       try {
-                                               if (job.isADecodingJob)
-                                                       
job.codec.realDecode(job.dataBlockStatus, job.checkBlockStatus, job.blockLength,
-                                                               
job.bucketFactory);
-                                               else {
-                                                       
job.codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength, 
job.bucketFactory);
-                                                       // Update 
SplitFileBlocks from buckets if necessary
-                                                       if 
((job.dataBlockStatus != null) || (job.checkBlockStatus != null)) {
-                                                               for (int i = 0; 
i < job.dataBlocks.length; i++)
-                                                                       
job.dataBlockStatus[i].setData(job.dataBlocks[i]);
-                                                               for (int i = 0; 
i < job.checkBlocks.length; i++)
-                                                                       
job.checkBlockStatus[i].setData(job.checkBlocks[i]);
-                                                       }
-                                               }
-                                       } catch (IOException e) {
-                                               Logger.error(this, "BOH! ioe:" 
+ e.getMessage());
-                                       }
-
-                                       // Call the callback
-                                       try {
-                                               if (job.isADecodingJob)
-                                                       
job.callback.onDecodedSegment();
-                                               else
-                                                       
job.callback.onEncodedSegment();
-                                       } catch (Throwable e) {
-                                               Logger.error(this, "The 
callback failed!" + e.getMessage(), e);
-                                       }
-                               }
-                       } catch (Throwable t) {
-                               Logger.error(this, "Caught "+t+" in "+this, t);
-                       }
-                       finally {
-                               synchronized (_awaitingJobs) {
-                                       runningFECThreads--;
-                               }
-                       }
-               }
-
-               public int getPriority() {
-                       return NativeThread.LOW_PRIORITY;
-               }
-       }
-
-       public void handleLowMemory() throws Exception {
-               synchronized (_awaitingJobs) {
-                       maxRunningFECThreads = Math.min(1, maxRunningFECThreads 
- 1);
-                       _awaitingJobs.notify(); // not notifyAll()
-               }
-       }
-
-       public void handleOutOfMemory() throws Exception {
-               synchronized (_awaitingJobs) {
-                       maxRunningFECThreads = 1;
-                       _awaitingJobs.notifyAll();
-               }
-       }
-
-       /**
-        * An interface wich has to be implemented by FECJob submitters
-        * 
-        * @author Florent Daigni&egrave;re &lt;nextgens at 
freenetproject.org&gt;
-        * 
-        * WARNING: the callback is expected to release the thread !
-        */
-       public interface StandardOnionFECCodecEncoderCallback {
-
-               public void onEncodedSegment(ObjectContainer container);
-
-               public void onDecodedSegment(ObjectContainer container);
-       }
 }

Modified: branches/db4o/freenet/src/freenet/client/FECJob.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECJob.java        2008-06-05 
16:48:30 UTC (rev 20222)
+++ branches/db4o/freenet/src/freenet/client/FECJob.java        2008-06-05 
18:34:18 UTC (rev 20223)
@@ -3,7 +3,6 @@
  */
 package freenet.client;

-import freenet.client.FECCodec.StandardOnionFECCodecEncoderCallback;
 import freenet.support.api.Bucket;
 import freenet.support.api.BucketFactory;

@@ -19,11 +18,19 @@
        final SplitfileBlock[] dataBlockStatus, checkBlockStatus;
        final BucketFactory bucketFactory;
        final int blockLength;
-       final StandardOnionFECCodecEncoderCallback callback;
+       final FECCallback callback;
        final boolean isADecodingJob;
+       final long addedTime;
+       final short priority;
+       final boolean persistent;
+       /** Parent queue */
+       final FECQueue queue;

-       public FECJob(FECCodec codec, SplitfileBlock[] dataBlockStatus, 
SplitfileBlock[] checkBlockStatus,  int blockLength, BucketFactory 
bucketFactory, StandardOnionFECCodecEncoderCallback callback, boolean 
isADecodingJob) {
+       public FECJob(FECCodec codec, FECQueue queue, SplitfileBlock[] 
dataBlockStatus, SplitfileBlock[] checkBlockStatus,  int blockLength, 
BucketFactory bucketFactory, FECCallback callback, boolean isADecodingJob, 
short priority, boolean persistent) {
                this.codec = codec;
+               this.queue = queue;
+               this.priority = priority;
+               this.addedTime = System.currentTimeMillis();
                this.dataBlockStatus = dataBlockStatus;
                this.checkBlockStatus = checkBlockStatus;

@@ -37,11 +44,15 @@
                this.blockLength = blockLength;
                this.bucketFactory = bucketFactory;
                this.callback = callback;
-               this.isADecodingJob = isADecodingJob;                   
+               this.isADecodingJob = isADecodingJob;
+               this.persistent = persistent;
        }

-       public FECJob(FECCodec codec, Bucket[] dataBlocks, Bucket[] 
checkBlocks, int blockLength, BucketFactory bucketFactory, 
StandardOnionFECCodecEncoderCallback callback, boolean isADecodingJob) {
+       public FECJob(FECCodec codec, FECQueue queue, Bucket[] dataBlocks, 
Bucket[] checkBlocks, int blockLength, BucketFactory bucketFactory, FECCallback 
callback, boolean isADecodingJob, short priority, boolean persistent) {
                this.codec = codec;
+               this.queue = queue;
+               this.priority = priority;
+               this.addedTime = System.currentTimeMillis();
                this.dataBlocks = dataBlocks;
                this.checkBlocks = checkBlocks;
                this.dataBlockStatus = null;
@@ -50,5 +61,6 @@
                this.bucketFactory = bucketFactory;
                this.callback = callback;
                this.isADecodingJob = isADecodingJob;
+               this.persistent = persistent;
        }
 }
\ No newline at end of file

Added: branches/db4o/freenet/src/freenet/client/FECQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECQueue.java                      
        (rev 0)
+++ branches/db4o/freenet/src/freenet/client/FECQueue.java      2008-06-05 
18:34:18 UTC (rev 20223)
@@ -0,0 +1,301 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.ListIterator;
+
+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Query;
+
+import freenet.client.async.DBJob;
+import freenet.client.async.DBJobRunner;
+import freenet.node.PrioRunnable;
+import freenet.node.RequestScheduler;
+import freenet.support.Executor;
+import freenet.support.Logger;
+import freenet.support.OOMHandler;
+import freenet.support.OOMHook;
+import freenet.support.io.NativeThread;
+
+/**
+ * The FEC queue. Uses a limited number of threads (at most one per core), a 
non-persistent queue,
+ * a persistent queue (kept in the database), and a transient cache of the 
persistent queue.
+ * Sorted by priority and then by time added.
+ * @author toad
+ */
+class FECQueue implements OOMHook {
+       
+       private transient LinkedList[] transientQueue;
+       private transient LinkedList[] persistentQueueCache;
+       private transient int maxPersistentQueueCacheSize;
+       private transient int priorities;
+       private transient DBJobRunner databaseJobRunner;
+       private transient Executor executor;
+
+       /** Called after creating or deserializing the FECQueue. Initialises 
all the transient fields. */
+       void init(int priorities, int maxCacheSize, DBJobRunner dbJobRunner, 
Executor exec) {
+               this.priorities = priorities;
+               this.maxPersistentQueueCacheSize = maxCacheSize;
+               this.databaseJobRunner = dbJobRunner;
+               this.executor = exec;
+               transientQueue = new LinkedList[priorities];
+               persistentQueueCache = new LinkedList[priorities];
+               for(int i=0;i<priorities;i++) {
+                       transientQueue[i] = new LinkedList();
+                       persistentQueueCache[i] = new LinkedList();
+               }
+               OOMHandler.addOOMHook(this);
+               queueCacheFiller();
+       }
+       
+       private void queueCacheFiller() {
+               databaseJobRunner.queue(cacheFillerJob, 
NativeThread.NORM_PRIORITY);
+       }
+
+       public void addToQueue(FECJob job, FECCodec codec, ObjectContainer 
container) {
+               boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+               int maxThreads = getMaxRunningFECThreads();
+               if(job.persistent) {
+                       container.set(job);
+               }
+               synchronized(FECQueue.this) {
+                       if(!job.persistent) {
+                               transientQueue[job.priority].addLast(job);
+                       } else {
+                               int totalAbove = 0;
+                               for(int i=0;i<job.priority;i++) {
+                                       totalAbove += 
persistentQueueCache[i].size();
+                               }
+                               if(totalAbove >= maxPersistentQueueCacheSize) {
+                                       // Don't add.
+                                       if(logMINOR)
+                                               Logger.minor(this, "Not adding 
persistent job to in-RAM cache, too many above it");
+                               } else {
+                                       if(totalAbove + 
persistentQueueCache[job.priority].size() >= maxPersistentQueueCacheSize) {
+                                               // Still don't add, within a 
priority it's oldest first.
+                                               if(logMINOR)
+                                                       Logger.minor(this, "Not 
adding persistent job to in-RAM cache, too many at same priority");
+                                       } else {
+                                               
persistentQueueCache[job.priority].addLast(job);
+                                               int total = totalAbove + 
persistentQueueCache[job.priority].size();
+                                               for(int 
i=job.priority+1;i<priorities;i++) {
+                                                       total += 
persistentQueueCache[i].size();
+                                                       while(total >= 
maxPersistentQueueCacheSize && !persistentQueueCache[i].isEmpty()) {
+                                                               if(logMINOR)
+                                                                       
Logger.minor(this, "Removing low priority job from cache, total now "+total);
+                                                               
persistentQueueCache[job.priority].removeLast();
+                                                               total--;
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+                       if(runningFECThreads < maxThreads) {
+                               executor.execute(runner, "FEC Pool 
"+fecPoolCounter++);
+                               runningFECThreads++;
+                       }
+                       notifyAll();
+               }
+               if(logMINOR)
+                       Logger.minor(StandardOnionFECCodec.class, "Adding a new 
job to the queue.");
+       }
+       
+       /**
+        * Runs on each thread.
+        * @author nextgens
+        */
+       private final PrioRunnable runner = new PrioRunnable() {
+               public void run() {
+                       freenet.support.Logger.OSThread.logPID(this);
+                       try {
+                               while(true) {
+                                       final FECJob job;
+                                       // Get a job
+                                       synchronized (FECQueue.this) {
+                                               job = 
getFECJobBlockingNoDBAccess();
+                                       }
+
+                                       // Encode it
+                                       try {
+                                               if (job.isADecodingJob)
+                                                       
job.codec.realDecode(job.dataBlockStatus, job.checkBlockStatus, job.blockLength,
+                                                               
job.bucketFactory);
+                                               else {
+                                                       
job.codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength, 
job.bucketFactory);
+                                                       // Update 
SplitFileBlocks from buckets if necessary
+                                                       if 
((job.dataBlockStatus != null) || (job.checkBlockStatus != null)) {
+                                                               for (int i = 0; 
i < job.dataBlocks.length; i++)
+                                                                       
job.dataBlockStatus[i].setData(job.dataBlocks[i]);
+                                                               for (int i = 0; 
i < job.checkBlocks.length; i++)
+                                                                       
job.checkBlockStatus[i].setData(job.checkBlocks[i]);
+                                                       }
+                                               }
+                                       } catch (IOException e) {
+                                               Logger.error(this, "BOH! ioe:" 
+ e.getMessage());
+                                       }
+
+                                       // Call the callback
+                                       try {
+                                               if(!job.persistent) {
+                                                       if (job.isADecodingJob)
+                                                               
job.callback.onDecodedSegment(null);
+                                                       else
+                                                               
job.callback.onEncodedSegment(null);
+                                               } else {
+                                                       
databaseJobRunner.queue(new DBJob() {
+
+                                                               public void 
run(ObjectContainer container, RequestScheduler sched) {
+                                                                       
if(job.isADecodingJob)
+                                                                               
job.callback.onDecodedSegment(container);
+                                                                       else
+                                                                               
job.callback.onEncodedSegment(container);
+                                                                       
container.delete(job);
+                                                               }
+                                                               
+                                                       }, job.priority);
+                                               }
+                                       } catch (Throwable e) {
+                                               Logger.error(this, "The 
callback failed!" + e.getMessage(), e);
+                                       }
+                               }
+                       } catch (Throwable t) {
+                               Logger.error(this, "Caught "+t+" in "+this, t);
+                       }
+                       finally {
+                               synchronized (FECQueue.this) {
+                                       runningFECThreads--;
+                               }
+                       }
+               }
+
+               public int getPriority() {
+                       return NativeThread.LOW_PRIORITY;
+               }
+
+       };
+
+       private final DBJob cacheFillerJob = new DBJob() {
+
+               public void run(ObjectContainer container, RequestScheduler 
sched) {
+                       // Try to avoid accessing the database while 
synchronized on the FECQueue.
+                       while(true) {
+                               boolean addedAny = false;
+                               int totalCached = 0;
+                               for(short i=0;i<priorities;i++) {
+                                       int grab = 0;
+                                       synchronized(FECQueue.this) {
+                                               int newCached = totalCached + 
persistentQueueCache[i].size();
+                                               if(newCached >= 
maxPersistentQueueCacheSize) return;
+                                               grab = 
maxPersistentQueueCacheSize - newCached;
+                                       }
+                                       Query query = container.query();
+                                       query.descend("priority").constrain(new 
Short(i));
+                                       
query.descend("queue").constrain(FECQueue.this);
+                                       
query.descend("addedTime").orderAscending();
+                                       ObjectSet results = query.execute();
+                                       if(results.hasNext()) {
+                                               for(int j=0;j<grab && 
results.hasNext();j++) {
+                                                       FECJob job = (FECJob) 
results.next();
+                                                       
synchronized(FECQueue.this) {
+                                                               
if(persistentQueueCache[j].contains(job)) {
+                                                                       j--;
+                                                                       
continue;
+                                                               }
+                                                               boolean added = 
false;
+                                                               
for(ListIterator it = persistentQueueCache[j].listIterator();it.hasNext();) {
+                                                                       FECJob 
cmp = (FECJob) it.next();
+                                                                       
if(cmp.addedTime >= job.addedTime) {
+                                                                               
it.previous();
+                                                                               
it.add(job);
+                                                                               
added = true;
+                                                                               
addedAny = true;
+                                                                               
break;
+                                                                       }
+                                                               }
+                                                               if(!added) 
persistentQueueCache[j].addLast(job);
+                                                       }
+                                               }
+                                       }
+                               }
+                               if(!addedAny) return;
+                       }
+               }
+               
+       };
+       
+       private int maxRunningFECThreads = -1;
+
+       private synchronized int getMaxRunningFECThreads() {
+               if (maxRunningFECThreads != -1)
+                       return maxRunningFECThreads;
+               String osName = System.getProperty("os.name");
+               if(osName.indexOf("Windows") == -1 && 
(osName.toLowerCase().indexOf("mac os x") > 0) || 
(!NativeThread.usingNativeCode())) {
+                       // OS/X niceness is really weak, so we don't want any 
more background CPU load than necessary
+                       // Also, on non-Windows, we need the native threads 
library to be working.
+                       maxRunningFECThreads = 1;
+               } else {
+                       // Most other OSs will have reasonable niceness, so go 
by RAM.
+                       Runtime r = Runtime.getRuntime();
+                       int max = r.availableProcessors(); // FIXME this may 
change in a VM, poll it
+                       long maxMemory = r.maxMemory();
+                       if(maxMemory < 256*1024*1024) {
+                               max = 1;
+                       } else {
+                               // Measured 11MB decode 8MB encode on amd64.
+                               // No more than 10% of available RAM, so 110MB 
for each extra processor.
+                               // No more than 3 so that we don't reach any 
FileDescriptor related limit
+                               max = Math.min(3, Math.min(max, (int) 
(Math.min(Integer.MAX_VALUE, maxMemory / (128*1024*1024)))));
+                       }
+                       maxRunningFECThreads = max;
+               }
+               Logger.minor(FECCodec.class, "Maximum FEC threads: 
"+maxRunningFECThreads);
+               return maxRunningFECThreads;
+       }
+
+       /**
+        * Find a FEC job to run.
+        * @return null only if there are too many FEC threads running.
+        */
+       protected synchronized FECJob getFECJobBlockingNoDBAccess() {
+               while(true) {
+                       if(runningFECThreads > getMaxRunningFECThreads())
+                               return null;
+                       for(int i=0;i<priorities;i++) {
+                               if(!transientQueue[i].isEmpty())
+                                       return (FECJob) 
transientQueue[i].removeFirst();
+                               if(!persistentQueueCache[i].isEmpty())
+                                       return (FECJob) 
persistentQueueCache[i].removeFirst();
+                       }
+                       queueCacheFiller();
+                       try {
+                               wait();
+                       } catch (InterruptedException e) {
+                               // Ignore
+                       }
+               }
+       }
+
+       private static int runningFECThreads;
+       private static int fecPoolCounter;
+
+       public void handleLowMemory() throws Exception {
+               synchronized (this) {
+                       maxRunningFECThreads = Math.min(1, maxRunningFECThreads 
- 1);
+                       notify(); // not notifyAll()
+               }
+       }
+
+       public void handleOutOfMemory() throws Exception {
+               synchronized (this) {
+                       maxRunningFECThreads = 1;
+                       notifyAll();
+               }
+       }
+
+       
+       
+}

Modified: branches/db4o/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/StandardOnionFECCodec.java 
2008-06-05 16:48:30 UTC (rev 20222)
+++ branches/db4o/freenet/src/freenet/client/StandardOnionFECCodec.java 
2008-06-05 18:34:18 UTC (rev 20223)
@@ -52,7 +52,7 @@
                        recentlyUsedCodecs.push(key, codec);
                        return codec;
                }
-               codec = new StandardOnionFECCodec(executor, dataBlocks, 
checkBlocks + dataBlocks);
+               codec = new StandardOnionFECCodec(dataBlocks, checkBlocks + 
dataBlocks);
                recentlyUsedCodecs.push(key, codec);
                while(recentlyUsedCodecs.size() > MAX_CACHED_CODECS) {
                        recentlyUsedCodecs.popKey();
@@ -60,8 +60,8 @@
                return codec;
        }

-       public StandardOnionFECCodec(Executor executor, int k, int n) {
-               super(executor, k, n);
+       public StandardOnionFECCodec(int k, int n) {
+               super(k, n);

                FECCode fec2 = null;
                if(!noNative) {

Added: branches/db4o/freenet/src/freenet/client/async/DBJob.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJob.java                   
        (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/DBJob.java   2008-06-05 
18:34:18 UTC (rev 20223)
@@ -0,0 +1,19 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client.async;
+
+import com.db4o.ObjectContainer;
+
+import freenet.node.RequestScheduler;
+
+/**
+ * A job to be run on the database thread. We will pass a transactional 
context in,
+ * and a RequestScheduler.
+ * @author toad
+ */
+public interface DBJob {
+       
+       void run(ObjectContainer container, RequestScheduler sched);
+
+}

Added: branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java             
                (rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java     
2008-06-05 18:34:18 UTC (rev 20223)
@@ -0,0 +1,14 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client.async;
+
+/**
+ * Interface for an object which queues and runs DBJob's.
+ * @author toad
+ */
+public interface DBJobRunner {
+       
+       public void queue(DBJob job, int priority);
+
+}

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-06-05 16:48:30 UTC (rev 20222)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java 
2008-06-05 18:34:18 UTC (rev 20223)
@@ -10,6 +10,7 @@
 import com.db4o.ObjectContainer;

 import freenet.client.ArchiveContext;
+import freenet.client.FECCallback;
 import freenet.client.FECCodec;
 import freenet.client.FECJob;
 import freenet.client.FailureCodeTracker;
@@ -18,7 +19,6 @@
 import freenet.client.Metadata;
 import freenet.client.MetadataParseException;
 import freenet.client.SplitfileBlock;
-import freenet.client.FECCodec.StandardOnionFECCodecEncoderCallback;
 import freenet.keys.CHKBlock;
 import freenet.keys.CHKEncodeException;
 import freenet.keys.ClientCHK;
@@ -35,7 +35,7 @@
  * A single segment within a SplitFileFetcher.
  * This in turn controls a large number of SplitFileFetcherSubSegment's, which 
are registered on the ClientRequestScheduler.
  */
-public class SplitFileFetcherSegment implements 
StandardOnionFECCodecEncoderCallback {
+public class SplitFileFetcherSegment implements FECCallback {

        private static volatile boolean logMINOR;
        final short splitfileType;

Modified: 
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java    
    2008-06-05 16:48:30 UTC (rev 20222)
+++ 
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java    
    2008-06-05 18:34:18 UTC (rev 20223)
@@ -4,13 +4,13 @@

 import com.db4o.ObjectContainer;

+import freenet.client.FECCallback;
 import freenet.client.FECCodec;
 import freenet.client.FECJob;
 import freenet.client.FailureCodeTracker;
 import freenet.client.InsertContext;
 import freenet.client.InsertException;
 import freenet.client.Metadata;
-import freenet.client.FECCodec.StandardOnionFECCodecEncoderCallback;
 import freenet.keys.BaseClientKey;
 import freenet.keys.CHKBlock;
 import freenet.keys.ClientCHK;
@@ -24,7 +24,7 @@
 import freenet.support.io.SerializableToFieldSetBucket;
 import freenet.support.io.SerializableToFieldSetBucketUtil;

-public class SplitFileInserterSegment implements PutCompletionCallback, 
StandardOnionFECCodecEncoderCallback {
+public class SplitFileInserterSegment implements PutCompletionCallback, 
FECCallback {

        private static volatile boolean logMINOR;



Reply via email to