Author: mrogers
Date: 2006-11-02 18:50:28 +0000 (Thu, 02 Nov 2006)
New Revision: 10797
Added:
trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
trunk/apps/load-balancing-sims/phase7/sim/Sim.java
trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
Log:
Token passing (not tested yet)
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-02 15:00:48 UTC
(rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-02 18:50:28 UTC
(rev 10797)
@@ -11,6 +11,10 @@
// Coarse-grained retransmission timer
public final static double RETX_TIMER = 0.1; // Seconds
+ // Flow control
+ public final static int FLOW_TOKENS = 20; // Shared by all peers
+ public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
+
public double location; // Routing location
public NetworkInterface net;
private HashMap<Integer,Peer> peers; // Look up a peer by its address
@@ -25,6 +29,7 @@
private boolean decrementMinHtl = false;
public TokenBucket bandwidth; // Bandwidth limiter
private boolean timerRunning = false;
+ private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
public Node (double txSpeed, double rxSpeed)
{
@@ -45,7 +50,9 @@
pubKeyCache = new LruCache<Integer> (16000);
if (Math.random() < 0.5) decrementMaxHtl = true;
if (Math.random() < 0.25) decrementMinHtl = true;
- bandwidth = new TokenBucket (15000, 60000);
+ bandwidth = new TokenBucket (40000, 60000);
+ // Allocate flow control tokens after a short delay
+ Event.schedule (this, Math.random(), ALLOCATE_TOKENS, null);
}
// Return true if a connection was added, false if already connected
@@ -173,7 +180,9 @@
public void handleMessage (Message m, Peer src)
{
if (src != null) log ("received " + m + " from " + src);
- if (m instanceof ChkRequest)
+ if (m instanceof Token)
+ handleToken ((Token) m, src);
+ else if (m instanceof ChkRequest)
handleChkRequest ((ChkRequest) m, src);
else if (m instanceof ChkInsert)
handleChkInsert ((ChkInsert) m, src);
@@ -188,6 +197,11 @@
}
}
+ private void handleToken (Token t, Peer prev)
+ {
+ prev.tokensOut += t.id; // t.id is the number of tokens
+ }
+
private void handleChkRequest (ChkRequest r, Peer prev)
{
if (!recentlySeenRequests.add (r.id)) {
@@ -198,6 +212,7 @@
if (mh != null) mh.removeNextHop (prev);
return;
}
+ if (!getToken (prev)) return;
// Accept the search
if (prev != null) {
log ("accepting " + r);
@@ -212,6 +227,7 @@
for (int i = 0; i < 32; i++)
prev.sendMessage (new Block (r.id, i));
}
+ allocateToken (prev);
return;
}
log ("key " + r.key + " not found in CHK store");
@@ -224,6 +240,7 @@
for (int i = 0; i < 32; i++)
prev.sendMessage (new Block (r.id, i));
}
+ allocateToken (prev);
return;
}
log ("key " + r.key + " not found in CHK cache");
@@ -243,6 +260,7 @@
if (mh != null) mh.removeNextHop (prev);
return;
}
+ if (!getToken (prev)) return;
// Accept the search
if (prev != null) {
log ("accepting " + i);
@@ -264,6 +282,7 @@
if (mh != null) mh.removeNextHop (prev);
return;
}
+ if (!getToken (prev)) return;
// Look up the public key
boolean pub = pubKeyCache.get (r.key);
if (pub) log ("public key " + r.key + " found in cache");
@@ -284,6 +303,7 @@
prev.sendMessage
(new SskPubKey (r.id, r.key));
}
+ allocateToken (prev);
return;
}
log ("key " + r.key + " not found in SSK store");
@@ -298,6 +318,7 @@
prev.sendMessage
(new SskPubKey (r.id, r.key));
}
+ allocateToken (prev);
return;
}
log ("key " + r.key + " not found in SSK cache");
@@ -317,6 +338,7 @@
if (mh != null) mh.removeNextHop (prev);
return;
}
+ if (!getToken (prev)) return;
// Look up the public key
boolean pub = pubKeyCache.get (i.key);
if (pub) log ("public key " + i.key + " found in cache");
@@ -336,9 +358,45 @@
{
MessageHandler mh = messageHandlers.remove (id);
if (mh == null) log ("no message handler to remove for " + id);
- else log ("removing message handler for " + id);
+ else {
+ log ("removing message handler for " + id);
+ allocateToken (mh.prev);
+ }
}
+ // Check whether the peer sendng a request or insert has enough tokens
+ private boolean getToken (Peer p)
+ {
+ if (p == null) {
+ if (spareTokens == 0) {
+ // The client will have to wait
+ log ("not enough tokens");
+ return false;
+ }
+ spareTokens--;
+ return true;
+ }
+ else {
+ if (p.tokensIn == 0) {
+ // This indicates a misbehaving sender
+ log ("WARNING: not enough tokens");
+ return false;
+ }
+ p.tokensIn--;
+ return true;
+ }
+ }
+
+ // Give another token to the peer whose request/insert just completed
+ private void allocateToken (Peer p)
+ {
+ if (p == null) spareTokens++;
+ else {
+ p.tokensIn++;
+ p.sendMessage (new Token (1));
+ }
+ }
+
// Return the list of peers in a random order
public ArrayList<Peer> peers()
{
@@ -352,16 +410,14 @@
Event.log (net.address + " " + message);
}
- // Event callbacks
-
- private void generateChkRequest (int key)
+ public void generateChkRequest (int key)
{
ChkRequest cr = new ChkRequest (key, location);
log ("generating " + cr);
handleChkRequest (cr, null);
}
- private void generateChkInsert (int key)
+ public void generateChkInsert (int key)
{
ChkInsert ci = new ChkInsert (key, location);
log ("generating " + ci);
@@ -371,14 +427,14 @@
handleMessage (new Block (ci.id, i), null);
}
- private void generateSskRequest (int key)
+ public void generateSskRequest (int key)
{
SskRequest sr = new SskRequest (key, location, true);
log ("generating " + sr);
handleSskRequest (sr, null);
}
- private void generateSskInsert (int key, int value)
+ public void generateSskInsert (int key, int value)
{
SskInsert si = new SskInsert (key, value, location);
log ("generating " + si);
@@ -397,6 +453,18 @@
else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
}
+ // Allocate all flow control tokens at startup
+ private void allocateTokens()
+ {
+ // Rounding error in your favour - collect 50 tokens
+ int tokensPerPeer = FLOW_TOKENS / (peers.size() + 1);
+ for (Peer p : peers.values()) {
+ p.tokensIn += tokensPerPeer;
+ spareTokens -= tokensPerPeer;
+ p.sendMessage (new Token (tokensPerPeer));
+ }
+ }
+
// EventTarget interface
public void handleEvent (int type, Object data)
{
@@ -424,6 +492,10 @@
case CHECK_TIMEOUTS:
checkTimeouts();
break;
+
+ case ALLOCATE_TOKENS:
+ allocateTokens();
+ break;
}
}
@@ -433,4 +505,5 @@
public final static int INSERT_SSK = 4;
public final static int SSK_COLLISION = 5;
private final static int CHECK_TIMEOUTS = 6;
+ private final static int ALLOCATE_TOKENS = 7;
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-02 15:00:48 UTC
(rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-02 18:50:28 UTC
(rev 10797)
@@ -42,6 +42,10 @@
private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
private int rxSeq = 0; // Sequence number of next in-order incoming pkt
+ // Flow control
+ public int tokensOut = 0; // How many requests/inserts can we send?
+ public int tokensIn = 0; // How many requests/inserts should we accept?
+
public Peer (Node node, int address, double location, double latency)
{
this.node = node;
@@ -201,7 +205,7 @@
sendAck (p.seq);
}
// This indicates a misbehaving sender - discard the packet
- else log ("warning: sequence number out of range");
+ else log ("WARNING: sequence number out of range");
}
private void handleAck (Ack a)
Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-02 15:00:48 UTC
(rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-02 18:50:28 UTC
(rev 10797)
@@ -5,7 +5,7 @@
{
private final int NODES = 100; // Number of nodes
private final int DEGREE = 5; // Average degree
- private final double SPEED = 40000; // Network speed, bytes per second
+ private final double SPEED = 15000; // Network speed, bytes per second
private final double LATENCY = 0.1; // Latency of all links in seconds
private final int INSERTS = 100; // Number of inserts per publisher
private Node[] nodes;
Modified:
trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
2006-11-02 18:50:28 UTC (rev 10797)
@@ -36,7 +36,7 @@
{
// Insert a random key
int key = Node.locationToKey (Math.random());
- Event.schedule (node, 0.0, Node.INSERT_CHK, key);
+ node.generateChkInsert (key);
// Inform each reader after an average of ten minutes
for (Node n : readers) {
double delay = 595.0 + Math.random() * 10.0;
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-02 18:50:28 UTC (rev 10797)
@@ -62,7 +62,7 @@
// Start the search
forwardSearch();
// If we have all the blocks and the headers, consider finishing
- if (blocksReceived == 32 && inState == TRANSFERRING) {
+ if (blocksReceived == 32) {
inState = COMPLETED;
considerFinishing();
}
@@ -110,6 +110,7 @@
private void handleRejectedLoop (RejectedLoop rl)
{
if (searchState != SENT) node.log (rl + " out of order");
+ next.tokensOut++; // No token was consumed
forwardSearch();
}
@@ -158,6 +159,9 @@
> Node.distance (target, closest))
htl = node.decrementHtl (htl);
node.log (this + " has htl " + htl);
+ // Consume a token
+ next.tokensOut--;
+ // Forward the search
node.log ("forwarding " + this + " to " + next.address);
next.sendMessage (makeSearchMessage());
nexts.remove (next);
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-02 18:50:28 UTC (rev 10797)
@@ -42,7 +42,7 @@
searchState = TRANSFERRING;
if (prev != null) prev.sendMessage (df); // Forward the message
// If we have all the blocks and the headers, cache the data
- if (blocksReceived == 32 && searchState == TRANSFERRING) {
+ if (blocksReceived == 32) {
node.cacheChk (key);
if (prev == null) node.log (this + " succeeded");
finish();
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-02 18:50:28 UTC (rev 10797)
@@ -21,8 +21,8 @@
protected double closest; // The closest location seen so far
protected int htl; // Hops to live for backtracking
- protected Node node; // The owner of this MessageHandler
- protected Peer prev; // The previous hop of the search
+ public final Node node; // The owner of this MessageHandler
+ public final Peer prev; // The previous hop of the search
protected Peer next = null; // The (current) next hop of the search
protected LinkedList<Peer> nexts; // Candidates for the next hop
protected int searchState = STARTED; // The state of the search
@@ -60,13 +60,17 @@
double closestDist = Double.POSITIVE_INFINITY;
Peer closestPeer = null;
for (Peer peer : nexts) {
+ if (peer.tokensOut == 0) {
+ node.log ("bypassing busy peer " + peer);
+ continue;
+ }
double dist = Node.distance (keyLoc, peer.location);
if (dist < closestDist) {
closestDist = dist;
closestPeer = peer;
}
}
- return closestPeer; // Null if the list was empty
+ return closestPeer; // Null if there are no suitable peers
}
public abstract void handleMessage (Message m, Peer src);
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
2006-11-02 18:50:28 UTC (rev 10797)
@@ -28,6 +28,7 @@
protected void handleRejectedLoop (RejectedLoop rl)
{
if (searchState != SENT) node.log (rl + " out of order");
+ next.tokensOut++; // No token was consumed
forwardSearch();
}
@@ -73,6 +74,9 @@
> Node.distance (target, closest))
htl = node.decrementHtl (htl);
node.log (this + " has htl " + htl);
+ // Consume a token
+ next.tokensOut--;
+ // Forward the search
node.log ("forwarding " + this + " to " + next.address);
next.sendMessage (makeSearchMessage());
nexts.remove (next);
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-02 18:50:28 UTC (rev 10797)
@@ -89,6 +89,7 @@
private void handleRejectedLoop (RejectedLoop rl)
{
if (searchState != SENT) node.log (rl + " out of order");
+ next.tokensOut++; // No token was consumed
forwardSearch();
}
@@ -142,6 +143,9 @@
> Node.distance (target, closest))
htl = node.decrementHtl (htl);
node.log (this + " has htl " + htl);
+ // Consume a token
+ next.tokensOut--;
+ // Forward the search
node.log ("forwarding " + this + " to " + next.address);
next.sendMessage (makeSearchMessage());
nexts.remove (next);
Added: trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
2006-11-02 18:50:28 UTC (rev 10797)
@@ -0,0 +1,14 @@
+package sim.messages;
+
+public class Token extends Message
+{
+ public Token (int tokens)
+ {
+ id = tokens; // Space-saving hack
+ }
+
+ public String toString()
+ {
+ return new String (id + " tokens");
+ }
+}