Author: mrogers
Date: 2006-11-27 18:21:24 +0000 (Mon, 27 Nov 2006)
New Revision: 11071

Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
Log:
Queue searches when there are no available tokens (token policies not 
implemented yet)

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-27 15:27:37 UTC 
(rev 11070)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-27 18:21:24 UTC 
(rev 11071)
@@ -18,8 +18,7 @@
        public static boolean useTokens = false;
        public static boolean useBackoff = false;
        public static boolean useThrottle = false;
-       public final static int FLOW_TOKENS = 20; // Shared by all peers
-       public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
+       public final static int FLOW_TOKENS = 50; // Shared by all peers
        public final static double DELAY_DECAY = 0.99; // Exp moving average
        public final static double MAX_DELAY = 2.0; // Reject all, seconds
        public final static double HIGH_DELAY = 1.0; // Reject some, seconds
@@ -43,6 +42,7 @@
        private double delay = 0.0; // Delay caused by congestion or b/w limiter
        private LinkedList<Search> searchQueue;
        private SearchThrottle searchThrottle;
+       private HashSet<Peer> availablePeers; // Peers with outgoing tokens

        public Node (double txSpeed, double rxSpeed)
        {
@@ -65,10 +65,13 @@
                if (Math.random() < 0.5) decrementMaxHtl = true;
                if (Math.random() < 0.25) decrementMinHtl = true;
                bandwidth = new TokenBucket (40000, 60000);
-               // Allocate flow control tokens after a short delay
-               if (useTokens) Event.schedule (this, Math.random() * 0.1,
-                                               ALLOCATE_TOKENS, null);
                searchQueue = new LinkedList<Search>();
+               if (useTokens) {
+                       // Allocate flow control tokens after a short delay
+                       Event.schedule (this, Math.random() * 0.1,
+                                       ALLOCATE_TOKENS, null);
+                       availablePeers = new HashSet<Peer>();
+               }
                if (useThrottle) searchThrottle = new SearchThrottle();
        }

@@ -294,7 +297,7 @@

        private void handleToken (Token t, Peer prev)
        {
-               prev.tokensOut += t.id; // t.id is the number of tokens
+               prev.addTokensOut (t.id); // t.id is the number of tokens
        }

        private void handleChkRequest (ChkRequest r, Peer prev)
@@ -468,12 +471,12 @@
                        return true;
                }
                else {
-                       if (p.tokensIn == 0) {
+                       if (p.getTokensIn() == 0) {
                                // This indicates a misbehaving sender
                                if (LOG) log ("WARNING: not enough tokens");
                                return false;
                        }
-                       p.tokensIn--;
+                       p.removeTokensIn (1);
                        return true;
                }
        }
@@ -482,10 +485,7 @@
        private void allocateToken (Peer p)
        {
                if (p == null) spareTokens++;
-               else {
-                       p.tokensIn++;
-                       p.sendMessage (new Token (1));
-               }
+               else p.addTokensIn (1);
        }

        // Return the list of peers in a random order
@@ -522,6 +522,10 @@
        // Remove the first search from the queue and send it
        private void sendSearch()
        {
+               if (useTokens && availablePeers.isEmpty()) {
+                       if (LOG) log ("blocked");
+                       return;
+               }
                Search s = searchQueue.poll();
                if (s instanceof ChkRequest)
                        handleChkRequest ((ChkRequest) s, null);
@@ -547,6 +551,18 @@
                }
        }

+       public void addAvailablePeer (Peer p)
+       {
+               boolean blocked = availablePeers.isEmpty();
+               availablePeers.add (p);
+               if (blocked && !searchQueue.isEmpty()) sendSearch();
+       }
+       
+       public void removeAvailablePeer (Peer p)
+       {
+               availablePeers.remove (p);
+       }
+       
        public void generateChkRequest (int key)
        {
                ChkRequest cr = new ChkRequest (key, location);
@@ -592,9 +608,8 @@
                // Rounding error in your favour - collect 50 tokens
                int tokensPerPeer = FLOW_TOKENS / (peers.size() + 1);
                for (Peer p : peers.values()) {
-                       p.tokensIn += tokensPerPeer;
                        spareTokens -= tokensPerPeer;
-                       p.sendMessage (new Token (tokensPerPeer));
+                       p.addTokensIn (tokensPerPeer);
                }
        }


Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-27 15:27:37 UTC 
(rev 11070)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-27 18:21:24 UTC 
(rev 11071)
@@ -49,8 +49,8 @@
        private int rxSeq = 0; // Sequence number of next in-order incoming pkt

        // Flow control
-       public int tokensOut = 0; // How many requests/inserts can we send?
-       public int tokensIn = 0; // How many requests/inserts should we accept?
+       private int tokensOut = 0; // How many searches can we send?
+       private int tokensIn = 0; // How many searches should we accept?
        public double backoffUntil = 0.0; // Time
        public double backoffLength = INITIAL_BACKOFF; // Seconds

@@ -279,6 +279,45 @@
                if (LOG) log ("resetting backoff length");
        }

+       // Add outgoing tokens
+       public void addTokensOut (int tokens)
+       {
+               tokensOut += tokens;
+               if (tokensOut > 0) node.addAvailablePeer (this);
+       }
+       
+       // Remove outgoing tokens
+       public void removeTokensOut (int tokens)
+       {
+               tokensOut -= tokens;
+               if (tokensOut <= 0) node.removeAvailablePeer (this);
+       }
+       
+       // Return the number of outgoing tokens
+       public int getTokensOut()
+       {
+               return tokensOut;
+       }
+       
+       // Add incoming tokens
+       public void addTokensIn (int tokens)
+       {
+               tokensIn += tokens;
+               sendMessage (new Token (tokens)); // Inform the other side
+       }
+       
+       // Remove incoming tokens
+       public void removeTokensIn (int tokens)
+       {
+               tokensIn -= tokens;
+       }
+       
+       // Return the number of incoming tokens
+       public int getTokensIn()
+       {
+               return tokensIn;
+       }
+       
        // Check retx timeouts, return true if there are packets in flight
        public boolean checkTimeouts()
        {

Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-27 15:27:37 UTC (rev 11070)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-27 18:21:24 UTC (rev 11071)
@@ -83,7 +83,7 @@
                        htl = node.decrementHtl (htl);
                if (LOG) node.log (this + " has htl " + htl);
                // Consume a token
-               if (Node.useTokens) next.tokensOut--;
+               if (Node.useTokens) next.removeTokensOut (1);
                // Forward the search
                if (LOG) node.log ("forwarding " +this+ " to " + next.address);
                next.sendMessage (makeSearchMessage());
@@ -112,7 +112,7 @@
                double closestDist = Double.POSITIVE_INFINITY;
                Peer closestPeer = null;
                for (Peer peer : nexts) {
-                       if (Node.useTokens && peer.tokensOut == 0) {
+                       if (Node.useTokens && peer.getTokensOut() == 0) {
                                if (LOG) node.log ("no tokens for " + peer);
                                continue;
                        }


Reply via email to