So when we accept a request for a peer, and it completes, we then allocate it another token?
On Thu, Nov 02, 2006 at 06:50:37PM +0000, mrogers at freenetproject.org wrote: > Author: mrogers > Date: 2006-11-02 18:50:28 +0000 (Thu, 02 Nov 2006) > New Revision: 10797 > > Added: > trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java > Modified: > trunk/apps/load-balancing-sims/phase7/sim/Node.java > trunk/apps/load-balancing-sims/phase7/sim/Peer.java > trunk/apps/load-balancing-sims/phase7/sim/Sim.java > trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java > trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java > trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java > trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java > trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java > trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java > Log: > Token passing (not tested yet) > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-02 > 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-02 > 18:50:28 UTC (rev 10797) > @@ -11,6 +11,10 @@ > // Coarse-grained retransmission timer > public final static double RETX_TIMER = 0.1; // Seconds > > + // Flow control > + public final static int FLOW_TOKENS = 20; // Shared by all peers > + public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens > + > public double location; // Routing location > public NetworkInterface net; > private HashMap<Integer,Peer> peers; // Look up a peer by its address > @@ -25,6 +29,7 @@ > private boolean decrementMinHtl = false; > public TokenBucket bandwidth; // Bandwidth limiter > private boolean timerRunning = false; > + private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer > > public Node (double txSpeed, double rxSpeed) > { > @@ -45,7 +50,9 @@ > pubKeyCache = new LruCache<Integer> (16000); > if (Math.random() < 0.5) decrementMaxHtl = true; > if (Math.random() < 0.25) decrementMinHtl = true; > - bandwidth = new TokenBucket (15000, 60000); > + bandwidth = new TokenBucket (40000, 60000); > + // Allocate flow control tokens after a short delay > + Event.schedule (this, Math.random(), ALLOCATE_TOKENS, null); > } > > // Return true if a connection was added, false if already connected > @@ -173,7 +180,9 @@ > public void handleMessage (Message m, Peer src) > { > if (src != null) log ("received " + m + " from " + src); > - if (m instanceof ChkRequest) > + if (m instanceof Token) > + handleToken ((Token) m, src); > + else if (m instanceof ChkRequest) > handleChkRequest ((ChkRequest) m, src); > else if (m instanceof ChkInsert) > handleChkInsert ((ChkInsert) m, src); > @@ -188,6 +197,11 @@ > } > } > > + private void handleToken (Token t, Peer prev) > + { > + prev.tokensOut += t.id; // t.id is the number of tokens > + } > + > private void handleChkRequest (ChkRequest r, Peer prev) > { > if (!recentlySeenRequests.add (r.id)) { > @@ -198,6 +212,7 @@ > if (mh != null) mh.removeNextHop (prev); > return; > } > + if (!getToken (prev)) return; > // Accept the search > if (prev != null) { > log ("accepting " + r); > @@ -212,6 +227,7 @@ > for (int i = 0; i < 32; i++) > prev.sendMessage (new Block (r.id, i)); > } > + allocateToken (prev); > return; > } > log ("key " + r.key + " not found in CHK store"); > @@ -224,6 +240,7 @@ > for (int i = 0; i < 32; i++) > prev.sendMessage (new Block (r.id, i)); > } > + allocateToken (prev); > return; > } > log ("key " + r.key + " not found in CHK cache"); > @@ -243,6 +260,7 @@ > if (mh != null) mh.removeNextHop (prev); > return; > } > + if (!getToken (prev)) return; > // Accept the search > if (prev != null) { > log ("accepting " + i); > @@ -264,6 +282,7 @@ > if (mh != null) mh.removeNextHop (prev); > return; > } > + if (!getToken (prev)) return; > // Look up the public key > boolean pub = pubKeyCache.get (r.key); > if (pub) log ("public key " + r.key + " found in cache"); > @@ -284,6 +303,7 @@ > prev.sendMessage > (new SskPubKey (r.id, r.key)); > } > + allocateToken (prev); > return; > } > log ("key " + r.key + " not found in SSK store"); > @@ -298,6 +318,7 @@ > prev.sendMessage > (new SskPubKey (r.id, r.key)); > } > + allocateToken (prev); > return; > } > log ("key " + r.key + " not found in SSK cache"); > @@ -317,6 +338,7 @@ > if (mh != null) mh.removeNextHop (prev); > return; > } > + if (!getToken (prev)) return; > // Look up the public key > boolean pub = pubKeyCache.get (i.key); > if (pub) log ("public key " + i.key + " found in cache"); > @@ -336,9 +358,45 @@ > { > MessageHandler mh = messageHandlers.remove (id); > if (mh == null) log ("no message handler to remove for " + id); > - else log ("removing message handler for " + id); > + else { > + log ("removing message handler for " + id); > + allocateToken (mh.prev); > + } > } > > + // Check whether the peer sendng a request or insert has enough tokens > + private boolean getToken (Peer p) > + { > + if (p == null) { > + if (spareTokens == 0) { > + // The client will have to wait > + log ("not enough tokens"); > + return false; > + } > + spareTokens--; > + return true; > + } > + else { > + if (p.tokensIn == 0) { > + // This indicates a misbehaving sender > + log ("WARNING: not enough tokens"); > + return false; > + } > + p.tokensIn--; > + return true; > + } > + } > + > + // Give another token to the peer whose request/insert just completed > + private void allocateToken (Peer p) > + { > + if (p == null) spareTokens++; > + else { > + p.tokensIn++; > + p.sendMessage (new Token (1)); > + } > + } > + > // Return the list of peers in a random order > public ArrayList<Peer> peers() > { > @@ -352,16 +410,14 @@ > Event.log (net.address + " " + message); > } > > - // Event callbacks > - > - private void generateChkRequest (int key) > + public void generateChkRequest (int key) > { > ChkRequest cr = new ChkRequest (key, location); > log ("generating " + cr); > handleChkRequest (cr, null); > } > > - private void generateChkInsert (int key) > + public void generateChkInsert (int key) > { > ChkInsert ci = new ChkInsert (key, location); > log ("generating " + ci); > @@ -371,14 +427,14 @@ > handleMessage (new Block (ci.id, i), null); > } > > - private void generateSskRequest (int key) > + public void generateSskRequest (int key) > { > SskRequest sr = new SskRequest (key, location, true); > log ("generating " + sr); > handleSskRequest (sr, null); > } > > - private void generateSskInsert (int key, int value) > + public void generateSskInsert (int key, int value) > { > SskInsert si = new SskInsert (key, value, location); > log ("generating " + si); > @@ -397,6 +453,18 @@ > else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null); > } > > + // Allocate all flow control tokens at startup > + private void allocateTokens() > + { > + // Rounding error in your favour - collect 50 tokens > + int tokensPerPeer = FLOW_TOKENS / (peers.size() + 1); > + for (Peer p : peers.values()) { > + p.tokensIn += tokensPerPeer; > + spareTokens -= tokensPerPeer; > + p.sendMessage (new Token (tokensPerPeer)); > + } > + } > + > // EventTarget interface > public void handleEvent (int type, Object data) > { > @@ -424,6 +492,10 @@ > case CHECK_TIMEOUTS: > checkTimeouts(); > break; > + > + case ALLOCATE_TOKENS: > + allocateTokens(); > + break; > } > } > > @@ -433,4 +505,5 @@ > public final static int INSERT_SSK = 4; > public final static int SSK_COLLISION = 5; > private final static int CHECK_TIMEOUTS = 6; > + private final static int ALLOCATE_TOKENS = 7; > } > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-02 > 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-02 > 18:50:28 UTC (rev 10797) > @@ -42,6 +42,10 @@ > private HashSet<Integer> rxDupe; // Detect duplicates by sequence number > private int rxSeq = 0; // Sequence number of next in-order incoming pkt > > + // Flow control > + public int tokensOut = 0; // How many requests/inserts can we send? > + public int tokensIn = 0; // How many requests/inserts should we accept? > + > public Peer (Node node, int address, double location, double latency) > { > this.node = node; > @@ -201,7 +205,7 @@ > sendAck (p.seq); > } > // This indicates a misbehaving sender - discard the packet > - else log ("warning: sequence number out of range"); > + else log ("WARNING: sequence number out of range"); > } > > private void handleAck (Ack a) > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-02 > 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-02 > 18:50:28 UTC (rev 10797) > @@ -5,7 +5,7 @@ > { > private final int NODES = 100; // Number of nodes > private final int DEGREE = 5; // Average degree > - private final double SPEED = 40000; // Network speed, bytes per second > + private final double SPEED = 15000; // Network speed, bytes per second > private final double LATENCY = 0.1; // Latency of all links in seconds > private final int INSERTS = 100; // Number of inserts per publisher > private Node[] nodes; > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java > 2006-11-02 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/generators/SimplePublisher.java > 2006-11-02 18:50:28 UTC (rev 10797) > @@ -36,7 +36,7 @@ > { > // Insert a random key > int key = Node.locationToKey (Math.random()); > - Event.schedule (node, 0.0, Node.INSERT_CHK, key); > + node.generateChkInsert (key); > // Inform each reader after an average of ten minutes > for (Node n : readers) { > double delay = 595.0 + Math.random() * 10.0; > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java > 2006-11-02 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java > 2006-11-02 18:50:28 UTC (rev 10797) > @@ -62,7 +62,7 @@ > // Start the search > forwardSearch(); > // If we have all the blocks and the headers, consider finishing > - if (blocksReceived == 32 && inState == TRANSFERRING) { > + if (blocksReceived == 32) { > inState = COMPLETED; > considerFinishing(); > } > @@ -110,6 +110,7 @@ > private void handleRejectedLoop (RejectedLoop rl) > { > if (searchState != SENT) node.log (rl + " out of order"); > + next.tokensOut++; // No token was consumed > forwardSearch(); > } > > @@ -158,6 +159,9 @@ > > Node.distance (target, closest)) > htl = node.decrementHtl (htl); > node.log (this + " has htl " + htl); > + // Consume a token > + next.tokensOut--; > + // Forward the search > node.log ("forwarding " + this + " to " + next.address); > next.sendMessage (makeSearchMessage()); > nexts.remove (next); > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java > 2006-11-02 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java > 2006-11-02 18:50:28 UTC (rev 10797) > @@ -42,7 +42,7 @@ > searchState = TRANSFERRING; > if (prev != null) prev.sendMessage (df); // Forward the message > // If we have all the blocks and the headers, cache the data > - if (blocksReceived == 32 && searchState == TRANSFERRING) { > + if (blocksReceived == 32) { > node.cacheChk (key); > if (prev == null) node.log (this + " succeeded"); > finish(); > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java > 2006-11-02 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java > 2006-11-02 18:50:28 UTC (rev 10797) > @@ -21,8 +21,8 @@ > protected double closest; // The closest location seen so far > protected int htl; // Hops to live for backtracking > > - protected Node node; // The owner of this MessageHandler > - protected Peer prev; // The previous hop of the search > + public final Node node; // The owner of this MessageHandler > + public final Peer prev; // The previous hop of the search > protected Peer next = null; // The (current) next hop of the search > protected LinkedList<Peer> nexts; // Candidates for the next hop > protected int searchState = STARTED; // The state of the search > @@ -60,13 +60,17 @@ > double closestDist = Double.POSITIVE_INFINITY; > Peer closestPeer = null; > for (Peer peer : nexts) { > + if (peer.tokensOut == 0) { > + node.log ("bypassing busy peer " + peer); > + continue; > + } > double dist = Node.distance (keyLoc, peer.location); > if (dist < closestDist) { > closestDist = dist; > closestPeer = peer; > } > } > - return closestPeer; // Null if the list was empty > + return closestPeer; // Null if there are no suitable peers > } > > public abstract void handleMessage (Message m, Peer src); > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java > 2006-11-02 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java > 2006-11-02 18:50:28 UTC (rev 10797) > @@ -28,6 +28,7 @@ > protected void handleRejectedLoop (RejectedLoop rl) > { > if (searchState != SENT) node.log (rl + " out of order"); > + next.tokensOut++; // No token was consumed > forwardSearch(); > } > > @@ -73,6 +74,9 @@ > > Node.distance (target, closest)) > htl = node.decrementHtl (htl); > node.log (this + " has htl " + htl); > + // Consume a token > + next.tokensOut--; > + // Forward the search > node.log ("forwarding " + this + " to " + next.address); > next.sendMessage (makeSearchMessage()); > nexts.remove (next); > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java > 2006-11-02 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java > 2006-11-02 18:50:28 UTC (rev 10797) > @@ -89,6 +89,7 @@ > private void handleRejectedLoop (RejectedLoop rl) > { > if (searchState != SENT) node.log (rl + " out of order"); > + next.tokensOut++; // No token was consumed > forwardSearch(); > } > > @@ -142,6 +143,9 @@ > > Node.distance (target, closest)) > htl = node.decrementHtl (htl); > node.log (this + " has htl " + htl); > + // Consume a token > + next.tokensOut--; > + // Forward the search > node.log ("forwarding " + this + " to " + next.address); > next.sendMessage (makeSearchMessage()); > nexts.remove (next); > > Added: trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java > 2006-11-02 15:00:48 UTC (rev 10796) > +++ trunk/apps/load-balancing-sims/phase7/sim/messages/Token.java > 2006-11-02 18:50:28 UTC (rev 10797) > @@ -0,0 +1,14 @@ > +package sim.messages; > + > +public class Token extends Message > +{ > + public Token (int tokens) > + { > + id = tokens; // Space-saving hack > + } > + > + public String toString() > + { > + return new String (id + " tokens"); > + } > +} > > _______________________________________________ > cvs mailing list > cvs at freenetproject.org > http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs > -------------- next part -------------- A non-text attachment was scrubbed... Name: signature.asc Type: application/pgp-signature Size: 189 bytes Desc: Digital signature URL: <https://emu.freenetproject.org/pipermail/devl/attachments/20061102/20284431/attachment.pgp>