Author: toad
Date: 2008-06-24 15:43:05 +0000 (Tue, 24 Jun 2008)
New Revision: 20648

Modified:
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
   
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
Log:
Remove ObjectContainer from CRSCore.

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-06-24 15:34:20 UTC (rev 20647)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-06-24 15:43:05 UTC (rev 20648)
@@ -115,7 +115,7 @@
                this.selectorContainer = node.db;
                schedCore = ClientRequestSchedulerCore.create(node, forInserts, 
forSSKs, selectorContainer, COOLDOWN_PERIOD, core.clientDatabaseExecutor, this, 
context);
                schedTransient = new ClientRequestSchedulerNonPersistent(this);
-               schedCore.fillStarterQueue();
+               schedCore.fillStarterQueue(selectorContainer);
                schedCore.start(core);
                persistentCooldownQueue = schedCore.persistentCooldownQueue;
                this.databaseExecutor = core.clientDatabaseExecutor;
@@ -165,7 +165,7 @@
        public void register(final SendableRequest req, boolean 
onDatabaseThread, final RegisterMe reg) {
                logMINOR = Logger.shouldLog(Logger.MINOR, this);
                if(logMINOR) Logger.minor(this, "Registering "+req, new 
Exception("debug"));
-               boolean persistent = req.persistent();
+               final boolean persistent = req.persistent();
                if(isInsertScheduler != (req instanceof SendableInsert))
                        throw new IllegalArgumentException("Expected a 
SendableInsert: "+req);
                if(req instanceof SendableGet) {
@@ -173,7 +173,7 @@

                        if(persistent && onDatabaseThread) {
                                schedCore.addPendingKeys(getter, 
selectorContainer);
-                               schedCore.queueRegister(getter, 
databaseExecutor);
+                               schedCore.queueRegister(getter, 
databaseExecutor, selectorContainer);
                                final Object[] keyTokens = 
getter.sendableKeys(selectorContainer);
                                final ClientKey[] keys = new 
ClientKey[keyTokens.length];
                                for(int i=0;i<keyTokens.length;i++)
@@ -190,7 +190,7 @@

                                        public void run(ObjectContainer 
container, ClientContext context) {
                                                
schedCore.addPendingKeys(getter, container);
-                                               schedCore.queueRegister(getter, 
databaseExecutor);
+                                               schedCore.queueRegister(getter, 
databaseExecutor, container);
                                                final Object[] keyTokens = 
getter.sendableKeys(container);
                                                final ClientKey[] keys = new 
ClientKey[keyTokens.length];
                                                for(int 
i=0;i<keyTokens.length;i++)
@@ -222,12 +222,26 @@
                                }, getter.getPriorityClass(), "Checking 
datastore");
                        }
                } else {
-                       if(persistent)
-                               schedCore.queueRegister(req, databaseExecutor);
-                       // Pretend to not be on the database thread.
-                       // In some places (e.g. SplitFileInserter.start(), we 
call register() *many* times within a single transaction.
-                       // We can greatly improve responsiveness at the cost of 
some throughput and RAM by only adding the tags at this point.
-                       finishRegister(req, persistent, false, true, reg);
+                       if(persistent) {
+                               if(onDatabaseThread) {
+                                       schedCore.queueRegister(req, 
databaseExecutor, selectorContainer);
+                                       finishRegister(req, persistent, false, 
true, reg);
+                               } else {
+                                       jobRunner.queue(new DBJob() {
+
+                                               public void run(ObjectContainer 
container, ClientContext context) {
+                                                       
schedCore.queueRegister(req, databaseExecutor, selectorContainer);
+                                                       // Pretend to not be on 
the database thread.
+                                                       // In some places (e.g. 
SplitFileInserter.start(), we call register() *many* times within a single 
transaction.
+                                                       // We can greatly 
improve responsiveness at the cost of some throughput and RAM by only adding 
the tags at this point.
+                                                       finishRegister(req, 
persistent, false, true, reg);
+                                               }
+                                               
+                                       }, NativeThread.NORM_PRIORITY, false);
+                               }
+                       } else {
+                               finishRegister(req, persistent, false, true, 
reg);
+                       }
                }
        }

@@ -366,7 +380,7 @@
                        schedTransient.addPendingKey(key, getter);
        }

