Author: toad
Date: 2008-06-07 14:12:48 +0000 (Sat, 07 Jun 2008)
New Revision: 20250
Modified:
branches/db4o/freenet/src/freenet/client/async/BackgroundBlockEncoder.java
branches/db4o/freenet/src/freenet/client/async/DBJob.java
branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
branches/db4o/freenet/src/freenet/node/NodeClientCore.java
Log:
BackgroundBlockEncoder.
Modified:
branches/db4o/freenet/src/freenet/client/async/BackgroundBlockEncoder.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BackgroundBlockEncoder.java
2008-06-07 05:10:41 UTC (rev 20249)
+++ branches/db4o/freenet/src/freenet/client/async/BackgroundBlockEncoder.java
2008-06-07 14:12:48 UTC (rev 20250)
@@ -3,6 +3,10 @@
import java.lang.ref.SoftReference;
import java.util.ArrayList;
+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Query;
+
import freenet.node.PrioRunnable;
import freenet.support.Logger;
import freenet.support.io.NativeThread;
@@ -20,32 +24,59 @@
queue = new ArrayList();
}
- public void queue(SingleBlockInserter sbi) {
+ public void queue(SingleBlockInserter sbi, ObjectContainer container,
ClientContext context) {
if(sbi.isCancelled()) return;
if(sbi.resultingURI != null) return;
- SoftReference ref = new SoftReference(sbi);
- synchronized(this) {
- queue.add(ref);
- Logger.minor(this, "Queueing encode of "+sbi);
- notifyAll();
+ if(sbi.persistent()) {
+ queuePersistent(sbi, container, context);
+ runPersistentQueue(context);
+ } else {
+ SoftReference ref = new SoftReference(sbi);
+ synchronized(this) {
+ queue.add(ref);
+ Logger.minor(this, "Queueing encode of "+sbi);
+ notifyAll();
+ }
}
}
- public void queue(SingleBlockInserter[] sbis) {
+ public void queue(SingleBlockInserter[] sbis, ObjectContainer
container, ClientContext context) {
synchronized(this) {
for(int i=0;i<sbis.length;i++) {
SingleBlockInserter inserter = sbis[i];
if(inserter == null) continue;
if(inserter.isCancelled()) continue;
if(inserter.resultingURI != null) continue;
+ if(inserter.persistent()) continue;
Logger.minor(this, "Queueing encode of
"+inserter);
SoftReference ref = new SoftReference(inserter);
queue.add(ref);
}
notifyAll();
}
+ boolean anyPersistent = false;
+ for(int i=0;i<sbis.length;i++) {
+ anyPersistent = true;
+ SingleBlockInserter inserter = sbis[i];
+ if(inserter == null) continue;
+ if(inserter.isCancelled()) continue;
+ if(inserter.resultingURI != null) continue;
+ if(!inserter.persistent()) continue;
+ queuePersistent(inserter, container, context);
+ }
+ if(anyPersistent)
+ runPersistentQueue(context);
}
+ private void runPersistentQueue(ClientContext context) {
+ context.jobRunner.queue(runner, NativeThread.LOW_PRIORITY,
true);
+ }
+
+ private void queuePersistent(SingleBlockInserter sbi, ObjectContainer
container, ClientContext context) {
+ BackgroundBlockEncoderTag tag = new
BackgroundBlockEncoderTag(sbi, context);
+ container.set(tag);
+ }
+
public void run() {
freenet.support.Logger.OSThread.logPID(this);
while(true) {
@@ -75,4 +106,49 @@
return NativeThread.MIN_PRIORITY;
}
+ static final int JOBS_PER_SLOT = 1;
+
+ private DBJob runner = new DBJob() {
+
+ public void run(ObjectContainer container, ClientContext
context) {
+ Query query = container.query();
+ query.constrain(BackgroundBlockEncoderTag.class);
+ query.descend("nodeDBHandle").constrain(new
Long(context.nodeDBHandle));
+ query.descend("priority").orderAscending();
+ query.descend("addedTime").orderAscending();
+ ObjectSet results = query.execute();
+ for(int x = 0; x < JOBS_PER_SLOT && results.hasNext();
x++) {
+ BackgroundBlockEncoderTag tag =
(BackgroundBlockEncoderTag) results.next();
+ try {
+ SingleBlockInserter sbi = tag.inserter;
+ if(sbi == null) continue; // deleted
+ if(sbi.isCancelled()) continue;
+ if(sbi.resultingURI != null) continue;
+ sbi.tryEncode();
+ } catch (Throwable t) {
+ Logger.error(this, "Caught "+t, t);
+ } finally {
+ container.delete(tag);
+ }
+ }
+ }
+
+ };
+
}
+
+class BackgroundBlockEncoderTag {
+ final SingleBlockInserter inserter;
+ final long nodeDBHandle;
+ /** For implementing FIFO ordering */
+ final long addedTime;
+ /** For implementing priority ordering */
+ final short priority;
+
+ BackgroundBlockEncoderTag(SingleBlockInserter inserter, ClientContext
context) {
+ this.inserter = inserter;
+ this.nodeDBHandle = context.nodeDBHandle;
+ this.addedTime = System.currentTimeMillis();
+ this.priority = inserter.getPriorityClass();
+ }
+}
Modified: branches/db4o/freenet/src/freenet/client/async/DBJob.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJob.java 2008-06-07
05:10:41 UTC (rev 20249)
+++ branches/db4o/freenet/src/freenet/client/async/DBJob.java 2008-06-07
14:12:48 UTC (rev 20250)
@@ -5,8 +5,6 @@
import com.db4o.ObjectContainer;
-import freenet.node.RequestScheduler;
-
/**
* A job to be run on the database thread. We will pass a transactional
context in,
* and a RequestScheduler.
@@ -14,6 +12,6 @@
*/
public interface DBJob {
- void run(ObjectContainer container);
+ void run(ObjectContainer container, ClientContext context);
}
Modified: branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
2008-06-07 05:10:41 UTC (rev 20249)
+++ branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
2008-06-07 14:12:48 UTC (rev 20250)
@@ -9,7 +9,7 @@
*/
public interface DBJobRunner {
- public void queue(DBJob job, int priority);
+ public void queue(DBJob job, int priority, boolean checkDupes);
public boolean onDatabaseThread();
Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-07
05:10:41 UTC (rev 20249)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java 2008-06-07
14:12:48 UTC (rev 20250)
@@ -127,6 +127,8 @@
*/
public final PrioritizedSerialExecutor datastoreCheckerExecutor;
+ public final ClientContext clientContext;
+
public static int maxBackgroundUSKFetchers;
// Client stuff that needs to be configged - FIXME
@@ -171,7 +173,8 @@
if(logMINOR) Logger.minor(this, "Read
throttleFS:\n"+throttleFS);
if(logMINOR) Logger.minor(this, "Serializing
RequestStarterGroup from:\n"+throttleFS);
- requestStarters = new RequestStarterGroup(node, this,
portNumber, random, config, throttleFS, new ClientContext(this));
+ clientContext = new ClientContext(this);
+ requestStarters = new RequestStarterGroup(node, this,
portNumber, random, config, throttleFS, clientContext);
// Temp files
@@ -304,7 +307,7 @@
healingQueue = new
SimpleHealingQueue(requestStarters.chkPutScheduler,
new InsertContext(tempBucketFactory,
tempBucketFactory, persistentTempBucketFactory,
random, 0, 2, 1, 0, 0, new
SimpleEventProducer(),
-
!Node.DONT_CACHE_LOCAL_REQUESTS, uskManager, backgroundBlockEncoder,
node.executor), RequestStarter.PREFETCH_PRIORITY_CLASS, 512 /* FIXME make
configurable */);
+
!Node.DONT_CACHE_LOCAL_REQUESTS, uskManager, node.executor),
RequestStarter.PREFETCH_PRIORITY_CLASS, 512 /* FIXME make configurable */);
nodeConfig.register("lazyResume", false, sortOrder++, true,
false, "NodeClientCore.lazyResume",
"NodeClientCore.lazyResumeLong", new
BooleanCallback() {
@@ -1160,19 +1163,34 @@
return requestStarters.countTransientQueuedRequests();
}
- public void queue(final DBJob job, int priority) {
- this.clientDatabaseExecutor.execute(new Runnable() {
-
- public void run() {
- try {
- job.run(node.db);
- node.db.commit();
- } catch (Throwable t) {
- Logger.error(this, "Failed to run
database job "+job+" : caught "+t, t);
- }
+ public void queue(final DBJob job, int priority, boolean checkDupes) {
+ this.clientDatabaseExecutor.executeNoDupes(new
DBJobWrapper(job), priority, ""+job);
+ }
+
+ class DBJobWrapper implements Runnable {
+
+ DBJobWrapper(DBJob job) {
+ this.job = job;
+ }
+
+ final DBJob job;
+
+ public void run() {
+
+ try {
+ job.run(node.db, clientContext);
+ node.db.commit();
+ } catch (Throwable t) {
+ Logger.error(this, "Failed to run database job
"+job+" : caught "+t, t);
}
-
- }, priority, ""+job);
+ }
+
+ public boolean equals(Object o) {
+ if(!(o instanceof DBJobWrapper)) return false;
+ DBJobWrapper cmp = (DBJobWrapper) o;
+ return (cmp.job == job);
+ }
+
}
public boolean onDatabaseThread() {