Author: toad
Date: 2008-06-20 20:48:24 +0000 (Fri, 20 Jun 2008)
New Revision: 20535

Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
Log:
Move various executor jobs to dbjob's.

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-06-20 18:37:18 UTC (rev 20534)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-06-20 20:48:24 UTC (rev 20535)
@@ -102,6 +102,7 @@
        final PrioritizedSerialExecutor databaseExecutor;
        final PrioritizedSerialExecutor datastoreCheckerExecutor;
        public final ClientContext clientContext;
+       final DBJobRunner jobRunner;

        public static final String PRIORITY_NONE = "NONE";
        public static final String PRIORITY_SOFT = "SOFT";
@@ -142,6 +143,7 @@
                        transientCooldownQueue = new 
RequestCooldownQueue(COOLDOWN_PERIOD);
                else
                        transientCooldownQueue = null;
+               jobRunner = clientContext.jobRunner;
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
        }

@@ -181,15 +183,15 @@

                                }, getter.getPriorityClass(), "Checking 
datastore");
                        } else if(persistent) {
-                               databaseExecutor.execute(new Runnable() {
+                               jobRunner.queue(new DBJob() {

-                                       public void run() {
-                                               
schedCore.addPendingKeys(getter, selectorContainer);
+                                       public void run(ObjectContainer 
container, ClientContext context) {
+                                               
schedCore.addPendingKeys(getter, container);
                                                schedCore.queueRegister(getter, 
databaseExecutor);
-                                               final Object[] keyTokens = 
getter.sendableKeys(selectorContainer);
+                                               final Object[] keyTokens = 
getter.sendableKeys(container);
                                                final ClientKey[] keys = new 
ClientKey[keyTokens.length];
                                                for(int 
i=0;i<keyTokens.length;i++)
-                                                       keys[i] = 
getter.getKey(keyTokens[i], selectorContainer);
+                                                       keys[i] = 
getter.getKey(keyTokens[i], container);
                                                
datastoreCheckerExecutor.execute(new Runnable() {

                                                        public void run() {
@@ -199,7 +201,7 @@
                                                }, getter.getPriorityClass(), 
"Checking datastore");
                                        }

-                               }, NativeThread.NORM_PRIORITY, "Registering 
request");
+                               }, NativeThread.NORM_PRIORITY, false);
                        } else {
                                // Not persistent
                                schedTransient.addPendingKeys(getter, null);
@@ -263,12 +265,13 @@
                                else {
                                        final SendableGet g = getter;
                                        final Object token = tok;
-                                       databaseExecutor.execute(new Runnable() 
{
-                                               public void run() {
-                                                       g.onFailure(new 
LowLevelGetException(LowLevelGetException.DECODE_FAILED), token, 
ClientRequestScheduler.this, selectorContainer, clientContext);
-                                                       
selectorContainer.commit();
+                                       jobRunner.queue(new DBJob() {
+
+                                               public void run(ObjectContainer 
container, ClientContext context) {
+                                                       g.onFailure(new 
LowLevelGetException(LowLevelGetException.DECODE_FAILED), token, 
ClientRequestScheduler.this, container, context);
                                                }
-                                       }, NativeThread.NORM_PRIORITY, "Block 
decode failed");
+                                               
+                                       }, NativeThread.NORM_PRIORITY, false);
                                }
                                continue; // other keys might be valid
                        }
@@ -280,11 +283,13 @@
                                        final ClientKeyBlock b = block;
                                        final Object t = tok;
                                        final SendableGet g = getter;
-                                       databaseExecutor.execute(new Runnable() 
{
-                                               public void run() {
-                                                       g.onSuccess(b, true, t, 
ClientRequestScheduler.this, selectorContainer, clientContext);
+                                       jobRunner.queue(new DBJob() {
+
+                                               public void run(ObjectContainer 
container, ClientContext context) {
+                                                       g.onSuccess(b, true, t, 
ClientRequestScheduler.this, container, context);
                                                }
-                                       }, NativeThread.NORM_PRIORITY, "Block 
found on register");
+                                               
+                                       }, NativeThread.NORM_PRIORITY, false);
                                }
                                // Even with working thread priorities, we 
still get very high latency accessing
                                // the datastore when background threads are 
doing it in parallel.
@@ -310,14 +315,15 @@
                                schedCore.deleteRegisterMe(req);
                                starter.wakeUp();
                        } else {
-                               databaseExecutor.execute(new Runnable() {
-                                       public void run() {
+                               jobRunner.queue(new DBJob() {
+
+                                       public void run(ObjectContainer 
container, ClientContext context) {
                                                if(anyValid)
-                                                       
schedCore.innerRegister(req, random, selectorContainer);
+                                                       
schedCore.innerRegister(req, random, container);
                                                schedCore.deleteRegisterMe(req);
-                                               selectorContainer.commit();
                                        }
-                               }, NativeThread.NORM_PRIORITY, "Add persistent 
job to queue");
+                                       
+                               }, NativeThread.NORM_PRIORITY, false);
                        }
                } else {
                        // Register immediately.
@@ -371,8 +377,7 @@
        }

        public void queueFillRequestStarterQueue() {
-               databaseExecutor.executeNoDupes(requestStarterQueueFiller, 
-                               NativeThread.MAX_PRIORITY, "Fill request 
starter queue");
+               jobRunner.queue(requestStarterQueueFiller, 
NativeThread.MAX_PRIORITY, true);
        }

        void addToStarterQueue(PersistentChosenRequest req) {
@@ -381,8 +386,8 @@
                }
        }

-       private Runnable requestStarterQueueFiller = new Runnable() {
-               public void run() {
+       private DBJob requestStarterQueueFiller = new DBJob() {
+               public void run(ObjectContainer container, ClientContext 
context) {
                        if(isInsertScheduler && !isSSKScheduler) {
                                if(logMINOR) Logger.minor(this, "Scheduling 
inserts...");
                        }
@@ -398,7 +403,6 @@
                                        }
                                        if(starterQueue.size() >= 
MAX_STARTER_QUEUE_SIZE) finished = true;
                                }
-                               selectorContainer.commit();
                                if(req == null || finished) return;
                        }
                }
@@ -414,19 +418,15 @@
                        if(transientCooldownQueue != null)
                                transientCooldownQueue.removeKey(key, getter, 
getter.getCooldownWakeupByKey(key, null), null);
                } else {
-                       databaseExecutor.execute(new Runnable() {
-                               public void run() {
-                                       try {
-                                               
schedCore.removePendingKey(getter, complain, key);
-                                               if(persistentCooldownQueue != 
null)
-                                                       
persistentCooldownQueue.removeKey(key, getter, 
getter.getCooldownWakeupByKey(key, selectorContainer), selectorContainer);
-                                               selectorContainer.commit();
-                                       } catch (Throwable t) {
-                                               Logger.error(this, "Caught "+t, 
t);
-                                       }
+                       jobRunner.queue(new DBJob() {
+
+                               public void run(ObjectContainer container, 
ClientContext context) {
+                                       schedCore.removePendingKey(getter, 
complain, key);
+                                       if(persistentCooldownQueue != null)
+                                               
persistentCooldownQueue.removeKey(key, getter, 
getter.getCooldownWakeupByKey(key, container), container);
                                }

-                       }, "removePendingKey");
+                       }, NativeThread.NORM_PRIORITY, false);
                }
        }

@@ -470,18 +470,15 @@

        public synchronized void succeeded(final BaseSendableGet succeeded, 
final ChosenRequest req) {
                if(succeeded.persistent()) {
-                       databaseExecutor.execute(new Runnable() {
-                               public void run() {
-                                       try {
-                                               schedCore.succeeded(succeeded);
-                                               if(succeeded.persistent())
-                                                       
selectorContainer.delete((PersistentChosenRequest)req);
-                                               selectorContainer.commit();
-                                       } catch (Throwable t) {
-                                               Logger.error(this, "Caught "+t, 
t);
-                                       }
+                       jobRunner.queue(new DBJob() {
+
+                               public void run(ObjectContainer container, 
ClientContext context) {
+                                       schedCore.succeeded(succeeded);
+                                       if(succeeded.persistent())
+                                               
container.delete((PersistentChosenRequest)req);
                                }
-                       }, "Mark success for "+succeeded);
+                               
+                       }, NativeThread.NORM_PRIORITY, false);
                } else
                        schedTransient.succeeded(succeeded);
        }
@@ -520,14 +517,14 @@

                // Now the persistent stuff

-               databaseExecutor.execute(new Runnable() {
+               jobRunner.queue(new DBJob() {

-                       public void run() {
+                       public void run(ObjectContainer container, 
ClientContext context) {
                                final SendableGet[] gets = 
schedCore.removePendingKey(key);
                                if(gets == null) return;
                                if(persistentCooldownQueue != null) {
                                        for(int i=0;i<gets.length;i++)
-                                               
persistentCooldownQueue.removeKey(key, gets[i], 
gets[i].getCooldownWakeupByKey(key, selectorContainer), selectorContainer);
+                                               
persistentCooldownQueue.removeKey(key, gets[i], 
gets[i].getCooldownWakeupByKey(key, container), container);
                                }
                                // Call the callbacks on the database executor 
thread, because the first thing
                                // they will need to do is access the database 
to decide whether they need to
@@ -535,16 +532,15 @@
                                for(int i=0;i<gets.length;i++) {
                                        try {
                                                if(logMINOR) Logger.minor(this, 
"Calling callback for "+gets[i]+" for "+key);
-                                               gets[i].onGotKey(key, block, 
ClientRequestScheduler.this, selectorContainer, clientContext);
+                                               gets[i].onGotKey(key, block, 
ClientRequestScheduler.this, container, context);
                                        } catch (Throwable t) {
                                                Logger.error(this, "Caught 
"+t+" running callback "+gets[i]+" for "+key);
                                        }
                                }
                                if(logMINOR) Logger.minor(this, "Finished 
running callbacks");
-                               selectorContainer.commit();
                        }

-               }, "tripPendingKey for "+block.getKey());
+               }, NativeThread.NORM_PRIORITY, false);

        }

@@ -565,15 +561,16 @@

                final short oldPrio = priority;

-               databaseExecutor.execute(new Runnable() {
-                       public void run() {
+               jobRunner.queue(new DBJob() {
+
+                       public void run(ObjectContainer container, 
ClientContext context) {
                                short priority = schedCore.getKeyPrio(key, 
oldPrio);
                                if(priority >= oldPrio) return; // already on 
list at >= priority
                                offeredKeys[priority].queueKey(key);
                                starter.wakeUp();
-                               // No need to commit
                        }
-               }, "maybeQueueOfferedKey");
+                       
+               }, NativeThread.NORM_PRIORITY, false);
        }

        public void dequeueOfferedKey(Key key) {
@@ -594,17 +591,14 @@

        public void moveKeysFromCooldownQueue() {
                moveKeysFromCooldownQueue(transientCooldownQueue, null);
-               databaseExecutor.execute(new Runnable() {
-                       public void run() {
-                               try {
-                                       
if(moveKeysFromCooldownQueue(persistentCooldownQueue, selectorContainer))
-                                               starter.wakeUp();
-                                       selectorContainer.commit();
-                               } catch (Throwable t) {
-                                       Logger.error(this, "Caught "+t, t);
-                               }
+               jobRunner.queue(new DBJob() {
+
+                       public void run(ObjectContainer container, 
ClientContext context) {
+                               
if(moveKeysFromCooldownQueue(persistentCooldownQueue, selectorContainer))
+                                       starter.wakeUp();
                        }
-               }, "moveKeysFromCooldownQueue");
+                       
+               }, NativeThread.NORM_PRIORITY, false);
        }

        private boolean moveKeysFromCooldownQueue(CooldownQueue queue, 
ObjectContainer container) {
@@ -656,36 +650,39 @@
        }

        public void callFailure(final SendableGet get, final 
LowLevelGetException e, final Object keyNum, int prio, String name, final 
ChosenRequest req) {
-               databaseExecutor.execute(new Runnable() {
-                       public void run() {
+               jobRunner.queue(new DBJob() {
+
+                       public void run(ObjectContainer container, 
ClientContext context) {
                                get.onFailure(e, keyNum, 
ClientRequestScheduler.this, selectorContainer, clientContext);
                                if(get.persistent())
                                        
selectorContainer.delete((PersistentChosenRequest)req);
-                               selectorContainer.commit();
                        }
-               }, prio, name);
+                       
+               }, NativeThread.NORM_PRIORITY, false);
        }

        public void callFailure(final SendableInsert put, final 
LowLevelPutException e, final Object keyNum, int prio, String name, final 
ChosenRequest req) {
-               databaseExecutor.execute(new Runnable() {
-                       public void run() {
+               jobRunner.queue(new DBJob() {
+
+                       public void run(ObjectContainer container, 
ClientContext context) {
                                put.onFailure(e, keyNum, selectorContainer, 
clientContext);
                                if(put.persistent())
                                        
selectorContainer.delete((PersistentChosenRequest)req);
-                               selectorContainer.commit();
                        }
-               }, prio, name);
+                       
+               }, NativeThread.NORM_PRIORITY, false);
        }

        public void callSuccess(final SendableInsert put, final Object keyNum, 
int prio, String name, final ChosenRequest req) {
-               databaseExecutor.execute(new Runnable() {
-                       public void run() {
+               jobRunner.queue(new DBJob() {
+
+                       public void run(ObjectContainer container, 
ClientContext context) {
                                put.onSuccess(keyNum, selectorContainer, 
clientContext);
                                if(put.persistent())
                                        
selectorContainer.delete((PersistentChosenRequest)req);
-                               selectorContainer.commit();
                        }
-               }, prio, name);
+                       
+               }, NativeThread.NORM_PRIORITY, false);
        }

        public FECQueue getFECQueue() {


Reply via email to