-       private synchronized ChosenRequest removeFirst() {
+       private synchronized ChosenRequest removeFirst(ObjectContainer 
container) {
                if(!databaseExecutor.onThread()) {
                        throw new IllegalStateException("Not on database 
thread!");
                }
@@ -376,7 +390,7 @@
                else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
                        fuzz = 0;       
                // schedCore juggles both
-               return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, false, Short.MAX_VALUE, Short.MAX_VALUE, 
clientContext);
+               return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, false, Short.MAX_VALUE, Short.MAX_VALUE, 
clientContext, container);
        }

        public ChosenRequest getBetterNonPersistentRequest(ChosenRequest req) {
@@ -386,10 +400,10 @@
                else if(PRIORITY_HARD.equals(choosenPriorityScheduler))
                        fuzz = 0;       
                if(req == null)
-                       return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, Short.MAX_VALUE, Integer.MAX_VALUE, 
clientContext);
+                       return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, Short.MAX_VALUE, Integer.MAX_VALUE, 
clientContext, null);
                short prio = req.request.getPriorityClass();
                int retryCount = req.request.getRetryCount();
-               return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, prio, retryCount, clientContext);
+               return schedCore.removeFirst(fuzz, random, offeredKeys, 
starter, schedTransient, true, prio, retryCount, clientContext, null);
        }

        private static final int MAX_STARTER_QUEUE_SIZE = 100;
