Author: mrogers
Date: 2006-08-27 17:02:33 +0000 (Sun, 27 Aug 2006)
New Revision: 10283
Added:
trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
trunk/apps/load-balancing-sims/phase6/messages/Accepted.java
trunk/apps/load-balancing-sims/phase6/messages/ChkDataFound.java
trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java
trunk/apps/load-balancing-sims/phase6/messages/RejectedLoop.java
Removed:
trunk/apps/load-balancing-sims/phase6/RequestState.java
trunk/apps/load-balancing-sims/phase6/messages/Request.java
trunk/apps/load-balancing-sims/phase6/messages/Response.java
Modified:
trunk/apps/load-balancing-sims/phase6/Node.java
trunk/apps/load-balancing-sims/phase6/Sim.java
trunk/apps/load-balancing-sims/phase6/messages/Block.java
trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
Log:
Multi-stage CHK requests (no timeouts yet)
Copied: trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java (from rev
10262, trunk/apps/load-balancing-sims/phase6/RequestState.java)
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestState.java 2006-08-25
11:48:51 UTC (rev 10262)
+++ trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,140 @@
+// The state of an outstanding CHK request, stored at each node along the path
+
+import java.util.LinkedList;
+import messages.*;
+
+class ChkRequestHandler
+{
+ // State machine
+ public final static int REQUEST_SENT = 1;
+ public final static int ACCEPTED = 2;
+ public final static int TRANSFERRING = 3;
+
+ public final int id; // The unique ID of the request
+ public final int key; // The requested key
+ private Node node; // The owner of this RequestHandler
+ private Peer prev; // The previous hop of the request
+ private Peer next = null; // The (current) next hop of the request
+ private LinkedList<Peer> nexts; // Candidates for the next hop
+ private int state = REQUEST_SENT; // State machine
+ private int blockBitmap = 0; // Bitmap of received blocks
+
+ public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
+ {
+ id = r.id;
+ key = r.key;
+ this.node = node;
+ this.prev = prev;
+ nexts = new LinkedList<Peer> (node.peers());
+ }
+
+ // Remove a peer from the list of candidates for the next hop
+ public void removeNextHop (Peer p)
+ {
+ nexts.remove (p);
+ }
+
+ public boolean handleMessage (Message m, Peer src)
+ {
+ if (src != next) {
+ node.log ("unexpected source for " + m);
+ return false; // Request not completed
+ }
+ if (m instanceof Accepted) return handleAccepted ((Accepted) m);
+ if (m instanceof ChkDataFound)
+ return handleChkDataFound ((ChkDataFound) m);
+ if (m instanceof Block) return handleBlock ((Block) m);
+ if (m instanceof RouteNotFound) return forwardRequest();
+ if (m instanceof RejectedLoop) return forwardRequest();
+ // Unrecognised message type
+ node.log ("unexpected type for " + m);
+ return false; // Request not completed
+ }
+
+ private boolean handleAccepted (Accepted a)
+ {
+ if (state != REQUEST_SENT)
+ node.log (a + " received out of order");
+ state = ACCEPTED;
+ return false; // Request not completed
+ }
+
+ private boolean handleChkDataFound (ChkDataFound df)
+ {
+ if (state != ACCEPTED)
+ node.log (df + " received out of order");
+ state = TRANSFERRING;
+ if (prev != null) prev.sendMessage (df);
+ return false; // Request not completed
+ }
+
+ private boolean handleBlock (Block b)
+ {
+ if (state != TRANSFERRING)
+ node.log (b + " received out of order");
+ if (receivedBlock (b.index)) return false; // Ignore duplicates
+ // Forward the block
+ if (prev != null) {
+ node.log ("forwarding " + b);
+ prev.sendMessage (b);
+ }
+ if (receivedAll()) {
+ node.storeChk (key);
+ if (prev == null) node.log (this + " succeeded");
+ return true; // Request completed
+ }
+ else return false; // Request not completed
+ }
+
+ public boolean forwardRequest()
+ {
+ next = closestPeer();
+ if (next == null) {
+ node.log ("route not found for " + this);
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (new RouteNotFound (id));
+ return true; // Request completed
+ }
+ else {
+ node.log ("forwarding " + this + " to " + next.address);
+ next.sendMessage (new ChkRequest (id, key));
+ nexts.remove (next);
+ return false; // Request not completed
+ }
+ }
+
+ // Find the closest peer to the requested key
+ private Peer closestPeer ()
+ {
+ double keyLoc = Node.keyToLocation (key);
+ double bestDist = Double.POSITIVE_INFINITY;
+ Peer bestPeer = null;
+ for (Peer peer : nexts) {
+ double dist = Node.distance (keyLoc, peer.location);
+ if (dist < bestDist) {
+ bestDist = dist;
+ bestPeer = peer;
+ }
+ }
+ return bestPeer; // Null if the list was empty
+ }
+
+ // Mark a block as received, return true if it's a duplicate
+ private boolean receivedBlock (int index)
+ {
+ boolean duplicate = (blockBitmap & 1 << index) != 0;
+ blockBitmap |= 1 << index;
+ return duplicate;
+ }
+
+ // Return true if all blocks have been received
+ private boolean receivedAll()
+ {
+ return blockBitmap == 0xFFFFFFFF;
+ }
+
+ public String toString()
+ {
+ return new String ("CHK request (" + id + "," + key + ")");
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-27 13:45:23 UTC
(rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-27 17:02:33 UTC
(rev 10283)
@@ -7,19 +7,21 @@
class Node implements EventTarget
{
public final static int STORE_SIZE = 10; // Max number of keys in store
+ public final static int CACHE_SIZE = 10; // Max number of keys in cache
public final static double MIN_SLEEP = 0.01; // Seconds
public final static double SHORT_SLEEP = 0.05; // Poll the bw limiter
// Token bucket bandwidth limiter
- public final static int BUCKET_RATE = 20000; // Bytes per second
- public final static int BUCKET_SIZE = 40000; // Burst size in bytes
+ public final static int BUCKET_RATE = 15000; // Bytes per second
+ public final static int BUCKET_SIZE = 60000; // Burst size in bytes
public double location; // Routing location
public NetworkInterface net;
private HashMap<Integer,Peer> peers; // Look up a peer by its address
private HashSet<Integer> recentlySeenRequests; // Request IDs
- private HashMap<Integer,RequestState> outstandingRequests; // By ID
- public LruCache<Integer> cache; // Datastore containing keys
+ private HashMap<Integer,ChkRequestHandler> chkRequests; // By ID
+ private LruCache<Integer> chkStore; // CHK datastore
+ private LruCache<Integer> chkCache; // CHK datacache
public TokenBucket bandwidth; // Bandwidth limiter
private boolean timerRunning = false; // Is the timer running?
@@ -29,8 +31,9 @@
net = new NetworkInterface (this, txSpeed, rxSpeed);
peers = new HashMap<Integer,Peer>();
recentlySeenRequests = new HashSet<Integer>();
- outstandingRequests = new HashMap<Integer,RequestState>();
- cache = new LruCache<Integer> (STORE_SIZE);
+ chkRequests = new HashMap<Integer,ChkRequestHandler>();
+ chkStore = new LruCache<Integer> (STORE_SIZE);
+ chkCache = new LruCache<Integer> (CACHE_SIZE);
bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
}
@@ -65,6 +68,30 @@
return (int) (location * Integer.MAX_VALUE);
}
+ // Add a CHK to the cache and consider adding it to the store
+ public void storeChk (int key)
+ {
+ log ("key " + key + " added to CHK cache");
+ chkCache.put (key);
+ // Add the key to the store if this node is as close to the
+ // key's location as any of its peers
+ if (isClosest (keyToLocation (key))) {
+ log ("key " + key + " added to CHK store");
+ chkStore.put (key);
+ }
+ }
+
+ // Return true if this node is as close to the target as any peer
+ private boolean isClosest (double target)
+ {
+ double bestDist = Double.POSITIVE_INFINITY;
+ for (Peer peer : peers.values()) {
+ double dist = distance (target, peer.location);
+ if (dist < bestDist) bestDist = dist;
+ }
+ return distance (target, location) <= bestDist;
+ }
+
// Called by Peer
public void startTimer()
{
@@ -86,41 +113,63 @@
public void handleMessage (Message m, Peer src)
{
log ("received " + m);
- if (m instanceof Request) {
- if (handleRequest ((Request) m, src))
- outstandingRequests.remove (m.id);
+ if (m instanceof ChkRequest) {
+ if (handleChkRequest ((ChkRequest) m, src))
+ chkRequests.remove (m.id); // Completed
}
else {
- RequestState rs = outstandingRequests.get (m.id);
- if (rs == null) log ("unexpected " + m);
- else if (rs.handleMessage (m, src))
- outstandingRequests.remove (m.id);
+ ChkRequestHandler rh = chkRequests.get (m.id);
+ if (rh == null) log ("no request handler for " + m);
+ else if (rh.handleMessage (m, src))
+ chkRequests.remove (m.id); // Completed
}
}
- private boolean handleRequest (Request r, Peer prev)
+ private boolean handleChkRequest (ChkRequest r, Peer prev)
{
if (!recentlySeenRequests.add (r.id)) {
log ("rejecting recently seen " + r);
- prev.sendMessage (new RouteNotFound (r.id));
+ prev.sendMessage (new RejectedLoop (r.id));
+ // Optimisation: the previous hop has already seen
+ // this request, so don't ask it in the future
+ ChkRequestHandler rh = chkRequests.get (r.id);
+ if (rh != null) rh.removeNextHop (prev);
return false; // Request not completed
}
- if (cache.get (r.key)) {
- log ("key " + r.key + " found in cache");
+ // Accept the request
+ if (prev != null) prev.sendMessage (new Accepted (r.id));
+ // If the key is in the store, return it
+ if (chkStore.get (r.key)) {
+ log ("key " + r.key + " found in CHK store");
if (prev == null) log (r + " succeeded locally");
- else for (int i = 0; i < 32; i++)
- prev.sendMessage (new Response (r.id, i));
+ else {
+ prev.sendMessage (new ChkDataFound (r.id));
+ for (int i = 0; i < 32; i++)
+ prev.sendMessage (new Block (r.id, i));
+ }
return true; // Request completed
}
- log ("key " + r.key + " not found in cache");
+ log ("key " + r.key + " not found in CHK store");
+ // If the key is in the cache, return it
+ if (chkCache.get (r.key)) {
+ log ("key " + r.key + " found in CHK cache");
+ if (prev == null) log (r + " succeeded locally");
+ else {
+ prev.sendMessage (new ChkDataFound (r.id));
+ for (int i = 0; i < 32; i++)
+ prev.sendMessage (new Block (r.id, i));
+ }
+ return true; // Request completed
+ }
+ log ("key " + r.key + " not found in CHK cache");
// Forward the request and store the request state
- RequestState rs = new RequestState (r, this, prev, peers());
- outstandingRequests.put (r.id, rs);
- return rs.forwardRequest();
+ ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
+ chkRequests.put (r.id, rh);
+ return rh.forwardRequest();
}
// Return the list of peers in a random order
- private ArrayList<Peer> peers()
+ public ArrayList<Peer> peers()
{
ArrayList<Peer> copy = new ArrayList<Peer> (peers.values());
Collections.shuffle (copy);
@@ -135,9 +184,9 @@
// Event callback
private void generateRequest (int key)
{
- Request r = new Request (key);
- log ("generating request " + r.id);
- handleRequest (r, null);
+ ChkRequest r = new ChkRequest (key);
+ log ("generating " + r);
+ handleChkRequest (r, null);
}
// Event callback
Deleted: trunk/apps/load-balancing-sims/phase6/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestState.java 2006-08-27
13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/RequestState.java 2006-08-27
17:02:33 UTC (rev 10283)
@@ -1,115 +0,0 @@
-// The state of an outstanding request, stored at each node along the path
-
-import java.util.HashSet;
-import java.util.Collection;
-import messages.*;
-
-class RequestState
-{
- // State machine
- public final static int REQUEST_SENT = 1;
- public final static int TRANSFERRING = 2;
-
- public final int id; // The unique ID of the request
- public final int key; // The requested key
- private Node node; // The owner of this RequestState
- private Peer prev; // The previous hop of the request
- private Peer next; // The (current) next hop of the request
- private HashSet<Peer> nexts; // Possible next hops
- private int state = REQUEST_SENT; // State machine
- private int blockBitmap = 0; // Bitmap of received blocks
-
- public RequestState (Request r, Node node, Peer prev,
- Collection<Peer> peers)
- {
- id = r.id;
- key = r.key;
- this.node = node;
- this.prev = prev;
- next = null;
- nexts = new HashSet<Peer> (peers);
- nexts.remove (prev);
- }
-
- public boolean handleMessage (Message m, Peer src)
- {
- if (src != next) {
- node.log ("unexpected source for " + m);
- return false; // Request not completed
- }
- if (m instanceof Response) return handleResponse ((Response) m);
- else if (m instanceof RouteNotFound) return forwardRequest();
- // Unrecognised message type
- node.log ("unrecognised " + m);
- return false; // Request not completed
- }
-
- private boolean handleResponse (Response r)
- {
- state = TRANSFERRING;
- if (receivedBlock (r.index)) return false; // Ignore duplicates
- // Forward the block
- if (prev != null) {
- node.log ("forwarding " + r);
- prev.sendMessage (r);
- }
- if (receivedAll()) {
- node.cache.put (key);
- if (prev == null) node.log (this + " succeeded");
- return true; // Request completed
- }
- else return false; // Request not completed
- }
-
- public boolean forwardRequest()
- {
- next = closestPeer();
- if (next == null) {
- node.log ("route not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new RouteNotFound (id));
- return true; // Request completed
- }
- else {
- node.log ("forwarding " + this + " to " + next.address);
- next.sendMessage (new Request (id, key));
- nexts.remove (next);
- return false; // Request not completed
- }
- }
-
- // Find the closest peer to the requested key
- private Peer closestPeer()
- {
- double keyLoc = Node.keyToLocation (key);
- double bestDist = Double.POSITIVE_INFINITY;
- Peer bestPeer = null;
- for (Peer peer : nexts) {
- double dist = Node.distance (keyLoc, peer.location);
- if (dist < bestDist) {
- bestDist = dist;
- bestPeer = peer;
- }
- }
- return bestPeer; // Null if the list was empty
- }
-
- // Mark a block as received, return true if it's a duplicate
- private boolean receivedBlock (int index)
- {
- boolean duplicate = (blockBitmap & 1 << index) != 0;
- blockBitmap |= 1 << index;
- return duplicate;
- }
-
- // Return true if all blocks have been received
- private boolean receivedAll()
- {
- return blockBitmap == 0xFFFFFFFF;
- }
-
- public String toString()
- {
- return new String ("request (" + id + "," + key + ")");
- }
-}
Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java 2006-08-27 13:45:23 UTC
(rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java 2006-08-27 17:02:33 UTC
(rev 10283)
@@ -6,7 +6,7 @@
{
public static void main (String[] args)
{
- double txSpeed = 15000, rxSpeed = 15000; // Bytes per second
+ double txSpeed = 20000, rxSpeed = 20000; // Bytes per second
// rxSpeed = Math.exp (rand.nextGaussian() + 11.74);
// txSpeed = rxSpeed / 5.0;
@@ -26,7 +26,7 @@
for (int i = 0; i < 10; i++) {
int key = Node.locationToKey (Math.random());
// Half the requests will succeed, half will fail
- if (i % 2 == 0) n3.cache.put (key);
+ if (i % 2 == 0) n3.storeChk (key);
Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, key);
}
Added: trunk/apps/load-balancing-sims/phase6/messages/Accepted.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Accepted.java
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/Accepted.java
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,15 @@
+package messages;
+
+public class Accepted extends Message
+{
+ public Accepted (int id)
+ {
+ this.id = id;
+ size = Message.HEADER_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("accepted (" + id + ")");
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase6/messages/Block.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Block.java 2006-08-27
13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/Block.java 2006-08-27
17:02:33 UTC (rev 10283)
@@ -12,4 +12,9 @@
this.index = index;
size = Message.HEADER_SIZE + Message.DATA_SIZE;
}
+
+ public String toString()
+ {
+ return new String ("block (" + id + "," + index + ")");
+ }
}
Added: trunk/apps/load-balancing-sims/phase6/messages/ChkDataFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/ChkDataFound.java
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/ChkDataFound.java
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,15 @@
+package messages;
+
+public class ChkDataFound extends Message
+{
+ public ChkDataFound (int id)
+ {
+ this.id = id;
+ size = Message.HEADER_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("CHK data found (" + id + ")");
+ }
+}
Copied: trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java (from
rev 10226, trunk/apps/load-balancing-sims/phase6/messages/Request.java)
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-21
18:15:04 UTC (rev 10226)
+++ trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,29 @@
+package messages;
+
+public class ChkRequest extends Message
+{
+ private static int nextId = 0;
+
+ public final int key; // The requested key
+
+ // Start a new request
+ public ChkRequest (int key)
+ {
+ id = nextId++;
+ this.key = key;
+ size = Message.HEADER_SIZE + Message.KEY_SIZE;
+ }
+
+ // Forward a request
+ public ChkRequest (int id, int key)
+ {
+ this.id = id;
+ this.key = key;
+ size = Message.HEADER_SIZE + Message.KEY_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("CHK request (" + id + "," + key + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase6/messages/RejectedLoop.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/RejectedLoop.java
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/RejectedLoop.java
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,15 @@
+package messages;
+
+public class RejectedLoop extends Message
+{
+ public RejectedLoop (int id)
+ {
+ this.id = id;
+ size = Message.HEADER_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("rejected loop (" + id + ")");
+ }
+}
Deleted: trunk/apps/load-balancing-sims/phase6/messages/Request.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-27
13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-27
17:02:33 UTC (rev 10283)
@@ -1,29 +0,0 @@
-package messages;
-
-public class Request extends Message
-{
- private static int nextId = 0;
-
- public final int key; // The requested key
-
- // Start a new request
- public Request (int key)
- {
- id = nextId++;
- this.key = key;
- size = Message.HEADER_SIZE + Message.KEY_SIZE;
- }
-
- // Forward a request
- public Request (int id, int key)
- {
- this.id = id;
- this.key = key;
- size = Message.HEADER_SIZE + Message.KEY_SIZE;
- }
-
- public String toString()
- {
- return new String ("request (" + id + "," + key + ")");
- }
-}
Deleted: trunk/apps/load-balancing-sims/phase6/messages/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Response.java
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/Response.java
2006-08-27 17:02:33 UTC (rev 10283)
@@ -1,16 +0,0 @@
-// A single block of a multi-block response
-
-package messages;
-
-public class Response extends Block
-{
- public Response (int id, int index)
- {
- super (id, index);
- }
-
- public String toString()
- {
- return new String ("response (" + id + "," + index + ")");
- }
-}
Modified: trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
2006-08-27 17:02:33 UTC (rev 10283)
@@ -1,6 +1,3 @@
-// Note: for the purposes of this simulation, RejectedLoop and RouteNotFound
-// are equivalent
-
package messages;
public class RouteNotFound extends Message