Author: mrogers
Date: 2006-11-17 13:51:32 +0000 (Fri, 17 Nov 2006)
New Revision: 10972

Added:
   trunk/apps/load-balancing-sims/phase7/sim/SearchThrottle.java
Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
Log:
Rudimentary search throttle (not tested)

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 11:52:30 UTC 
(rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 13:51:32 UTC 
(rev 10972)
@@ -5,6 +5,7 @@
 import java.util.HashSet;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;

 public class Node implements EventTarget
 {
@@ -14,6 +15,7 @@
        // Flow control
        public final static boolean USE_TOKENS = false;
        public final static boolean USE_BACKOFF = false;
+       public final static boolean USE_THROTTLE = false;
        public final static int FLOW_TOKENS = 20; // Shared by all peers
        public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
        public final static double DELAY_DECAY = 0.99; // Exp moving average
@@ -37,6 +39,8 @@
        private boolean timerRunning = false;
        private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
        private double delay = 0.0; // Delay caused by congestion or b/w limiter
+       private LinkedList<Search> searchQueue;
+       private SearchThrottle searchThrottle;

        public Node (double txSpeed, double rxSpeed)
        {
@@ -62,6 +66,8 @@
                // Allocate flow control tokens after a short delay
                if (USE_TOKENS) Event.schedule (this, Math.random() * 0.1,
                                                ALLOCATE_TOKENS, null);
+               searchQueue = new LinkedList<Search>();
+               if (USE_THROTTLE) searchThrottle = new SearchThrottle();
        }

        // Return true if a connection was added, false if already connected
@@ -133,12 +139,9 @@
        // Reject a request or insert if the node appears to be overloaded
        private boolean rejectIfOverloaded (Peer prev, int id)
        {
+               if (prev == null) return false;
                if (shouldRejectSearch()) {
-                       if (prev == null) {
-                               // FIXME
-                               log ("rejecting local search " + id);
-                       }
-                       else prev.sendMessage (new RejectedOverload (id, true));
+                       prev.sendMessage (new RejectedOverload (id, true));
                        return true;
                }
                return false;
@@ -419,13 +422,13 @@

        public void searchSucceeded (MessageHandler m)
        {
-               // FIXME: increase the rate of the relevant throttle
                log (m + " succeeded");
+               if (USE_THROTTLE) searchThrottle.increaseRate();
        }

        public void reduceSearchRate (MessageHandler m)
        {
-               // FIXME: reduce the rate of the relevant throttle
+               if (USE_THROTTLE) searchThrottle.decreaseRate();
        }

        public void removeMessageHandler (int id)
@@ -484,36 +487,71 @@
                Event.log (net.address + " " + message);
        }

+       // Add a search to the queue
+       private void addToSearchQueue (Search s)
+       {
+               searchQueue.add (s);
+               if (USE_THROTTLE && searchQueue.size() == 1) {
+                       double now = Event.time();
+                       double then = searchThrottle.nextSearchTime (now);
+                       Event.schedule (this, then - now, SEND_SEARCH, null);
+               }
+               else sendSearch();
+       }
+       
+       // Remove the first search from the queue and send it
+       private void sendSearch()
+       {
+               Search s = searchQueue.poll();
+               if (s instanceof ChkRequest)
+                       handleChkRequest ((ChkRequest) s, null);
+               else if (s instanceof ChkInsert) {
+                       handleChkInsert ((ChkInsert) s, null);
+                       handleMessage (new DataInsert (s.id), null);
+                       for (int i = 0; i < 32; i++)
+                               handleMessage (new Block (s.id, i), null);
+               }
+               else if (s instanceof SskRequest)
+                       handleSskRequest ((SskRequest) s, null);
+               else if (s instanceof SskInsert) {
+                       pubKeyCache.put (s.key);
+                       handleSskInsert ((SskInsert) s, null);
+               }
+               if (USE_THROTTLE) {
+                       searchThrottle.searchSent();
+                       if (searchQueue.isEmpty()) return;
+                       double now = Event.time();
+                       double then = searchThrottle.nextSearchTime (now);
+                       Event.schedule (this, then - now, SEND_SEARCH, null);
+               }
+       }
+       
        public void generateChkRequest (int key)
        {
                ChkRequest cr = new ChkRequest (key, location);
                log ("generating " + cr);
-               handleChkRequest (cr, null);
+               addToSearchQueue (cr);
        }

        public void generateChkInsert (int key)
        {
                ChkInsert ci = new ChkInsert (key, location);
                log ("generating " + ci);
-               handleChkInsert (ci, null);
-               handleMessage (new DataInsert (ci.id), null);
-               for (int i = 0; i < 32; i++)
-                       handleMessage (new Block (ci.id, i), null);
+               addToSearchQueue (ci);
        }

        public void generateSskRequest (int key)
        {
                SskRequest sr = new SskRequest (key, location, true);
                log ("generating " + sr);
-               handleSskRequest (sr, null);
+               addToSearchQueue (sr);
        }

        public void generateSskInsert (int key, int value)
        {
                SskInsert si = new SskInsert (key, value, location);
                log ("generating " + si);
-               pubKeyCache.put (key);
-               handleSskInsert (si, null);
+               addToSearchQueue (si);
        }

        private void checkTimeouts()
@@ -570,6 +608,10 @@
                        case ALLOCATE_TOKENS:
                        allocateTokens();
                        break;
+                       
+                       case SEND_SEARCH:
+                       sendSearch();
+                       break;
                }
        }

