Author: mrogers
Date: 2006-11-17 09:40:51 +0000 (Fri, 17 Nov 2006)
New Revision: 10969

Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
Log:
Finished backoff, made backoff and tokens easy to disable

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 08:29:44 UTC 
(rev 10968)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 09:40:51 UTC 
(rev 10969)
@@ -12,9 +12,13 @@
        public final static double RETX_TIMER = 0.1; // Seconds

        // Flow control
+       public final static boolean USE_TOKENS = false;
+       public final static boolean USE_BACKOFF = false;
        public final static int FLOW_TOKENS = 20; // Shared by all peers
        public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
        public final static double DELAY_DECAY = 0.99; // Exp moving average
+       public final static double MAX_DELAY = 2.0; // Reject all, seconds
+       public final static double HIGH_DELAY = 1.0; // Reject some, seconds

        public double location; // Routing location
        public NetworkInterface net;
@@ -56,7 +60,8 @@
                if (Math.random() < 0.25) decrementMinHtl = true;
                bandwidth = new TokenBucket (40000, 60000);
                // Allocate flow control tokens after a short delay
-               Event.schedule (this, Math.random(), ALLOCATE_TOKENS, null);
+               if (USE_TOKENS) Event.schedule (this, Math.random() * 0.1,
+                                               ALLOCATE_TOKENS, null);
        }

        // Return true if a connection was added, false if already connected
@@ -114,6 +119,44 @@
                else return htl - 1;
        }

+       // Return true if the node appears to be overloaded
+       private boolean shouldRejectRequest()
+       {
+               if (delay > MAX_DELAY) return true;
+               if (delay > HIGH_DELAY) {
+                       double p = (delay-HIGH_DELAY) / (MAX_DELAY-HIGH_DELAY);
+                       if (Math.random() < p) return true;
+               }
+               return false;
+       }
+       
+       // Reject a request or insert if the node appears to be overloaded
+       private boolean rejectIfOverloaded (Peer prev, int id)
+       {
+               if (shouldRejectRequest()) {
+                       if (prev == null) {
+                               // FIXME: throttle
+                       }
+                       else prev.sendMessage (new RejectedOverload (id, true));
+                       return true;
+               }
+               return false;
+       }
+       
+       // Reject a request or insert if its search ID has already been seen
+       private boolean rejectIfRecentlySeen (Peer prev, int id)
+       {
+               if (recentlySeenRequests.add (id)) return false;
+               
+               log ("rejecting recently seen search " + id);
+               prev.sendMessage (new RejectedLoop (id));
+               if (USE_TOKENS) allocateToken (prev);
+               // Don't forward the same search back to prev
+               MessageHandler mh = messageHandlers.get (id);
+               if (mh != null) mh.removeNextHop (prev);
+               return true;
+       }
+       
        // Add a CHK to the cache
        public void cacheChk (int key)
        {
@@ -246,16 +289,9 @@

        private void handleChkRequest (ChkRequest r, Peer prev)
        {
-               // FIXME: reject if overloaded
-               if (!recentlySeenRequests.add (r.id)) {
-                       log ("rejecting recently seen " + r);
-                       prev.sendMessage (new RejectedLoop (r.id));
-                       // Don't forward the same search back to prev
-                       MessageHandler mh = messageHandlers.get (r.id);
-                       if (mh != null) mh.removeNextHop (prev);
-                       return;
-               }
-               if (!getToken (prev)) return;
+               if (USE_BACKOFF && rejectIfOverloaded (prev, r.id)) return;
+               if (USE_TOKENS && !getToken (prev)) return;
+               if (rejectIfRecentlySeen (prev, r.id)) return;
                // Accept the search
                if (prev != null) {
                        log ("accepting " + r);
@@ -270,7 +306,7 @@
                                for (int i = 0; i < 32; i++)
                                        prev.sendMessage (new Block (r.id, i));
                        }
-                       allocateToken (prev);
+                       if (USE_TOKENS) allocateToken (prev);
                        return;
                }
                log ("key " + r.key + " not found in CHK store");
@@ -283,7 +319,7 @@
                                for (int i = 0; i < 32; i++)
                                        prev.sendMessage (new Block (r.id, i));
                        }
-                       allocateToken (prev);
+                       if (USE_TOKENS) allocateToken (prev);
                        return;
                }
                log ("key " + r.key + " not found in CHK cache");