@@ -424,7 +438,7 @@
                                }
                        }
                        while(true) {
-                               req = removeFirst();
+                               req = removeFirst(container);
                                if(req == null) return;
                                synchronized(starterQueue) {
                                        if(req != null) {

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java  
    2008-06-24 15:34:20 UTC (rev 20647)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerBase.java  
    2008-06-24 15:43:05 UTC (rev 20648)
@@ -61,9 +61,6 @@

        abstract boolean persistent();

-       /** @return The container if this is persistent, otherwise null */
-       abstract ObjectContainer container();
-       
        protected ClientRequestSchedulerBase(boolean forInserts, boolean 
forSSKs, Map pendingKeys, Map allRequestsByClientRequest, List recentSuccesses) 
{
                this.isInsertScheduler = forInserts;
                this.isSSKScheduler = forSSKs;
@@ -244,14 +241,14 @@
                addToGrabArray(req.getPriorityClass(), retryCount, 
fixRetryCount(retryCount), req.getClient(), req.getClientRequest(), req, 
random, container);
                Set v = (Set) 
allRequestsByClientRequest.get(req.getClientRequest());
                if(v == null) {
-                       v = makeSetForAllRequestsByClientRequest();
+                       v = makeSetForAllRequestsByClientRequest(container);
                        allRequestsByClientRequest.put(req.getClientRequest(), 
v);
                }
                v.add(req);
                if(logMINOR) Logger.minor(this, "Registered "+req+" on 
prioclass="+req.getPriorityClass()+", retrycount="+req.getRetryCount()+" 
v.size()="+v.size());
        }

-       protected abstract Set makeSetForAllRequestsByClientRequest();
+       protected abstract Set 
makeSetForAllRequestsByClientRequest(ObjectContainer container);

        void addToGrabArray(short priorityClass, int retryCount, int rc, Object 
client, ClientRequester cr, SendableRequest req, RandomSource random, 
ObjectContainer container) {
                if((priorityClass > RequestStarter.MINIMUM_PRIORITY_CLASS) || 
(priorityClass < RequestStarter.MAXIMUM_PRIORITY_CLASS))

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-06-24 15:34:20 UTC (rev 20647)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerCore.java  
    2008-06-24 15:43:05 UTC (rev 20648)
@@ -40,7 +40,6 @@
  */
 class ClientRequestSchedulerCore extends ClientRequestSchedulerBase implements 
KeysFetchingLocally {

-       private ObjectContainer container;
        private static boolean logMINOR;
        /** Identifier in the database for the node we are attached to */
        private final long nodeDBHandle;
@@ -92,7 +91,6 @@
        ClientRequestSchedulerCore(Node node, boolean forInserts, boolean 
forSSKs, ObjectContainer selectorContainer, long cooldownTime) {
                super(forInserts, forSSKs, forInserts ? null : 
selectorContainer.ext().collections().newHashMap(1024), 
selectorContainer.ext().collections().newHashMap(32), 
selectorContainer.ext().collections().newLinkedList());
                this.nodeDBHandle = node.nodeDBHandle;
-               this.container = selectorContainer;
                if(!forInserts) {
                        this.persistentCooldownQueue = new 
PersistentCooldownQueue();
                } else {
@@ -105,7 +103,6 @@
                        
((Db4oMap)pendingKeys).activationDepth(Integer.MAX_VALUE);
                ((Db4oMap)allRequestsByClientRequest).activationDepth(1);
                ((Db4oList)recentSuccesses).activationDepth(Integer.MAX_VALUE);
-               this.container = container;
                if(!isInsertScheduler) {
                        persistentCooldownQueue.setCooldownTime(cooldownTime);
                }
@@ -147,7 +144,7 @@
                runner.queue(preRegisterMeRunner, NativeThread.NORM_PRIORITY, 
true);
        }

-       void fillStarterQueue() {
+       void fillStarterQueue(ObjectContainer container) {
                ObjectSet results = container.query(new Predicate() {
                        public boolean match(PersistentChosenRequest req) {
                                if(req.core != ClientRequestSchedulerCore.this) 
return false;
@@ -206,8 +203,8 @@
        // We prevent a number of race conditions (e.g. adding a retry count 
and then another 
        // thread removes it cos its empty) ... and in addToGrabArray etc we 
already sync on this.
        // The worry is ... is there any nested locking outside of the 
hierarchy?
-       ChosenRequest removeFirst(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio, int retryCount, ClientContext context) {
-               SendableRequest req = removeFirstInner(fuzz, random, 
offeredKeys, starter, schedTransient, transientOnly, maxPrio, retryCount, 
context);
+       ChosenRequest removeFirst(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio, int retryCount, ClientContext context, ObjectContainer 
container) {
+               SendableRequest req = removeFirstInner(fuzz, random, 
offeredKeys, starter, schedTransient, transientOnly, maxPrio, retryCount, 
context, container);
                if(req == null) return null;
                Object token = req.chooseKey(this, req.persistent() ? container 
: null, context);
                if(token == null) {
@@ -243,7 +240,7 @@
                }
        }

-       SendableRequest removeFirstInner(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio, int retryCount, ClientContext context) {
+       SendableRequest removeFirstInner(int fuzz, RandomSource random, 
OfferedKeysList[] offeredKeys, RequestStarter starter, 
ClientRequestSchedulerNonPersistent schedTransient, boolean transientOnly, 
short maxPrio, int retryCount, ClientContext context, ObjectContainer 
container) {
                // Priorities start at 0
                if(logMINOR) Logger.minor(this, "removeFirst()");
                boolean tryOfferedKeys = offeredKeys != null && 
random.nextBoolean();
@@ -453,10 +450,6 @@
                return true;
        }

-       ObjectContainer container() {
-               return container;
-       }
-
        private transient ObjectSet registerMeSet;

        private transient RegisterMeRunner registerMeRunner;
@@ -513,7 +506,7 @@
                }

        }
-       public void queueRegister(SendableRequest req, 
PrioritizedSerialExecutor databaseExecutor) {
+       public void queueRegister(SendableRequest req, 
PrioritizedSerialExecutor databaseExecutor, ObjectContainer container) {
                if(!databaseExecutor.onThread()) {
                        throw new IllegalStateException("Not on database 
thread!");
                }
@@ -531,8 +524,8 @@
                synchronized(keysFetching) {
                        keysFetching.remove(key);
                }
-               sched.databaseExecutor.execute(new Runnable() {
-                       public void run() {
+               sched.clientContext.jobRunner.queue(new DBJob() {
+                       public void run(ObjectContainer container, 
ClientContext context) {
                                ObjectSet results = container.query(new 
Predicate() {
                                        public boolean 
match(PersistentChosenRequest req) {
                                                if(req.core != 
ClientRequestSchedulerCore.this) return false;
@@ -545,10 +538,10 @@
                                        container.commit();
                                }
                        }
-               }, NativeThread.NORM_PRIORITY, "Remove fetching key");
+               }, NativeThread.NORM_PRIORITY, false);
        }

-       protected Set makeSetForAllRequestsByClientRequest() {
+       protected Set makeSetForAllRequestsByClientRequest(ObjectContainer 
container) {
                return new Db4oSet(container, 1);
        }


Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
===================================================================
--- 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
     2008-06-24 15:34:20 UTC (rev 20647)
+++ 
branches/db4o/freenet/src/freenet/client/async/ClientRequestSchedulerNonPersistent.java
     2008-06-24 15:43:05 UTC (rev 20648)
@@ -39,7 +39,7 @@
                return null;
        }

-       protected Set makeSetForAllRequestsByClientRequest() {
+       protected Set makeSetForAllRequestsByClientRequest(ObjectContainer 
ignored) {
                return new HashSet();
        }



Reply via email to