Author: mrogers
Date: 2006-08-25 11:48:51 +0000 (Fri, 25 Aug 2006)
New Revision: 10262

Modified:
   trunk/apps/load-balancing-sims/phase6/Node.java
   trunk/apps/load-balancing-sims/phase6/Peer.java
   trunk/apps/load-balancing-sims/phase6/RequestState.java
   trunk/apps/load-balancing-sims/phase6/Sim.java
   trunk/apps/load-balancing-sims/phase6/TokenBucket.java
Log:
Moved request handling into RequestState in preparation for a more complete FNP 
implementation

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-24 19:51:36 UTC 
(rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-25 11:48:51 UTC 
(rev 10262)
@@ -83,89 +83,51 @@
        }

        // Called by Peer
-       public void handleMessage (Message m, Peer prev)
+       public void handleMessage (Message m, Peer src)
        {
                log ("received " + m);
-               if (m instanceof Request) handleRequest ((Request) m, prev);
+               if (m instanceof Request) {
+                       if (handleRequest ((Request) m, src))
+                               outstandingRequests.remove (m.id);
+               }
                else {
                        RequestState rs = outstandingRequests.get (m.id);
                        if (rs == null) log ("unexpected " + m);
-                       else if (m instanceof Response)
-                               handleResponse ((Response) m, rs);
-                       else if (m instanceof RouteNotFound)
-                               handleRouteNotFound ((RouteNotFound) m, rs);
+                       else if (rs.handleMessage (m, src))
+                               outstandingRequests.remove (m.id);
                }
        }

-       private void handleRequest (Request r, Peer prev)
+       private boolean handleRequest (Request r, Peer prev)
        {
                if (!recentlySeenRequests.add (r.id)) {
                        log ("rejecting recently seen " + r);
                        prev.sendMessage (new RouteNotFound (r.id));
-                       // Optimisation: prev has seen the request, so remove
-                       // it from the list of potential next hops
-                       RequestState rs = outstandingRequests.get (r.id);
-                       if (rs != null) rs.nexts.remove (prev);
-                       return;
+                       return false; // Request not completed
                }
                if (cache.get (r.key)) {
                        log ("key " + r.key + " found in cache");
                        if (prev == null) log (r + " succeeded locally");
                        else for (int i = 0; i < 32; i++)
                                prev.sendMessage (new Response (r.id, i));
-                       return;
+                       return true; // Request completed
                }
                log ("key " + r.key + " not found in cache");
-               forwardRequest (new RequestState (r, prev, shufflePeers()));
+               // Forward the request and store the request state
+               RequestState rs = new RequestState (r, this, prev, peers());
+               outstandingRequests.put (r.id, rs);
+               return rs.forwardRequest();
        }

-       private void handleResponse (Response r, RequestState rs)
-       {
-               rs.state = RequestState.TRANSFERRING;
-               if (rs.receivedBlock (r.index)) return; // Ignore duplicates
-               if (rs.receivedAll()) {
-                       cache.put (rs.key);
-                       if (rs.prev == null) log (rs + " succeeded");
-                       outstandingRequests.remove (rs.id);
-               }
-               // Forward the block
-               if (rs.prev != null) {
-                       log ("forwarding " + r);
-                       rs.prev.sendMessage (r);
-               }
-       }
-       
-       private void handleRouteNotFound (RouteNotFound r, RequestState rs)
-       {
-               forwardRequest (rs);
-       }
-       
-       private void forwardRequest (RequestState rs)
-       {
-               Peer next = rs.closestPeer();
-               if (next == null) {
-                       log ("route not found for " + rs);
-                       if (rs.prev == null) log (rs + " failed");
-                       else rs.prev.sendMessage (new RouteNotFound (rs.id));
-                       outstandingRequests.remove (rs.id);
-               }
-               else {
-                       log ("forwarding " + rs + " to " + next.address);
-                       next.sendMessage (new Request (rs.id, rs.key));
-                       rs.nexts.remove (next);
-                       outstandingRequests.put (rs.id, rs);
-               }
-       }
-       
        // Return the list of peers in a random order
-       private ArrayList<Peer> shufflePeers()
+       private ArrayList<Peer> peers()
        {
                ArrayList<Peer> copy = new ArrayList<Peer> (peers.values());
                Collections.shuffle (copy);
                return copy;
        }

-       private void log (String message)
+       public void log (String message)
        {
                Event.log (net.address + " " + message);
        }
