Author: mrogers
Date: 2006-11-02 18:50:28 +0000 (Thu, 02 Nov 2006)
New Revision: 10797

Added:
   trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/Sim.java
   trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
Log:
Token passing (not tested yet)

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-02 15:00:48 UTC 
(rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-02 18:50:28 UTC 
(rev 10797)
@@ -11,6 +11,10 @@
        // Coarse-grained retransmission timer
        public final static double RETX_TIMER = 0.1; // Seconds

+       // Flow control
+       public final static int FLOW_TOKENS = 20; // Shared by all peers
+       public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
+       
        public double location; // Routing location
        public NetworkInterface net;
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
@@ -25,6 +29,7 @@
        private boolean decrementMinHtl = false;
        public TokenBucket bandwidth; // Bandwidth limiter
        private boolean timerRunning = false;
+       private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer

        public Node (double txSpeed, double rxSpeed)
        {
@@ -45,7 +50,9 @@
                pubKeyCache = new LruCache<Integer> (16000);
                if (Math.random() < 0.5) decrementMaxHtl = true;
                if (Math.random() < 0.25) decrementMinHtl = true;
-               bandwidth = new TokenBucket (15000, 60000);
+               bandwidth = new TokenBucket (40000, 60000);
+               // Allocate flow control tokens after a short delay
+               Event.schedule (this, Math.random(), ALLOCATE_TOKENS, null);
        }

        // Return true if a connection was added, false if already connected
@@ -173,7 +180,9 @@
        public void handleMessage (Message m, Peer src)
        {
                if (src != null) log ("received " + m + " from " + src);
-               if (m instanceof ChkRequest)
+               if (m instanceof Token)
+                       handleToken ((Token) m, src);
+               else if (m instanceof ChkRequest)
                        handleChkRequest ((ChkRequest) m, src);
                else if (m instanceof ChkInsert)
                        handleChkInsert ((ChkInsert) m, src);
@@ -188,6 +197,11 @@
                }
        }

+       private void handleToken (Token t, Peer prev)
+       {
+               prev.tokensOut += t.id; // t.id is the number of tokens
+       }
+       
        private void handleChkRequest (ChkRequest r, Peer prev)
        {
                if (!recentlySeenRequests.add (r.id)) {
@@ -198,6 +212,7 @@
                        if (mh != null) mh.removeNextHop (prev);
                        return;
                }
+               if (!getToken (prev)) return;
                // Accept the search
                if (prev != null) {
                        log ("accepting " + r);
@@ -212,6 +227,7 @@
                                for (int i = 0; i < 32; i++)
                                        prev.sendMessage (new Block (r.id, i));
                        }
+                       allocateToken (prev);
                        return;
                }
                log ("key " + r.key + " not found in CHK store");
@@ -224,6 +240,7 @@
                                for (int i = 0; i < 32; i++)
                                        prev.sendMessage (new Block (r.id, i));
                        }
+                       allocateToken (prev);
                        return;
                }
                log ("key " + r.key + " not found in CHK cache");
@@ -243,6 +260,7 @@
                        if (mh != null) mh.removeNextHop (prev);
                        return;
                }
+               if (!getToken (prev)) return;
                // Accept the search
                if (prev != null) {
                        log ("accepting " + i);
@@ -264,6 +282,7 @@
                        if (mh != null) mh.removeNextHop (prev);
                        return;
                }
+               if (!getToken (prev)) return;
                // Look up the public key
                boolean pub = pubKeyCache.get (r.key);
                if (pub) log ("public key " + r.key + " found in cache");
@@ -284,6 +303,7 @@
                                        prev.sendMessage
                                                (new SskPubKey (r.id, r.key));
                        }
+                       allocateToken (prev);
                        return;
                }
                log ("key " + r.key + " not found in SSK store");
@@ -298,6 +318,7 @@
                                        prev.sendMessage
                                                (new SskPubKey (r.id, r.key));
                        }
+                       allocateToken (prev);
                        return;
                }
                log ("key " + r.key + " not found in SSK cache");
@@ -317,6 +338,7 @@
                        if (mh != null) mh.removeNextHop (prev);
                        return;
                }
+               if (!getToken (prev)) return;
                // Look up the public key
                boolean pub = pubKeyCache.get (i.key);
                if (pub) log ("public key " + i.key + " found in cache");
@@ -336,9 +358,45 @@
        {
                MessageHandler mh = messageHandlers.remove (id);
                if (mh == null) log ("no message handler to remove for " + id);
-               else log ("removing message handler for " + id);
+               else {
+                       log ("removing message handler for " + id);
+                       allocateToken (mh.prev);
+               }
        }

