Author: mrogers
Date: 2006-08-08 14:39:02 +0000 (Tue, 08 Aug 2006)
New Revision: 9966

Added:
   trunk/apps/load-balancing-sims/phase5-coalescing/Ack.java
Modified:
   trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
   trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
   trunk/apps/load-balancing-sims/phase5-coalescing/Packet.java
   trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
Log:
Subtract the amount of time the ack was held for coalescing from the RTT

Added: trunk/apps/load-balancing-sims/phase5-coalescing/Ack.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Ack.java   2006-08-08 
13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Ack.java   2006-08-08 
14:39:02 UTC (rev 9966)
@@ -0,0 +1,13 @@
+// Tell the sender how long each ack was delayed so it can measure the RTT
+
+class Ack
+{
+       public final int seq; // Sequence number of an acked packet
+       public final double delay; // Seconds the ack was delayed for coalescing
+       
+       public Ack (int seq, double delay)
+       {
+               this.seq = seq;
+               this.delay = delay;
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java      
2006-08-08 13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java      
2006-08-08 14:39:02 UTC (rev 9966)
@@ -1,21 +1,28 @@
-// An AIMD congestion window
+// AIMD congestion control

 class CongestionWindow
 {
        public final static int MIN_CWIND = 3000; // Minimum congestion window
-       public final static int MAX_CWIND = 100000; // Maximum congestion window
-       public final static double ALPHA = 0.1615; // AIMD increase parameter
-       public final static double BETA = 0.9375; // AIMD decrease parameter
+       public final static int MAX_CWIND = 1000000; // Max congestion window
+       public final static double ALPHA = 0.3125; // AIMD increase parameter
+       public final static double BETA = 0.875; // AIMD decrease parameter
        public final static double GAMMA = 3.0; // Slow start divisor

        private double cwind = MIN_CWIND; // Size of window in bytes
        private int inflight = 0; // Bytes sent but not acked
        private boolean slowStart = true; // Are we in the slow start phase?
+       private Peer peer; // The owner

+       public CongestionWindow (Peer peer)
+       {
+               this.peer = peer;
+       }
+       
        public void reset()
        {
-               Event.log ("returning to slow start");
+               peer.log ("congestion window decreased to " + MIN_CWIND);
                cwind = MIN_CWIND;
+               peer.log ("returning to slow start");
                slowStart = true;
        }

@@ -28,31 +35,31 @@
        public void bytesSent (int bytes)
        {
                inflight += bytes;
-               Event.log (inflight + " bytes in flight");
+               peer.log (inflight + " bytes in flight");
        }

        // Take bytes out of flight
        public void bytesAcked (int bytes)
        {
                inflight -= bytes;
-               Event.log (inflight + " bytes in flight");
+               peer.log (inflight + " bytes in flight");
                // Increase the window
                if (slowStart) cwind += bytes / GAMMA;
                else cwind += bytes * bytes * ALPHA / cwind;
                if (cwind > MAX_CWIND) cwind = MAX_CWIND;
-               Event.log ("congestion window increased to " + cwind);
+               peer.log ("congestion window increased to " + cwind);
        }

        // Decrease the window when a packet is fast retransmitted
        public void fastRetransmission (double now)
        {
-               Event.log (inflight + " bytes in flight");
+               peer.log (inflight + " bytes in flight");
                cwind *= BETA;
                if (cwind < MIN_CWIND) cwind = MIN_CWIND;
-               Event.log ("congestion window decreased to " + cwind);
+               peer.log ("congestion window decreased to " + cwind);
                // The slow start phase ends when the first packet is lost
                if (slowStart) {
-                       Event.log ("leaving slow start");
+                       peer.log ("leaving slow start");
                        slowStart = false;
                }
        }
@@ -60,7 +67,7 @@
        // Decrease the window when a packet is retransmitted due to a timeout
        public void timeout (double now)
        {
-               Event.log (inflight + " bytes in flight");
+               peer.log (inflight + " bytes in flight");
                if (slowStart) fastRetransmission (now); // Leave slow start
                else reset(); // Reset the window and return to slow start
        }

Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Node.java  2006-08-08 
13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Node.java  2006-08-08 
14:39:02 UTC (rev 9966)
@@ -61,7 +61,7 @@
        {
                if (timerRunning) return;
                log ("starting retransmission/coalescing timer");
-               Event.schedule (this, Peer.COALESCE, CHECK_TIMEOUTS, null);
+               Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
                timerRunning = true;
        }

@@ -156,13 +156,13 @@
        // Event callback
        private void generateRequest()
        {
-               if (requestsGenerated++ > 1000) return;
+               if (requestsGenerated++ > 10000) return;
                // Send a request to a random location
                Request r = new Request (locationToKey (Math.random()));
                log ("generating request " + r.id);
                handleRequest (r, null);
                // Schedule the next request
-               Event.schedule (this, 0.049, GENERATE_REQUEST, null);
+               Event.schedule (this, 0.123, GENERATE_REQUEST, null);
        }

        // Event callback
@@ -176,7 +176,7 @@
                        timerRunning = false;
                }
                else {
-                       double sleep = deadline - Event.time();
+                       double sleep = deadline - Event.time(); // Can be < 0
                        if (sleep < MIN_SLEEP) sleep = MIN_SLEEP;
                        log ("sleeping for " + sleep + " seconds");
                        Event.schedule (this, sleep, CHECK_TIMEOUTS, null);

Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Packet.java        
2006-08-08 13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Packet.java        
2006-08-08 14:39:02 UTC (rev 9966)
@@ -5,23 +5,23 @@
 class Packet
 {
        public final static int HEADER_SIZE = 80; // Including IP & UDP headers
-       public final static int ACK_SIZE = 4; // Size of a sequence num in bytes
+       public final static int ACK_SIZE = 8; // Size of an ack in bytes
        public final static int MAX_SIZE = 1450; // MTU including headers
-       public final static int SENSIBLE_PAYLOAD = 1000; // Nagle's algorithm
+       public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing

        public int src, dest; // Network addresses
        public int size = HEADER_SIZE; // Size in bytes, including headers
        public int seq = -1; // Data sequence number (-1 if no data)
-       public ArrayList<Integer> acks = null; // Sequence numbers of acked pkts
-       public ArrayList<Message> messages = null; // Payload
+       public ArrayList<Ack> acks = null;
+       public ArrayList<Message> messages = null;

        public double sent; // Time at which the packet was (re) transmitted
        public double latency; // Link latency (stored here for convenience)

-       public void addAck (Integer seq)
+       public void addAck (Ack a)
        {
-               if (acks == null) acks = new ArrayList<Integer>();
-               acks.add (seq);
+               if (acks == null) acks = new ArrayList<Ack>();
+               acks.add (a);
                size += ACK_SIZE;
        }


Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java  2006-08-08 
13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java  2006-08-08 
14:39:02 UTC (rev 9966)
@@ -15,17 +15,17 @@
        public final static double RTT_DECAY = 0.9; // Exp moving average

        // Coalescing
-       public final static double COALESCE = 0.1; // Max delay in seconds
+       public final static double MAX_DELAY = 0.1; // Coalescing delay, seconds

        // Out-of-order delivery with eventual detection of missing packets
        public final static int SEQ_RANGE = 1000;

        // Token bucket bandwidth limiter
-       public final static int BUCKET_RATE = 1000; // Bytes per second
-       public final static int BUCKET_SIZE = 2000; // Burst size in bytes
+       public final static int BUCKET_RATE = 2000; // Bytes per second
+       public final static int BUCKET_SIZE = 4000; // Burst size in bytes

        // Sender state
-       private double rtt = 3.0; // Estimated round-trip time in seconds
+       private double rtt = 5.0; // Estimated round-trip time in seconds
        private int txSeq = 0; // Sequence number of next outgoing data packet
        private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
        private LinkedList<Packet> txBuffer; // Retransmission buffer
@@ -50,7 +50,7 @@
                txBuffer = new LinkedList<Packet>();
                msgQueue = new LinkedList<Deadline<Message>>();
                ackQueue = new LinkedList<Deadline<Integer>>();
-               window = new CongestionWindow();
+               window = new CongestionWindow (this);
                bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
                rxDupe = new HashSet<Integer>();
        }
@@ -62,7 +62,7 @@
                // Warning: until token-passing is implemented the length of
                // the message queue is unlimited
                double now = Event.time();
-               msgQueue.add (new Deadline<Message> (m, now + COALESCE));
+               msgQueue.add (new Deadline<Message> (m, now + MAX_DELAY));
                msgQueueSize += m.size;
                log (msgQueue.size() + " messages in queue");
                // Start the node's timer if necessary
@@ -103,9 +103,13 @@
                        return false;
                }

+               Packet p = new Packet();
+               
                // Put all waiting acks in the packet
-               Packet p = new Packet();
-               for (Deadline<Integer> a : ackQueue) p.addAck (a.item);
+               for (Deadline<Integer> a : ackQueue) {
+                       double delay = now - (a.deadline - MAX_DELAY);
+                       p.addAck (new Ack (a.item, delay));
+               }
                ackQueue.clear();
                ackQueueSize = 0;

@@ -152,7 +156,7 @@
        {
                log ("ack " + seq + " added to ack queue");
                double now = Event.time();
-               ackQueue.add (new Deadline<Integer> (seq, now + COALESCE));
+               ackQueue.add (new Deadline<Integer> (seq, now + MAX_DELAY));
                ackQueueSize += Packet.ACK_SIZE;
                log (ackQueue.size() + " acks in queue");
                // Start the node's timer if necessary
@@ -165,7 +169,7 @@
        public void handlePacket (Packet p)
        {
                if (p.messages != null) handleData (p);
-               if (p.acks != null) for (int seq : p.acks) handleAck (seq);
+               if (p.acks != null) for (Ack a : p.acks) handleAck (a);
        }

        private void handleData (Packet p)
@@ -195,28 +199,30 @@
                else log ("warning: received " + p.seq + " before " + rxSeq);
        }

-       private void handleAck (int seq)
+       private void handleAck (Ack a)
        {
-               log ("received ack " + seq);
+               log ("received ack " + a.seq);
                double now = Event.time();
                Iterator<Packet> i = txBuffer.iterator();
                while (i.hasNext()) {
                        Packet p = i.next();
                        double age = now - p.sent;
                        // Explicit ack
-                       if (p.seq == seq) {
+                       if (p.seq == a.seq) {
                                log ("packet " + p.seq + " acknowledged");
                                i.remove();
                                // Update the congestion window
                                window.bytesAcked (p.size);
                                // Update the average round-trip time
-                               rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
-                               log ("round-trip time " + age);
+                               rtt *= RTT_DECAY;
+                               rtt += (age - a.delay) * (1.0 - RTT_DECAY);
+                               log ("ack delay " + a.delay);
+                               log ("round-trip time " + (age - a.delay));
                                log ("average round-trip time " + rtt);
                                break;
                        }
                        // Fast retransmission
-                       if (p.seq < seq && age > FRTO * rtt) {
+                       if (p.seq < a.seq && age > FRTO * rtt) {
                                p.sent = now;
                                log ("fast retransmitting packet " + p.seq);
                                node.net.send (p, address, latency);
@@ -258,7 +264,7 @@
                                window.timeout (now);
                        }
                }
-               return Math.min (now + COALESCE, deadline());
+               return Math.min (now + MAX_DELAY, deadline());
        }

        // Work out when the first ack or message needs to be sent
@@ -272,7 +278,7 @@
                return deadline;
        }

-       private void log (String message)
+       public void log (String message)
        {
                Event.log (node.net.address + ":" + address + " " + message);
        }


Reply via email to