Author: toad
Date: 2008-02-12 23:17:05 +0000 (Tue, 12 Feb 2008)
New Revision: 17845

Modified:
   trunk/freenet/src/freenet/node/LocationManager.java
Log:
First implementation of swap queueing: queue incoming swap requests up to a 
limit (instead of rejecting them when we can't lock), process each one after 
the last one in unlock().

Modified: trunk/freenet/src/freenet/node/LocationManager.java
===================================================================
--- trunk/freenet/src/freenet/node/LocationManager.java 2008-02-12 22:57:16 UTC 
(rev 17844)
+++ trunk/freenet/src/freenet/node/LocationManager.java 2008-02-12 23:17:05 UTC 
(rev 17845)
@@ -6,6 +6,7 @@
 import java.security.MessageDigest;
 import java.util.Enumeration;
 import java.util.Hashtable;
+import java.util.LinkedList;
 import java.util.Vector;

 import freenet.crypt.RandomSource;
@@ -607,16 +608,34 @@
     /**
      * Unlock the node for swapping.
      * @param logSwapTime If true, log the swap time. */
-    synchronized void unlock(boolean logSwapTime) {
+    void unlock(boolean logSwapTime) {
+       Message nextMessage;
+       synchronized(this) {
         if(!locked)
             throw new IllegalStateException("Unlocking when not locked!");
-        locked = false;
         long lockTime = System.currentTimeMillis() - lockedTime;
         if(logMINOR) {
                Logger.minor(this, "Unlocking on port 
"+node.getDarknetPortNumber());
                Logger.minor(this, "lockTime: "+lockTime);
         }
         averageSwapTime.report(lockTime);
+        
+        if(incomingMessageQueue.isEmpty()) {
+               locked = false;
+               return;
+        }
+        
+        // Otherwise, stay locked, and start the next one from the queue.
+        
+        nextMessage = (Message) incomingMessageQueue.removeFirst();
+        
+       }
+       
+        long oldID = nextMessage.getLong(DMT.UID);
+        long newID = oldID+1;
+        PeerNode pn = (PeerNode) nextMessage.getSource();
+
+       innerHandleSwapRequest(oldID, newID, pn, nextMessage);
     }

     /**
@@ -734,6 +753,11 @@
         }
     }

+    /** Queue of swap requests to handle after this one. */
+    private final LinkedList incomingMessageQueue = new LinkedList();
+    
+    static final int MAX_INCOMING_QUEUE_LENGTH = 10;
+    
     /**
      * Handle an incoming SwapRequest
      * @return True if we have handled the message, false if it needs
@@ -796,6 +820,7 @@
         if(htl <= 0) {
                if(logMINOR) Logger.minor(this, "Accepting?... "+oldID);
             // Accept - handle locally
+               lockOrQueue(m, oldID, newID, pn);
             if(!lock()) {
                if(logMINOR) Logger.minor(this, "Can't obtain lock on "+oldID+" 
- rejecting to "+pn);
                 // Reject
@@ -853,6 +878,48 @@
         }
     }

+    /**
+     * If we can obtain the lock, then execute the swap by calling 
innerHandleSwapRequest().
+     * If we can queue the message, queue it.
+     * Otherwise, reject it.
+     */
+    void lockOrQueue(Message msg, long oldID, long newID, PeerNode pn) {
+       boolean runNow = false;
+       boolean reject = false;
+       synchronized(this) {
+               if(!locked) {
+                       locked = true;
+                       runNow = true;
+               } else {
+                       // Locked.
+                       if(incomingMessageQueue.size() > 
MAX_INCOMING_QUEUE_LENGTH) {
+                               // Reject anyway.
+                               runNow = false;
+                       } else {
+                               // Queue it.
+                               incomingMessageQueue.addLast(msg);
+                       }
+               }
+       }
+       if(reject) {
+            Message rejected = DMT.createFNPSwapRejected(oldID);
+            try {
+                pn.sendAsync(rejected, null, 0, null);
+            } catch (NotConnectedException e1) {
+               if(logMINOR) Logger.minor(this, "Lost connection rejecting 
SwapRequest (locked) from "+pn);
+            }
+            swapsRejectedAlreadyLocked++;
+       } else if(runNow) {
+               boolean logSwapTime = false;
+               try {
+                       innerHandleSwapRequest(oldID, newID, pn, msg);
+                       logSwapTime = true;
+               } finally {
+                       unlock(logSwapTime);
+               }
+       } // else it is queued.
+    }
+    
     private void innerHandleSwapRequest(long oldID, long newID, PeerNode pn, 
Message m) {
        RecentlyForwardedItem item = addForwardedItem(oldID, newID, pn, null);
         // Locked, do it


Reply via email to