Author: toad
Date: 2008-06-07 14:12:48 +0000 (Sat, 07 Jun 2008)
New Revision: 20250

Modified:
   branches/db4o/freenet/src/freenet/client/async/BackgroundBlockEncoder.java
   branches/db4o/freenet/src/freenet/client/async/DBJob.java
   branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
   branches/db4o/freenet/src/freenet/node/NodeClientCore.java
Log:
BackgroundBlockEncoder.

Modified: 
branches/db4o/freenet/src/freenet/client/async/BackgroundBlockEncoder.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/BackgroundBlockEncoder.java  
2008-06-07 05:10:41 UTC (rev 20249)
+++ branches/db4o/freenet/src/freenet/client/async/BackgroundBlockEncoder.java  
2008-06-07 14:12:48 UTC (rev 20250)
@@ -3,6 +3,10 @@
 import java.lang.ref.SoftReference;
 import java.util.ArrayList;

+import com.db4o.ObjectContainer;
+import com.db4o.ObjectSet;
+import com.db4o.query.Query;
+
 import freenet.node.PrioRunnable;
 import freenet.support.Logger;
 import freenet.support.io.NativeThread;
@@ -20,32 +24,59 @@
                queue = new ArrayList();
        }

-       public void queue(SingleBlockInserter sbi) {
+       public void queue(SingleBlockInserter sbi, ObjectContainer container, 
ClientContext context) {
                if(sbi.isCancelled()) return;
                if(sbi.resultingURI != null) return;
-               SoftReference ref = new SoftReference(sbi);
-               synchronized(this) {
-                       queue.add(ref);
-                       Logger.minor(this, "Queueing encode of "+sbi);
-                       notifyAll();
+               if(sbi.persistent()) {
+                       queuePersistent(sbi, container, context);
+                       runPersistentQueue(context);
+               } else {
+                       SoftReference ref = new SoftReference(sbi);
+                       synchronized(this) {
+                               queue.add(ref);
+                               Logger.minor(this, "Queueing encode of "+sbi);
+                               notifyAll();
+                       }
                }
        }

-       public void queue(SingleBlockInserter[] sbis) {
+       public void queue(SingleBlockInserter[] sbis, ObjectContainer 
container, ClientContext context) {
                synchronized(this) {
                        for(int i=0;i<sbis.length;i++) {
                                SingleBlockInserter inserter = sbis[i];
                                if(inserter == null) continue;
                                if(inserter.isCancelled()) continue;
                                if(inserter.resultingURI != null) continue;
+                               if(inserter.persistent()) continue;
                                Logger.minor(this, "Queueing encode of 
"+inserter);
                                SoftReference ref = new SoftReference(inserter);
                                queue.add(ref);
                        }
                        notifyAll();
                }
+               boolean anyPersistent = false;
+               for(int i=0;i<sbis.length;i++) {
+                       anyPersistent = true;
+                       SingleBlockInserter inserter = sbis[i];
+                       if(inserter == null) continue;
+                       if(inserter.isCancelled()) continue;
+                       if(inserter.resultingURI != null) continue;
+                       if(!inserter.persistent()) continue;
+                       queuePersistent(inserter, container, context);
+               }
+               if(anyPersistent)
+                       runPersistentQueue(context);
        }

+       private void runPersistentQueue(ClientContext context) {
+               context.jobRunner.queue(runner, NativeThread.LOW_PRIORITY, 
true);
+       }
+
+       private void queuePersistent(SingleBlockInserter sbi, ObjectContainer 
container, ClientContext context) {
+               BackgroundBlockEncoderTag tag = new 
BackgroundBlockEncoderTag(sbi, context);
+               container.set(tag);
+       }
+
        public void run() {
            freenet.support.Logger.OSThread.logPID(this);
                while(true) {
@@ -75,4 +106,49 @@
                return NativeThread.MIN_PRIORITY;
        }

+       static final int JOBS_PER_SLOT = 1;
+       
+       private DBJob runner = new DBJob() {
+
+               public void run(ObjectContainer container, ClientContext 
context) {
+                       Query query = container.query();
+                       query.constrain(BackgroundBlockEncoderTag.class);
+                       query.descend("nodeDBHandle").constrain(new 
Long(context.nodeDBHandle));
+                       query.descend("priority").orderAscending();
+                       query.descend("addedTime").orderAscending();
+                       ObjectSet results = query.execute();
+                       for(int x = 0; x < JOBS_PER_SLOT && results.hasNext(); 
x++) {
+                               BackgroundBlockEncoderTag tag = 
(BackgroundBlockEncoderTag) results.next();
+                               try {
+                                       SingleBlockInserter sbi = tag.inserter;
+                                       if(sbi == null) continue; // deleted
+                                       if(sbi.isCancelled()) continue;
+                                       if(sbi.resultingURI != null) continue;
+                                       sbi.tryEncode();
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught "+t, t);
+                               } finally {
+                                       container.delete(tag);
+                               }
+                       }
+               }
+               
+       };
+       
 }
+
+class BackgroundBlockEncoderTag {
+       final SingleBlockInserter inserter;
+       final long nodeDBHandle;
+       /** For implementing FIFO ordering */
+       final long addedTime;
+       /** For implementing priority ordering */
+       final short priority;
+       
+       BackgroundBlockEncoderTag(SingleBlockInserter inserter, ClientContext 
context) {
+               this.inserter = inserter;
+               this.nodeDBHandle = context.nodeDBHandle;
+               this.addedTime = System.currentTimeMillis();
+               this.priority = inserter.getPriorityClass();
+       }
+}

Modified: branches/db4o/freenet/src/freenet/client/async/DBJob.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJob.java   2008-06-07 
05:10:41 UTC (rev 20249)
+++ branches/db4o/freenet/src/freenet/client/async/DBJob.java   2008-06-07 
14:12:48 UTC (rev 20250)
@@ -5,8 +5,6 @@

 import com.db4o.ObjectContainer;

-import freenet.node.RequestScheduler;
-
 /**
  * A job to be run on the database thread. We will pass a transactional 
context in,
  * and a RequestScheduler.
@@ -14,6 +12,6 @@
  */
 public interface DBJob {

-       void run(ObjectContainer container);
+       void run(ObjectContainer container, ClientContext context);

 }

Modified: branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java     
2008-06-07 05:10:41 UTC (rev 20249)
+++ branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java     
2008-06-07 14:12:48 UTC (rev 20250)
@@ -9,7 +9,7 @@
  */
 public interface DBJobRunner {

-       public void queue(DBJob job, int priority);
+       public void queue(DBJob job, int priority, boolean checkDupes);

        public boolean onDatabaseThread();


Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-06-07 
05:10:41 UTC (rev 20249)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-06-07 
14:12:48 UTC (rev 20250)
@@ -127,6 +127,8 @@
         */
        public final PrioritizedSerialExecutor datastoreCheckerExecutor;

+       public final ClientContext clientContext;
+       
        public static int maxBackgroundUSKFetchers;

        // Client stuff that needs to be configged - FIXME
@@ -171,7 +173,8 @@
                if(logMINOR) Logger.minor(this, "Read 
throttleFS:\n"+throttleFS);

                if(logMINOR) Logger.minor(this, "Serializing 
RequestStarterGroup from:\n"+throttleFS);
-               requestStarters = new RequestStarterGroup(node, this, 
portNumber, random, config, throttleFS, new ClientContext(this));
+               clientContext = new ClientContext(this);
+               requestStarters = new RequestStarterGroup(node, this, 
portNumber, random, config, throttleFS, clientContext);

                // Temp files

@@ -304,7 +307,7 @@
                healingQueue = new 
SimpleHealingQueue(requestStarters.chkPutScheduler,
                                new InsertContext(tempBucketFactory, 
tempBucketFactory, persistentTempBucketFactory, 
                                                random, 0, 2, 1, 0, 0, new 
SimpleEventProducer(), 
-                                               
!Node.DONT_CACHE_LOCAL_REQUESTS, uskManager, backgroundBlockEncoder, 
node.executor), RequestStarter.PREFETCH_PRIORITY_CLASS, 512 /* FIXME make 
configurable */);
+                                               
!Node.DONT_CACHE_LOCAL_REQUESTS, uskManager, node.executor), 
RequestStarter.PREFETCH_PRIORITY_CLASS, 512 /* FIXME make configurable */);

                nodeConfig.register("lazyResume", false, sortOrder++, true, 
false, "NodeClientCore.lazyResume",
                                "NodeClientCore.lazyResumeLong", new 
BooleanCallback() {
@@ -1160,19 +1163,34 @@
                return requestStarters.countTransientQueuedRequests();
        }

-       public void queue(final DBJob job, int priority) {
-               this.clientDatabaseExecutor.execute(new Runnable() {
-
-                       public void run() {
-                               try {
-                                       job.run(node.db);
-                                       node.db.commit();
-                               } catch (Throwable t) {
-                                       Logger.error(this, "Failed to run 
database job "+job+" : caught "+t, t);
-                               }
+       public void queue(final DBJob job, int priority, boolean checkDupes) {
+               this.clientDatabaseExecutor.executeNoDupes(new 
DBJobWrapper(job), priority, ""+job);
+       }
+       
+       class DBJobWrapper implements Runnable {
+               
+               DBJobWrapper(DBJob job) {
+                       this.job = job;
+               }
+               
+               final DBJob job;
+               
+               public void run() {
+                       
+                       try {
+                               job.run(node.db, clientContext);
+                               node.db.commit();
+                       } catch (Throwable t) {
+                               Logger.error(this, "Failed to run database job 
"+job+" : caught "+t, t);
                        }
-                       
-               }, priority, ""+job);
+               }
+               
+               public boolean equals(Object o) {
+                       if(!(o instanceof DBJobWrapper)) return false;
+                       DBJobWrapper cmp = (DBJobWrapper) o;
+                       return (cmp.job == job);
+               }
+               
        }

        public boolean onDatabaseThread() {


Reply via email to