Author: toad
Date: 2008-05-29 23:10:06 +0000 (Thu, 29 May 2008)
New Revision: 20138

Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
   branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
Log:
Persistent request registration queue:
Instead of scheduling a persistent request to be registered on the database 
thread, add it to the registration queue. Do this on the database thread, but 
do it immediately if we are already on the database thread.
On startup, and when adding a new request, this causes a job to be queued on 
the database thread which actually calls innerRegister().
Thus, once register() has returned, the request WILL BE REGISTERED.

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-05-29 20:14:00 UTC (rev 20137)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-05-29 23:10:06 UTC (rev 20138)
@@ -27,6 +27,7 @@
 import freenet.support.PrioritizedSerialExecutor;
 import freenet.support.SerialExecutor;
 import freenet.support.api.StringCallback;
+import freenet.support.io.NativeThread;

 /**
  * Every X seconds, the RequestSender calls the ClientRequestScheduler to
@@ -94,7 +95,7 @@
        public final String name;
        private final CooldownQueue transientCooldownQueue;
        private final CooldownQueue persistentCooldownQueue;
-       private final PrioritizedSerialExecutor databaseExecutor;
+       final PrioritizedSerialExecutor databaseExecutor;

        public static final String PRIORITY_NONE = "NONE";
        public static final String PRIORITY_SOFT = "SOFT";
@@ -103,7 +104,7 @@

        public ClientRequestScheduler(boolean forInserts, boolean forSSKs, 
RandomSource random, RequestStarter starter, Node node, NodeClientCore core, 
SubConfig sc, String name) {
                this.selectorContainer = node.db;
-               schedCore = ClientRequestSchedulerCore.create(node, forInserts, 
forSSKs, selectorContainer, COOLDOWN_PERIOD);
+               schedCore = ClientRequestSchedulerCore.create(node, forInserts, 
forSSKs, selectorContainer, COOLDOWN_PERIOD, core.clientDatabaseExecutor);
                schedTransient = new ClientRequestSchedulerNonPersistent(this);
                persistentCooldownQueue = schedCore.persistentCooldownQueue;
                this.databaseExecutor = core.clientDatabaseExecutor;
@@ -143,6 +144,10 @@
        }

        public void register(final SendableRequest req) {
+               register(req, databaseExecutor.onThread());
+       }
+       
+       public void register(final SendableRequest req, boolean 
onDatabaseThread) {
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                if(logMINOR) Logger.minor(this, "Registering "+req, new 
Exception("debug"));
                if(isInsertScheduler != (req instanceof SendableInsert))
@@ -201,17 +206,21 @@
                        }
                }
                if(req.persistent()) {
-                       databaseExecutor.execute(new Runnable() {
-                               public void run() {
-                                       try {
-                                               schedCore.innerRegister(req, 
random);
-                                               starter.wakeUp();
-                                       } catch (Throwable t) {
-                                               Logger.error(this, "Caught "+t, 
t);
+                       // Add to the persistent registration queue
+                       if(onDatabaseThread) {
+                               if(!databaseExecutor.onThread()) {
+                                       throw new IllegalStateException("Not on 
database thread!");
+                               }
+                               schedCore.queueRegister(req, databaseExecutor);
+                       } else {
+                               databaseExecutor.execute(new Runnable() {
+                                       public void run() {
+                                               schedCore.queueRegister(req, 
databaseExecutor);
                                        }
-                               }
-                       }, "Register request");
+                               }, NativeThread.NORM_PRIORITY, "Add persistent 
job to queue");
+                       }
                } else {
+                       // Register immediately.
                        schedTransient.innerRegister(req, random);
                        starter.wakeUp();
                }

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-05-29 20:14:00 UTC (rev 20137)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-05-29 23:10:06 UTC (rev 20138)
@@ -8,6 +8,7 @@
 import com.db4o.ObjectContainer;
 import com.db4o.ObjectSet;
 import com.db4o.query.Predicate;
+import com.db4o.query.Query;
 import com.db4o.types.Db4oList;
 import com.db4o.types.Db4oMap;

@@ -15,12 +16,15 @@
 import freenet.node.BaseSendableGet;
 import freenet.node.Node;
 import freenet.node.RequestStarter;
+import freenet.node.SendableGet;
 import freenet.node.SendableRequest;
 import freenet.support.Logger;
+import freenet.support.PrioritizedSerialExecutor;
 import freenet.support.RandomGrabArray;
 import freenet.support.SectoredRandomGrabArrayWithInt;
 import freenet.support.SectoredRandomGrabArrayWithObject;
 import freenet.support.SortedVectorByNumber;
+import freenet.support.io.NativeThread;

 /**
  * @author toad
@@ -35,6 +39,8 @@
        /** Identifier in the database for the node we are attached to */
        private final long nodeDBHandle;
        final PersistentCooldownQueue persistentCooldownQueue;