@@ -183,7 +145,7 @@
        {
                // Check the peers in a random order each time
                double deadline = Double.POSITIVE_INFINITY;
-               for (Peer p : shufflePeers())
+               for (Peer p : peers())
                        deadline = Math.min (deadline, p.checkTimeouts());
                if (deadline == Double.POSITIVE_INFINITY) {
                        log ("stopping retransmission/coalescing timer");

Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-24 19:51:36 UTC 
(rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-25 11:48:51 UTC 
(rev 10262)
@@ -83,7 +83,7 @@
        // Try to send a packet, return true if a packet was sent
        private boolean send()
        {               
-               if (ackQueue.size + searchQueue.size + transferQueue.size ==0) {
+               if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
                        log ("nothing to send");
                        return false;
                }
@@ -240,11 +240,10 @@
                // Send as many packets as possible
                while (send());

+               log (txBuffer.size() + " packets in flight");
                double now = Event.time();
-               if (txBuffer.isEmpty()) {
-                       log ("no packets in flight");
-                       return deadline (now); // Sleep until the next deadline
-               }
+               if (txBuffer.isEmpty()) return deadline (now);
+               
                for (Packet p : txBuffer) {
                        if (now - p.sent > RTO * rtt + MAX_DELAY) {
                                // Retransmission timeout

Modified: trunk/apps/load-balancing-sims/phase6/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestState.java     2006-08-24 
19:51:36 UTC (rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/RequestState.java     2006-08-25 
11:48:51 UTC (rev 10262)
@@ -2,7 +2,7 @@

 import java.util.HashSet;
 import java.util.Collection;
-import messages.Request;
+import messages.*;

 class RequestState
 {
@@ -12,22 +12,74 @@

        public final int id; // The unique ID of the request
        public final int key; // The requested key
-       public final Peer prev; // The previous hop of the request
-       public final HashSet<Peer> nexts; // Possible next hops
-       public int state = REQUEST_SENT; // State machine
+       private Node node; // The owner of this RequestState
+       private Peer prev; // The previous hop of the request
+       private Peer next; // The (current) next hop of the request
+       private HashSet<Peer> nexts; // Possible next hops
+       private int state = REQUEST_SENT; // State machine
        private int blockBitmap = 0; // Bitmap of received blocks

-       public RequestState (Request r, Peer prev, Collection<Peer> peers)
+       public RequestState (Request r, Node node, Peer prev,
+                               Collection<Peer> peers)
        {
                id = r.id;
                key = r.key;
+               this.node = node;
                this.prev = prev;
+               next = null;
                nexts = new HashSet<Peer> (peers);
                nexts.remove (prev);
        }

+       public boolean handleMessage (Message m, Peer src)
+       {
+               if (src != next) {
+                       node.log ("unexpected source for " + m);
+                       return false; // Request not completed
+               }
+               if (m instanceof Response) return handleResponse ((Response) m);
+               else if (m instanceof RouteNotFound) return forwardRequest();
+               // Unrecognised message type
+               node.log ("unrecognised " + m);
+               return false; // Request not completed
+       }
+       
+       private boolean handleResponse (Response r)
+       {
+               state = TRANSFERRING;
+               if (receivedBlock (r.index)) return false; // Ignore duplicates
+               // Forward the block
+               if (prev != null) {
+                       node.log ("forwarding " + r);
+                       prev.sendMessage (r);
+               }
+               if (receivedAll()) {
+                       node.cache.put (key);
+                       if (prev == null) node.log (this + " succeeded");
+                       return true; // Request completed
+               }
+               else return false; // Request not completed
+       }
+       
+       public boolean forwardRequest()
+       {
+               next = closestPeer();
+               if (next == null) {
+                       node.log ("route not found for " + this);
+                       if (prev == null) node.log (this + " failed");
+                       else prev.sendMessage (new RouteNotFound (id));
+                       return true; // Request completed
+               }
+               else {
+                       node.log ("forwarding " + this + " to " + next.address);
+                       next.sendMessage (new Request (id, key));
+                       nexts.remove (next);
+                       return false; // Request not completed
+               }
+       }
+       
        // Find the closest peer to the requested key
-       public Peer closestPeer()
+       private Peer closestPeer()
        {
                double keyLoc = Node.keyToLocation (key);
                double bestDist = Double.POSITIVE_INFINITY;
@@ -39,11 +91,11 @@
                                bestPeer = peer;
                        }
                }
-               return bestPeer; // Null if list was empty
+               return bestPeer; // Null if the list was empty
        }

        // Mark a block as received, return true if it's a duplicate
-       public boolean receivedBlock (int index)
+       private boolean receivedBlock (int index)
        {
                boolean duplicate = (blockBitmap & 1 << index) != 0;
                blockBitmap |= 1 << index;
@@ -51,7 +103,7 @@
        }

        // Return true if all blocks have been received
-       public boolean receivedAll()
+       private boolean receivedAll()
        {
                return blockBitmap == 0xFFFFFFFF;
        }

Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java      2006-08-24 19:51:36 UTC 
(rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java      2006-08-25 11:48:51 UTC 
(rev 10262)
@@ -23,9 +23,10 @@
                n1.connectBothWays (n3, 0.001);
                n2.connectBothWays (n3, 0.001);

-               for (int i = 0; i < 5; i++) {
+               for (int i = 0; i < 10; i++) {
                        int key = Node.locationToKey (Math.random());
-                       n3.cache.put (key);
+                       // Half the requests will succeed, half will fail
+                       if (i % 2 == 0) n3.cache.put (key);
                        Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, key);
                }


Modified: trunk/apps/load-balancing-sims/phase6/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/TokenBucket.java      2006-08-24 
19:51:36 UTC (rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/TokenBucket.java      2006-08-25 
11:48:51 UTC (rev 10262)
@@ -17,7 +17,7 @@
                lastUpdated = now;
                tokens += elapsed * rate;
                if (tokens > size) tokens = size;
-               Event.log (tokens + " tokens available");
+               // Event.log (tokens + " tokens available");
                return (int) tokens;
        }



Reply via email to