Author: toad
Date: 2008-06-05 18:34:18 +0000 (Thu, 05 Jun 2008)
New Revision: 20223
Added:
branches/db4o/freenet/src/freenet/client/FECCallback.java
branches/db4o/freenet/src/freenet/client/FECQueue.java
branches/db4o/freenet/src/freenet/client/async/DBJob.java
branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
Modified:
branches/db4o/freenet/src/freenet/client/FECCodec.java
branches/db4o/freenet/src/freenet/client/FECJob.java
branches/db4o/freenet/src/freenet/client/StandardOnionFECCodec.java
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
Log:
Separate FECQueue class.
Supports both persistent and non-persistent jobs, with a cache of
non-persistent jobs to speed up starting jobs.
Uses DBJobRunner.
Not wired in yet.
Added: branches/db4o/freenet/src/freenet/client/FECCallback.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECCallback.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/FECCallback.java 2008-06-05
18:34:18 UTC (rev 20223)
@@ -0,0 +1,20 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client;
+
+import com.db4o.ObjectContainer;
+
+/**
+ * An interface wich has to be implemented by FECJob submitters
+ *
+ * @author Florent Daignière <nextgens at freenetproject.org>
+ *
+ * WARNING: the callback is expected to release the thread !
+ */
+public interface FECCallback {
+
+ public void onEncodedSegment(ObjectContainer container);
+
+ public void onDecodedSegment(ObjectContainer container);
+}
\ No newline at end of file
Modified: branches/db4o/freenet/src/freenet/client/FECCodec.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECCodec.java 2008-06-05
16:48:30 UTC (rev 20222)
+++ branches/db4o/freenet/src/freenet/client/FECCodec.java 2008-06-05
18:34:18 UTC (rev 20223)
@@ -6,22 +6,17 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.LinkedList;
import com.db4o.ObjectContainer;
import com.onionnetworks.fec.FECCode;
import com.onionnetworks.util.Buffer;
-import freenet.node.PrioRunnable;
import freenet.support.Executor;
import freenet.support.Logger;
-import freenet.support.OOMHandler;
-import freenet.support.OOMHook;
import freenet.support.api.Bucket;
import freenet.support.api.BucketFactory;
import freenet.support.io.BucketTools;
import freenet.support.io.Closer;
-import freenet.support.io.NativeThread;
/**
* FEC (forward error correction) handler.
@@ -30,7 +25,7 @@
* @author root
*
*/
-public abstract class FECCodec implements OOMHook {
+public abstract class FECCodec {
// REDFLAG: Optimal stripe size? Smaller => less memory usage, but more
JNI overhead
@@ -38,14 +33,10 @@
static boolean logMINOR;
FECCode fec;
protected final int k, n;
- private final Executor executor;
- protected FECCodec(Executor executor, int k, int n) {
- this.executor = executor;
+ protected FECCodec(int k, int n) {
this.k = k;
this.n = n;
-
- OOMHandler.addOOMHook(this);
}
/**
@@ -339,148 +330,7 @@
*
* @param FECJob
*/
- public void addToQueue(FECJob job) {
- addToQueue(job, this, executor);
+ public void addToQueue(FECJob job, FECQueue queue, ObjectContainer
container) {
+ queue.addToQueue(job, this, container);
}
-
- public static void addToQueue(FECJob job, FECCodec codec, Executor
executor) {
- int maxThreads = getMaxRunningFECThreads();
- synchronized(_awaitingJobs) {
- _awaitingJobs.addFirst(job);
- if(runningFECThreads < maxThreads) {
- executor.execute(fecRunner, "FEC Pool
"+fecPoolCounter++);
- runningFECThreads++;
- }
- _awaitingJobs.notifyAll();
- }
- if(logMINOR)
- Logger.minor(StandardOnionFECCodec.class, "Adding a new
job to the queue (" + _awaitingJobs.size() + ").");
- }
- private static final LinkedList _awaitingJobs = new LinkedList();
- private static final FECRunner fecRunner = new FECRunner();
- private static int runningFECThreads;
- private static int fecPoolCounter;
-
- private synchronized static int getMaxRunningFECThreads() {
- if (maxRunningFECThreads != -1)
- return maxRunningFECThreads;
- String osName = System.getProperty("os.name");
- if(osName.indexOf("Windows") == -1 &&
(osName.toLowerCase().indexOf("mac os x") > 0) ||
(!NativeThread.usingNativeCode())) {
- // OS/X niceness is really weak, so we don't want any
more background CPU load than necessary
- // Also, on non-Windows, we need the native threads
library to be working.
- maxRunningFECThreads = 1;
- } else {
- // Most other OSs will have reasonable niceness, so go
by RAM.
- Runtime r = Runtime.getRuntime();
- int max = r.availableProcessors(); // FIXME this may
change in a VM, poll it
- long maxMemory = r.maxMemory();
- if(maxMemory < 256*1024*1024) {
- max = 1;
- } else {
- // Measured 11MB decode 8MB encode on amd64.
- // No more than 10% of available RAM, so 110MB
for each extra processor.
- // No more than 3 so that we don't reach any
FileDescriptor related limit
- max = Math.min(3, Math.min(max, (int)
(Math.min(Integer.MAX_VALUE, maxMemory / (128*1024*1024)))));
- }
- maxRunningFECThreads = max;
- }
- Logger.minor(FECCodec.class, "Maximum FEC threads:
"+maxRunningFECThreads);
- return maxRunningFECThreads;
- }
-
- private static int maxRunningFECThreads = -1;
-
- /**
- * A private Thread started by {@link FECCodec}...
- *
- * @author Florent Daignière <nextgens at
freenetproject.org>
- */
- private static class FECRunner implements PrioRunnable {
-
- public void run() {
- freenet.support.Logger.OSThread.logPID(this);
- try {
- while(true) {
- FECJob job = null;
- // Get a job
- synchronized (_awaitingJobs) {
- while (_awaitingJobs.isEmpty())
{
-
_awaitingJobs.wait(Integer.MAX_VALUE);
- if (runningFECThreads >
getMaxRunningFECThreads())
- return;
- }
- job = (FECJob)
_awaitingJobs.removeLast();
- }
-
- // Encode it
- try {
- if (job.isADecodingJob)
-
job.codec.realDecode(job.dataBlockStatus, job.checkBlockStatus, job.blockLength,
-
job.bucketFactory);
- else {
-
job.codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength,
job.bucketFactory);
- // Update
SplitFileBlocks from buckets if necessary
- if
((job.dataBlockStatus != null) || (job.checkBlockStatus != null)) {
- for (int i = 0;
i < job.dataBlocks.length; i++)
-
job.dataBlockStatus[i].setData(job.dataBlocks[i]);
- for (int i = 0;
i < job.checkBlocks.length; i++)
-
job.checkBlockStatus[i].setData(job.checkBlocks[i]);
- }
- }
- } catch (IOException e) {
- Logger.error(this, "BOH! ioe:"
+ e.getMessage());
- }
-
- // Call the callback
- try {
- if (job.isADecodingJob)
-
job.callback.onDecodedSegment();
- else
-
job.callback.onEncodedSegment();
- } catch (Throwable e) {
- Logger.error(this, "The
callback failed!" + e.getMessage(), e);
- }
- }
- } catch (Throwable t) {
- Logger.error(this, "Caught "+t+" in "+this, t);
- }
- finally {
- synchronized (_awaitingJobs) {
- runningFECThreads--;
- }
- }
- }
-
- public int getPriority() {
- return NativeThread.LOW_PRIORITY;
- }
- }
-
- public void handleLowMemory() throws Exception {
- synchronized (_awaitingJobs) {
- maxRunningFECThreads = Math.min(1, maxRunningFECThreads
- 1);
- _awaitingJobs.notify(); // not notifyAll()
- }
- }
-
- public void handleOutOfMemory() throws Exception {
- synchronized (_awaitingJobs) {
- maxRunningFECThreads = 1;
- _awaitingJobs.notifyAll();
- }
- }
-
- /**
- * An interface wich has to be implemented by FECJob submitters
- *
- * @author Florent Daignière <nextgens at
freenetproject.org>
- *
- * WARNING: the callback is expected to release the thread !
- */
- public interface StandardOnionFECCodecEncoderCallback {
-
- public void onEncodedSegment(ObjectContainer container);
-
- public void onDecodedSegment(ObjectContainer container);
- }
}
Modified: branches/db4o/freenet/src/freenet/client/FECJob.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECJob.java 2008-06-05
16:48:30 UTC (rev 20222)
+++ branches/db4o/freenet/src/freenet/client/FECJob.java 2008-06-05
18:34:18 UTC (rev 20223)
@@ -3,7 +3,6 @@
*/
package freenet.client;
-import freenet.client.FECCodec.StandardOnionFECCodecEncoderCallback;
import freenet.support.api.Bucket;
import freenet.support.api.BucketFactory;
@@ -19,11 +18,19 @@
final SplitfileBlock[] dataBlockStatus, checkBlockStatus;
final BucketFactory bucketFactory;
final int blockLength;
- final StandardOnionFECCodecEncoderCallback callback;
+ final FECCallback callback;
final boolean isADecodingJob;
+ final long addedTime;
+ final short priority;
+ final boolean persistent;
+ /** Parent queue */
+ final FECQueue queue;
- public FECJob(FECCodec codec, SplitfileBlock[] dataBlockStatus,
SplitfileBlock[] checkBlockStatus, int blockLength, BucketFactory
bucketFactory, StandardOnionFECCodecEncoderCallback callback, boolean
isADecodingJob) {
+ public FECJob(FECCodec codec, FECQueue queue, SplitfileBlock[]
dataBlockStatus, SplitfileBlock[] checkBlockStatus, int blockLength,
BucketFactory bucketFactory, FECCallback callback, boolean isADecodingJob,
short priority, boolean persistent) {
this.codec = codec;
+ this.queue = queue;
+ this.priority = priority;
+ this.addedTime = System.currentTimeMillis();
this.dataBlockStatus = dataBlockStatus;
this.checkBlockStatus = checkBlockStatus;
@@ -37,11 +44,15 @@
this.blockLength = blockLength;
this.bucketFactory = bucketFactory;
this.callback = callback;
- this.isADecodingJob = isADecodingJob;
+ this.isADecodingJob = isADecodingJob;
+ this.persistent = persistent;
}
- public FECJob(FECCodec codec, Bucket[] dataBlocks, Bucket[]
checkBlocks, int blockLength, BucketFactory bucketFactory,
StandardOnionFECCodecEncoderCallback callback, boolean isADecodingJob) {
+ public FECJob(FECCodec codec, FECQueue queue, Bucket[] dataBlocks,
Bucket[] checkBlocks, int blockLength, BucketFactory bucketFactory, FECCallback
callback, boolean isADecodingJob, short priority, boolean persistent) {
this.codec = codec;
+ this.queue = queue;
+ this.priority = priority;
+ this.addedTime = System.currentTimeMillis();
this.dataBlocks = dataBlocks;
this.checkBlocks = checkBlocks;
this.dataBlockStatus = null;
@@ -50,5 +61,6 @@
this.bucketFactory = bucketFactory;
this.callback = callback;
this.isADecodingJob = isADecodingJob;
+ this.persistent = persistent;
}
}
\ No newline at end of file
Added: branches/db4o/freenet/src/freenet/client/FECQueue.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/FECQueue.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/FECQueue.java 2008-06-05
18:34:18 UTC (rev 20223)
@@ -0,0 +1,301 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.ListIterator;
+
+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Query;
+
+import freenet.client.async.DBJob;
+import freenet.client.async.DBJobRunner;
+import freenet.node.PrioRunnable;
+import freenet.node.RequestScheduler;
+import freenet.support.Executor;
+import freenet.support.Logger;
+import freenet.support.OOMHandler;
+import freenet.support.OOMHook;
+import freenet.support.io.NativeThread;
+
+/**
+ * The FEC queue. Uses a limited number of threads (at most one per core), a
non-persistent queue,
+ * a persistent queue (kept in the database), and a transient cache of the
persistent queue.
+ * Sorted by priority and then by time added.
+ * @author toad
+ */
+class FECQueue implements OOMHook {
+
+ private transient LinkedList[] transientQueue;
+ private transient LinkedList[] persistentQueueCache;
+ private transient int maxPersistentQueueCacheSize;
+ private transient int priorities;
+ private transient DBJobRunner databaseJobRunner;
+ private transient Executor executor;
+
+ /** Called after creating or deserializing the FECQueue. Initialises
all the transient fields. */
+ void init(int priorities, int maxCacheSize, DBJobRunner dbJobRunner,
Executor exec) {
+ this.priorities = priorities;
+ this.maxPersistentQueueCacheSize = maxCacheSize;
+ this.databaseJobRunner = dbJobRunner;
+ this.executor = exec;
+ transientQueue = new LinkedList[priorities];
+ persistentQueueCache = new LinkedList[priorities];
+ for(int i=0;i<priorities;i++) {
+ transientQueue[i] = new LinkedList();
+ persistentQueueCache[i] = new LinkedList();
+ }
+ OOMHandler.addOOMHook(this);
+ queueCacheFiller();
+ }
+
+ private void queueCacheFiller() {
+ databaseJobRunner.queue(cacheFillerJob,
NativeThread.NORM_PRIORITY);
+ }
+
+ public void addToQueue(FECJob job, FECCodec codec, ObjectContainer
container) {
+ boolean logMINOR = Logger.shouldLog(Logger.MINOR, this);
+ int maxThreads = getMaxRunningFECThreads();
+ if(job.persistent) {
+ container.set(job);
+ }
+ synchronized(FECQueue.this) {
+ if(!job.persistent) {
+ transientQueue[job.priority].addLast(job);
+ } else {
+ int totalAbove = 0;
+ for(int i=0;i<job.priority;i++) {
+ totalAbove +=
persistentQueueCache[i].size();
+ }
+ if(totalAbove >= maxPersistentQueueCacheSize) {
+ // Don't add.
+ if(logMINOR)
+ Logger.minor(this, "Not adding
persistent job to in-RAM cache, too many above it");
+ } else {
+ if(totalAbove +
persistentQueueCache[job.priority].size() >= maxPersistentQueueCacheSize) {
+ // Still don't add, within a
priority it's oldest first.
+ if(logMINOR)
+ Logger.minor(this, "Not
adding persistent job to in-RAM cache, too many at same priority");
+ } else {
+
persistentQueueCache[job.priority].addLast(job);
+ int total = totalAbove +
persistentQueueCache[job.priority].size();
+ for(int
i=job.priority+1;i<priorities;i++) {
+ total +=
persistentQueueCache[i].size();
+ while(total >=
maxPersistentQueueCacheSize && !persistentQueueCache[i].isEmpty()) {
+ if(logMINOR)
+
Logger.minor(this, "Removing low priority job from cache, total now "+total);
+
persistentQueueCache[job.priority].removeLast();
+ total--;
+ }
+ }
+ }
+ }
+ }
+ if(runningFECThreads < maxThreads) {
+ executor.execute(runner, "FEC Pool
"+fecPoolCounter++);
+ runningFECThreads++;
+ }
+ notifyAll();
+ }
+ if(logMINOR)
+ Logger.minor(StandardOnionFECCodec.class, "Adding a new
job to the queue.");
+ }
+
+ /**
+ * Runs on each thread.
+ * @author nextgens
+ */
+ private final PrioRunnable runner = new PrioRunnable() {
+ public void run() {
+ freenet.support.Logger.OSThread.logPID(this);
+ try {
+ while(true) {
+ final FECJob job;
+ // Get a job
+ synchronized (FECQueue.this) {
+ job =
getFECJobBlockingNoDBAccess();
+ }
+
+ // Encode it
+ try {
+ if (job.isADecodingJob)
+
job.codec.realDecode(job.dataBlockStatus, job.checkBlockStatus, job.blockLength,
+
job.bucketFactory);
+ else {
+
job.codec.realEncode(job.dataBlocks, job.checkBlocks, job.blockLength,
job.bucketFactory);
+ // Update
SplitFileBlocks from buckets if necessary
+ if
((job.dataBlockStatus != null) || (job.checkBlockStatus != null)) {
+ for (int i = 0;
i < job.dataBlocks.length; i++)
+
job.dataBlockStatus[i].setData(job.dataBlocks[i]);
+ for (int i = 0;
i < job.checkBlocks.length; i++)
+
job.checkBlockStatus[i].setData(job.checkBlocks[i]);
+ }
+ }
+ } catch (IOException e) {
+ Logger.error(this, "BOH! ioe:"
+ e.getMessage());
+ }
+
+ // Call the callback
+ try {
+ if(!job.persistent) {
+ if (job.isADecodingJob)
+
job.callback.onDecodedSegment(null);
+ else
+
job.callback.onEncodedSegment(null);
+ } else {
+
databaseJobRunner.queue(new DBJob() {
+
+ public void
run(ObjectContainer container, RequestScheduler sched) {
+
if(job.isADecodingJob)
+
job.callback.onDecodedSegment(container);
+ else
+
job.callback.onEncodedSegment(container);
+
container.delete(job);
+ }
+
+ }, job.priority);
+ }
+ } catch (Throwable e) {
+ Logger.error(this, "The
callback failed!" + e.getMessage(), e);
+ }
+ }
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t+" in "+this, t);
+ }
+ finally {
+ synchronized (FECQueue.this) {
+ runningFECThreads--;
+ }
+ }
+ }
+
+ public int getPriority() {
+ return NativeThread.LOW_PRIORITY;
+ }
+
+ };
+
+ private final DBJob cacheFillerJob = new DBJob() {
+
+ public void run(ObjectContainer container, RequestScheduler
sched) {
+ // Try to avoid accessing the database while
synchronized on the FECQueue.
+ while(true) {
+ boolean addedAny = false;
+ int totalCached = 0;
+ for(short i=0;i<priorities;i++) {
+ int grab = 0;
+ synchronized(FECQueue.this) {
+ int newCached = totalCached +
persistentQueueCache[i].size();
+ if(newCached >=
maxPersistentQueueCacheSize) return;
+ grab =
maxPersistentQueueCacheSize - newCached;
+ }
+ Query query = container.query();
+ query.descend("priority").constrain(new
Short(i));
+
query.descend("queue").constrain(FECQueue.this);
+
query.descend("addedTime").orderAscending();
+ ObjectSet results = query.execute();
+ if(results.hasNext()) {
+ for(int j=0;j<grab &&
results.hasNext();j++) {
+ FECJob job = (FECJob)
results.next();
+
synchronized(FECQueue.this) {
+
if(persistentQueueCache[j].contains(job)) {
+ j--;
+
continue;
+ }
+ boolean added =
false;
+
for(ListIterator it = persistentQueueCache[j].listIterator();it.hasNext();) {
+ FECJob
cmp = (FECJob) it.next();
+
if(cmp.addedTime >= job.addedTime) {
+
it.previous();
+
it.add(job);
+
added = true;
+
addedAny = true;
+
break;
+ }
+ }
+ if(!added)
persistentQueueCache[j].addLast(job);
+ }
+ }
+ }
+ }
+ if(!addedAny) return;
+ }
+ }
+
+ };
+
+ private int maxRunningFECThreads = -1;
+
+ private synchronized int getMaxRunningFECThreads() {
+ if (maxRunningFECThreads != -1)
+ return maxRunningFECThreads;
+ String osName = System.getProperty("os.name");
+ if(osName.indexOf("Windows") == -1 &&
(osName.toLowerCase().indexOf("mac os x") > 0) ||
(!NativeThread.usingNativeCode())) {
+ // OS/X niceness is really weak, so we don't want any
more background CPU load than necessary
+ // Also, on non-Windows, we need the native threads
library to be working.
+ maxRunningFECThreads = 1;
+ } else {
+ // Most other OSs will have reasonable niceness, so go
by RAM.
+ Runtime r = Runtime.getRuntime();
+ int max = r.availableProcessors(); // FIXME this may
change in a VM, poll it
+ long maxMemory = r.maxMemory();
+ if(maxMemory < 256*1024*1024) {
+ max = 1;
+ } else {
+ // Measured 11MB decode 8MB encode on amd64.
+ // No more than 10% of available RAM, so 110MB
for each extra processor.
+ // No more than 3 so that we don't reach any
FileDescriptor related limit
+ max = Math.min(3, Math.min(max, (int)
(Math.min(Integer.MAX_VALUE, maxMemory / (128*1024*1024)))));
+ }
+ maxRunningFECThreads = max;
+ }
+ Logger.minor(FECCodec.class, "Maximum FEC threads:
"+maxRunningFECThreads);
+ return maxRunningFECThreads;
+ }
+
+ /**
+ * Find a FEC job to run.
+ * @return null only if there are too many FEC threads running.
+ */
+ protected synchronized FECJob getFECJobBlockingNoDBAccess() {
+ while(true) {
+ if(runningFECThreads > getMaxRunningFECThreads())
+ return null;
+ for(int i=0;i<priorities;i++) {
+ if(!transientQueue[i].isEmpty())
+ return (FECJob)
transientQueue[i].removeFirst();
+ if(!persistentQueueCache[i].isEmpty())
+ return (FECJob)
persistentQueueCache[i].removeFirst();
+ }
+ queueCacheFiller();
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+
+ private static int runningFECThreads;
+ private static int fecPoolCounter;
+
+ public void handleLowMemory() throws Exception {
+ synchronized (this) {
+ maxRunningFECThreads = Math.min(1, maxRunningFECThreads
- 1);
+ notify(); // not notifyAll()
+ }
+ }
+
+ public void handleOutOfMemory() throws Exception {
+ synchronized (this) {
+ maxRunningFECThreads = 1;
+ notifyAll();
+ }
+ }
+
+
+
+}
Modified: branches/db4o/freenet/src/freenet/client/StandardOnionFECCodec.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/StandardOnionFECCodec.java
2008-06-05 16:48:30 UTC (rev 20222)
+++ branches/db4o/freenet/src/freenet/client/StandardOnionFECCodec.java
2008-06-05 18:34:18 UTC (rev 20223)
@@ -52,7 +52,7 @@
recentlyUsedCodecs.push(key, codec);
return codec;
}
- codec = new StandardOnionFECCodec(executor, dataBlocks,
checkBlocks + dataBlocks);
+ codec = new StandardOnionFECCodec(dataBlocks, checkBlocks +
dataBlocks);
recentlyUsedCodecs.push(key, codec);
while(recentlyUsedCodecs.size() > MAX_CACHED_CODECS) {
recentlyUsedCodecs.popKey();
@@ -60,8 +60,8 @@
return codec;
}
- public StandardOnionFECCodec(Executor executor, int k, int n) {
- super(executor, k, n);
+ public StandardOnionFECCodec(int k, int n) {
+ super(k, n);
FECCode fec2 = null;
if(!noNative) {
Added: branches/db4o/freenet/src/freenet/client/async/DBJob.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJob.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/DBJob.java 2008-06-05
18:34:18 UTC (rev 20223)
@@ -0,0 +1,19 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client.async;
+
+import com.db4o.ObjectContainer;
+
+import freenet.node.RequestScheduler;
+
+/**
+ * A job to be run on the database thread. We will pass a transactional
context in,
+ * and a RequestScheduler.
+ * @author toad
+ */
+public interface DBJob {
+
+ void run(ObjectContainer container, RequestScheduler sched);
+
+}
Added: branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
(rev 0)
+++ branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
2008-06-05 18:34:18 UTC (rev 20223)
@@ -0,0 +1,14 @@
+/* This code is part of Freenet. It is distributed under the GNU General
+ * Public License, version 2 (or at your option any later version). See
+ * http://www.gnu.org/ for further details of the GPL. */
+package freenet.client.async;
+
+/**
+ * Interface for an object which queues and runs DBJob's.
+ * @author toad
+ */
+public interface DBJobRunner {
+
+ public void queue(DBJob job, int priority);
+
+}
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-06-05 16:48:30 UTC (rev 20222)
+++ branches/db4o/freenet/src/freenet/client/async/SplitFileFetcherSegment.java
2008-06-05 18:34:18 UTC (rev 20223)
@@ -10,6 +10,7 @@
import com.db4o.ObjectContainer;
import freenet.client.ArchiveContext;
+import freenet.client.FECCallback;
import freenet.client.FECCodec;
import freenet.client.FECJob;
import freenet.client.FailureCodeTracker;
@@ -18,7 +19,6 @@
import freenet.client.Metadata;
import freenet.client.MetadataParseException;
import freenet.client.SplitfileBlock;
-import freenet.client.FECCodec.StandardOnionFECCodecEncoderCallback;
import freenet.keys.CHKBlock;
import freenet.keys.CHKEncodeException;
import freenet.keys.ClientCHK;
@@ -35,7 +35,7 @@
* A single segment within a SplitFileFetcher.
* This in turn controls a large number of SplitFileFetcherSubSegment's, which
are registered on the ClientRequestScheduler.
*/
-public class SplitFileFetcherSegment implements
StandardOnionFECCodecEncoderCallback {
+public class SplitFileFetcherSegment implements FECCallback {
private static volatile boolean logMINOR;
final short splitfileType;
Modified:
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
===================================================================
---
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2008-06-05 16:48:30 UTC (rev 20222)
+++
branches/db4o/freenet/src/freenet/client/async/SplitFileInserterSegment.java
2008-06-05 18:34:18 UTC (rev 20223)
@@ -4,13 +4,13 @@
import com.db4o.ObjectContainer;
+import freenet.client.FECCallback;
import freenet.client.FECCodec;
import freenet.client.FECJob;
import freenet.client.FailureCodeTracker;
import freenet.client.InsertContext;
import freenet.client.InsertException;
import freenet.client.Metadata;
-import freenet.client.FECCodec.StandardOnionFECCodecEncoderCallback;
import freenet.keys.BaseClientKey;
import freenet.keys.CHKBlock;
import freenet.keys.ClientCHK;
@@ -24,7 +24,7 @@
import freenet.support.io.SerializableToFieldSetBucket;
import freenet.support.io.SerializableToFieldSetBucketUtil;
-public class SplitFileInserterSegment implements PutCompletionCallback,
StandardOnionFECCodecEncoderCallback {
+public class SplitFileInserterSegment implements PutCompletionCallback,
FECCallback {
private static volatile boolean logMINOR;