@@ -295,16 +331,9 @@

        private void handleChkInsert (ChkInsert i, Peer prev)
        {
-               // FIXME: reject if overloaded
-               if (!recentlySeenRequests.add (i.id)) {
-                       log ("rejecting recently seen " + i);
-                       prev.sendMessage (new RejectedLoop (i.id));
-                       // Don't forward the same search back to prev
-                       MessageHandler mh = messageHandlers.get (i.id);
-                       if (mh != null) mh.removeNextHop (prev);
-                       return;
-               }
-               if (!getToken (prev)) return;
+               if (USE_BACKOFF && rejectIfOverloaded (prev, i.id)) return;
+               if (USE_TOKENS && !getToken (prev)) return;
+               if (rejectIfRecentlySeen (prev, i.id)) return;
                // Accept the search
                if (prev != null) {
                        log ("accepting " + i);
@@ -318,16 +347,9 @@

        private void handleSskRequest (SskRequest r, Peer prev)
        {
-               // FIXME: reject if overloaded
-               if (!recentlySeenRequests.add (r.id)) {
-                       log ("rejecting recently seen " + r);
-                       prev.sendMessage (new RejectedLoop (r.id));
-                       // Don't forward the same search back to prev
-                       MessageHandler mh = messageHandlers.get (r.id);
-                       if (mh != null) mh.removeNextHop (prev);
-                       return;
-               }
-               if (!getToken (prev)) return;
+               if (USE_BACKOFF && rejectIfOverloaded (prev, r.id)) return;
+               if (USE_TOKENS && !getToken (prev)) return;
+               if (rejectIfRecentlySeen (prev, r.id)) return;
                // Look up the public key
                boolean pub = pubKeyStore.get (r.key) || pubKeyCache.get(r.key);
                if (pub) log ("public key " + r.key + " found in cache");
@@ -348,7 +370,7 @@
                                        prev.sendMessage
                                                (new SskPubKey (r.id, r.key));
                        }
-                       allocateToken (prev);
+                       if (USE_TOKENS) allocateToken (prev);
                        return;
                }
                log ("key " + r.key + " not found in SSK store");
@@ -363,7 +385,7 @@
                                        prev.sendMessage
                                                (new SskPubKey (r.id, r.key));
                        }
-                       allocateToken (prev);
+                       if (USE_TOKENS) allocateToken (prev);
                        return;
                }
                log ("key " + r.key + " not found in SSK cache");
@@ -375,16 +397,9 @@

        private void handleSskInsert (SskInsert i, Peer prev)
        {
-               // FIXME: reject if overloaded
-               if (!recentlySeenRequests.add (i.id)) {
-                       log ("rejecting recently seen " + i);
-                       prev.sendMessage (new RejectedLoop (i.id));
-                       // Don't forward the same search back to prev
-                       MessageHandler mh = messageHandlers.get (i.id);
-                       if (mh != null) mh.removeNextHop (prev);
-                       return;
-               }
-               if (!getToken (prev)) return;
+               if (USE_BACKOFF && rejectIfOverloaded (prev, i.id)) return;
+               if (USE_TOKENS && !getToken (prev)) return;
+               if (rejectIfRecentlySeen (prev, i.id)) return;
                // Look up the public key
                boolean pub = pubKeyStore.get (i.key) || pubKeyCache.get(i.key);
                if (pub) log ("public key " + i.key + " found in cache");
@@ -406,7 +421,7 @@
                if (mh == null) log ("no message handler to remove for " + id);
                else {
                        log ("removing message handler for " + id);
-                       allocateToken (mh.prev);
+                       if (USE_TOKENS) allocateToken (mh.prev);
                }
        }


Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-17 08:29:44 UTC 
(rev 10968)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-17 09:40:51 UTC 
(rev 10969)
@@ -24,7 +24,7 @@
        // Backoff
        public final static double INITIAL_BACKOFF = 1.0; // Seconds
        public final static double BACKOFF_MULTIPLIER = 2.0;
-       public final static double MAX_BACKOFF = 10800000.0; // Three hours!?
+       public final static double MAX_BACKOFF = 10800.0; // Three hours!?

        // Out-of-order delivery with duplicate detection
        public final static int SEQ_RANGE = 65536;
@@ -253,6 +253,7 @@
        // When a local RejectedOverload is received, back off unless backed off
        public void localRejectedOverload()
        {
+               if (!Node.USE_BACKOFF) return;
                double now = Event.time();
                if (now < backoffUntil) return; // Already backed off
                backoffLength *= BACKOFF_MULTIPLIER;
@@ -264,6 +265,7 @@
        // When a search is accepted, reset the backoff length unless backed off
        public void successNotOverload()
        {
+               if (!Node.USE_BACKOFF) return;
                if (Event.time() < backoffUntil) return;
                backoffLength = INITIAL_BACKOFF;
                log ("resetting backoff length");

Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-17 08:29:44 UTC (rev 10968)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-17 09:40:51 UTC (rev 10969)
@@ -78,7 +78,7 @@
                        htl = node.decrementHtl (htl);
                node.log (this + " has htl " + htl);
                // Consume a token
-               next.tokensOut--;
+               if (Node.USE_TOKENS) next.tokensOut--;
                // Forward the search
                node.log ("forwarding " + this + " to " + next.address);
                next.sendMessage (makeSearchMessage());
@@ -96,11 +96,11 @@
                double closestDist = Double.POSITIVE_INFINITY;
                Peer closestPeer = null;
                for (Peer peer : nexts) {
-                       if (peer.tokensOut == 0) {
+                       if (Node.USE_TOKENS && peer.tokensOut == 0) {
                                node.log ("bypassing busy peer " + peer);
                                continue;
                        }
-                       if (now < peer.backoffUntil) {
+                       if (Node.USE_BACKOFF && now < peer.backoffUntil) {
                                node.log ("bypassing backed off peer " + peer);
                                continue;
                        }
@@ -117,7 +117,6 @@
        {
                if (searchState != SENT) node.log (rl + " out of order");
                next.successNotOverload(); // Reset the backoff length
-               next.tokensOut++; // No token was consumed
                forwardSearch();
        }



Reply via email to