+       private transient RandomSource random;
+       private transient PrioritizedSerialExecutor databaseExecutor;

        /**
         * Fetch a ClientRequestSchedulerCore from the database, or create a 
new one.
@@ -42,9 +48,10 @@
         * @param forInserts
         * @param forSSKs
         * @param selectorContainer
+        * @param executor 
         * @return
         */
-       public static ClientRequestSchedulerCore create(Node node, final 
boolean forInserts, final boolean forSSKs, ObjectContainer selectorContainer, 
long cooldownTime) {
+       public static ClientRequestSchedulerCore create(Node node, final 
boolean forInserts, final boolean forSSKs, ObjectContainer selectorContainer, 
long cooldownTime, PrioritizedSerialExecutor databaseExecutor) {
                final long nodeDBHandle = node.nodeDBHandle;
                ObjectSet results = selectorContainer.query(new Predicate() {
                        public boolean match(ClientRequestSchedulerCore core) {
@@ -61,7 +68,7 @@
                        core = new ClientRequestSchedulerCore(node, forInserts, 
forSSKs, selectorContainer, cooldownTime);
                }
                logMINOR = Logger.shouldLog(Logger.MINOR, 
ClientRequestSchedulerCore.class);
-               core.onStarted(selectorContainer, cooldownTime);
+               core.onStarted(selectorContainer, cooldownTime, node.random, 
databaseExecutor);
                return core;
        }

@@ -76,7 +83,7 @@
                }
        }

-       private void onStarted(ObjectContainer container, long cooldownTime) {
+       private void onStarted(ObjectContainer container, long cooldownTime, 
RandomSource random, PrioritizedSerialExecutor databaseExecutor) {
                ((Db4oMap)pendingKeys).activationDepth(1);
                ((Db4oMap)allRequestsByClientRequest).activationDepth(1);
                ((Db4oList)recentSuccesses).activationDepth(1);
@@ -84,6 +91,9 @@
                if(!isInsertScheduler) {
                        persistentCooldownQueue.setCooldownTime(cooldownTime);
                }
+               this.random = random;
+               this.databaseExecutor = databaseExecutor;
+               databaseExecutor.execute(registerMeRunner, 
NativeThread.NORM_PRIORITY, "Register request");
        }

        // We pass in the schedTransient to the next two methods so that we can 
select between either of them.
@@ -316,5 +326,53 @@
                return container;
        }

+       private final RegisterMeRunner registerMeRunner = new 
RegisterMeRunner();

+       class RegisterMeRunner implements Runnable {
+
+               public void run() {
+                       Query query = container.query();
+                       query.constrain(RegisterMe.class);
+                       
query.descend("core").constrain(ClientRequestSchedulerCore.this);
+                       query.descend("priority").orderAscending();
+                       query.descend("addedTime").orderAscending();
+                       ObjectSet result = query.execute();
+                       if(result.hasNext()) {
+                               RegisterMe reg = (RegisterMe) result.next();
+                               if(result.hasNext()) {
+                                       
databaseExecutor.execute(registerMeRunner, NativeThread.NORM_PRIORITY, 
"Register request");
+                               }
+                               container.delete(reg);
+                               // Don't need to activate, fields should exist? 
FIXME
+                               innerRegister(reg.getter, random);
+                       }
+               }
+               
+       }
+       public void queueRegister(SendableRequest req, 
PrioritizedSerialExecutor databaseExecutor) {
+               if(!databaseExecutor.onThread()) {
+                       throw new IllegalStateException("Not on database 
thread!");
+               }
+               RegisterMe reg = new RegisterMe(req, this);
+               container.set(reg);
+               databaseExecutor.execute(registerMeRunner, 
NativeThread.NORM_PRIORITY, "Register request");
+       }
+       
+       
+       
 }
+
+class RegisterMe {
+       final SendableRequest getter;
+       final ClientRequestSchedulerCore core;
+       final short priority;
+       final long addedTime;
+       
+       RegisterMe(SendableRequest getter, ClientRequestSchedulerCore core) {
+               this.getter = getter;
+               this.core = core;
+               this.addedTime = System.currentTimeMillis();
+               this.priority = getter.getPriorityClass();
+       }
+}
+

Modified: 
branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java    
2008-05-29 20:14:00 UTC (rev 20137)
+++ branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java    
2008-05-29 23:10:06 UTC (rev 20138)
@@ -17,14 +17,19 @@
        private boolean running;

        private static final int NEWJOB_TIMEOUT = 5*60*1000;
+
+       private final Runner runner = new Runner();

-       private final Runnable runner = new PrioRunnable() {
+       class Runner implements PrioRunnable {

+               Thread current;
+               
                public int getPriority() {
                        return priority;
                }

                public void run() {
+                       current = Thread.currentThread();
                        while(true) {
                                Runnable job = null;
                                synchronized(jobs) {
@@ -136,4 +141,12 @@
                return retval;
        }

+       public boolean onThread() {
+               Thread running = Thread.currentThread();
+               synchronized(jobs) {
+                       if(runner != null) return false;
+                       return runner.current == running; 
+               }
+       }
+
 }


Reply via email to