Author: mrogers
Date: 2007-02-25 19:10:41 +0000 (Sun, 25 Feb 2007)
New Revision: 11920

Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
Log:
One timer per node (should lead to fairer bandwidth sharing between peers when 
throttled)

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-25 18:08:47 UTC 
(rev 11919)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-25 19:10:41 UTC 
(rev 11920)
@@ -12,6 +12,9 @@
 {
        public final static boolean LOG = false;

+       // Retransmission/coalescing timer
+       public final static double TICK = 0.1; // Timer granularity, seconds
+       
        // Flow control
        public static boolean useTokens = false;
        public static boolean useBackoff = false;
@@ -40,6 +43,7 @@
        private boolean decrementMaxHtl = false;
        private boolean decrementMinHtl = false;
        public TokenBucket bandwidth; // Bandwidth limiter
+       private boolean timerRunning = false; // Coalescing/retransmission timer
        private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
        private double delay = 0.0; // Delay caused by congestion or b/w limiter
        private LinkedList<Search> searchQueue;
@@ -66,7 +70,7 @@
                pubKeyCache = new LruCache<Integer> (16000);
                if (Math.random() < 0.5) decrementMaxHtl = true;
                if (Math.random() < 0.25) decrementMinHtl = true;
-               bandwidth = new TokenBucket (40000, 400000);
+               bandwidth = new TokenBucket (40000, 80000);
                searchQueue = new LinkedList<Search>();
                if (useTokens) {
                        // Allocate flow control tokens after a short delay
@@ -501,6 +505,28 @@
                return copy;
        }

+       // Called by Peer to start the retransmission/coalescing timer
+       public void startTimer()
+       {
+               if (timerRunning) return;
+               timerRunning = true;
+               if (LOG) log ("starting timer");
+               Event.schedule (this, TICK, TIMER, null);
+       }
+       
+       // Event callback - check retransmission/coalescing deadlines
+       private void timer()
+       {
+               boolean stopTimer = true;
+               // Check the peers in a random order for fair bandwidth sharing
+               for (Peer p : peers()) if (p.timer()) stopTimer = false;
+               if (stopTimer && timerRunning) {
+                       timerRunning = false;
+                       if (LOG) log ("stopping timer");
+               }
+               else Event.schedule (this, TICK, TIMER, null);
+       }
+       
        public void log (String message)
        {
                Event.log (net.address + " " + message);
@@ -640,6 +666,10 @@
                        case SEND_SEARCH:
                        sendSearch();
                        break;
+                       
+                       case TIMER:
+                       timer();
+                       break;
                }
        }

@@ -650,4 +680,5 @@
        public final static int SSK_COLLISION = 5;
        private final static int ALLOCATE_TOKENS = 6;
        private final static int SEND_SEARCH = 7;
+       private final static int TIMER = 8;
 }

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-25 18:08:47 UTC 
(rev 11919)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-25 19:10:41 UTC 
(rev 11920)
@@ -4,7 +4,7 @@
 import java.util.Iterator;
 import java.util.HashSet;

-public class Peer implements EventTarget
+public class Peer
 {
        public final static boolean LOG = false;

@@ -18,10 +18,8 @@
        public final static double FRTO = 1.5; // Fast retx timeout in RTTs
        public final static double RTT_DECAY = 0.9; // Exp moving average
        public final static double LINK_IDLE = 8.0; // RTTs without transmitting
+       public final static double MAX_DELAY = 0.1; // Coalescing delay, seconds

-       // Retransmission/coalescing timer
-       public final static double TICK = 0.1; // Timer granularity, seconds
-       
        // Backoff
        public final static double INITIAL_BACKOFF = 1.0; // Seconds
        public final static double BACKOFF_MULTIPLIER = 2.0;
@@ -40,7 +38,6 @@
        private CongestionWindow window; // AIMD congestion window
        private double lastTransmission = Double.POSITIVE_INFINITY; // Abs. time
        private boolean tgif = false; // "Transfers go in first" toggle
-       private boolean timerRunning = false; // Retransmission/coalescing timer

        // Receiver state
        private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -68,7 +65,7 @@
        // Queue a message for transmission
        public void sendMessage (Message m)
        {
-               m.deadline = Event.time() + TICK;
+               m.deadline = Event.time() + MAX_DELAY;
                if (m instanceof Block) {
                        if (LOG) log (m + " added to transfer queue");
                        transferQueue.add (m);
@@ -78,20 +75,11 @@
                        searchQueue.add (m);
                }
                // Start the coalescing timer
-               startTimer();
+               node.startTimer();
                // Send as many packets as possible
                while (send (-1));
        }

-       // Start the retransmission/coalescing timer
-       private void startTimer()
-       {
-               if (timerRunning) return;
-               timerRunning = true;
-               if (LOG) log ("starting timer");
-               Event.schedule (this, TICK, TIMER, null);
-       }
-       
        // Try to send a packet, return true if a packet was sent
        private boolean send (int ack)
        {
@@ -164,7 +152,7 @@
                if (p.messages != null) {
                        p.sent = Event.time();
                        txBuffer.add (p);
-                       startTimer(); // Start the retransmission timer
+                       node.startTimer(); // Start the retransmission timer
                        window.bytesSent (p.size);
                }
                return true;
@@ -303,18 +291,14 @@
                return tokensIn;
        }

-       // Event callback: wake up, send packets, go back to sleep
-       private void timer()
+       // Called by Node - return true if there are messages outstanding
+       public boolean timer()
        {
-               // Send as many packets as possible
-               while (send (-1));
                // Stop the timer if there's nothing to wait for
                if (searchQueue.size + transferQueue.size == 0
-               && txBuffer.isEmpty()) {
-                       if (LOG) log ("stopping timer");
-                       timerRunning = false;
-                       return;
-               }
+               && txBuffer.isEmpty()) return false;
+               // Send as many packets as possible
+               while (send (-1));
                // Check the retransmission timeouts
                double now = Event.time();
                for (Packet p : txBuffer) {
@@ -326,8 +310,7 @@
                                window.timeout (now);
                        }
                }
-               // Schedule the next check
-               Event.schedule (this, TICK, TIMER, null);
+               return true;
        }

        public void log (String message)
@@ -339,12 +322,4 @@
        {
                return Integer.toString (address);
        }
-       
-       // EventTarget interface
-       public void handleEvent (int type, Object data)
-       {
-               if (type == TIMER) timer();
-       }
-       
-       private final static int TIMER = 1;
 }


Reply via email to