Author: toad
Date: 2008-05-22 22:10:15 +0000 (Thu, 22 May 2008)
New Revision: 20048

Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   branches/db4o/freenet/src/freenet/node/NodeClientCore.java
   branches/db4o/freenet/src/freenet/node/RequestScheduler.java
   branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
   branches/db4o/freenet/src/freenet/node/SendableGet.java
   branches/db4o/freenet/src/freenet/node/simulator/RealNodeBusyNetworkTest.java
Log:
Beginnings of single-database-thread support.

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-05-22 19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-05-22 22:10:15 UTC (rev 20048)
@@ -24,6 +24,7 @@
 import freenet.node.SendableInsert;
 import freenet.node.SendableRequest;
 import freenet.support.Logger;
+import freenet.support.SerialExecutor;
 import freenet.support.api.StringCallback;

 /**
@@ -92,6 +93,7 @@
        public final String name;
        private final CooldownQueue transientCooldownQueue;
        private final CooldownQueue persistentCooldownQueue;
+       private final SerialExecutor databaseExecutor;

        public static final String PRIORITY_NONE = "NONE";
        public static final String PRIORITY_SOFT = "SOFT";
@@ -103,6 +105,7 @@
                schedCore = ClientRequestSchedulerCore.create(node, forInserts, 
forSSKs, selectorContainer, COOLDOWN_PERIOD);
                schedTransient = new ClientRequestSchedulerNonPersistent(this);
                persistentCooldownQueue = schedCore.persistentCooldownQueue;
+               this.databaseExecutor = core.clientDatabaseExecutor;
                this.starter = starter;
                this.random = random;
                this.node = node;
@@ -138,7 +141,7 @@
                choosenPriorityScheduler = val;
        }

-       public void register(SendableRequest req) {
+       public void register(final SendableRequest req) {
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                if(logMINOR) Logger.minor(this, "Registering "+req, new 
Exception("debug"));
                if(isInsertScheduler != (req instanceof SendableInsert))
@@ -196,17 +199,35 @@
                                }
                        }
                }
-               if(req.persistent())
-                       schedCore.innerRegister(req, random);
-               else
+               if(req.persistent()) {
+                       databaseExecutor.execute(new Runnable() {
+                               public void run() {
+                                       try {
+                                               schedCore.innerRegister(req, 
random);
+                                               starter.wakeUp();
+                                       } catch (Throwable t) {
+                                               Logger.error(this, "Caught "+t, 
t);
+                                       }
+                               }
+                       }, "Register request");
+               } else {
                        schedTransient.innerRegister(req, random);
-               starter.wakeUp();
+                       starter.wakeUp();
+               }
        }

-       void addPendingKey(ClientKey key, SendableGet getter) {
-               if(getter.persistent())
-                       schedCore.addPendingKey(key, getter);
-               else
+       void addPendingKey(final ClientKey key, final SendableGet getter) {
+               if(getter.persistent()) {
+                       databaseExecutor.execute(new Runnable() {
+                               public void run() {
+                                       try {
+                                               schedCore.addPendingKey(key, 
getter);
+                                       } catch (Throwable t) {
+                                               Logger.error(this, "Caught "+t, 
t);
+                                       }
+                               }
+                       }, "Add pending key");
+               } else
                        schedTransient.addPendingKey(key, getter);
        }

@@ -220,18 +241,30 @@
                return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient);
        }

-       public void removePendingKey(SendableGet getter, boolean complain, Key 
key) {
-               boolean dropped = 
-                       schedCore.removePendingKey(getter, complain, key) |
-                       schedTransient.removePendingKey(getter, complain, key);
+       public void removePendingKey(final SendableGet getter, final boolean 
complain, final Key key) {
+               boolean dropped = schedTransient.removePendingKey(getter, 
complain, key);
                if(dropped && offeredKeys != null && !node.peersWantKey(key)) {
                        for(int i=0;i<offeredKeys.length;i++)
                                offeredKeys[i].remove(key);
                }
                if(transientCooldownQueue != null)
                        transientCooldownQueue.removeKey(key, getter, 
getter.getCooldownWakeupByKey(key), null);
-               if(persistentCooldownQueue != null)
-                       persistentCooldownQueue.removeKey(key, getter, 
getter.getCooldownWakeupByKey(key), selectorContainer);
+               
+               // Now the persistent clients...
+               
+               databaseExecutor.execute(new Runnable() {
+                       public void run() {
+                               try {
+                                       schedCore.removePendingKey(getter, 
complain, key);
+                                       if(persistentCooldownQueue != null)
+                                               
persistentCooldownQueue.removeKey(key, getter, 
getter.getCooldownWakeupByKey(key), selectorContainer);
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught "+t, t);
+                               }
+                       }
+                       
+               }, "removePendingKey");
+               
        }

        /**
@@ -241,6 +274,7 @@
         * @param complain
         */
        public void removePendingKeys(SendableGet getter, boolean complain) {
+               // FIXME should this be a single databaseExecutor thread??
                Object[] keyTokens = getter.allKeys();
                for(int i=0;i<keyTokens.length;i++) {
                        Object tok = keyTokens[i];
@@ -254,47 +288,58 @@
                }
        }

