Author: mrogers
Date: 2006-08-08 10:11:27 +0000 (Tue, 08 Aug 2006)
New Revision: 9961

Added:
   trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
   trunk/apps/load-balancing-sims/phase5-coalescing/TokenBucket.java
Modified:
   trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
   trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
   trunk/apps/load-balancing-sims/phase5-coalescing/RequestState.java
Log:
Token bucket bandwidth limiter, moved AIMD congestion window to separate class

Added: trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java      
2006-08-08 08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java      
2006-08-08 10:11:27 UTC (rev 9961)
@@ -0,0 +1,67 @@
+// An AIMD congestion window
+
+class CongestionWindow
+{
+       public final static int MIN_CWIND = 3000; // Minimum congestion window
+       public final static int MAX_CWIND = 100000; // Maximum congestion window
+       public final static double ALPHA = 0.1615; // AIMD increase parameter
+       public final static double BETA = 0.9375; // AIMD decrease parameter
+       public final static double GAMMA = 3.0; // Slow start divisor
+       
+       private double cwind = MIN_CWIND; // Size of window in bytes
+       private int inflight = 0; // Bytes sent but not acked
+       private boolean slowStart = true; // Are we in the slow start phase?
+       
+       public void reset()
+       {
+               Event.log ("returning to slow start");
+               cwind = MIN_CWIND;
+               slowStart = true;
+       }
+       
+       public int available()
+       {
+               return (int) cwind - inflight;
+       }
+       
+       // Put bytes in flight
+       public void bytesSent (int bytes)
+       {
+               inflight += bytes;
+               Event.log (inflight + " bytes in flight");
+       }
+       
+       // Take bytes out of flight
+       public void bytesAcked (int bytes)
+       {
+               inflight -= bytes;
+               Event.log (inflight + " bytes in flight");
+               // Increase the window
+               if (slowStart) cwind += bytes / GAMMA;
+               else cwind += bytes * bytes * ALPHA / cwind;
+               if (cwind > MAX_CWIND) cwind = MAX_CWIND;
+               Event.log ("congestion window increased to " + cwind);
+       }
+       
+       // Decrease the window when a packet is fast retransmitted
+       public void fastRetransmission (double now)
+       {
+               Event.log (inflight + " bytes in flight");
+               cwind *= BETA;
+               if (cwind < MIN_CWIND) cwind = MIN_CWIND;
+               Event.log ("congestion window decreased to " + cwind);
+               // The slow start phase ends when the first packet is lost
+               if (slowStart) {
+                       Event.log ("leaving slow start");
+                       slowStart = false;
+               }
+       }
+       
+       // Decrease the window when a packet is retransmitted due to a timeout
+       public void timeout (double now)
+       {
+               Event.log (inflight + " bytes in flight");
+               if (slowStart) fastRetransmission (now); // Leave slow start
+               else reset(); // Reset the window and return to slow start
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Node.java  2006-08-08 
08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Node.java  2006-08-08 
10:11:27 UTC (rev 9961)
@@ -37,7 +37,7 @@
                n.connect (this, latency);
        }

-       // Returns the circular distance between two locations
+       // Calculate the circular distance between two locations
        public static double distance (double a, double b)
        {
                if (a > b) return Math.min (a - b, b - a + 1.0);

Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java  2006-08-08 
08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java  2006-08-08 
10:11:27 UTC (rev 9961)
@@ -14,26 +14,18 @@
        public final static double FRTO = 1.5; // Fast retx timeout in RTTs
        public final static double RTT_DECAY = 0.9; // Exp moving average

-       // Congestion control parameters
-       public final static int MIN_CWIND = 3000; // Minimum congestion window
-       public final static int MAX_CWIND = 100000; // Maximum congestion window
-       public final static double ALPHA = 0.1615; // AIMD increase parameter
-       public final static double BETA = 0.9375; // AIMD decrease parameter
-       public final static double GAMMA = 3.0; // Slow start divisor
-       
        // Coalescing
        public final static double COALESCE = 0.1; // Max delay in seconds

        // Out-of-order delivery with eventual detection of missing packets
        public final static int SEQ_RANGE = 1000; // Packets

+       // Token bucket bandwidth limiter
+       public final static int BUCKET_RATE = 1000; // Bytes per second
+       public final static int BUCKET_SIZE = 2000; // Burst size in bytes
+       
        // Sender state
-       private double cwind = MIN_CWIND; // Congestion window in bytes
-       private boolean slowStart = true; // Are we in the slow start phase?
        private double rtt = 3.0; // Estimated round-trip time in seconds
-       private double lastTransmission = 0.0; // Clock time
-       private double lastLeftSlowStart = 0.0; // Clock time
-       private int inflight = 0; // Bytes sent but not acked
        private int txSeq = 0; // Sequence number of next outgoing data packet
        private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
        private LinkedList<Packet> txBuffer; // Retransmission buffer
@@ -41,6 +33,9 @@
        private int msgQueueSize = 0; // Size of message queue in bytes
        private LinkedList<Deadline<Integer>> ackQueue; // Outgoing acks
        private int ackQueueSize = 0; // Size of ack queue in bytes
+       private CongestionWindow window; // AIMD congestion window
+       private double lastTransmission = 0.0; // Clock time
+       private TokenBucket bandwidth; // Token bucket bandwidth limiter

        // Receiver state
        private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -55,6 +50,8 @@
                txBuffer = new LinkedList<Packet>();
                msgQueue = new LinkedList<Deadline<Message>>();
                ackQueue = new LinkedList<Deadline<Integer>>();
+               window = new CongestionWindow();
+               bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
                rxDupe = new HashSet<Integer>();
        }

@@ -76,31 +73,31 @@

        // Try to send a packet, return true if a packet was sent
        private boolean send()
-       {
-               // Return to slow start when the link is idle
-               double now = Event.time();
-               if (now - lastTransmission > RTO * rtt) {
-                       log ("returning to slow start");
-                       cwind = MIN_CWIND;
-                       slowStart = true;
-               }
-               lastTransmission = now;
-               
+       {               
                if (ackQueueSize == 0 && msgQueueSize == 0) {
                        log ("no messages or acks to send");
                        return false;
                }

+               // Return to slow start when the link is idle
+               double now = Event.time();
+               if (now - lastTransmission > RTO * rtt) window.reset();
+               lastTransmission = now;
+               
                Packet p = new Packet();

-               int window = (int) cwind - inflight - p.size - ackQueueSize;
-               if (window <= 0) log ("no room in congestion window");
-               
                // Work out how large a packet we can send
                int payload = Packet.MAX_SIZE - p.size - ackQueueSize;
                if (payload > msgQueueSize) payload = msgQueueSize;
-               if (payload > window) payload = window;

+               int win = window.available() - p.size - ackQueueSize;
+               if (win <= 0) log ("no room in congestion window for messages");
+               if (payload > win) payload = win;
+               
+               int bw = bandwidth.available() - p.size - ackQueueSize;
+               if (bw <= 0) log ("no bandwidth available for messages");
+               if (payload > bw) payload = bw;
+               
                // Delay small packets for coalescing
                if (payload < Packet.SENSIBLE_PAYLOAD && now < deadline()) {
                        log ("delaying transmission of " + payload + " bytes");
@@ -125,10 +122,11 @@
                        Iterator<Deadline<Message>> i = msgQueue.iterator();
                        while (i.hasNext()) {
                                Message m = i.next().item;
-                               if (p.size + m.size > Packet.MAX_SIZE) break;
+                               if (payload - m.size < 0) break;
                                i.remove();
                                msgQueueSize -= m.size;
                                p.addMessage (m);
+                               payload -= m.size;
                        }
                }

@@ -139,14 +137,14 @@
                if (p.messages != null) {
                        p.seq = txSeq++;
                        p.sent = now;
-                       inflight += p.size; // Acks aren't congestion-controlled
-                       log (inflight + " bytes in flight");
+                       window.bytesSent (p.size);
                        txBuffer.add (p);
                }

                // Send the packet
                log ("sending packet " + p.seq + ", " + p.size + " bytes");
                node.net.send (p, address, latency);
+               bandwidth.remove (p.size);
                return true;
        }

@@ -155,6 +153,7 @@
                double now = Event.time();
                ackQueue.add (new Deadline<Integer> (seq, now + COALESCE));
                ackQueueSize += Packet.ACK_SIZE;
+               log (ackQueue.size() + " acks in ack queue");
                // Start the node's timer if necessary
                node.startTimer();
                // Send as many packets as possible
@@ -207,13 +206,8 @@
                        if (p.seq == seq) {
                                log ("packet " + p.seq + " acknowledged");
                                i.remove();
-                               inflight -= p.size;
-                               log (inflight + " bytes in flight");
-                               // Increase the congestion window
-                               if (slowStart) cwind += p.size / GAMMA;
-                               else cwind += p.size * p.size * ALPHA / cwind;
-                               if (cwind > MAX_CWIND) cwind = MAX_CWIND;
-                               log ("congestion window increased to " + cwind);
+                               // Update the congestion window
+                               window.bytesAcked (p.size);
                                // Update the average round-trip time
                                rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
                                log ("round-trip time " + age);
@@ -224,9 +218,8 @@
                        if (p.seq < seq && age > FRTO * rtt) {
                                p.sent = now;
                                log ("fast retransmitting packet " + p.seq);
-                               log (inflight + " bytes in flight");
                                node.net.send (p, address, latency);
-                               decreaseCongestionWindow (now);
+                               window.fastRetransmission (now);
                        }
                }
                // Recalculate the maximum sequence number
@@ -237,19 +230,6 @@
                while (send());
        }

-       private void decreaseCongestionWindow (double now)
-       {
-               cwind *= BETA;
-               if (cwind < MIN_CWIND) cwind = MIN_CWIND;
-               log ("congestion window decreased to " + cwind);
-               // The slow start phase ends when the first packet is lost
-               if (slowStart) {
-                       log ("leaving slow start");
-                       slowStart = false;
-                       lastLeftSlowStart = now;
-               }
-       }
-       
        // Remove messages from a packet and deliver them to the node
        private void unpack (Packet p)
        {
@@ -257,22 +237,6 @@
                for (Message m : p.messages) node.handleMessage (m, this);
        }

-       // Work out when the first ack or message needs to be sent
-       private double deadline()
-       {
-               double deadline = Double.POSITIVE_INFINITY;
-               Deadline<Integer> ack = ackQueue.peek();
-               if (ack != null) deadline = ack.deadline;
-               Deadline<Message> msg = msgQueue.peek();
-               if (msg != null) deadline = Math.min (deadline, msg.deadline);
-               return deadline;
-       }
-       
-       private void log (String message)
-       {
-               Event.log (node.net.address + ":" + address + " " + message);
-       }
-       
        // Called by Node, returns the next coalescing or retx deadline
        public double checkTimeouts()
        {
@@ -287,23 +251,28 @@
                for (Packet p : txBuffer) {
                        if (now - p.sent > RTO * rtt) {
                                // Retransmission timeout
-                               p.sent = now;
                                log ("retransmitting packet " + p.seq);
-                               log (inflight + " bytes in flight");
+                               p.sent = now;
                                node.net.send (p, address, latency);
-                               // Return to slow start
-                               if (!slowStart &&
-                               now - lastLeftSlowStart > RTO * rtt) {
-                                       log ("returning to slow start");
-                                       cwind = MIN_CWIND;
-                                       slowStart = true;
-                               }
-                               else {
-                                       log ("not returning to slow start");
-                                       decreaseCongestionWindow (now);
-                               }
+                               window.timeout (now);
                        }
                }
                return Math.min (now + COALESCE, deadline());
        }
+       
+       // Work out when the first ack or message needs to be sent
+       private double deadline()
+       {
+               double deadline = Double.POSITIVE_INFINITY;
+               Deadline<Integer> ack = ackQueue.peek();
+               if (ack != null) deadline = ack.deadline;
+               Deadline<Message> msg = msgQueue.peek();
+               if (msg != null) deadline = Math.min (deadline, msg.deadline);
+               return deadline;
+       }
+       
+       private void log (String message)
+       {
+               Event.log (node.net.address + ":" + address + " " + message);
+       }
 }

