Author: mrogers
Date: 2007-02-23 20:30:06 +0000 (Fri, 23 Feb 2007)
New Revision: 11903

Removed:
   trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java
Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
Log:
Cleaner (saner?) coalescing and retransmission code

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-23 18:11:16 UTC 
(rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-23 20:30:06 UTC 
(rev 11903)
@@ -12,9 +12,6 @@
 {
        public final static boolean LOG = false;

-       // Coarse-grained retransmission timer
-       public final static double RETX_TIMER = 0.1; // Seconds
-       
        // Flow control
        public static boolean useTokens = false;
        public static boolean useBackoff = false;
@@ -43,7 +40,6 @@
        private boolean decrementMaxHtl = false;
        private boolean decrementMinHtl = false;
        public TokenBucket bandwidth; // Bandwidth limiter
-       private boolean timerRunning = false;
        private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
        private double delay = 0.0; // Delay caused by congestion or b/w limiter
        private LinkedList<Search> searchQueue;
@@ -70,7 +66,7 @@
                pubKeyCache = new LruCache<Integer> (16000);
                if (Math.random() < 0.5) decrementMaxHtl = true;
                if (Math.random() < 0.25) decrementMinHtl = true;
-               bandwidth = new TokenBucket (40000, 60000);
+               bandwidth = new TokenBucket (40000, 400000);
                searchQueue = new LinkedList<Search>();
                if (useTokens) {
                        // Allocate flow control tokens after a short delay
@@ -230,15 +226,6 @@
                else if (LOG) log ("public key " + key + " not added to store");
        }

-       // Called by Peer to start the retransmission timer
-       public void startTimer()
-       {
-               if (timerRunning) return;
-               timerRunning = true;
-               if (LOG) log ("starting retransmission timer");
-               Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
-       }
-       
        // Called by Peer to transmit a packet for the first time
        public void sendPacket (Packet p)
        {
@@ -611,17 +598,6 @@
                addToSearchQueue (si);
        }

-       private void checkTimeouts()
-       {
-               boolean stopTimer = true;
-               for (Peer p : peers()) if (p.checkTimeouts()) stopTimer = false;
-               if (stopTimer) {
-                       if (LOG) log ("stopping retransmission timer");
-                       timerRunning = false;
-               }
-               else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
-       }
-       
        // Allocate all flow control tokens at startup
        private void allocateTokens()
        {
@@ -657,10 +633,6 @@
                        generateSskInsert ((Integer) data, 1, null);
                        break;

-                       case CHECK_TIMEOUTS:
-                       checkTimeouts();
-                       break;
-                       
                        case ALLOCATE_TOKENS:
                        allocateTokens();
                        break;
@@ -676,7 +648,6 @@
        public final static int REQUEST_SSK = 3;
        public final static int INSERT_SSK = 4;
        public final static int SSK_COLLISION = 5;
-       private final static int CHECK_TIMEOUTS = 6;
-       private final static int ALLOCATE_TOKENS = 7;
-       private final static int SEND_SEARCH = 8;
+       private final static int ALLOCATE_TOKENS = 6;
+       private final static int SEND_SEARCH = 7;
 }

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-23 18:11:16 UTC 
(rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-23 20:30:06 UTC 
(rev 11903)
@@ -19,9 +19,8 @@
        public final static double RTT_DECAY = 0.9; // Exp moving average
        public final static double LINK_IDLE = 8.0; // RTTs without transmitting

-       // Coalescing
-       public final static double MAX_DELAY = 0.1; // Max coalescing delay
-       public final static double MIN_SLEEP = 0.01; // Forty winks
+       // Retransmission/coalescing timer
+       public final static double TICK = 0.1; // Timer granularity, seconds

        // Backoff
        public final static double INITIAL_BACKOFF = 1.0; // Seconds
@@ -39,9 +38,9 @@
        private DeadlineQueue<Message> searchQueue; // Outgoing search messages
        private DeadlineQueue<Message> transferQueue; // Outgoing transfers
        private CongestionWindow window; // AIMD congestion window
-       private double lastTransmission = Double.POSITIVE_INFINITY; // Time
+       private double lastTransmission = Double.POSITIVE_INFINITY; // Abs. time
        private boolean tgif = false; // "Transfers go in first" toggle
-       private boolean timerRunning = false; // Coalescing timer
+       private boolean timerRunning = false; // Retransmission/coalescing timer

        // Receiver state
        private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -50,8 +49,8 @@
        // Flow control
        private int tokensOut = 0; // How many searches can we send?
        private int tokensIn = 0; // How many searches should we accept?
-       public double backoffUntil = 0.0; // Time
-       public double backoffLength = INITIAL_BACKOFF; // Seconds
+       public double backoffUntil = 0.0; // Absolute time, seconds
+       public double backoffLength = INITIAL_BACKOFF; // Relative time, seconds

        public Peer (Node node, int address, double location, double latency)
        {
@@ -69,7 +68,7 @@
        // Queue a message for transmission
        public void sendMessage (Message m)
        {
-               m.deadline = Event.time() + MAX_DELAY;
+               m.deadline = Event.time() + TICK;
                if (m instanceof Block) {
                        if (LOG) log (m + " added to transfer queue");
                        transferQueue.add (m);
@@ -84,13 +83,13 @@
                while (send (-1));
        }

-       // Start the coalescing timer
+       // Start the retransmission/coalescing timer
        private void startTimer()
        {
                if (timerRunning) return;
                timerRunning = true;
-               if (LOG) log ("starting coalescing timer");
-               Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
+               if (LOG) log ("starting timer");
+               Event.schedule (this, TICK, TIMER, null);
        }

        // Try to send a packet, return true if a packet was sent
@@ -165,7 +164,7 @@
                if (p.messages != null) {
                        p.sent = Event.time();
                        txBuffer.add (p);
-                       node.startTimer(); // Start the retransmission timer
+                       startTimer(); // Start the retransmission timer
                        window.bytesSent (p.size);
                }
                return true;
@@ -241,8 +240,7 @@
                else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
                if (LOG) log ("maximum sequence number " + txMaxSeq);
                // Send as many packets as possible
-               if (timerRunning) while (send (-1));
-               else checkDeadlines();
+               while (send (-1));
        }

        // When a local RejectedOverload is received, back off unless backed off
@@ -305,12 +303,19 @@
                return tokensIn;
        }

-       // Check retx timeouts, return true if there are packets in flight
-       public boolean checkTimeouts()
+       // Event callback: wake up, send packets, go back to sleep
+       private void timer()
        {
-               if (LOG) log (txBuffer.size() + " packets in flight");
-               if (txBuffer.isEmpty()) return false;
-               
+               // Send as many packets as possible
+               while (send (-1));
+               // Stop the timer if there's nothing to wait for
+               if (searchQueue.size + transferQueue.size == 0
+               && txBuffer.isEmpty()) {
+                       if (LOG) log ("stopping timer");
+                       timerRunning = false;
+                       return;
+               }
+               // Check the retransmission timeouts
                double now = Event.time();
                for (Packet p : txBuffer) {
                        if (now - p.sent > RTO * rtt) {
@@ -321,57 +326,10 @@
                                window.timeout (now);
                        }
                }
-               return true;
-       }
-       
-       // Event callback: wake up, send packets, go back to sleep
-       private void checkDeadlines()
-       {
-               // Send as many packets as possible
-               while (send (-1));
-               // Find the next coalescing deadline - ignore message deadlines
-               // if there isn't room in the congestion window to send them
-               double dl = Double.POSITIVE_INFINITY;
-               int win = window.available() - Packet.HEADER_SIZE;
-               if (searchQueue.headSize() <= win)
-                       dl = Math.min (dl, searchQueue.deadline());
-               if (transferQueue.headSize() <= win)
-                       dl = Math.min (dl, transferQueue.deadline());
-               // If there's no deadline, stop the timer
-               if (dl == Double.POSITIVE_INFINITY) {
-                       if (timerRunning) {
-                               if (LOG) log ("stopping coalescing timer");
-                               timerRunning = false;
-                       }
-                       return;
-               }
                // Schedule the next check
-               double sleep = dl - Event.time();
-               if (shouldPoll()) sleep = Math.max (sleep, node.bandwidth.poll);
-               else sleep = Math.max (sleep, MIN_SLEEP);
-               timerRunning = true;
-               if (LOG) log ("sleeping for " + sleep + " seconds");
-               Event.schedule (this, sleep, CHECK_DEADLINES, null);
+               Event.schedule (this, TICK, TIMER, null);
        }

-       // Are we waiting for the bandwidth limiter?
-       private boolean shouldPoll()
-       {
-               double bw = node.bandwidth.available();
-               double win = window.available();
-               double now = Event.time();
-               // Is there an overdue search that's waiting for bandwidth?
-               if (searchQueue.headSize() > bw
-               && searchQueue.headSize() <= win
-               && searchQueue.deadline() <= now) return true;
-               // Is there an overdue transfer that's waiting for bandwidth?
-               if (transferQueue.headSize() > bw
-               && transferQueue.headSize() <= win
-               && transferQueue.deadline() <= now) return true;
-               // We're waiting for something other than bandwidth
-               return false;
-       }
-       
        public void log (String message)
        {
                Event.log (node.net.address + ":" + address + " " + message);
@@ -385,8 +343,8 @@
        // EventTarget interface
        public void handleEvent (int type, Object data)
        {
-               if (type == CHECK_DEADLINES) checkDeadlines();
+               if (type == TIMER) timer();
        }

-       private final static int CHECK_DEADLINES = 1;
+       private final static int TIMER = 1;
 }

Modified: trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java  2007-02-23 
18:11:16 UTC (rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java  2007-02-23 
20:30:06 UTC (rev 11903)
@@ -2,17 +2,13 @@

 class TokenBucket
 {
-       public final double rate, size, poll;
+       public final double rate, size;
        private double tokens, lastUpdated;

        public TokenBucket (double rate, double size)
        {
                this.rate = rate; // Bandwidth limit in bytes per second
                this.size = size; // Size of maximum burst in bytes
-               double poll = Packet.MAX_SIZE / rate;
-               if (poll < Peer.MIN_SLEEP) poll = Peer.MIN_SLEEP;
-               if (poll > Peer.MAX_DELAY) poll = Peer.MAX_DELAY;
-               this.poll = poll; // Polling interval in seconds
                tokens = size;
                lastUpdated = 0.0; // Time
        }

Deleted: trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java 2007-02-23 
18:11:16 UTC (rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java 2007-02-23 
20:30:06 UTC (rev 11903)
@@ -1,15 +0,0 @@
-package sim.messages;
-
-public class Ack extends Message
-{
-       public Ack (int seq, double deadline)
-       {
-               id = seq; // Space-saving hack
-               this.deadline = deadline;
-       }
-       
-       public int size()
-       {
-               return ACK_SIZE;
-       }
-}


Reply via email to