Author: mrogers
Date: 2006-11-17 09:40:51 +0000 (Fri, 17 Nov 2006)
New Revision: 10969
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
Log:
Finished backoff, made backoff and tokens easy to disable
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 08:29:44 UTC
(rev 10968)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 09:40:51 UTC
(rev 10969)
@@ -12,9 +12,13 @@
public final static double RETX_TIMER = 0.1; // Seconds
// Flow control
+ public final static boolean USE_TOKENS = false;
+ public final static boolean USE_BACKOFF = false;
public final static int FLOW_TOKENS = 20; // Shared by all peers
public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
public final static double DELAY_DECAY = 0.99; // Exp moving average
+ public final static double MAX_DELAY = 2.0; // Reject all, seconds
+ public final static double HIGH_DELAY = 1.0; // Reject some, seconds
public double location; // Routing location
public NetworkInterface net;
@@ -56,7 +60,8 @@
if (Math.random() < 0.25) decrementMinHtl = true;
bandwidth = new TokenBucket (40000, 60000);
// Allocate flow control tokens after a short delay
- Event.schedule (this, Math.random(), ALLOCATE_TOKENS, null);
+ if (USE_TOKENS) Event.schedule (this, Math.random() * 0.1,
+ ALLOCATE_TOKENS, null);
}
// Return true if a connection was added, false if already connected
@@ -114,6 +119,44 @@
else return htl - 1;
}
+ // Return true if the node appears to be overloaded
+ private boolean shouldRejectRequest()
+ {
+ if (delay > MAX_DELAY) return true;
+ if (delay > HIGH_DELAY) {
+ double p = (delay-HIGH_DELAY) / (MAX_DELAY-HIGH_DELAY);
+ if (Math.random() < p) return true;
+ }
+ return false;
+ }
+
+ // Reject a request or insert if the node appears to be overloaded
+ private boolean rejectIfOverloaded (Peer prev, int id)
+ {
+ if (shouldRejectRequest()) {
+ if (prev == null) {
+ // FIXME: throttle
+ }
+ else prev.sendMessage (new RejectedOverload (id, true));
+ return true;
+ }
+ return false;
+ }
+
+ // Reject a request or insert if its search ID has already been seen
+ private boolean rejectIfRecentlySeen (Peer prev, int id)
+ {
+ if (recentlySeenRequests.add (id)) return false;
+
+ log ("rejecting recently seen search " + id);
+ prev.sendMessage (new RejectedLoop (id));
+ if (USE_TOKENS) allocateToken (prev);
+ // Don't forward the same search back to prev
+ MessageHandler mh = messageHandlers.get (id);
+ if (mh != null) mh.removeNextHop (prev);
+ return true;
+ }
+
// Add a CHK to the cache
public void cacheChk (int key)
{
@@ -246,16 +289,9 @@
private void handleChkRequest (ChkRequest r, Peer prev)
{
- // FIXME: reject if overloaded
- if (!recentlySeenRequests.add (r.id)) {
- log ("rejecting recently seen " + r);
- prev.sendMessage (new RejectedLoop (r.id));
- // Don't forward the same search back to prev
- MessageHandler mh = messageHandlers.get (r.id);
- if (mh != null) mh.removeNextHop (prev);
- return;
- }
- if (!getToken (prev)) return;
+ if (USE_BACKOFF && rejectIfOverloaded (prev, r.id)) return;
+ if (USE_TOKENS && !getToken (prev)) return;
+ if (rejectIfRecentlySeen (prev, r.id)) return;
// Accept the search
if (prev != null) {
log ("accepting " + r);
@@ -270,7 +306,7 @@
for (int i = 0; i < 32; i++)
prev.sendMessage (new Block (r.id, i));
}
- allocateToken (prev);
+ if (USE_TOKENS) allocateToken (prev);
return;
}
log ("key " + r.key + " not found in CHK store");
@@ -283,7 +319,7 @@
for (int i = 0; i < 32; i++)
prev.sendMessage (new Block (r.id, i));
}
- allocateToken (prev);
+ if (USE_TOKENS) allocateToken (prev);
return;
}
log ("key " + r.key + " not found in CHK cache");
@@ -295,16 +331,9 @@
private void handleChkInsert (ChkInsert i, Peer prev)
{
- // FIXME: reject if overloaded
- if (!recentlySeenRequests.add (i.id)) {
- log ("rejecting recently seen " + i);
- prev.sendMessage (new RejectedLoop (i.id));
- // Don't forward the same search back to prev
- MessageHandler mh = messageHandlers.get (i.id);
- if (mh != null) mh.removeNextHop (prev);
- return;
- }
- if (!getToken (prev)) return;
+ if (USE_BACKOFF && rejectIfOverloaded (prev, i.id)) return;
+ if (USE_TOKENS && !getToken (prev)) return;
+ if (rejectIfRecentlySeen (prev, i.id)) return;
// Accept the search
if (prev != null) {
log ("accepting " + i);
@@ -318,16 +347,9 @@
private void handleSskRequest (SskRequest r, Peer prev)
{
- // FIXME: reject if overloaded
- if (!recentlySeenRequests.add (r.id)) {
- log ("rejecting recently seen " + r);
- prev.sendMessage (new RejectedLoop (r.id));
- // Don't forward the same search back to prev
- MessageHandler mh = messageHandlers.get (r.id);
- if (mh != null) mh.removeNextHop (prev);
- return;
- }
- if (!getToken (prev)) return;
+ if (USE_BACKOFF && rejectIfOverloaded (prev, r.id)) return;
+ if (USE_TOKENS && !getToken (prev)) return;
+ if (rejectIfRecentlySeen (prev, r.id)) return;
// Look up the public key
boolean pub = pubKeyStore.get (r.key) || pubKeyCache.get(r.key);
if (pub) log ("public key " + r.key + " found in cache");
@@ -348,7 +370,7 @@
prev.sendMessage
(new SskPubKey (r.id, r.key));
}
- allocateToken (prev);
+ if (USE_TOKENS) allocateToken (prev);
return;
}
log ("key " + r.key + " not found in SSK store");
@@ -363,7 +385,7 @@
prev.sendMessage
(new SskPubKey (r.id, r.key));
}
- allocateToken (prev);
+ if (USE_TOKENS) allocateToken (prev);
return;
}
log ("key " + r.key + " not found in SSK cache");
@@ -375,16 +397,9 @@
private void handleSskInsert (SskInsert i, Peer prev)
{
- // FIXME: reject if overloaded
- if (!recentlySeenRequests.add (i.id)) {
- log ("rejecting recently seen " + i);
- prev.sendMessage (new RejectedLoop (i.id));
- // Don't forward the same search back to prev
- MessageHandler mh = messageHandlers.get (i.id);
- if (mh != null) mh.removeNextHop (prev);
- return;
- }
- if (!getToken (prev)) return;
+ if (USE_BACKOFF && rejectIfOverloaded (prev, i.id)) return;
+ if (USE_TOKENS && !getToken (prev)) return;
+ if (rejectIfRecentlySeen (prev, i.id)) return;
// Look up the public key
boolean pub = pubKeyStore.get (i.key) || pubKeyCache.get(i.key);
if (pub) log ("public key " + i.key + " found in cache");
@@ -406,7 +421,7 @@
if (mh == null) log ("no message handler to remove for " + id);
else {
log ("removing message handler for " + id);
- allocateToken (mh.prev);
+ if (USE_TOKENS) allocateToken (mh.prev);
}
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-17 08:29:44 UTC
(rev 10968)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-17 09:40:51 UTC
(rev 10969)
@@ -24,7 +24,7 @@
// Backoff
public final static double INITIAL_BACKOFF = 1.0; // Seconds
public final static double BACKOFF_MULTIPLIER = 2.0;
- public final static double MAX_BACKOFF = 10800000.0; // Three hours!?
+ public final static double MAX_BACKOFF = 10800.0; // Three hours!?
// Out-of-order delivery with duplicate detection
public final static int SEQ_RANGE = 65536;
@@ -253,6 +253,7 @@
// When a local RejectedOverload is received, back off unless backed off
public void localRejectedOverload()
{
+ if (!Node.USE_BACKOFF) return;
double now = Event.time();
if (now < backoffUntil) return; // Already backed off
backoffLength *= BACKOFF_MULTIPLIER;
@@ -264,6 +265,7 @@
// When a search is accepted, reset the backoff length unless backed off
public void successNotOverload()
{
+ if (!Node.USE_BACKOFF) return;
if (Event.time() < backoffUntil) return;
backoffLength = INITIAL_BACKOFF;
log ("resetting backoff length");
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-17 08:29:44 UTC (rev 10968)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-17 09:40:51 UTC (rev 10969)
@@ -78,7 +78,7 @@
htl = node.decrementHtl (htl);
node.log (this + " has htl " + htl);
// Consume a token
- next.tokensOut--;
+ if (Node.USE_TOKENS) next.tokensOut--;
// Forward the search
node.log ("forwarding " + this + " to " + next.address);
next.sendMessage (makeSearchMessage());
@@ -96,11 +96,11 @@
double closestDist = Double.POSITIVE_INFINITY;
Peer closestPeer = null;
for (Peer peer : nexts) {
- if (peer.tokensOut == 0) {
+ if (Node.USE_TOKENS && peer.tokensOut == 0) {
node.log ("bypassing busy peer " + peer);
continue;
}
- if (now < peer.backoffUntil) {
+ if (Node.USE_BACKOFF && now < peer.backoffUntil) {
node.log ("bypassing backed off peer " + peer);
continue;
}
@@ -117,7 +117,6 @@
{
if (searchState != SENT) node.log (rl + " out of order");
next.successNotOverload(); // Reset the backoff length
- next.tokensOut++; // No token was consumed
forwardSearch();
}