+       // Check whether the peer sendng a request or insert has enough tokens
+       private boolean getToken (Peer p)
+       {
+               if (p == null) {
+                       if (spareTokens == 0) {
+                               // The client will have to wait
+                               log ("not enough tokens");
+                               return false;
+                       }
+                       spareTokens--;
+                       return true;
+               }
+               else {
+                       if (p.tokensIn == 0) {
+                               // This indicates a misbehaving sender
+                               log ("WARNING: not enough tokens");
+                               return false;
+                       }
+                       p.tokensIn--;
+                       return true;
+               }
+       }
+       
+       // Give another token to the peer whose request/insert just completed
+       private void allocateToken (Peer p)
+       {
+               if (p == null) spareTokens++;
+               else {
+                       p.tokensIn++;
+                       p.sendMessage (new Token (1));
+               }
+       }
+       
        // Return the list of peers in a random order
        public ArrayList<Peer> peers()
        {
@@ -352,16 +410,14 @@
                Event.log (net.address + " " + message);
        }

-       // Event callbacks
-       
-       private void generateChkRequest (int key)
+       public void generateChkRequest (int key)
        {
                ChkRequest cr = new ChkRequest (key, location);
                log ("generating " + cr);
                handleChkRequest (cr, null);
        }

-       private void generateChkInsert (int key)
+       public void generateChkInsert (int key)
        {
                ChkInsert ci = new ChkInsert (key, location);
                log ("generating " + ci);
@@ -371,14 +427,14 @@
                        handleMessage (new Block (ci.id, i), null);
        }

-       private void generateSskRequest (int key)
+       public void generateSskRequest (int key)
        {
                SskRequest sr = new SskRequest (key, location, true);
                log ("generating " + sr);
                handleSskRequest (sr, null);
        }

-       private void generateSskInsert (int key, int value)
+       public void generateSskInsert (int key, int value)
        {
                SskInsert si = new SskInsert (key, value, location);
                log ("generating " + si);
@@ -397,6 +453,18 @@
                else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
        }

+       // Allocate all flow control tokens at startup
+       private void allocateTokens()
+       {
+               // Rounding error in your favour - collect 50 tokens
+               int tokensPerPeer = FLOW_TOKENS / (peers.size() + 1);
+               for (Peer p : peers.values()) {
+                       p.tokensIn += tokensPerPeer;
+                       spareTokens -= tokensPerPeer;
+                       p.sendMessage (new Token (tokensPerPeer));
+               }
+       }
+       
        // EventTarget interface
        public void handleEvent (int type, Object data)
        {
@@ -424,6 +492,10 @@
                        case CHECK_TIMEOUTS:
                        checkTimeouts();
                        break;
+                       
+                       case ALLOCATE_TOKENS:
+                       allocateTokens();
+                       break;
                }
        }

@@ -433,4 +505,5 @@
        public final static int INSERT_SSK = 4;
        public final static int SSK_COLLISION = 5;
        private final static int CHECK_TIMEOUTS = 6;
