Author: mrogers
Date: 2006-08-03 17:23:52 +0000 (Thu, 03 Aug 2006)
New Revision: 9866

Added:
   trunk/apps/load-balancing-sims/phase5/LruCache.java
Modified:
   trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
   trunk/apps/load-balancing-sims/phase5/Node.java
   trunk/apps/load-balancing-sims/phase5/Peer.java
   trunk/apps/load-balancing-sims/phase5/Request.java
   trunk/apps/load-balancing-sims/phase5/RequestState.java
   trunk/apps/load-balancing-sims/phase5/Response.java
Log:
LRU cache, timeouts return to slow start, one retransmission timer per node

Added: trunk/apps/load-balancing-sims/phase5/LruCache.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/LruCache.java 2006-08-03 16:44:56 UTC 
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/LruCache.java 2006-08-03 17:23:52 UTC 
(rev 9866)
@@ -0,0 +1,46 @@
+// Limited-capacity LRU cache
+
+import java.util.LinkedHashSet;
+
+class LruCache<Key>
+{
+       public int capacity;
+       public LinkedHashSet<Key> set;
+       
+       public LruCache (int capacity)
+       {
+               this.capacity = capacity;
+               set = new LinkedHashSet<Key> (capacity);
+       }
+       
+       public boolean get (Key key)
+       {
+               log ("searching cache for key " + key);
+               if (set.remove (key)) {
+                       set.add (key); // Move the key to the fresh end
+                       return true;
+               }
+               return false;
+       }
+       
+       public void put (Key key)
+       {
+               if (set.remove (key))
+                       log ("key " + key + " already in cache");
+               else {
+                       log ("adding key " + key + " to cache");
+                       if (set.size() == capacity) {
+                               // Discard the oldest element
+                               Key oldest = set.iterator().next();
+                               log ("discarding key " + oldest);
+                               set.remove (oldest);
+                       }
+               }
+               set.add (key); // Add or move the key to the fresh end
+       }
+       
+       private void log (String message)
+       {
+               Event.log (message);
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-08-03 
16:44:56 UTC (rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-08-03 
17:23:52 UTC (rev 9866)
@@ -1,5 +1,4 @@
 import java.util.LinkedList;
-import java.util.NoSuchElementException;

 class NetworkInterface implements EventTarget
 {
@@ -72,13 +71,10 @@
        {
                log ("finished receiving " + p);
                node.handlePacket (p);
+               rxQueueSize -= p.size;
+               rxQueue.remove (p);
                // If there's another packet waiting, start to receive it
-               try {
-                       rxQueueSize -= p.size;
-                       rxQueue.remove (p);
-                       rxStart (rxQueue.getFirst());
-               }
-               catch (NoSuchElementException nse) {}
+               if (!rxQueue.isEmpty()) rxStart (rxQueue.peek());
        }

        // Start transmitting a packet
@@ -94,13 +90,10 @@
        {
                log ("finished transmitting " + p);
                Network.deliver (p);
+               txQueueSize -= p.size;
+               txQueue.remove (p);
                // If there's another packet waiting, start to transmit it
-               try {
-                       txQueueSize -= p.size;
-                       txQueue.remove (p);
-                       txStart (txQueue.getFirst());
-               }
-               catch (NoSuchElementException nse) {}
+               if (!txQueue.isEmpty()) txStart (txQueue.peek());
        }

        private void log (String message)

Modified: trunk/apps/load-balancing-sims/phase5/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Node.java     2006-08-03 16:44:56 UTC 
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/Node.java     2006-08-03 17:23:52 UTC 
(rev 9866)
@@ -3,13 +3,17 @@

 class Node implements EventTarget
 {
+       public final static double RETX_TIMER = 0.1; // Coarse-grained timer
+       public final static int STORE_SIZE = 10; // Max number of keys in store
+       
        public double location; // Routing location
        public NetworkInterface net;
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
        private int requestsGenerated = 0;
        private HashSet<Integer> recentlySeenRequests; // Request IDs
        private HashMap<Integer,RequestState> outstandingRequests;
-       public HashSet<Double> cache; // Datastore containing keys
+       public LruCache<Integer> cache; // Datastore containing keys
+       private boolean timerRunning = false; // Is the retx timer running?

        public Node (double txSpeed, double rxSpeed)
        {
@@ -18,7 +22,7 @@
                peers = new HashMap<Integer,Peer>();
                recentlySeenRequests = new HashSet<Integer>();
                outstandingRequests = new HashMap<Integer,RequestState>();
-               cache = new HashSet<Double>();
+               cache = new LruCache<Integer> (STORE_SIZE);
        }

        public void connect (Node n, double latency)
@@ -40,6 +44,27 @@
                else return Math.min (b - a, a - b + 1.0);
        }

+       // Convert an integer key to a routing location
+       public static double keyToLocation (int key)
+       {
+               return key / (double) Integer.MAX_VALUE;
+       }
+       
+       // Convert a routing location to an integer key
+       public static int locationToKey (double location)
+       {
+               return (int) (location * Integer.MAX_VALUE);
+       }
+       
+       // Called by Peer
+       public void startTimer()
+       {
+               if (timerRunning) return;
+               log ("starting retransmission timer");
+               Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
+               timerRunning = true;
+       }
+       
        // Called by NetworkInterface
        public void handlePacket (Packet packet)
        {
@@ -71,7 +96,7 @@
                        if (rs != null) rs.nexts.remove (prev);
                        return;
                }
-               if (cache.contains (r.key)) {
+               if (cache.get (r.key)) {
                        log ("key " + r.key + " found in cache");
                        if (prev == null)
                                log (r + " succeeded locally");
@@ -89,7 +114,7 @@
                        log ("unexpected " + r);
                        return;
                }
-               cache.add (r.key);
+               cache.put (r.key);
                if (rs.prev == null) log (rs + " succeeded");
                else {
                        log ("forwarding " + r);
@@ -133,19 +158,37 @@
        {
                if (requestsGenerated++ > 1000) return;
                // Send a request to a random location
-               Request r = new Request (0.1);
+               Request r = new Request (123456);
                log ("generating request " + r.id);
                handleRequest (r, null);
                // Schedule the next request
-               // Event.schedule (this, Math.random(), GENERATE_REQUEST, null);
+               Event.schedule (this, 0.01, GENERATE_REQUEST, null);
        }

+       // Event callback
+       private void checkTimeouts()
+       {
+               boolean stopTimer = true;
+               for (Peer p : peers.values())
+                       if (p.checkTimeouts()) stopTimer = false;
+               if (stopTimer) {
+                       log ("stopping retransmission timer");
+                       timerRunning = false;
+               }
+               else {
+                       Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
+                       timerRunning = true;
+               }
+       }
+       
        // EventTarget interface
        public void handleEvent (int type, Object data)
        {
                if (type == GENERATE_REQUEST) generateRequest();
+               else if (type == CHECK_TIMEOUTS) checkTimeouts();
        }

        // Each EventTarget class has its own event codes
        public final static int GENERATE_REQUEST = 1;
+       public final static int CHECK_TIMEOUTS = 2;
 }

Modified: trunk/apps/load-balancing-sims/phase5/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Peer.java     2006-08-03 16:44:56 UTC 
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/Peer.java     2006-08-03 17:23:52 UTC 
(rev 9866)
@@ -2,7 +2,7 @@
 import java.util.Iterator;
 import java.util.NoSuchElementException;

-class Peer implements EventTarget
+class Peer
 {
        private Node node; // The local node
        public int address; // The remote node's address
@@ -10,7 +10,6 @@
        private double latency; // The latency of the connection in seconds

        // Retransmission parameters
-       public final static double TIMER = 0.5; // Coarse-grained timer, seconds
        public final static double RTO = 4.0; // Retransmission timeout in RTTs
        public final static double FRTO = 1.5; // Fast retx timeout in RTTs
        public final static double RTT_DECAY = 0.9; // Exp moving average
@@ -29,7 +28,7 @@
        private boolean slowStart = true; // Are we in the slow start phase?
        private double rtt = 3.0; // Estimated round-trip time in seconds
        private double lastTransmission = 0.0; // Clock time
-       private boolean timerRunning = false; // Is the retx timer running?
+       private double lastLeftSlowStart = 0.0; // Clock time
        private int inflight = 0; // Bytes sent but not acked
        private int txSeq = 0; // Sequence number of next outgoing packet
        private LinkedList<DataPacket> txBuffer; // Retransmission buffer
@@ -126,12 +125,8 @@
                inflight += p.size;
                log (inflight + " bytes in flight");
                txBuffer.add (p);
-               // Start the coarse-grained retransmission timer if necessary
-               if (!timerRunning) {
-                       log ("starting retransmission timer");
-                       Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
-                       timerRunning = true;
-               }
+               // Start the node's retransmission timer if necessary
+               node.startTimer();
                return true;
        }

@@ -248,6 +243,7 @@
                if (slowStart) {
                        log ("leaving slow start");
                        slowStart = false;
+                       lastLeftSlowStart = now;
                }
        }

@@ -263,15 +259,13 @@
                Event.log (node.net.address + ":" + address + " " + message);
        }

-       // Event callback
-       private void checkTimeouts()
+       // Called by Node
+       public boolean checkTimeouts()
        {
                log ("checking timeouts");
-               // If there are no packets in flight, stop the timer
                if (txBuffer.isEmpty()) {
-                       log ("stopping retransmission timer");
-                       timerRunning = false;
-                       return;
+                       log ("no packets in flight");
+                       return false;
                }
                double now = Event.time();
                for (DataPacket p : txBuffer) {
@@ -281,20 +275,19 @@
                                log ("retransmitting packet " + p.seq);
                                log (inflight + " bytes in flight");
                                node.net.send (p, address, latency);
-                               // Note: TCP would return to slow start
-                               decreaseCongestionWindow (now);
+                               // Return to slow start
+                               if (!slowStart &&
+                               now - lastLeftSlowStart > RTO * rtt) {
+                                       log ("returning to slow start");
+                                       cwind = MIN_CWIND;
+                                       slowStart = true;
+                               }
+                               else {
+                                       log ("not returning to slow start");
+                                       decreaseCongestionWindow (now);
+                               }
                        }
                }
-               // Reset the timer
-               Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
+               return true;
        }
-       
-       // EventTarget interface
-       public void handleEvent (int type, Object data)
-       {
-               if (type == CHECK_TIMEOUTS) checkTimeouts();
-       }
-       
-       // Each EventTarget class has its own event codes
-       private final static int CHECK_TIMEOUTS = 1;
 }

Modified: trunk/apps/load-balancing-sims/phase5/Request.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Request.java  2006-08-03 16:44:56 UTC 
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/Request.java  2006-08-03 17:23:52 UTC 
(rev 9866)
@@ -3,10 +3,10 @@
        private static int nextId = 0;

        public final int id; // The unique ID of the request
-       public final double key; // The requested key (as a routing location)
+       public final int key; // The requested key

        // Start a new request
-       public Request (double key)
+       public Request (int key)
        {
                id = nextId++;
                this.key = key;
@@ -14,7 +14,7 @@
        }

        // Forward a request
-       public Request (int id, double key)
+       public Request (int id, int key)
        {
                this.id = id;
                this.key = key;

Modified: trunk/apps/load-balancing-sims/phase5/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/RequestState.java     2006-08-03 
16:44:56 UTC (rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/RequestState.java     2006-08-03 
17:23:52 UTC (rev 9866)
@@ -6,7 +6,7 @@
 class RequestState
 {
        public final int id; // The unique ID of the request
-       public final double key; // The requested key (as a routing location)
+       public final int key; // The requested key
        public final Peer prev; // The previous hop of the request
        public final HashSet<Peer> nexts; // Possible next hops

@@ -22,10 +22,11 @@
        // Returns the closest peer to the requested key
        public Peer closestPeer()
        {
+               double keyLoc = Node.keyToLocation (key);
                double bestDist = Double.POSITIVE_INFINITY;
                Peer bestPeer = null;
                for (Peer peer : nexts) {
-                       double dist = Node.distance (key, peer.location);
+                       double dist = Node.distance (keyLoc, peer.location);
                        if (dist < bestDist) {
                                bestDist = dist;
                                bestPeer = peer;

Modified: trunk/apps/load-balancing-sims/phase5/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Response.java 2006-08-03 16:44:56 UTC 
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/Response.java 2006-08-03 17:23:52 UTC 
(rev 9866)
@@ -1,9 +1,9 @@
 class Response extends Message
 {
        public final int id; // The unique ID of the request
-       public final double key; // The requested key (as a routing location)
+       public final int key; // The requested key

-       public Response (int id, double key)
+       public Response (int id, int key)
        {
                this.id = id;
                this.key = key;


Reply via email to