So when we accept a request for a peer, and it completes, we then
allocate it another token?

On Thu, Nov 02, 2006 at 06:50:37PM +0000, mrogers at freenetproject.org wrote:
> Author: mrogers
> Date: 2006-11-02 18:50:28 +0000 (Thu, 02 Nov 2006)
> New Revision: 10797
> 
> Added:
>    trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
> Modified:
>    trunk/apps/load-balancing-sims/phase7/sim/Node.java
>    trunk/apps/load-balancing-sims/phase7/sim/Peer.java
>    trunk/apps/load-balancing-sims/phase7/sim/Sim.java
>    trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
>    trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
>    trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
>    trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
>    trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
>    trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> Log:
> Token passing (not tested yet)
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Node.java       2006-11-02 
> 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Node.java       2006-11-02 
> 18:50:28 UTC (rev 10797)
> @@ -11,6 +11,10 @@
>       // Coarse-grained retransmission timer
>       public final static double RETX_TIMER = 0.1; // Seconds
>       
> +     // Flow control
> +     public final static int FLOW_TOKENS = 20; // Shared by all peers
> +     public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
> +     
>       public double location; // Routing location
>       public NetworkInterface net;
>       private HashMap<Integer,Peer> peers; // Look up a peer by its address
> @@ -25,6 +29,7 @@
>       private boolean decrementMinHtl = false;
>       public TokenBucket bandwidth; // Bandwidth limiter
>       private boolean timerRunning = false;
> +     private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
>       
>       public Node (double txSpeed, double rxSpeed)
>       {
> @@ -45,7 +50,9 @@
>               pubKeyCache = new LruCache<Integer> (16000);
>               if (Math.random() < 0.5) decrementMaxHtl = true;
>               if (Math.random() < 0.25) decrementMinHtl = true;
> -             bandwidth = new TokenBucket (15000, 60000);
> +             bandwidth = new TokenBucket (40000, 60000);
> +             // Allocate flow control tokens after a short delay
> +             Event.schedule (this, Math.random(), ALLOCATE_TOKENS, null);
>       }
>       
>       // Return true if a connection was added, false if already connected
> @@ -173,7 +180,9 @@
>       public void handleMessage (Message m, Peer src)
>       {
>               if (src != null) log ("received " + m + " from " + src);
> -             if (m instanceof ChkRequest)
> +             if (m instanceof Token)
> +                     handleToken ((Token) m, src);
> +             else if (m instanceof ChkRequest)
>                       handleChkRequest ((ChkRequest) m, src);
>               else if (m instanceof ChkInsert)
>                       handleChkInsert ((ChkInsert) m, src);
> @@ -188,6 +197,11 @@
>               }
>       }
>       
> +     private void handleToken (Token t, Peer prev)
> +     {
> +             prev.tokensOut += t.id; // t.id is the number of tokens
> +     }
> +     
>       private void handleChkRequest (ChkRequest r, Peer prev)
>       {
>               if (!recentlySeenRequests.add (r.id)) {
> @@ -198,6 +212,7 @@
>                       if (mh != null) mh.removeNextHop (prev);
>                       return;
>               }
> +             if (!getToken (prev)) return;
>               // Accept the search
>               if (prev != null) {
>                       log ("accepting " + r);
> @@ -212,6 +227,7 @@
>                               for (int i = 0; i < 32; i++)
>                                       prev.sendMessage (new Block (r.id, i));
>                       }
> +                     allocateToken (prev);
>                       return;
>               }
>               log ("key " + r.key + " not found in CHK store");
> @@ -224,6 +240,7 @@
>                               for (int i = 0; i < 32; i++)
>                                       prev.sendMessage (new Block (r.id, i));
>                       }
> +                     allocateToken (prev);
>                       return;
>               }
>               log ("key " + r.key + " not found in CHK cache");
> @@ -243,6 +260,7 @@
>                       if (mh != null) mh.removeNextHop (prev);
>                       return;
>               }
> +             if (!getToken (prev)) return;
>               // Accept the search
>               if (prev != null) {
>                       log ("accepting " + i);
> @@ -264,6 +282,7 @@
>                       if (mh != null) mh.removeNextHop (prev);
>                       return;
>               }
> +             if (!getToken (prev)) return;
>               // Look up the public key
>               boolean pub = pubKeyCache.get (r.key);
>               if (pub) log ("public key " + r.key + " found in cache");
> @@ -284,6 +303,7 @@
>                                       prev.sendMessage
>                                               (new SskPubKey (r.id, r.key));
>                       }
> +                     allocateToken (prev);
>                       return;
>               }
>               log ("key " + r.key + " not found in SSK store");
> @@ -298,6 +318,7 @@
>                                       prev.sendMessage
>                                               (new SskPubKey (r.id, r.key));
>                       }
> +                     allocateToken (prev);
>                       return;
>               }
>               log ("key " + r.key + " not found in SSK cache");
> @@ -317,6 +338,7 @@
>                       if (mh != null) mh.removeNextHop (prev);
>                       return;
>               }
> +             if (!getToken (prev)) return;
>               // Look up the public key
>               boolean pub = pubKeyCache.get (i.key);
>               if (pub) log ("public key " + i.key + " found in cache");
> @@ -336,9 +358,45 @@
>       {
>               MessageHandler mh = messageHandlers.remove (id);
>               if (mh == null) log ("no message handler to remove for " + id);
> -             else log ("removing message handler for " + id);
> +             else {
> +                     log ("removing message handler for " + id);
> +                     allocateToken (mh.prev);
> +             }
>       }
>       
> +     // Check whether the peer sendng a request or insert has enough tokens
> +     private boolean getToken (Peer p)
> +     {
> +             if (p == null) {
> +                     if (spareTokens == 0) {
> +                             // The client will have to wait
> +                             log ("not enough tokens");
> +                             return false;
> +                     }
> +                     spareTokens--;
> +                     return true;
> +             }
> +             else {
> +                     if (p.tokensIn == 0) {
> +                             // This indicates a misbehaving sender
> +                             log ("WARNING: not enough tokens");
> +                             return false;
> +                     }
> +                     p.tokensIn--;
> +                     return true;
> +             }
> +     }
> +     
> +     // Give another token to the peer whose request/insert just completed
> +     private void allocateToken (Peer p)
> +     {
> +             if (p == null) spareTokens++;
> +             else {
> +                     p.tokensIn++;
> +                     p.sendMessage (new Token (1));
> +             }
> +     }
> +     
>       // Return the list of peers in a random order
>       public ArrayList<Peer> peers()
>       {
> @@ -352,16 +410,14 @@
>               Event.log (net.address + " " + message);
>       }
>       
> -     // Event callbacks
> -     
> -     private void generateChkRequest (int key)
> +     public void generateChkRequest (int key)
>       {
>               ChkRequest cr = new ChkRequest (key, location);
>               log ("generating " + cr);
>               handleChkRequest (cr, null);
>       }
>       
> -     private void generateChkInsert (int key)
> +     public void generateChkInsert (int key)
>       {
>               ChkInsert ci = new ChkInsert (key, location);
>               log ("generating " + ci);
> @@ -371,14 +427,14 @@
>                       handleMessage (new Block (ci.id, i), null);
>       }
>       
> -     private void generateSskRequest (int key)
> +     public void generateSskRequest (int key)
>       {
>               SskRequest sr = new SskRequest (key, location, true);
>               log ("generating " + sr);
>               handleSskRequest (sr, null);
>       }
>       
> -     private void generateSskInsert (int key, int value)
> +     public void generateSskInsert (int key, int value)
>       {
>               SskInsert si = new SskInsert (key, value, location);
>               log ("generating " + si);
> @@ -397,6 +453,18 @@
>               else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
>       }
>       
> +     // Allocate all flow control tokens at startup
> +     private void allocateTokens()
> +     {
> +             // Rounding error in your favour - collect 50 tokens
> +             int tokensPerPeer = FLOW_TOKENS / (peers.size() + 1);
> +             for (Peer p : peers.values()) {
> +                     p.tokensIn += tokensPerPeer;
> +                     spareTokens -= tokensPerPeer;
> +                     p.sendMessage (new Token (tokensPerPeer));
> +             }
> +     }
> +     
>       // EventTarget interface
>       public void handleEvent (int type, Object data)
>       {
> @@ -424,6 +492,10 @@
>                       case CHECK_TIMEOUTS:
>                       checkTimeouts();
>                       break;
> +                     
> +                     case ALLOCATE_TOKENS:
> +                     allocateTokens();
> +                     break;
>               }
>       }
>       
> @@ -433,4 +505,5 @@
>       public final static int INSERT_SSK = 4;
>       public final static int SSK_COLLISION = 5;
>       private final static int CHECK_TIMEOUTS = 6;
> +     private final static int ALLOCATE_TOKENS = 7;
>  }
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Peer.java       2006-11-02 
> 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java       2006-11-02 
> 18:50:28 UTC (rev 10797)
> @@ -42,6 +42,10 @@
>       private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
>       private int rxSeq = 0; // Sequence number of next in-order incoming pkt
>       
> +     // Flow control
> +     public int tokensOut = 0; // How many requests/inserts can we send?
> +     public int tokensIn = 0; // How many requests/inserts should we accept?
> +     
>       public Peer (Node node, int address, double location, double latency)
>       {
>               this.node = node;
> @@ -201,7 +205,7 @@
>                       sendAck (p.seq);
>               }
>               // This indicates a misbehaving sender - discard the packet
> -             else log ("warning: sequence number out of range");
> +             else log ("WARNING: sequence number out of range");
>       }
>       
>       private void handleAck (Ack a)
> 
> Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/Sim.java        2006-11-02 
> 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java        2006-11-02 
> 18:50:28 UTC (rev 10797)
> @@ -5,7 +5,7 @@
>  {
>       private final int NODES = 100; // Number of nodes
>       private final int DEGREE = 5; // Average degree
> -     private final double SPEED = 40000; // Network speed, bytes per second
> +     private final double SPEED = 15000; // Network speed, bytes per second
>       private final double LATENCY = 0.1; // Latency of all links in seconds
>       private final int INSERTS = 100; // Number of inserts per publisher
>       private Node[] nodes;
> 
> Modified: 
> trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java 
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java 
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -36,7 +36,7 @@
>       {
>               // Insert a random key
>               int key = Node.locationToKey (Math.random());
> -             Event.schedule (node, 0.0, Node.INSERT_CHK, key);
> +             node.generateChkInsert (key);
>               // Inform each reader after an average of ten minutes
>               for (Node n : readers) {
>                       double delay = 595.0 + Math.random() * 10.0;
> 
> Modified: 
> trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java  
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java  
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -62,7 +62,7 @@
>               // Start the search
>               forwardSearch();
>               // If we have all the blocks and the headers, consider finishing
> -             if (blocksReceived == 32 && inState == TRANSFERRING) {
> +             if (blocksReceived == 32) {
>                       inState = COMPLETED;
>                       considerFinishing();
>               }
> @@ -110,6 +110,7 @@
>       private void handleRejectedLoop (RejectedLoop rl)
>       {
>               if (searchState != SENT) node.log (rl + " out of order");
> +             next.tokensOut++; // No token was consumed
>               forwardSearch();
>       }
>       
> @@ -158,6 +159,9 @@
>               > Node.distance (target, closest))
>                       htl = node.decrementHtl (htl);
>               node.log (this + " has htl " + htl);
> +             // Consume a token
> +             next.tokensOut--;
> +             // Forward the search
>               node.log ("forwarding " + this + " to " + next.address);
>               next.sendMessage (makeSearchMessage());
>               nexts.remove (next);
> 
> Modified: 
> trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java 
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java 
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -42,7 +42,7 @@
>               searchState = TRANSFERRING;
>               if (prev != null) prev.sendMessage (df); // Forward the message
>               // If we have all the blocks and the headers, cache the data
> -             if (blocksReceived == 32 && searchState == TRANSFERRING) {
> +             if (blocksReceived == 32) {
>                       node.cacheChk (key);
>                       if (prev == null) node.log (this + " succeeded");
>                       finish();
> 
> Modified: 
> trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java    
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java    
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -21,8 +21,8 @@
>       protected double closest; // The closest location seen so far
>       protected int htl; // Hops to live for backtracking
>       
> -     protected Node node; // The owner of this MessageHandler
> -     protected Peer prev; // The previous hop of the search
> +     public final Node node; // The owner of this MessageHandler
> +     public final Peer prev; // The previous hop of the search
>       protected Peer next = null; // The (current) next hop of the search
>       protected LinkedList<Peer> nexts; // Candidates for the next hop
>       protected int searchState = STARTED; // The state of the search
> @@ -60,13 +60,17 @@
>               double closestDist = Double.POSITIVE_INFINITY;
>               Peer closestPeer = null;
>               for (Peer peer : nexts) {
> +                     if (peer.tokensOut == 0) {
> +                             node.log ("bypassing busy peer " + peer);
> +                             continue;
> +                     }
>                       double dist = Node.distance (keyLoc, peer.location);
>                       if (dist < closestDist) {
>                               closestDist = dist;
>                               closestPeer = peer;
>                       }
>               }
> -             return closestPeer; // Null if the list was empty
> +             return closestPeer; // Null if there are no suitable peers
>       }
>       
>       public abstract void handleMessage (Message m, Peer src);
> 
> Modified: 
> trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java    
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java    
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -28,6 +28,7 @@
>       protected void handleRejectedLoop (RejectedLoop rl)
>       {
>               if (searchState != SENT) node.log (rl + " out of order");
> +             next.tokensOut++; // No token was consumed
>               forwardSearch();
>       }
>       
> @@ -73,6 +74,9 @@
>               > Node.distance (target, closest))
>                       htl = node.decrementHtl (htl);
>               node.log (this + " has htl " + htl);
> +             // Consume a token
> +             next.tokensOut--;
> +             // Forward the search
>               node.log ("forwarding " + this + " to " + next.address);
>               next.sendMessage (makeSearchMessage());
>               nexts.remove (next);
> 
> Modified: 
> trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java  
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java  
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -89,6 +89,7 @@
>       private void handleRejectedLoop (RejectedLoop rl)
>       {
>               if (searchState != SENT) node.log (rl + " out of order");
> +             next.tokensOut++; // No token was consumed
>               forwardSearch();
>       }
>       
> @@ -142,6 +143,9 @@
>               > Node.distance (target, closest))
>                       htl = node.decrementHtl (htl);
>               node.log (this + " has htl " + htl);
> +             // Consume a token
> +             next.tokensOut--;
> +             // Forward the search
>               node.log ("forwarding " + this + " to " + next.address);
>               next.sendMessage (makeSearchMessage());
>               nexts.remove (next);
> 
> Added: trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java
> ===================================================================
> --- trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java     
> 2006-11-02 15:00:48 UTC (rev 10796)
> +++ trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java     
> 2006-11-02 18:50:28 UTC (rev 10797)
> @@ -0,0 +1,14 @@
> +package sim.messages;
> +
> +public class Token extends Message
> +{
> +     public Token (int tokens)
> +     {
> +             id = tokens; // Space-saving hack
> +     }
> +     
> +     public String toString()
> +     {
> +             return new String (id + " tokens");
> +     }
> +}
> 
> _______________________________________________
> cvs mailing list
> cvs at freenetproject.org
> http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 189 bytes
Desc: Digital signature
URL: 
<https://emu.freenetproject.org/pipermail/devl/attachments/20061102/20284431/attachment.pgp>

Reply via email to