Author: toad
Date: 2008-08-14 00:35:04 +0000 (Thu, 14 Aug 2008)
New Revision: 21830

Modified:
   branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java
   branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
   branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
   branches/db4o/freenet/src/freenet/node/RequestScheduler.java
   branches/db4o/freenet/src/freenet/node/RequestStarter.java
Log:
Fix priority inversion problems.
Only add to keysFetching when we actually start the request. Only remove when 
done.
Check whether in keysFetching when creating a PersistentChosenRequest, also 
when checking the queue size, and grabbing a request from it.
Start a request from the best PersistentChosenRequest on the starterQueue, not 
from the first one.


Modified: branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java     
2008-08-13 23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/client/async/ChosenBlock.java     
2008-08-14 00:35:04 UTC (rev 21830)
@@ -32,12 +32,6 @@
                this.localRequestOnly = localRequestOnly;
                this.cacheLocalRequests = cacheLocalRequests;
                this.ignoreStore = ignoreStore;
-               // Ignore return value. Not our problem at this point.
-               // It won't have been scheduled if there were *no* valid 
blocks...
-               // it's possible, but very unlikely, that some blocks are 
scheduled
-               // and some are not.
-               if(key != null)
-                       sched.addToFetching(key);
        }

        public abstract boolean isPersistent();
@@ -66,8 +60,4 @@
        }

        public abstract SendableRequestSender getSender(ClientContext context);
-       
-       public void removeFromFetching(ClientRequestSchedulerCore core) {
-               core.removeFetchingKey(key);
-       }
 }