-       public void reregisterAll(ClientRequester request) {
-               if(request.persistent())
-                       schedCore.reregisterAll(request, random, this);
-               else
+       public void reregisterAll(final ClientRequester request) {
+               if(request.persistent()) {
+                       databaseExecutor.execute(new Runnable() {
+                               public void run() {
+                                       try {
+                                               
schedCore.reregisterAll(request, random, ClientRequestScheduler.this);
+                                               starter.wakeUp();
+                                       } catch (Throwable t) {
+                                               Logger.error(this, "Caught "+t, 
t);
+                                       }
+                               }
+                       }, "Reregister for "+request);
+               } else {
                        schedTransient.reregisterAll(request, random, this);
-               starter.wakeUp();
+                       starter.wakeUp();
+               }
        }

        public String getChoosenPriorityScheduler() {
                return choosenPriorityScheduler;
        }

-       public synchronized void succeeded(BaseSendableGet succeeded) {
-               if(succeeded.persistent())
-                       schedCore.succeeded(succeeded);
-               else
+       public synchronized void succeeded(final BaseSendableGet succeeded) {
+               if(succeeded.persistent()) {
+                       databaseExecutor.execute(new Runnable() {
+                               public void run() {
+                                       try {
+                                               schedCore.succeeded(succeeded);
+                                       } catch (Throwable t) {
+                                               Logger.error(this, "Caught "+t, 
t);
+                                       }
+                               }
+                       }, "Mark success for "+succeeded);
+               } else
                        schedTransient.succeeded(succeeded);
        }

        public void tripPendingKey(final KeyBlock block) {
                if(logMINOR) Logger.minor(this, 
"tripPendingKey("+block.getKey()+")");
+               
+               // First the transient stuff
+               
                if(offeredKeys != null) {
                        for(int i=0;i<offeredKeys.length;i++) {
                                offeredKeys[i].remove(block.getKey());
                        }
                }
                final Key key = block.getKey();
-               final SendableGet[] gets = schedCore.removePendingKey(key);
                final SendableGet[] transientGets = 
schedTransient.removePendingKey(key);
-               if(gets == null) return;
-               if(transientCooldownQueue != null) {
-                       for(int i=0;i<gets.length;i++)
-                               transientCooldownQueue.removeKey(key, 
transientGets[i], transientGets[i].getCooldownWakeupByKey(key), null);
-               }
-               if(persistentCooldownQueue != null) {
-                       for(int i=0;i<gets.length;i++)
-                               persistentCooldownQueue.removeKey(key, gets[i], 
gets[i].getCooldownWakeupByKey(key), selectorContainer);
-               }
-               Runnable r = new Runnable() {
+               node.executor.execute(new Runnable() {
                        public void run() {
-                               if(logMINOR) Logger.minor(this, "Running 
"+gets.length+" callbacks off-thread for "+block.getKey());
+                               if(logMINOR) Logger.minor(this, "Running 
"+transientGets.length+" callbacks off-thread for "+block.getKey());
                                for(int i=0;i<transientGets.length;i++) {
                                        try {
                                                if(logMINOR) Logger.minor(this, 
"Calling callback for "+transientGets[i]+" for "+key);
@@ -303,6 +348,27 @@
                                                Logger.error(this, "Caught 
"+t+" running callback "+transientGets[i]+" for "+key);
                                        }
                                }
+                       }
+               }, "Running off-thread callbacks for "+block.getKey());
+               if(transientCooldownQueue != null) {
+                       for(int i=0;i<transientGets.length;i++)
+                               transientCooldownQueue.removeKey(key, 
transientGets[i], transientGets[i].getCooldownWakeupByKey(key), null);
+               }
+               
+               // Now the persistent stuff
+               
+               databaseExecutor.execute(new Runnable() {
+
+                       public void run() {
+                               final SendableGet[] gets = 
schedCore.removePendingKey(key);
+                               if(gets == null) return;
+                               if(persistentCooldownQueue != null) {
+                                       for(int i=0;i<gets.length;i++)
+                                               
persistentCooldownQueue.removeKey(key, gets[i], 
gets[i].getCooldownWakeupByKey(key), selectorContainer);
+                               }
+                               // Call the callbacks on the database executor 
thread, because the first thing
+                               // they will need to do is access the database 
to decide whether they need to
+                               // decode, and if so to find the key to decode 
with.
                                for(int i=0;i<gets.length;i++) {
                                        try {
                                                if(logMINOR) Logger.minor(this, 
"Calling callback for "+gets[i]+" for "+key);
@@ -313,16 +379,13 @@
                                }
                                if(logMINOR) Logger.minor(this, "Finished 
running callbacks");
                        }
-               };
-               node.getTicker().queueTimedJob(r, 0); // FIXME ideally these 
would be completed on a single thread; when we have 1.5, use a dedicated 
non-parallel Executor
+                       
+               }, "tripPendingKey for "+block.getKey());
+               
        }

-       public boolean anyWantKey(Key key) {
-               return schedTransient.anyWantKey(key) || 
schedCore.anyWantKey(key);
-       }
-
        /** If we want the offered key, or if force is enabled, queue it */
-       public void maybeQueueOfferedKey(Key key, boolean force) {
+       public void maybeQueueOfferedKey(final Key key, boolean force) {
                if(logMINOR)
                        Logger.minor(this, 
"maybeQueueOfferedKey("+key+","+force);
                short priority = Short.MAX_VALUE;
@@ -331,12 +394,21 @@
                        priority = 
RequestStarter.IMMEDIATE_SPLITFILE_PRIORITY_CLASS;
                }
                priority = schedCore.getKeyPrio(key, priority);
-               priority = schedTransient.getKeyPrio(key, priority);
-               if(priority == Short.MAX_VALUE) return;
-               if(logMINOR)
-                       Logger.minor(this, "Priority: "+priority);
-               offeredKeys[priority].queueKey(key);
-               starter.wakeUp();
+               if(priority < Short.MAX_VALUE) {
+                       offeredKeys[priority].queueKey(key);
+                       starter.wakeUp();
+               }
+               
+               final short oldPrio = priority;
+               
+               databaseExecutor.execute(new Runnable() {
+                       public void run() {
+                               short priority = schedTransient.getKeyPrio(key, 
oldPrio);
+                               if(priority >= oldPrio) return; // already on 
list at >= priority
+                               offeredKeys[priority].queueKey(key);
+                               starter.wakeUp();
+                       }
+               }, "maybeQueueOfferedKey");
        }

        public void dequeueOfferedKey(Key key) {
@@ -345,8 +417,11 @@
                }
        }

+       /**
+        * MUST be called from database thread!
+        */
        public long queueCooldown(ClientKey key, SendableGet getter) {
-               if(getter.persistent())
+               if(!getter.persistent())
                        return persistentCooldownQueue.add(key.getNodeKey(), 
getter, selectorContainer);
                else
                        return transientCooldownQueue.add(key.getNodeKey(), 
getter, null);
@@ -354,7 +429,15 @@

        public void moveKeysFromCooldownQueue() {
                moveKeysFromCooldownQueue(transientCooldownQueue, null);
-               moveKeysFromCooldownQueue(persistentCooldownQueue, 
selectorContainer);
+               databaseExecutor.execute(new Runnable() {
+                       public void run() {
+                               try {
+                                       
moveKeysFromCooldownQueue(persistentCooldownQueue, selectorContainer);
+                               } catch (Throwable t) {
+                                       Logger.error(this, "Caught "+t, t);
+                               }
+                       }
+               }, "moveKeysFromCooldownQueue");
        }

        private void moveKeysFromCooldownQueue(CooldownQueue queue, 
ObjectContainer container) {
@@ -386,7 +469,7 @@
                }
        }

-       public long countQueuedRequests() {
+       public long countTransientQueuedRequests() {
                // Approximately... there might be some overlap in the two 
pendingKeys's...
                return schedCore.countQueuedRequests() + 
schedTransient.countQueuedRequests();
        }

Modified: branches/db4o/freenet/src/freenet/node/NodeClientCore.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-05-22 
19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/node/NodeClientCore.java  2008-05-22 
22:10:15 UTC (rev 20048)
@@ -105,6 +105,15 @@
        private boolean lazyResume;
        protected final Persister persister;
        private final SerialExecutor clientSlowSerialExecutor[];
+       /** All client-layer database access occurs on a SerialExecutor, so 
that we don't need
+        * to have multiple parallel transactions. Advantages:
+        * - We never have two copies of the same object in RAM, and more 
broadly, we don't
+        *   need to worry about interactions between objects from different 
transactions.
+        * - Only one weak-reference cache for the database.
+        * - No need to refresh live objects. 
+        * - Deactivation is simpler.
+        */
+       public final SerialExecutor clientDatabaseExecutor;

        public static int maxBackgroundUSKFetchers;

@@ -130,6 +139,7 @@
                        else prio = NativeThread.MIN_PRIORITY;
                        clientSlowSerialExecutor[i] = new SerialExecutor(prio);
                }
+               clientDatabaseExecutor = new 
SerialExecutor(NativeThread.NORM_PRIORITY);
                byte[] pwdBuf = new byte[16];
                random.nextBytes(pwdBuf);
                this.formPassword = Base64.encode(pwdBuf);
@@ -1106,18 +1116,6 @@
                return tempDir;
        }

-       /**
-        * Has any client registered an interest in this particular key?
-        */
-       public boolean clientWantKey(Key key) {
-               if(key instanceof NodeCHK)
-                       return 
requestStarters.chkFetchScheduler.anyWantKey(key);
-               else if(key instanceof NodeSSK)
-                       return 
requestStarters.sskFetchScheduler.anyWantKey(key);
-               else
-                       throw new IllegalArgumentException("Not a CHK and not 
an SSK!");
-       }
-
        public boolean hasLoadedQueue() {
                return fcpServer.hasFinishedStart();
        }
@@ -1144,7 +1142,7 @@
                return toadletContainer.getBookmarkURIs();
        }

-       public long countQueuedRequests() {
-               return requestStarters.countQueuedRequests();
+       public long countTransientQueuedRequests() {
+               return requestStarters.countTransientQueuedRequests();
        }
 }

Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-05-22 19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-05-22 22:10:15 UTC (rev 20048)
@@ -4,7 +4,6 @@
 package freenet.node;

 import freenet.keys.ClientKey;
-import freenet.support.RandomGrabArray;

 public interface RequestScheduler {

@@ -25,7 +24,7 @@
         * @param key The key to be added.
         * @return The time at which the key will leave the cooldown queue.
         */
-       public long queueCooldown(ClientKey key, SendableGet getter);
+       long queueCooldown(ClientKey key, SendableGet getter);

        /**
         * Remove keys from the cooldown queue who have now served their time 
and can be requested 
@@ -40,6 +39,6 @@
         * Note: If you don't want your requests to be subject to cooldown 
(e.g. in fproxy), make 
         * your max retry count less than this (and more than -1). */
        public static final int COOLDOWN_RETRIES = 3;
-       public long countQueuedRequests();
+       public long countTransientQueuedRequests();

 }

Modified: branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java     
2008-05-22 19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/node/RequestStarterGroup.java     
2008-05-22 22:10:15 UTC (rev 20048)
@@ -224,11 +224,11 @@
                return throttleWindow.realCurrentValue();
        }

-       public long countQueuedRequests() {
-               return chkFetchScheduler.countQueuedRequests() +
-                       sskFetchScheduler.countQueuedRequests() +
-                       chkPutScheduler.countQueuedRequests() +
-                       sskPutScheduler.countQueuedRequests();
+       public long countTransientQueuedRequests() {
+               return chkFetchScheduler.countTransientQueuedRequests() +
+                       sskFetchScheduler.countTransientQueuedRequests() +
+                       chkPutScheduler.countTransientQueuedRequests() +
+                       sskPutScheduler.countTransientQueuedRequests();
        }

 }

Modified: branches/db4o/freenet/src/freenet/node/SendableGet.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/SendableGet.java     2008-05-22 
19:52:56 UTC (rev 20047)
+++ branches/db4o/freenet/src/freenet/node/SendableGet.java     2008-05-22 
22:10:15 UTC (rev 20048)
@@ -110,6 +110,12 @@
                        return parent.chkScheduler;
        }

+       /**
+        * Callback for when a block is found. Will be called on the database 
executor thread.
+        * @param key
+        * @param block
+        * @param sched
+        */
        public abstract void onGotKey(Key key, KeyBlock block, RequestScheduler 
sched);

        /**

Modified: 
branches/db4o/freenet/src/freenet/node/simulator/RealNodeBusyNetworkTest.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/node/simulator/RealNodeBusyNetworkTest.java   
    2008-05-22 19:52:56 UTC (rev 20047)
+++ 
branches/db4o/freenet/src/freenet/node/simulator/RealNodeBusyNetworkTest.java   
    2008-05-22 22:10:15 UTC (rev 20048)
@@ -143,7 +143,7 @@
                }
                long totalRunningRequests = 0;
                for(int j=0;j<nodes.length;j++) {
-                       totalRunningRequests += 
nodes[j].clientCore.countQueuedRequests();
+                       totalRunningRequests += 
nodes[j].clientCore.countTransientQueuedRequests();
                }
                System.err.println("Running requests: "+totalRunningRequests);
         }
@@ -153,7 +153,7 @@
         while(true) {
                long totalRunningRequests = 0;
                for(int i=0;i<nodes.length;i++) {
-                       totalRunningRequests += 
nodes[i].clientCore.countQueuedRequests();
+                       totalRunningRequests += 
nodes[i].clientCore.countTransientQueuedRequests();
                }
                System.err.println("Running requests: "+totalRunningRequests);
                if(totalRunningRequests == 0) return;


Reply via email to