Author: mrogers
Date: 2006-11-27 18:21:24 +0000 (Mon, 27 Nov 2006)
New Revision: 11071
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
Log:
Queue searches when there are no available tokens (token policies not
implemented yet)
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-27 15:27:37 UTC
(rev 11070)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-27 18:21:24 UTC
(rev 11071)
@@ -18,8 +18,7 @@
public static boolean useTokens = false;
public static boolean useBackoff = false;
public static boolean useThrottle = false;
- public final static int FLOW_TOKENS = 20; // Shared by all peers
- public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
+ public final static int FLOW_TOKENS = 50; // Shared by all peers
public final static double DELAY_DECAY = 0.99; // Exp moving average
public final static double MAX_DELAY = 2.0; // Reject all, seconds
public final static double HIGH_DELAY = 1.0; // Reject some, seconds
@@ -43,6 +42,7 @@
private double delay = 0.0; // Delay caused by congestion or b/w limiter
private LinkedList<Search> searchQueue;
private SearchThrottle searchThrottle;
+ private HashSet<Peer> availablePeers; // Peers with outgoing tokens
public Node (double txSpeed, double rxSpeed)
{
@@ -65,10 +65,13 @@
if (Math.random() < 0.5) decrementMaxHtl = true;
if (Math.random() < 0.25) decrementMinHtl = true;
bandwidth = new TokenBucket (40000, 60000);
- // Allocate flow control tokens after a short delay
- if (useTokens) Event.schedule (this, Math.random() * 0.1,
- ALLOCATE_TOKENS, null);
searchQueue = new LinkedList<Search>();
+ if (useTokens) {
+ // Allocate flow control tokens after a short delay
+ Event.schedule (this, Math.random() * 0.1,
+ ALLOCATE_TOKENS, null);
+ availablePeers = new HashSet<Peer>();
+ }
if (useThrottle) searchThrottle = new SearchThrottle();
}
@@ -294,7 +297,7 @@
private void handleToken (Token t, Peer prev)
{
- prev.tokensOut += t.id; // t.id is the number of tokens
+ prev.addTokensOut (t.id); // t.id is the number of tokens
}
private void handleChkRequest (ChkRequest r, Peer prev)
@@ -468,12 +471,12 @@
return true;
}
else {
- if (p.tokensIn == 0) {
+ if (p.getTokensIn() == 0) {
// This indicates a misbehaving sender
if (LOG) log ("WARNING: not enough tokens");
return false;
}
- p.tokensIn--;
+ p.removeTokensIn (1);
return true;
}
}
@@ -482,10 +485,7 @@
private void allocateToken (Peer p)
{
if (p == null) spareTokens++;
- else {
- p.tokensIn++;
- p.sendMessage (new Token (1));
- }
+ else p.addTokensIn (1);
}
// Return the list of peers in a random order
@@ -522,6 +522,10 @@
// Remove the first search from the queue and send it
private void sendSearch()
{
+ if (useTokens && availablePeers.isEmpty()) {
+ if (LOG) log ("blocked");
+ return;
+ }
Search s = searchQueue.poll();
if (s instanceof ChkRequest)
handleChkRequest ((ChkRequest) s, null);
@@ -547,6 +551,18 @@
}
}
+ public void addAvailablePeer (Peer p)
+ {
+ boolean blocked = availablePeers.isEmpty();
+ availablePeers.add (p);
+ if (blocked && !searchQueue.isEmpty()) sendSearch();
+ }
+
+ public void removeAvailablePeer (Peer p)
+ {
+ availablePeers.remove (p);
+ }
+
public void generateChkRequest (int key)
{
ChkRequest cr = new ChkRequest (key, location);
@@ -592,9 +608,8 @@
// Rounding error in your favour - collect 50 tokens
int tokensPerPeer = FLOW_TOKENS / (peers.size() + 1);
for (Peer p : peers.values()) {
- p.tokensIn += tokensPerPeer;
spareTokens -= tokensPerPeer;
- p.sendMessage (new Token (tokensPerPeer));
+ p.addTokensIn (tokensPerPeer);
}
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-27 15:27:37 UTC
(rev 11070)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-27 18:21:24 UTC
(rev 11071)
@@ -49,8 +49,8 @@
private int rxSeq = 0; // Sequence number of next in-order incoming pkt
// Flow control
- public int tokensOut = 0; // How many requests/inserts can we send?
- public int tokensIn = 0; // How many requests/inserts should we accept?
+ private int tokensOut = 0; // How many searches can we send?
+ private int tokensIn = 0; // How many searches should we accept?
public double backoffUntil = 0.0; // Time
public double backoffLength = INITIAL_BACKOFF; // Seconds
@@ -279,6 +279,45 @@
if (LOG) log ("resetting backoff length");
}
+ // Add outgoing tokens
+ public void addTokensOut (int tokens)
+ {
+ tokensOut += tokens;
+ if (tokensOut > 0) node.addAvailablePeer (this);
+ }
+
+ // Remove outgoing tokens
+ public void removeTokensOut (int tokens)
+ {
+ tokensOut -= tokens;
+ if (tokensOut <= 0) node.removeAvailablePeer (this);
+ }
+
+ // Return the number of outgoing tokens
+ public int getTokensOut()
+ {
+ return tokensOut;
+ }
+
+ // Add incoming tokens
+ public void addTokensIn (int tokens)
+ {
+ tokensIn += tokens;
+ sendMessage (new Token (tokens)); // Inform the other side
+ }
+
+ // Remove incoming tokens
+ public void removeTokensIn (int tokens)
+ {
+ tokensIn -= tokens;
+ }
+
+ // Return the number of incoming tokens
+ public int getTokensIn()
+ {
+ return tokensIn;
+ }
+
// Check retx timeouts, return true if there are packets in flight
public boolean checkTimeouts()
{
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-27 15:27:37 UTC (rev 11070)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-27 18:21:24 UTC (rev 11071)
@@ -83,7 +83,7 @@
htl = node.decrementHtl (htl);
if (LOG) node.log (this + " has htl " + htl);
// Consume a token
- if (Node.useTokens) next.tokensOut--;
+ if (Node.useTokens) next.removeTokensOut (1);
// Forward the search
if (LOG) node.log ("forwarding " +this+ " to " + next.address);
next.sendMessage (makeSearchMessage());
@@ -112,7 +112,7 @@
double closestDist = Double.POSITIVE_INFINITY;
Peer closestPeer = null;
for (Peer peer : nexts) {
- if (Node.useTokens && peer.tokensOut == 0) {
+ if (Node.useTokens && peer.getTokensOut() == 0) {
if (LOG) node.log ("no tokens for " + peer);
continue;
}