Modified: trunk/apps/load-balancing-sims/phase5-coalescing/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/RequestState.java  
2006-08-08 08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/RequestState.java  
2006-08-08 10:11:27 UTC (rev 9961)
@@ -19,7 +19,7 @@
                if (prev != null) nexts.remove (prev);
        }

-       // Returns the closest peer to the requested key
+       // Find the closest peer to the requested key
        public Peer closestPeer()
        {
                double keyLoc = Node.keyToLocation (key);

Added: trunk/apps/load-balancing-sims/phase5-coalescing/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/TokenBucket.java   
2006-08-08 08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/TokenBucket.java   
2006-08-08 10:11:27 UTC (rev 9961)
@@ -0,0 +1,27 @@
+class TokenBucket
+{
+       private double tokens, rate, size, lastUpdated;
+       
+       public TokenBucket (double rate, double size)
+       {
+               tokens = size;
+               this.rate = rate;
+               this.size = size;
+               lastUpdated = 0.0; // Clock time
+       }
+       
+       public int available()
+       {
+               double now = Event.time();
+               double elapsed = now - lastUpdated;
+               lastUpdated = now;
+               tokens += elapsed * rate;
+               if (tokens > size) tokens = size;
+               return (int) tokens;
+       }
+       
+       public void remove (int t)
+       {
+               tokens -= t; // Counter can go negative
+       }
+}


Reply via email to