Modified: 
branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-08-13 23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/client/async/ClientRequestScheduler.java  
2008-08-14 00:35:04 UTC (rev 21830)
@@ -574,9 +574,18 @@
         */
        public ChosenBlock grabRequest() {
                while(true) {
-                       PersistentChosenRequest reqGroup;
+                       PersistentChosenRequest reqGroup = null;
                        synchronized(starterQueue) {
-                               reqGroup = starterQueue.isEmpty() ? null : 
starterQueue.getFirst();
+                               short bestPriority = Short.MAX_VALUE;
+                               int bestRetryCount = Integer.MAX_VALUE;
+                               for(PersistentChosenRequest req : starterQueue) 
{
+                                       if(req.prio < bestPriority || 
+                                                       (req.prio == 
bestPriority && req.retryCount < bestRetryCount)) {
+                                               bestPriority = req.prio;
+                                               bestRetryCount = req.retryCount;
+                                               reqGroup = req;
+                                       }
+                               }
                        }
                        if(reqGroup != null) {
                                // Try to find a better non-persistent request
@@ -590,7 +599,7 @@
                        ChosenBlock block;
                        int finalLength = 0;
                        synchronized(starterQueue) {
-                               block = 
reqGroup.grabNotStarted(clientContext.fastWeakRandom);
+                               block = 
reqGroup.grabNotStarted(clientContext.fastWeakRandom, this);
                                if(block == null) {
                                        for(int i=0;i<starterQueue.size();i++) {
                                                if(starterQueue.get(i) == 
reqGroup) {
@@ -671,8 +680,10 @@
                        synchronized(starterQueue) {
                                // Recompute starterQueueLength
                                int length = 0;
-                               for(PersistentChosenRequest req : starterQueue)
+                               for(PersistentChosenRequest req : starterQueue) 
{
+                                       
req.pruneDuplicates(ClientRequestScheduler.this);
                                        length += req.sizeNotStarted();
+                               }
                                if(logMINOR) Logger.minor(this, "Queue size: 
"+length+" SSK="+isSSKScheduler+" insert="+isInsertScheduler);
                                if(length >= MAX_STARTER_QUEUE_SIZE) {
                                        if(length >= WARNING_STARTER_QUEUE_SIZE)
@@ -1126,6 +1137,10 @@
        public boolean addToFetching(Key key) {
                return schedCore.addToFetching(key);
        }
+       
+       public boolean hasFetchingKey(Key key) {
+               return schedCore.hasKey(key);
+       }

        public long countPersistentQueuedRequests(ObjectContainer container) {
                return schedCore.countQueuedRequests(container);

Modified: 
branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java
===================================================================
--- branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 
2008-08-13 23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/client/async/PersistentChosenRequest.java 
2008-08-14 00:35:04 UTC (rev 21830)
@@ -4,12 +4,14 @@
 package freenet.client.async;

 import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 import java.util.Vector;

 import com.db4o.ObjectContainer;

 import freenet.client.FetchContext;
+import freenet.keys.Key;
 import freenet.node.BulkCallFailureItem;
 import freenet.node.LowLevelGetException;
 import freenet.node.RequestScheduler;
@@ -72,7 +74,13 @@
                boolean reqActive = container.ext().isActive(req);
                if(!reqActive)
                        container.activate(req, 1);
-               blocksNotStarted.addAll(req.makeBlocks(this, sched, container, 
context));
+               List<PersistentChosenBlock> candidates = req.makeBlocks(this, 
sched, container, context);
+               for(PersistentChosenBlock block : candidates) {
+                       Key key = block.key;
+                       if(key != null && sched.hasFetchingKey(key))
+                               continue;
+                       blocksNotStarted.add(block);
+               }
                sender = req.getSender(container, context);
                if(!reqActive)
                        container.deactivate(req, 1);
@@ -84,7 +92,7 @@
                        public void run() {
                                synchronized(PersistentChosenRequest.this) {
                                        if(finished) return;
-                                       Logger.error(this, "Still not finished 
after timeout: "+this);
+                                       Logger.error(this, "Still not finished 
after timeout: "+PersistentChosenRequest.this);
                                }
                        }

@@ -194,14 +202,20 @@
                scheduler.removeRunningRequest(request);
        }

-       public synchronized ChosenBlock grabNotStarted(Random random) {
-               int size = blocksNotStarted.size();
-               if(size == 0) return null;
-               PersistentChosenBlock ret;
-               if(size == 1) ret = blocksNotStarted.remove(0);
-               else ret = blocksNotStarted.remove(random.nextInt(size));
-               blocksStarted.add(ret);
-               return ret;
+       public synchronized ChosenBlock grabNotStarted(Random random, 
RequestScheduler sched) {
+               while(true) {
+                       int size = blocksNotStarted.size();
+                       if(size == 0) return null;
+                       PersistentChosenBlock ret;
+                       if(size == 1) ret = blocksNotStarted.remove(0);
+                       else ret = 
blocksNotStarted.remove(random.nextInt(size));
+                       Key key = ret.key;
+                       if(key != null && sched.hasFetchingKey(key))
+                               // Already fetching; remove from list.
+                               continue;
+                       blocksStarted.add(ret);
+                       return ret;
+               }
        }

        public synchronized int sizeNotStarted() {
@@ -211,19 +225,27 @@
        public void onDumped(ClientRequestSchedulerCore core, ObjectContainer 
container) {
                if(logMINOR)
                        Logger.minor(this, "Dumping "+this);
-               ArrayList<PersistentChosenBlock> oldNotStarted;
                boolean wasStarted;
                synchronized(this) {
-                       oldNotStarted = (ArrayList<PersistentChosenBlock>) 
blocksNotStarted.clone();
                        blocksNotStarted.clear();
                        wasStarted = !blocksStarted.isEmpty();
                }
-               for(PersistentChosenBlock block : oldNotStarted) {
-                       block.removeFromFetching(core);
-               }
                if(!wasStarted) {
                        if(logMINOR) Logger.minor(this, "Finishing immediately 
in onDumped() as nothing pending: "+this);
                        finish(container, core.sched.clientContext, true);
                }
        }
+
+       public synchronized void pruneDuplicates(ClientRequestScheduler sched) {
+               for(int i=0;i<blocksNotStarted.size();i++) {
+                       PersistentChosenBlock block = blocksNotStarted.get(i);
+                       Key key = block.key;
+                       if(key == null) continue;
+                       if(sched.hasFetchingKey(key)) {
+                               blocksNotStarted.remove(i);
+                               if(logMINOR) Logger.minor(this, "Pruned 
duplicate "+block+" from "+this);
+                               i--;
+                       }
+               }
+       }
 }

Modified: branches/db4o/freenet/src/freenet/node/RequestScheduler.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-08-13 23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/node/RequestScheduler.java        
2008-08-14 00:35:04 UTC (rev 21830)
@@ -68,6 +68,8 @@
        public void removeRunningRequest(SendableRequest request);

        public abstract boolean isRunningRequest(SendableRequest request);
+       
+       public boolean hasFetchingKey(Key key);

        public void start(NodeClientCore core);


Modified: branches/db4o/freenet/src/freenet/node/RequestStarter.java
===================================================================
--- branches/db4o/freenet/src/freenet/node/RequestStarter.java  2008-08-13 
23:31:21 UTC (rev 21829)
+++ branches/db4o/freenet/src/freenet/node/RequestStarter.java  2008-08-14 
00:35:04 UTC (rev 21830)
@@ -179,9 +179,12 @@

        private boolean startRequest(ChosenBlock req, boolean logMINOR) {
                if((!req.isPersistent()) && req.isCancelled()) {
-                       sched.removeFetchingKey(req.key);
                        return false;
                }
+               if(req.key != null) {
+                       if(!sched.addToFetching(req.key))
+                               return false;
+               }
                if(logMINOR) Logger.minor(this, "Running request "+req+" 
priority "+req.getPriority());
                core.getExecutor().execute(new SenderThread(req, req.key), 
"RequestStarter$SenderThread for "+req);
                return true;
@@ -225,7 +228,7 @@
                        if(Logger.shouldLog(Logger.MINOR, this)) 
                                Logger.minor(this, "Finished "+req);
                        } finally {
-                               sched.removeFetchingKey(key);
+                               if(key != null) sched.removeFetchingKey(key);
                        }
                }



Reply via email to