Author: toad
Date: 2008-07-19 16:48:15 +0000 (Sat, 19 Jul 2008)
New Revision: 21250

Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientContext.java
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
   branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
   branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
   branches/db4o/freenet/src/freenet/node/FailureTable.java
   branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
Log:
If the queue is full, write the RegisterMe and tell CRSCore to re-run the 
RegisterMeRunner.
So we don't need to add an item to the queue, which saves memory etc.

Modified: branches/db4o/freenet/src/freenet/client/async/ClientContext.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientContext.java   
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/client/async/ClientContext.java   
2008-07-19 16:48:15 UTC (rev 21250)
@@ -42,8 +42,10 @@
        public transient final HealingQueue healingQueue;
        public transient final USKManager uskManager;
        public transient final Random fastWeakRandom;
+       public transient final long bootID;

        public ClientContext(NodeClientCore core) {
+               this.bootID = core.node.bootID;
                this.fecQueue = core.fecQueue;
                jobRunner = core;
                this.mainExecutor = core.getExecutor();

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-07-19 16:48:15 UTC (rev 21250)
@@ -156,17 +156,24 @@
        public void registerInsert(final SendableRequest req, boolean 
persistent, boolean regmeOnly) {
                registerInsert(req, persistent, regmeOnly, 
databaseExecutor.onThread());
        }
+
+       static final int QUEUE_THRESHOLD = 250;

        public void registerInsert(final SendableRequest req, boolean 
persistent, boolean regmeOnly, boolean onDatabaseThread) {
                if(persistent) {
                        if(onDatabaseThread) {
                                if(regmeOnly) {
-                                       final RegisterMe regme = new 
RegisterMe(null, null, req, req.getPriorityClass(selectorContainer), schedCore, 
null);
+                                       long bootID = 0;
+                                       boolean queueFull = 
jobRunner.getQueueSize(NativeThread.NORM_PRIORITY) <= QUEUE_THRESHOLD;
+                                       if(!queueFull)
+                                               bootID = this.node.bootID;
+                                       final RegisterMe regme = new 
RegisterMe(null, null, req, req.getPriorityClass(selectorContainer), schedCore, 
null, bootID);
                                        selectorContainer.set(regme);
                                        if(logMINOR)
                                                Logger.minor(this, "Added 
insert RegisterMe: "+regme);
+                                       if(!queueFull) {
                                        jobRunner.queue(new DBJob() {
-
+                                               
                                                public void run(ObjectContainer 
container, ClientContext context) {
                                                        container.delete(regme);
                                                        container.activate(req, 
1);
@@ -174,6 +181,9 @@
                                                }

                                        }, NativeThread.NORM_PRIORITY, false);
+                                       } else {
+                                               
schedCore.rerunRegisterMeRunner(jobRunner);
+                                       }
                                        selectorContainer.deactivate(req, 1);
                                        return;
                                }
@@ -258,12 +268,19 @@
                if(listener != null) {
                        if(registerOffThread) {
                                short prio = 
listener.getPriorityClass(selectorContainer);
+                               boolean queueFull = false;
                                if(reg == null) {
-                                       reg = new RegisterMe(listener, getters, 
null, prio, schedCore, blocks);
+                                       long bootID = 0;
+                                       queueFull = 
jobRunner.getQueueSize(NativeThread.NORM_PRIORITY) <= QUEUE_THRESHOLD;
+                                       if(!queueFull)
+                                               bootID = this.node.bootID;
+
+                                       reg = new RegisterMe(listener, getters, 
null, prio, schedCore, blocks, bootID);
                                        selectorContainer.set(reg);
                                }
                                final RegisterMe regme = reg;
                                if(logMINOR) Logger.minor(this, "Added regme: 
"+regme);
+                               if(!queueFull) {
                                jobRunner.queue(new DBJob() {

                                        public void run(ObjectContainer 
container, ClientContext context) {
@@ -277,12 +294,15 @@
                                        }

                                }, NativeThread.NORM_PRIORITY, false);
+                               } else {
+                                       
schedCore.rerunRegisterMeRunner(jobRunner);
+                               }
                                return;
                        } else {
                                short prio = 
listener.getPriorityClass(selectorContainer);
                                schedCore.addPendingKeys(listener, 
selectorContainer);
                                if(reg == null && getters != null) {
-                                       reg = new RegisterMe(null, getters, 
null, prio, schedCore, blocks);
+                                       reg = new RegisterMe(null, getters, 
null, prio, schedCore, blocks, node.bootID);
                                        selectorContainer.set(reg);
                                        if(logMINOR) Logger.minor(this, "Added 
regme: "+reg);
                                } else {

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-07-19 14:46:50 UTC (rev 21249)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-07-19 16:48:15 UTC (rev 21250)
@@ -131,6 +131,9 @@
                preRegisterMeRunner = new DBJob() {

                        public void run(ObjectContainer container, 
ClientContext context) {
+                               synchronized(ClientRequestSchedulerCore.this) {
+                                       if(registerMeSet != null) return;
+                               }
                                long tStart = System.currentTimeMillis();
                                // FIXME REDFLAG EVIL DB4O BUG!!!
                                // FIXME verify and file a bug
@@ -182,7 +185,10 @@
 //                             query.constrain(eval);
 //                             
query.descend("key").descend("priority").orderAscending();
 //                             
query.descend("key").descend("addedTime").orderAscending();
-                               registerMeSet = query.execute();
+                               ObjectSet results = query.execute();
+                               synchronized(ClientRequestSchedulerCore.this) {
+                                       registerMeSet = results;
+                               }
                        long tEnd = System.currentTimeMillis();
                        if(logMINOR)
                                Logger.minor(this, "RegisterMe query took 
"+(tEnd-tStart)+" hasNext="+registerMeSet.hasNext()+" for 
insert="+isInsertScheduler+" ssk="+isSSKScheduler);
@@ -198,9 +204,13 @@
        private transient DBJob preRegisterMeRunner;

        void start(DBJobRunner runner) {
-                       runner.queue(preRegisterMeRunner, 
NativeThread.NORM_PRIORITY, true);
+               startRegisterMeRunner(runner);
        }

+       private final void startRegisterMeRunner(DBJobRunner runner) {
+               runner.queue(preRegisterMeRunner, NativeThread.NORM_PRIORITY, 
true);
+       }
+       
        void fillStarterQueue(ObjectContainer container) {
                ObjectSet results = container.query(new Predicate() {
                        public boolean match(PersistentChosenRequest req) {
@@ -560,6 +570,8 @@

        private transient RegisterMeRunner registerMeRunner;

+       private transient boolean shouldReRunRegisterMeRunner;
+       
        class RegisterMeRunner implements DBJob {

                public void run(ObjectContainer container, ClientContext 
context) {
@@ -586,6 +598,10 @@
                                long startNext = System.currentTimeMillis();
                                RegisterMe reg = (RegisterMe) 
registerMeSet.next();
                                container.activate(reg, 1);
+                               if(reg.bootID == context.bootID) {
+                                       if(logMINOR) Logger.minor(this, "Not 
registering block as was added to the queue");
+                                       continue;
+                               }
                                // FIXME remove the leftover/old core handling 
at some point, an NPE is acceptable long-term.
                                if(reg.core != ClientRequestSchedulerCore.this) 
{
                                        if(reg.core == null) {
@@ -652,7 +668,15 @@
                                context.jobRunner.queue(registerMeRunner, 
NativeThread.NORM_PRIORITY-1, true);
                        else {
                                if(logMINOR) Logger.minor(this, 
"RegisterMeRunner finished");
-                               registerMeSet = null;
+                               boolean rerun;
+                               synchronized(ClientRequestSchedulerCore.this) {
+                                       rerun = shouldReRunRegisterMeRunner;
+                                       shouldReRunRegisterMeRunner = false;
+                                       registerMeSet = null;
+                               }
+                               if(rerun) {
+                                       preRegisterMeRunner.run(container, 
context);
+                               }
                        }
                }

@@ -859,5 +883,13 @@
                }
        }

+       public void rerunRegisterMeRunner(DBJobRunner runner) {
+               synchronized(this) {
+                       shouldReRunRegisterMeRunner = true;
+                       if(registerMeSet != null) return;
+               }
+               startRegisterMeRunner(runner);
+       }
+
 }


Modified: branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java     
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/client/async/DBJobRunner.java     
2008-07-19 16:48:15 UTC (rev 21250)
@@ -13,4 +13,6 @@

        public boolean onDatabaseThread();

+       public int getQueueSize(int priority);
+
 }

Modified: branches/db4o/freenet/src/freenet/client/async/RegisterMe.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/RegisterMe.java      
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/client/async/RegisterMe.java      
2008-07-19 16:48:15 UTC (rev 21250)
@@ -16,11 +16,16 @@
        final SendableRequest nonGetRequest;
        final ClientRequestSchedulerCore core;
        final RegisterMeSortKey key;
+       /**
+        * Only set if the key is on the queue.
+        */
+       final long bootID;
        private final int hashCode;
        public final BlockSet blocks;

-       RegisterMe(GotKeyListener listener, SendableGet[] getters, 
SendableRequest nonGetRequest, short prio, ClientRequestSchedulerCore core, 
BlockSet blocks) {
+       RegisterMe(GotKeyListener listener, SendableGet[] getters, 
SendableRequest nonGetRequest, short prio, ClientRequestSchedulerCore core, 
BlockSet blocks, long bootID) {
                this.listener = listener;
+               this.bootID = bootID;
                this.getters = getters;
                this.core = core;
                this.nonGetRequest = nonGetRequest;

Modified: branches/db4o/freenet/src/freenet/node/FailureTable.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/FailureTable.java    2008-07-19 
14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/node/FailureTable.java    2008-07-19 
16:48:15 UTC (rev 21250)
@@ -49,9 +49,9 @@
        private final Node node;

        /** Maximum number of keys to track */
-       static final int MAX_ENTRIES = 20*1000;
+       static final int MAX_ENTRIES = 2*1000;
        /** Maximum number of offers to track */
-       static final int MAX_OFFERS = 10*1000;
+       static final int MAX_OFFERS = 1*1000;
        /** Terminate a request if there was a DNF on the same key less than 10 
minutes ago */
        static final int REJECT_TIME = 10*60*1000;
        /** After 1 hour we forget about an entry completely */

Modified: 
branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java
===================================================================
--- branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java    
2008-07-19 14:46:50 UTC (rev 21249)
+++ branches/db4o/freenet/src/freenet/support/PrioritizedSerialExecutor.java    
2008-07-19 16:48:15 UTC (rev 21250)
@@ -230,4 +230,10 @@
                return retval;
        }

+       public int getQueueSize(int priority) {
+               synchronized(jobs) {
+                       return jobs[priority].size();
+               }
+       }
+
 }


Reply via email to