Author: mrogers
Date: 2006-11-17 13:51:32 +0000 (Fri, 17 Nov 2006)
New Revision: 10972
Added:
trunk/apps/load-balancing-sims/phase7/sim/SearchThrottle.java
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
Log:
Rudimentary search throttle (not tested)
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 11:52:30 UTC
(rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 13:51:32 UTC
(rev 10972)
@@ -5,6 +5,7 @@
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
public class Node implements EventTarget
{
@@ -14,6 +15,7 @@
// Flow control
public final static boolean USE_TOKENS = false;
public final static boolean USE_BACKOFF = false;
+ public final static boolean USE_THROTTLE = false;
public final static int FLOW_TOKENS = 20; // Shared by all peers
public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
public final static double DELAY_DECAY = 0.99; // Exp moving average
@@ -37,6 +39,8 @@
private boolean timerRunning = false;
private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
private double delay = 0.0; // Delay caused by congestion or b/w limiter
+ private LinkedList<Search> searchQueue;
+ private SearchThrottle searchThrottle;
public Node (double txSpeed, double rxSpeed)
{
@@ -62,6 +66,8 @@
// Allocate flow control tokens after a short delay
if (USE_TOKENS) Event.schedule (this, Math.random() * 0.1,
ALLOCATE_TOKENS, null);
+ searchQueue = new LinkedList<Search>();
+ if (USE_THROTTLE) searchThrottle = new SearchThrottle();
}
// Return true if a connection was added, false if already connected
@@ -133,12 +139,9 @@
// Reject a request or insert if the node appears to be overloaded
private boolean rejectIfOverloaded (Peer prev, int id)
{
+ if (prev == null) return false;
if (shouldRejectSearch()) {
- if (prev == null) {
- // FIXME
- log ("rejecting local search " + id);
- }
- else prev.sendMessage (new RejectedOverload (id, true));
+ prev.sendMessage (new RejectedOverload (id, true));
return true;
}
return false;
@@ -419,13 +422,13 @@
public void searchSucceeded (MessageHandler m)
{
- // FIXME: increase the rate of the relevant throttle
log (m + " succeeded");
+ if (USE_THROTTLE) searchThrottle.increaseRate();
}
public void reduceSearchRate (MessageHandler m)
{
- // FIXME: reduce the rate of the relevant throttle
+ if (USE_THROTTLE) searchThrottle.decreaseRate();
}
public void removeMessageHandler (int id)
@@ -484,36 +487,71 @@
Event.log (net.address + " " + message);
}
+ // Add a search to the queue
+ private void addToSearchQueue (Search s)
+ {
+ searchQueue.add (s);
+ if (USE_THROTTLE && searchQueue.size() == 1) {
+ double now = Event.time();
+ double then = searchThrottle.nextSearchTime (now);
+ Event.schedule (this, then - now, SEND_SEARCH, null);
+ }
+ else sendSearch();
+ }
+
+ // Remove the first search from the queue and send it
+ private void sendSearch()
+ {
+ Search s = searchQueue.poll();
+ if (s instanceof ChkRequest)
+ handleChkRequest ((ChkRequest) s, null);
+ else if (s instanceof ChkInsert) {
+ handleChkInsert ((ChkInsert) s, null);
+ handleMessage (new DataInsert (s.id), null);
+ for (int i = 0; i < 32; i++)
+ handleMessage (new Block (s.id, i), null);
+ }
+ else if (s instanceof SskRequest)
+ handleSskRequest ((SskRequest) s, null);
+ else if (s instanceof SskInsert) {
+ pubKeyCache.put (s.key);
+ handleSskInsert ((SskInsert) s, null);
+ }
+ if (USE_THROTTLE) {
+ searchThrottle.searchSent();
+ if (searchQueue.isEmpty()) return;
+ double now = Event.time();
+ double then = searchThrottle.nextSearchTime (now);
+ Event.schedule (this, then - now, SEND_SEARCH, null);
+ }
+ }
+
public void generateChkRequest (int key)
{
ChkRequest cr = new ChkRequest (key, location);
log ("generating " + cr);
- handleChkRequest (cr, null);
+ addToSearchQueue (cr);
}
public void generateChkInsert (int key)
{
ChkInsert ci = new ChkInsert (key, location);
log ("generating " + ci);
- handleChkInsert (ci, null);
- handleMessage (new DataInsert (ci.id), null);
- for (int i = 0; i < 32; i++)
- handleMessage (new Block (ci.id, i), null);
+ addToSearchQueue (ci);
}
public void generateSskRequest (int key)
{
SskRequest sr = new SskRequest (key, location, true);
log ("generating " + sr);
- handleSskRequest (sr, null);
+ addToSearchQueue (sr);
}
public void generateSskInsert (int key, int value)
{
SskInsert si = new SskInsert (key, value, location);
log ("generating " + si);
- pubKeyCache.put (key);
- handleSskInsert (si, null);
+ addToSearchQueue (si);
}
private void checkTimeouts()
@@ -570,6 +608,10 @@
case ALLOCATE_TOKENS:
allocateTokens();
break;
+
+ case SEND_SEARCH:
+ sendSearch();
+ break;
}
}
@@ -580,4 +622,5 @@
public final static int SSK_COLLISION = 5;
private final static int CHECK_TIMEOUTS = 6;
private final static int ALLOCATE_TOKENS = 7;
+ private final static int SEND_SEARCH = 8;
}
Added: trunk/apps/load-balancing-sims/phase7/sim/SearchThrottle.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/SearchThrottle.java
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/SearchThrottle.java
2006-11-17 13:51:32 UTC (rev 10972)
@@ -0,0 +1,45 @@
+// An AIMD token bucket
+
+package sim;
+
+public class SearchThrottle
+{
+ public final static double INITIAL_RATE = 5.0; // Searches per second
+ public final static double MAX_RATE = 50.0;
+ public final static double MIN_RATE = 1.0 / 300.0;
+ public final static double ALPHA = 0.082; // AIMD increase parameter
+ public final static double BETA = 0.969; // AIMD decrease parameter
+
+ private double rate = INITIAL_RATE;
+ private double tokens = INITIAL_RATE, size = INITIAL_RATE;
+ private double lastUpdated = 0.0; // Time
+
+ public void increaseRate()
+ {
+ rate += ALPHA;
+ if (rate > MAX_RATE) rate = MAX_RATE;
+ Event.log ("rate increased to " + rate);
+ }
+
+ public void decreaseRate()
+ {
+ rate *= BETA;
+ if (rate < MIN_RATE) rate = MIN_RATE;
+ Event.log ("rate decreased to " + rate);
+ }
+
+ // Return the time when the next search can be sent
+ public double nextSearchTime (double now)
+ {
+ tokens += (now - lastUpdated) * rate;
+ if (tokens > size) tokens = size;
+ lastUpdated = now;
+ if (tokens >= 1.0) return now;
+ else return now + (1.0 - tokens) * rate;
+ }
+
+ public void searchSent()
+ {
+ tokens--;
+ }
+}
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-17 13:51:32 UTC (rev 10972)
@@ -113,7 +113,6 @@
private void handleInsertReply (InsertReply ir)
{
if (searchState != ACCEPTED) node.log (ir + " out of order");
- next.successNotOverload(); // Reset the backoff length
if (prev == null) node.searchSucceeded (this);
else prev.sendMessage (ir); // Forward the message
finish();
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-17 13:51:32 UTC (rev 10972)
@@ -40,7 +40,6 @@
{
if (searchState != ACCEPTED) node.log (df + " out of order");
searchState = TRANSFERRING;
- next.successNotOverload(); // Reset the backoff length
if (prev != null) prev.sendMessage (df); // Forward the message
// If we have all the blocks and the headers, cache the data
if (blocksReceived == 32) {
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-17 13:51:32 UTC (rev 10972)
@@ -134,7 +134,6 @@
protected void handleRouteNotFound (RouteNotFound rnf)
{
if (searchState != ACCEPTED) node.log (rnf + " out of order");
- next.successNotOverload(); // Reset the backoff length
// Use the remaining htl to try another peer
if (rnf.htl < htl) htl = rnf.htl;
forwardSearch();
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
2006-11-17 13:51:32 UTC (rev 10972)
@@ -29,7 +29,6 @@
protected void handleDataNotFound (DataNotFound dnf)
{
if (searchState != ACCEPTED) node.log (dnf + " out of order");
- next.successNotOverload(); // Reset the backoff length
if (prev == null) node.log (this + " failed");
else prev.sendMessage (dnf); // Forward the message
finish();
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-17 13:51:32 UTC (rev 10972)
@@ -82,6 +82,7 @@
{
if (searchState != SENT) node.log (sa + " out of order");
searchState = ACCEPTED;
+ next.successNotOverload(); // Reset the backoff length
// Wait 60 seconds for a reply to the search
Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
// Send the public key if requested
@@ -91,7 +92,6 @@
private void handleCollision (SskDataFound sdf)
{
if (searchState != ACCEPTED) node.log (sdf + " out of order");
- // FIXME: should we reset the backoff length?
if (prev == null) node.log (this + " collided");
else prev.sendMessage (sdf); // Forward the message
data = sdf.data; // Is this safe?
@@ -100,7 +100,6 @@
private void handleInsertReply (InsertReply ir)
{
if (searchState != ACCEPTED) node.log (ir + " out of order");
- next.successNotOverload(); // Reset the backoff length
if (prev == null) node.searchSucceeded (this);
else prev.sendMessage (ir); // Forward the message
finish();
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
2006-11-17 11:52:30 UTC (rev 10971)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
2006-11-17 13:51:32 UTC (rev 10972)
@@ -42,7 +42,6 @@
private void handleSskDataFound (SskDataFound df)
{
if (searchState != ACCEPTED) node.log (df + " out of order");
- next.successNotOverload(); // Reset the backoff length
dataFound = df;
if (pubKey == null) return; // Keep waiting
if (prev == null) node.searchSucceeded (this);
@@ -58,7 +57,6 @@
private void handleSskPubKey (SskPubKey pk)
{
if (searchState != ACCEPTED) node.log (pk + " out of order");
- next.successNotOverload(); // Reset the backoff length
pubKey = pk;
if (dataFound == null) return; // Keep waiting
if (prev == null) node.searchSucceeded (this);