Author: mrogers
Date: 2006-08-27 17:02:33 +0000 (Sun, 27 Aug 2006)
New Revision: 10283

Added:
   trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
   trunk/apps/load-balancing-sims/phase6/messages/Accepted.java
   trunk/apps/load-balancing-sims/phase6/messages/ChkDataFound.java
   trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java
   trunk/apps/load-balancing-sims/phase6/messages/RejectedLoop.java
Removed:
   trunk/apps/load-balancing-sims/phase6/RequestState.java
   trunk/apps/load-balancing-sims/phase6/messages/Request.java
   trunk/apps/load-balancing-sims/phase6/messages/Response.java
Modified:
   trunk/apps/load-balancing-sims/phase6/Node.java
   trunk/apps/load-balancing-sims/phase6/Sim.java
   trunk/apps/load-balancing-sims/phase6/messages/Block.java
   trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
Log:
Multi-stage CHK requests (no timeouts yet)

Copied: trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java (from rev 
10262, trunk/apps/load-balancing-sims/phase6/RequestState.java)
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestState.java     2006-08-25 
11:48:51 UTC (rev 10262)
+++ trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java        
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,140 @@
+// The state of an outstanding CHK request, stored at each node along the path
+
+import java.util.LinkedList;
+import messages.*;
+
+class ChkRequestHandler
+{
+       // State machine
+       public final static int REQUEST_SENT = 1;
+       public final static int ACCEPTED = 2;
+       public final static int TRANSFERRING = 3;
+       
+       public final int id; // The unique ID of the request
+       public final int key; // The requested key
+       private Node node; // The owner of this RequestHandler
+       private Peer prev; // The previous hop of the request
+       private Peer next = null; // The (current) next hop of the request
+       private LinkedList<Peer> nexts; // Candidates for the next hop
+       private int state = REQUEST_SENT; // State machine
+       private int blockBitmap = 0; // Bitmap of received blocks
+       
+       public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
+       {
+               id = r.id;
+               key = r.key;
+               this.node = node;
+               this.prev = prev;
+               nexts = new LinkedList<Peer> (node.peers());
+       }
+       
+       // Remove a peer from the list of candidates for the next hop
+       public void removeNextHop (Peer p)
+       {
+               nexts.remove (p);
+       }
+       
+       public boolean handleMessage (Message m, Peer src)
+       {
+               if (src != next) {
+                       node.log ("unexpected source for " + m);
+                       return false; // Request not completed
+               }
+               if (m instanceof Accepted) return handleAccepted ((Accepted) m);
+               if (m instanceof ChkDataFound)
+                       return handleChkDataFound ((ChkDataFound) m);
+               if (m instanceof Block) return handleBlock ((Block) m);
+               if (m instanceof RouteNotFound) return forwardRequest();
+               if (m instanceof RejectedLoop) return forwardRequest();
+               // Unrecognised message type
+               node.log ("unexpected type for " + m);
+               return false; // Request not completed
+       }
+       
+       private boolean handleAccepted (Accepted a)
+       {
+               if (state != REQUEST_SENT)
+                       node.log (a + " received out of order");
+               state = ACCEPTED;
+               return false; // Request not completed
+       }
+       
+       private boolean handleChkDataFound (ChkDataFound df)
+       {
+               if (state != ACCEPTED)
+                       node.log (df + " received out of order");
+               state = TRANSFERRING;
+               if (prev != null) prev.sendMessage (df);
+               return false; // Request not completed
+       }
+       
+       private boolean handleBlock (Block b)
+       {
+               if (state != TRANSFERRING)
+                       node.log (b + " received out of order");
+               if (receivedBlock (b.index)) return false; // Ignore duplicates
+               // Forward the block
+               if (prev != null) {
+                       node.log ("forwarding " + b);
+                       prev.sendMessage (b);
+               }
+               if (receivedAll()) {
+                       node.storeChk (key);
+                       if (prev == null) node.log (this + " succeeded");
+                       return true; // Request completed
+               }
+               else return false; // Request not completed
+       }
+       
+       public boolean forwardRequest()
+       {
+               next = closestPeer();
+               if (next == null) {
+                       node.log ("route not found for " + this);
+                       if (prev == null) node.log (this + " failed");
+                       else prev.sendMessage (new RouteNotFound (id));
+                       return true; // Request completed
+               }
+               else {
+                       node.log ("forwarding " + this + " to " + next.address);
+                       next.sendMessage (new ChkRequest (id, key));
+                       nexts.remove (next);
+                       return false; // Request not completed
+               }
+       }
+       
+       // Find the closest peer to the requested key
+       private Peer closestPeer ()
+       {
+               double keyLoc = Node.keyToLocation (key);
+               double bestDist = Double.POSITIVE_INFINITY;
+               Peer bestPeer = null;
+               for (Peer peer : nexts) {
+                       double dist = Node.distance (keyLoc, peer.location);
+                       if (dist < bestDist) {
+                               bestDist = dist;
+                               bestPeer = peer;
+                       }
+               }
+               return bestPeer; // Null if the list was empty
+       }
+       
+       // Mark a block as received, return true if it's a duplicate
+       private boolean receivedBlock (int index)
+       {
+               boolean duplicate = (blockBitmap & 1 << index) != 0;
+               blockBitmap |= 1 << index;
+               return duplicate;
+       }
+       
+       // Return true if all blocks have been received
+       private boolean receivedAll()
+       {
+               return blockBitmap == 0xFFFFFFFF;
+       }
+       
+       public String toString()
+       {
+               return new String ("CHK request (" + id + "," + key + ")");
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-27 13:45:23 UTC 
(rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-27 17:02:33 UTC 
(rev 10283)
@@ -7,19 +7,21 @@
 class Node implements EventTarget
 {
        public final static int STORE_SIZE = 10; // Max number of keys in store
+       public final static int CACHE_SIZE = 10; // Max number of keys in cache
        public final static double MIN_SLEEP = 0.01; // Seconds
        public final static double SHORT_SLEEP = 0.05; // Poll the bw limiter

        // Token bucket bandwidth limiter
-       public final static int BUCKET_RATE = 20000; // Bytes per second
-       public final static int BUCKET_SIZE = 40000; // Burst size in bytes
+       public final static int BUCKET_RATE = 15000; // Bytes per second
+       public final static int BUCKET_SIZE = 60000; // Burst size in bytes

        public double location; // Routing location
        public NetworkInterface net;
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
        private HashSet<Integer> recentlySeenRequests; // Request IDs
-       private HashMap<Integer,RequestState> outstandingRequests; // By ID
-       public LruCache<Integer> cache; // Datastore containing keys
+       private HashMap<Integer,ChkRequestHandler> chkRequests; // By ID
+       private LruCache<Integer> chkStore; // CHK datastore
+       private LruCache<Integer> chkCache; // CHK datacache
        public TokenBucket bandwidth; // Bandwidth limiter
        private boolean timerRunning = false; // Is the timer running?

@@ -29,8 +31,9 @@
                net = new NetworkInterface (this, txSpeed, rxSpeed);
                peers = new HashMap<Integer,Peer>();
                recentlySeenRequests = new HashSet<Integer>();
-               outstandingRequests = new HashMap<Integer,RequestState>();
-               cache = new LruCache<Integer> (STORE_SIZE);
+               chkRequests = new HashMap<Integer,ChkRequestHandler>();
+               chkStore = new LruCache<Integer> (STORE_SIZE);
+               chkCache = new LruCache<Integer> (CACHE_SIZE);
                bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
        }

@@ -65,6 +68,30 @@
                return (int) (location * Integer.MAX_VALUE);
        }

+       // Add a CHK to the cache and consider adding it to the store
+       public void storeChk (int key)
+       {
+               log ("key " + key + " added to CHK cache");
+               chkCache.put (key);
+               // Add the key to the store if this node is as close to the
+               // key's location as any of its peers
+               if (isClosest (keyToLocation (key))) {
+                       log ("key " + key + " added to CHK store");
+                       chkStore.put (key);
+               }
+       }
+       
+       // Return true if this node is as close to the target as any peer
+       private boolean isClosest (double target)
+       {
+               double bestDist = Double.POSITIVE_INFINITY;
+               for (Peer peer : peers.values()) {
+                       double dist = distance (target, peer.location);
+                       if (dist < bestDist) bestDist = dist;
+               }
+               return distance (target, location) <= bestDist;
+       }
+       
        // Called by Peer
        public void startTimer()
        {
@@ -86,41 +113,63 @@
        public void handleMessage (Message m, Peer src)
        {
                log ("received " + m);
-               if (m instanceof Request) {
-                       if (handleRequest ((Request) m, src))
-                               outstandingRequests.remove (m.id);
+               if (m instanceof ChkRequest) {
+                       if (handleChkRequest ((ChkRequest) m, src))
+                               chkRequests.remove (m.id); // Completed
                }
                else {
-                       RequestState rs = outstandingRequests.get (m.id);
-                       if (rs == null) log ("unexpected " + m);
-                       else if (rs.handleMessage (m, src))
-                               outstandingRequests.remove (m.id);
+                       ChkRequestHandler rh = chkRequests.get (m.id);
+                       if (rh == null) log ("no request handler for " + m);
+                       else if (rh.handleMessage (m, src))
+                               chkRequests.remove (m.id); // Completed
                }
        }

-       private boolean handleRequest (Request r, Peer prev)
+       private boolean handleChkRequest (ChkRequest r, Peer prev)
        {
                if (!recentlySeenRequests.add (r.id)) {
                        log ("rejecting recently seen " + r);
-                       prev.sendMessage (new RouteNotFound (r.id));
+                       prev.sendMessage (new RejectedLoop (r.id));
+                       // Optimisation: the previous hop has already seen
+                       // this request, so don't ask it in the future
+                       ChkRequestHandler rh = chkRequests.get (r.id);
+                       if (rh != null) rh.removeNextHop (prev);
                        return false; // Request not completed
                }
-               if (cache.get (r.key)) {
-                       log ("key " + r.key + " found in cache");
+               // Accept the request
+               if (prev != null) prev.sendMessage (new Accepted (r.id));
+               // If the key is in the store, return it
+               if (chkStore.get (r.key)) {
+                       log ("key " + r.key + " found in CHK store");
                        if (prev == null) log (r + " succeeded locally");
-                       else for (int i = 0; i < 32; i++)
-                               prev.sendMessage (new Response (r.id, i));
+                       else {
+                               prev.sendMessage (new ChkDataFound (r.id));
+                               for (int i = 0; i < 32; i++)
+                                       prev.sendMessage (new Block (r.id, i));
+                       }
                        return true; // Request completed
                }
-               log ("key " + r.key + " not found in cache");
+               log ("key " + r.key + " not found in CHK store");
+               // If the key is in the cache, return it
+               if (chkCache.get (r.key)) {
+                       log ("key " + r.key + " found in CHK cache");
+                       if (prev == null) log (r + " succeeded locally");
+                       else {
+                               prev.sendMessage (new ChkDataFound (r.id));
+                               for (int i = 0; i < 32; i++)
+                                       prev.sendMessage (new Block (r.id, i));
+                       }
+                       return true; // Request completed
+               }
+               log ("key " + r.key + " not found in CHK cache");
                // Forward the request and store the request state
-               RequestState rs = new RequestState (r, this, prev, peers());
-               outstandingRequests.put (r.id, rs);
-               return rs.forwardRequest();
+               ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
+               chkRequests.put (r.id, rh);
+               return rh.forwardRequest();
        }

        // Return the list of peers in a random order
-       private ArrayList<Peer> peers()
+       public ArrayList<Peer> peers()
        {
                ArrayList<Peer> copy = new ArrayList<Peer> (peers.values());
                Collections.shuffle (copy);
@@ -135,9 +184,9 @@
        // Event callback
        private void generateRequest (int key)
        {
-               Request r = new Request (key);
-               log ("generating request " + r.id);
-               handleRequest (r, null);
+               ChkRequest r = new ChkRequest (key);
+               log ("generating " + r);
+               handleChkRequest (r, null);
        }

        // Event callback

Deleted: trunk/apps/load-balancing-sims/phase6/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestState.java     2006-08-27 
13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/RequestState.java     2006-08-27 
17:02:33 UTC (rev 10283)
@@ -1,115 +0,0 @@
-// The state of an outstanding request, stored at each node along the path
-
-import java.util.HashSet;
-import java.util.Collection;
-import messages.*;
-
-class RequestState
-{
-       // State machine
-       public final static int REQUEST_SENT = 1;
-       public final static int TRANSFERRING = 2;
-       
-       public final int id; // The unique ID of the request
-       public final int key; // The requested key
-       private Node node; // The owner of this RequestState
-       private Peer prev; // The previous hop of the request
-       private Peer next; // The (current) next hop of the request
-       private HashSet<Peer> nexts; // Possible next hops
-       private int state = REQUEST_SENT; // State machine
-       private int blockBitmap = 0; // Bitmap of received blocks
-       
-       public RequestState (Request r, Node node, Peer prev,
-                               Collection<Peer> peers)
-       {
-               id = r.id;
-               key = r.key;
-               this.node = node;
-               this.prev = prev;
-               next = null;
-               nexts = new HashSet<Peer> (peers);
-               nexts.remove (prev);
-       }
-       
-       public boolean handleMessage (Message m, Peer src)
-       {
-               if (src != next) {
-                       node.log ("unexpected source for " + m);
-                       return false; // Request not completed
-               }
-               if (m instanceof Response) return handleResponse ((Response) m);
-               else if (m instanceof RouteNotFound) return forwardRequest();
-               // Unrecognised message type
-               node.log ("unrecognised " + m);
-               return false; // Request not completed
-       }
-       
-       private boolean handleResponse (Response r)
-       {
-               state = TRANSFERRING;
-               if (receivedBlock (r.index)) return false; // Ignore duplicates
-               // Forward the block
-               if (prev != null) {
-                       node.log ("forwarding " + r);
-                       prev.sendMessage (r);
-               }
-               if (receivedAll()) {
-                       node.cache.put (key);
-                       if (prev == null) node.log (this + " succeeded");
-                       return true; // Request completed
-               }
-               else return false; // Request not completed
-       }
-       
-       public boolean forwardRequest()
-       {
-               next = closestPeer();
-               if (next == null) {
-                       node.log ("route not found for " + this);
-                       if (prev == null) node.log (this + " failed");
-                       else prev.sendMessage (new RouteNotFound (id));
-                       return true; // Request completed
-               }
-               else {
-                       node.log ("forwarding " + this + " to " + next.address);
-                       next.sendMessage (new Request (id, key));
-                       nexts.remove (next);
-                       return false; // Request not completed
-               }
-       }
-       
-       // Find the closest peer to the requested key
-       private Peer closestPeer()
-       {
-               double keyLoc = Node.keyToLocation (key);
-               double bestDist = Double.POSITIVE_INFINITY;
-               Peer bestPeer = null;
-               for (Peer peer : nexts) {
-                       double dist = Node.distance (keyLoc, peer.location);
-                       if (dist < bestDist) {
-                               bestDist = dist;
-                               bestPeer = peer;
-                       }
-               }
-               return bestPeer; // Null if the list was empty
-       }
-       
-       // Mark a block as received, return true if it's a duplicate
-       private boolean receivedBlock (int index)
-       {
-               boolean duplicate = (blockBitmap & 1 << index) != 0;
-               blockBitmap |= 1 << index;
-               return duplicate;
-       }
-       
-       // Return true if all blocks have been received
-       private boolean receivedAll()
-       {
-               return blockBitmap == 0xFFFFFFFF;
-       }
-       
-       public String toString()
-       {
-               return new String ("request (" + id + "," + key + ")");
-       }
-}

Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java      2006-08-27 13:45:23 UTC 
(rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java      2006-08-27 17:02:33 UTC 
(rev 10283)
@@ -6,7 +6,7 @@
 {
        public static void main (String[] args)
        {               
-               double txSpeed = 15000, rxSpeed = 15000; // Bytes per second
+               double txSpeed = 20000, rxSpeed = 20000; // Bytes per second
                // rxSpeed = Math.exp (rand.nextGaussian() + 11.74);
                // txSpeed = rxSpeed / 5.0;

@@ -26,7 +26,7 @@
                for (int i = 0; i < 10; i++) {
                        int key = Node.locationToKey (Math.random());
                        // Half the requests will succeed, half will fail
-                       if (i % 2 == 0) n3.cache.put (key);
+                       if (i % 2 == 0) n3.storeChk (key);
                        Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, key);
                }


Added: trunk/apps/load-balancing-sims/phase6/messages/Accepted.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Accepted.java        
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/Accepted.java        
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,15 @@
+package messages;
+
+public class Accepted extends Message
+{
+       public Accepted (int id)
+       {
+               this.id = id;
+               size = Message.HEADER_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("accepted (" + id + ")");
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase6/messages/Block.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Block.java   2006-08-27 
13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/Block.java   2006-08-27 
17:02:33 UTC (rev 10283)
@@ -12,4 +12,9 @@
                this.index = index;
                size = Message.HEADER_SIZE + Message.DATA_SIZE;
        }
+       
+       public String toString()
+       {
+               return new String ("block (" + id + "," + index + ")");
+       }
 }

Added: trunk/apps/load-balancing-sims/phase6/messages/ChkDataFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/ChkDataFound.java    
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/ChkDataFound.java    
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,15 @@
+package messages;
+
+public class ChkDataFound extends Message
+{
+       public ChkDataFound (int id)
+       {
+               this.id = id;
+               size = Message.HEADER_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("CHK data found (" + id + ")");
+       }
+}

Copied: trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java (from 
rev 10226, trunk/apps/load-balancing-sims/phase6/messages/Request.java)
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-21 
18:15:04 UTC (rev 10226)
+++ trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java      
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,29 @@
+package messages;
+
+public class ChkRequest extends Message
+{
+       private static int nextId = 0;
+       
+       public final int key; // The requested key
+       
+       // Start a new request
+       public ChkRequest (int key)
+       {
+               id = nextId++;
+               this.key = key;
+               size = Message.HEADER_SIZE + Message.KEY_SIZE;
+       }
+       
+       // Forward a request
+       public ChkRequest (int id, int key)
+       {
+               this.id = id;
+               this.key = key;
+               size = Message.HEADER_SIZE + Message.KEY_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("CHK request (" + id + "," + key + ")");
+       }
+}

Added: trunk/apps/load-balancing-sims/phase6/messages/RejectedLoop.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/RejectedLoop.java    
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/RejectedLoop.java    
2006-08-27 17:02:33 UTC (rev 10283)
@@ -0,0 +1,15 @@
+package messages;
+
+public class RejectedLoop extends Message
+{
+       public RejectedLoop (int id)
+       {
+               this.id = id;
+               size = Message.HEADER_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("rejected loop (" + id + ")");
+       }
+}

Deleted: trunk/apps/load-balancing-sims/phase6/messages/Request.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-27 
13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-27 
17:02:33 UTC (rev 10283)
@@ -1,29 +0,0 @@
-package messages;
-
-public class Request extends Message
-{
-       private static int nextId = 0;
-       
-       public final int key; // The requested key
-       
-       // Start a new request
-       public Request (int key)
-       {
-               id = nextId++;
-               this.key = key;
-               size = Message.HEADER_SIZE + Message.KEY_SIZE;
-       }
-       
-       // Forward a request
-       public Request (int id, int key)
-       {
-               this.id = id;
-               this.key = key;
-               size = Message.HEADER_SIZE + Message.KEY_SIZE;
-       }
-       
-       public String toString()
-       {
-               return new String ("request (" + id + "," + key + ")");
-       }
-}

Deleted: trunk/apps/load-balancing-sims/phase6/messages/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Response.java        
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/Response.java        
2006-08-27 17:02:33 UTC (rev 10283)
@@ -1,16 +0,0 @@
-// A single block of a multi-block response
-
-package messages;
-
-public class Response extends Block
-{
-       public Response (int id, int index)
-       {
-               super (id, index);
-       }
-       
-       public String toString()
-       {
-               return new String ("response (" + id + "," + index + ")");
-       }
-}

Modified: trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java   
2006-08-27 13:45:23 UTC (rev 10282)
+++ trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java   
2006-08-27 17:02:33 UTC (rev 10283)
@@ -1,6 +1,3 @@
-// Note: for the purposes of this simulation, RejectedLoop and RouteNotFound
-// are equivalent
-
 package messages;

 public class RouteNotFound extends Message


Reply via email to