+       private final static int ALLOCATE_TOKENS = 7;
 }

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-02 15:00:48 UTC 
(rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-02 18:50:28 UTC 
(rev 10797)
@@ -42,6 +42,10 @@
        private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
        private int rxSeq = 0; // Sequence number of next in-order incoming pkt

+       // Flow control
+       public int tokensOut = 0; // How many requests/inserts can we send?
+       public int tokensIn = 0; // How many requests/inserts should we accept?
+       
        public Peer (Node node, int address, double location, double latency)
        {
                this.node = node;
@@ -201,7 +205,7 @@
                        sendAck (p.seq);
                }
                // This indicates a misbehaving sender - discard the packet
-               else log ("warning: sequence number out of range");
+               else log ("WARNING: sequence number out of range");
        }

        private void handleAck (Ack a)

Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Sim.java  2006-11-02 15:00:48 UTC 
(rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java  2006-11-02 18:50:28 UTC 
(rev 10797)
@@ -5,7 +5,7 @@
 {
        private final int NODES = 100; // Number of nodes
        private final int DEGREE = 5; // Average degree
-       private final double SPEED = 40000; // Network speed, bytes per second
+       private final double SPEED = 15000; // Network speed, bytes per second
        private final double LATENCY = 0.1; // Latency of all links in seconds
        private final int INSERTS = 100; // Number of inserts per publisher
        private Node[] nodes;

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java   
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java   
2006-11-02 18:50:28 UTC (rev 10797)
@@ -36,7 +36,7 @@
        {
                // Insert a random key
                int key = Node.locationToKey (Math.random());
-               Event.schedule (node, 0.0, Node.INSERT_CHK, key);
+               node.generateChkInsert (key);
                // Inform each reader after an average of ten minutes
                for (Node n : readers) {
                        double delay = 595.0 + Math.random() * 10.0;

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-02 18:50:28 UTC (rev 10797)
@@ -62,7 +62,7 @@
                // Start the search
                forwardSearch();
                // If we have all the blocks and the headers, consider finishing
-               if (blocksReceived == 32 && inState == TRANSFERRING) {
+               if (blocksReceived == 32) {
                        inState = COMPLETED;
                        considerFinishing();
                }
@@ -110,6 +110,7 @@
        private void handleRejectedLoop (RejectedLoop rl)
        {
                if (searchState != SENT) node.log (rl + " out of order");
+               next.tokensOut++; // No token was consumed
                forwardSearch();
        }

@@ -158,6 +159,9 @@
                > Node.distance (target, closest))
                        htl = node.decrementHtl (htl);
                node.log (this + " has htl " + htl);
+               // Consume a token
+               next.tokensOut--;
+               // Forward the search
                node.log ("forwarding " + this + " to " + next.address);
                next.sendMessage (makeSearchMessage());
                nexts.remove (next);

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-02 18:50:28 UTC (rev 10797)
@@ -42,7 +42,7 @@
                searchState = TRANSFERRING;
                if (prev != null) prev.sendMessage (df); // Forward the message
                // If we have all the blocks and the headers, cache the data
-               if (blocksReceived == 32 && searchState == TRANSFERRING) {
+               if (blocksReceived == 32) {
                        node.cacheChk (key);
                        if (prev == null) node.log (this + " succeeded");
                        finish();

Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-02 18:50:28 UTC (rev 10797)
@@ -21,8 +21,8 @@
        protected double closest; // The closest location seen so far
        protected int htl; // Hops to live for backtracking

-       protected Node node; // The owner of this MessageHandler
-       protected Peer prev; // The previous hop of the search
+       public final Node node; // The owner of this MessageHandler
+       public final Peer prev; // The previous hop of the search
        protected Peer next = null; // The (current) next hop of the search
        protected LinkedList<Peer> nexts; // Candidates for the next hop
        protected int searchState = STARTED; // The state of the search
@@ -60,13 +60,17 @@
                double closestDist = Double.POSITIVE_INFINITY;
                Peer closestPeer = null;
                for (Peer peer : nexts) {
+                       if (peer.tokensOut == 0) {
+                               node.log ("bypassing busy peer " + peer);
+                               continue;
+                       }
                        double dist = Node.distance (keyLoc, peer.location);
                        if (dist < closestDist) {
                                closestDist = dist;
                                closestPeer = peer;
                        }
                }
-               return closestPeer; // Null if the list was empty
+               return closestPeer; // Null if there are no suitable peers
        }

        public abstract void handleMessage (Message m, Peer src);

Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java      
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java      
2006-11-02 18:50:28 UTC (rev 10797)
@@ -28,6 +28,7 @@
        protected void handleRejectedLoop (RejectedLoop rl)
        {
                if (searchState != SENT) node.log (rl + " out of order");
+               next.tokensOut++; // No token was consumed
                forwardSearch();
        }

@@ -73,6 +74,9 @@
                > Node.distance (target, closest))
                        htl = node.decrementHtl (htl);
                node.log (this + " has htl " + htl);
+               // Consume a token
+               next.tokensOut--;
+               // Forward the search
                node.log ("forwarding " + this + " to " + next.address);
                next.sendMessage (makeSearchMessage());
                nexts.remove (next);

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java    
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java    
2006-11-02 18:50:28 UTC (rev 10797)
@@ -89,6 +89,7 @@
        private void handleRejectedLoop (RejectedLoop rl)
        {
                if (searchState != SENT) node.log (rl + " out of order");
+               next.tokensOut++; // No token was consumed
                forwardSearch();
        }

@@ -142,6 +143,9 @@
                > Node.distance (target, closest))
                        htl = node.decrementHtl (htl);
                node.log (this + " has htl " + htl);
+               // Consume a token
+               next.tokensOut--;
+               // Forward the search
                node.log ("forwarding " + this + " to " + next.address);
                next.sendMessage (makeSearchMessage());
                nexts.remove (next);

Added: trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java       
2006-11-02 15:00:48 UTC (rev 10796)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java       
2006-11-02 18:50:28 UTC (rev 10797)
@@ -0,0 +1,14 @@
+package sim.messages;
+
+public class Token extends Message
+{
+       public Token (int tokens)
+       {
+               id = tokens; // Space-saving hack
+       }
+       
+       public String toString()
+       {
+               return new String (id + " tokens");
+       }
+}


Reply via email to