@@ -580,4 +622,5 @@
        public final static int SSK_COLLISION = 5;
        private final static int CHECK_TIMEOUTS = 6;
        private final static int ALLOCATE_TOKENS = 7;
+       private final static int SEND_SEARCH = 8;
 }

Added: trunk/apps/load-balancing-sims/phase7/sim/SearchThrottle.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/SearchThrottle.java       
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/SearchThrottle.java       
2006-11-17 13:51:32 UTC (rev 10972)
@@ -0,0 +1,45 @@
+// An AIMD token bucket
+
+package sim;
+
+public class SearchThrottle
+{
+       public final static double INITIAL_RATE = 5.0; // Searches per second
+       public final static double MAX_RATE = 50.0;
+       public final static double MIN_RATE = 1.0 / 300.0;
+       public final static double ALPHA = 0.082; // AIMD increase parameter
+       public final static double BETA = 0.969; // AIMD decrease parameter
+       
+       private double rate = INITIAL_RATE;
+       private double tokens = INITIAL_RATE, size = INITIAL_RATE;
+       private double lastUpdated = 0.0; // Time
+       
+       public void increaseRate()
+       {
+               rate += ALPHA;
+               if (rate > MAX_RATE) rate = MAX_RATE;
+               Event.log ("rate increased to " + rate);
+       }
+       
+       public void decreaseRate()
+       {
+               rate *= BETA;
+               if (rate < MIN_RATE) rate = MIN_RATE;
+               Event.log ("rate decreased to " + rate);
+       }
+       
+       // Return the time when the next search can be sent
+       public double nextSearchTime (double now)
+       {
+               tokens += (now - lastUpdated) * rate;
+               if (tokens > size) tokens = size;
+               lastUpdated = now;
+               if (tokens >= 1.0) return now;
+               else return now + (1.0 - tokens) * rate;
+       }
+       
+       public void searchSent()
+       {
+               tokens--;
+       }
+}

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-17 13:51:32 UTC (rev 10972)
@@ -113,7 +113,6 @@
        private void handleInsertReply (InsertReply ir)
        {
                if (searchState != ACCEPTED) node.log (ir + " out of order");
-               next.successNotOverload(); // Reset the backoff length
                if (prev == null) node.searchSucceeded (this);
                else prev.sendMessage (ir); // Forward the message
                finish();

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-17 13:51:32 UTC (rev 10972)
@@ -40,7 +40,6 @@
        {
                if (searchState != ACCEPTED) node.log (df + " out of order");
                searchState = TRANSFERRING;
-               next.successNotOverload(); // Reset the backoff length
                if (prev != null) prev.sendMessage (df); // Forward the message
                // If we have all the blocks and the headers, cache the data
                if (blocksReceived == 32) {

Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-17 13:51:32 UTC (rev 10972)
@@ -134,7 +134,6 @@
        protected void handleRouteNotFound (RouteNotFound rnf)
        {
                if (searchState != ACCEPTED) node.log (rnf + " out of order");
-               next.successNotOverload(); // Reset the backoff length
                // Use the remaining htl to try another peer
                if (rnf.htl < htl) htl = rnf.htl;
                forwardSearch();

Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java      
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java      
2006-11-17 13:51:32 UTC (rev 10972)
@@ -29,7 +29,6 @@
        protected void handleDataNotFound (DataNotFound dnf)
        {
                if (searchState != ACCEPTED) node.log (dnf + " out of order");
-               next.successNotOverload(); // Reset the backoff length
                if (prev == null) node.log (this + " failed");
                else prev.sendMessage (dnf); // Forward the message
                finish();

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java    
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java    
2006-11-17 13:51:32 UTC (rev 10972)
@@ -82,6 +82,7 @@
        {
                if (searchState != SENT) node.log (sa + " out of order");
                searchState = ACCEPTED;
+               next.successNotOverload(); // Reset the backoff length
                // Wait 60 seconds for a reply to the search
                Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
                // Send the public key if requested
@@ -91,7 +92,6 @@
        private void handleCollision (SskDataFound sdf)
        {
                if (searchState != ACCEPTED) node.log (sdf + " out of order");
-               // FIXME: should we reset the backoff length?
                if (prev == null) node.log (this + " collided");
                else prev.sendMessage (sdf); // Forward the message
                data = sdf.data; // Is this safe?
@@ -100,7 +100,6 @@
        private void handleInsertReply (InsertReply ir)
        {
                if (searchState != ACCEPTED) node.log (ir + " out of order");
-               next.successNotOverload(); // Reset the backoff length
                if (prev == null) node.searchSucceeded (this);
                else prev.sendMessage (ir); // Forward the message
                finish();

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java   
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java   
2006-11-17 13:51:32 UTC (rev 10972)
@@ -42,7 +42,6 @@
        private void handleSskDataFound (SskDataFound df)
        {
                if (searchState != ACCEPTED) node.log (df + " out of order");
-               next.successNotOverload(); // Reset the backoff length
                dataFound = df;
                if (pubKey == null) return; // Keep waiting
                if (prev == null) node.searchSucceeded (this);
@@ -58,7 +57,6 @@
        private void handleSskPubKey (SskPubKey pk)
        {
                if (searchState != ACCEPTED) node.log (pk + " out of order");
-               next.successNotOverload(); // Reset the backoff length
                pubKey = pk;
                if (dataFound == null) return; // Keep waiting
                if (prev == null) node.searchSucceeded (this);


Reply via email to