Author: mrogers
Date: 2006-11-01 20:16:48 +0000 (Wed, 01 Nov 2006)
New Revision: 10787

Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Packet.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
Log:
Refactored interleaving, coalescing, congestion control and bandwidth limiter

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 20:04:40 UTC 
(rev 10786)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 20:16:48 UTC 
(rev 10787)
@@ -45,7 +45,7 @@
                pubKeyCache = new LruCache<Integer> (1000);
                if (Math.random() < 0.5) decrementMaxHtl = true;
                if (Math.random() < 0.25) decrementMinHtl = true;
-               bandwidth = new TokenBucket (30000, 60000);
+               bandwidth = new TokenBucket (15000, 30000);
        }

        // Return true if a connection was added, false if already connected

Modified: trunk/apps/load-balancing-sims/phase7/sim/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Packet.java       2006-11-01 
20:04:40 UTC (rev 10786)
+++ trunk/apps/load-balancing-sims/phase7/sim/Packet.java       2006-11-01 
20:16:48 UTC (rev 10787)
@@ -35,6 +35,12 @@
                size += m.size();
        }

+       public void addMessages (DeadlineQueue q, int maxSize)
+       {
+               while (q.size > 0 && size + q.headSize() <= maxSize)
+                       addMessage (q.pop());
+       }
+       
        public String toString()
        {
                return new String ("packet " + src + ":" + dest + ":" + seq);

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 20:04:40 UTC 
(rev 10786)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 20:16:48 UTC 
(rev 10787)
@@ -34,7 +34,7 @@
        private DeadlineQueue<Message> transferQueue; // Outgoing transfers
        private CongestionWindow window; // AIMD congestion window
        private double lastTransmission = Double.POSITIVE_INFINITY; // Time
-       private int searchBytesSent = 0, transferBytesSent = 0;
+       private boolean tgif = false; // "Transfers go in first" toggle
        private boolean timerRunning = false; // Coalescing timer

        // Receiver state
@@ -99,41 +99,64 @@
                int waiting = ackQueue.size+searchQueue.size+transferQueue.size;
                log (waiting + " bytes waiting");
                if (waiting == 0) return false;
+               
                // Return to slow start when the link is idle
                double now = Event.time();
                if (now - lastTransmission > LINK_IDLE * rtt) window.reset();
                lastTransmission = now;
-               // How many bytes of messages can we send?
-               int available = Math.min (window.available(),
-                                       node.bandwidth.available());
-               log (available + " bytes available for packet");
-               // If there are no urgent acks, and no urgent messages or no
-               // room to send them, and not enough messages for a large
-               // packet or no room to send a large packet, give up!
-               if (ackQueue.deadline() > now
-               && (searchQueue.deadline() > now
-               || searchQueue.headSize() > available)
-               && (transferQueue.deadline() > now
-               || transferQueue.headSize() > available)
-               && (waiting < Packet.SENSIBLE_PAYLOAD
-               || available < Packet.SENSIBLE_PAYLOAD)) {
-                       log ("not sending a packet");
-                       return false;
-               }
+               
+               // How many bytes can we send?
+               int size = Math.min (Packet.MAX_SIZE, window.available());
+               size = Math.min (size, node.bandwidth.available());
+               log (size + " bytes available for packet");
+               
+               // Urgent acks to send?
+               if (ackQueue.deadline() <= now) return sendPacket (size);
+               // Urgent searches and room to send them?
+               if (searchQueue.deadline() <= now
+               && searchQueue.headSize() <= size) return sendPacket (size);
+               // Urgent transfers and room to send them?
+               if (transferQueue.deadline() <= now
+               && transferQueue.headSize() <= size) return sendPacket (size);
+               // Enough non-urgent messages for a large packet?
+               if (waiting >= Packet.SENSIBLE_PAYLOAD
+               && size >= Packet.SENSIBLE_PAYLOAD) return sendPacket (size);
+               
+               log ("not sending a packet");
+               return false;
+       }
+       
+       private boolean sendPacket (int maxSize)
+       {
                // Construct a packet
                Packet p = new Packet();
                while (ackQueue.size > 0) p.addAck (ackQueue.pop());
-               int space = Math.min (available, Packet.MAX_SIZE - p.size);
-               addPayload (p, space);
+               log ((maxSize - p.size) + " bytes available for messages");
+               if (txSeq <= txMaxSeq) {
+                       // Alternate priority between searches and transfers
+                       if (tgif) {
+                               p.addMessages (transferQueue, maxSize);
+                               p.addMessages (searchQueue, maxSize);
+                               tgif = false;
+                       }
+                       else {
+                               p.addMessages (searchQueue, maxSize);
+                               p.addMessages (transferQueue, maxSize);
+                               tgif = true;
+                       }
+                       if (p.messages == null) log ("no messages added");
+                       else p.seq = txSeq++;
+               }
+               else log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
                // Don't send empty packets
                if (p.acks == null && p.messages == null) return false;
                // Transmit the packet
-               log ("sending " + p + ", " + p.size + " bytes");
+               log ("sending packet " + p.seq + ", " + p.size + " bytes");
                node.net.send (p, address, latency);
                node.bandwidth.remove (p.size);
                // If the packet contains data, buffer it for retransmission
                if (p.messages != null) {
-                       p.sent = now;
+                       p.sent = Event.time();
                        txBuffer.add (p);
                        node.startTimer(); // Start the retransmission timer
                        window.bytesSent (p.size);
@@ -141,52 +164,6 @@
                return true;
        }

-       // Allocate a payload number and add messages to a packet
-       private void addPayload (Packet p, int space)
-       {
-               log (space + " bytes available for messages");
-               if (txSeq > txMaxSeq) {
-                       log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
-                       return;
-               }
-               p.seq = txSeq++;
-               // Searches get priority unless transfers are starving
-               if (searchBytesSent < transferBytesSent) {
-                       while (searchQueue.size > 0
-                       && searchQueue.headSize() <= space) {
-                               Message m = searchQueue.pop();
-                               searchBytesSent += m.size();
-                               space -= m.size();
-                               p.addMessage (m);
-                       }
-                       while (transferQueue.size > 0
-                       && transferQueue.headSize() <= space) {
-                               Message m = transferQueue.pop();
-                               transferBytesSent += m.size();
-                               space -= m.size();
-                               p.addMessage (m);
-                       }
-               }
-               else {
-                       while (transferQueue.size > 0
-                       && transferQueue.headSize() <= space) {
-                               Message m = transferQueue.pop();
-                               transferBytesSent += m.size();
-                               space -= m.size();
-                               p.addMessage (m);
-                       }
-                       while (searchQueue.size > 0
-                       && searchQueue.headSize() <= space) {
-                               Message m = searchQueue.pop();
-                               searchBytesSent += m.size();
-                               space -= m.size();
-                               p.addMessage (m);
-                       }
-               }
-               if (p.messages == null) log ("no messages added");
-               else log (p.messages.size() + " messages added");
-       }
-       
        // Called by Node when a packet arrives
        public void handlePacket (Packet p)
        {
@@ -196,27 +173,30 @@

        private void handleData (Packet p)
        {
-               log ("received " + p + ", " + p.size + " bytes");
-               sendAck (p.seq);
+               log ("received packet " + p.seq + ", expected " + rxSeq);
                if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
-                       log (p + " is a duplicate");
+                       log ("duplicate packet");
+                       sendAck (p.seq); // Original ack may have been lost
                }
                else if (p.seq == rxSeq) {
-                       log (p + " is in order");
                        // Find the sequence number of the next missing packet
-                       int was = rxSeq;
                        while (rxDupe.remove (++rxSeq));
-                       log ("rxSeq was " + was + ", now " + rxSeq);
-                       // Deliver the packet
-                       unpack (p);
+                       log ("packet in order, now expecting " + rxSeq);
+                       // Deliver the messages to the node
+                       for (Message m : p.messages)
+                               node.handleMessage (m, this);
+                       sendAck (p.seq);
                }
                else if (p.seq < rxSeq + SEQ_RANGE * 2) {
-                       log (p + " is out of order - expected " + rxSeq);
-                       if (rxDupe.add (p.seq)) unpack (p);
-                       else log (p + " is a duplicate");
+                       log ("packet out of order");
+                       rxDupe.add (p.seq);
+                       // Deliver the messages to the node
+                       for (Message m : p.messages)
+                               node.handleMessage (m, this);
+                       sendAck (p.seq);
                }
                // This indicates a misbehaving sender - discard the packet
-               else log ("warning: received " + p.seq + " before " + rxSeq);
+               else log ("warning: sequence number out of range");
        }

        private void handleAck (Ack a)
@@ -230,7 +210,7 @@
                        double age = now - p.sent;
                        // Explicit ack
                        if (p.seq == seq) {
-                               log (p + " acknowledged");
+                               log ("packet " + p.seq + " acknowledged");
                                i.remove();
                                // Update the congestion window
                                window.bytesAcked (p.size);
@@ -243,7 +223,7 @@
                        // Fast retransmission
                        if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
                                p.sent = now;
-                               log ("fast retransmitting " + p);
+                               log ("fast retransmitting packet " + p.seq);
                                node.net.send (p, address, latency);
                                window.fastRetransmission (now);
                        }
@@ -257,13 +237,6 @@
                else checkDeadlines();
        }

-       // Remove messages from a packet and deliver them to the node
-       private void unpack (Packet p)
-       {
-               if (p.messages == null) return;
-               for (Message m : p.messages) node.handleMessage (m, this);
-       }
-       
        // Check retx timeouts, return true if there are packets in flight
        public boolean checkTimeouts()
        {
@@ -274,7 +247,7 @@
                for (Packet p : txBuffer) {
                        if (now - p.sent > RTO * rtt + MAX_DELAY) {
                                // Retransmission timeout
-                               log ("retransmitting " + p);
+                               log ("retransmitting packet " + p.seq);
                                p.sent = now;
                                node.net.send (p, address, latency);
                                window.timeout (now);
@@ -288,9 +261,8 @@
        {
                // Send as many packets as possible
                while (send());
-               // Find the next coalescing deadline - ignore message
-               // deadlines if there isn't room in the congestion window
-               // (we have to wait for an ack before sending them)
+               // Find the next coalescing deadline - ignore message deadlines
+               // if there isn't room in the congestion window to send them
                double dl = ackQueue.deadline();
                if (searchQueue.headSize() <= window.available())
                        dl = Math.min (dl, searchQueue.deadline());
@@ -306,27 +278,11 @@
                }
                // Schedule the next check
                double sleep = Math.max (dl - Event.time(), MIN_SLEEP);
-               if (waitingForBandwidth()) {
-                       log ("waiting for bandwidth");
-                       sleep = MIN_SLEEP; // Poll the bandwidth limiter
-               }
                timerRunning = true;
                log ("sleeping for " + sleep + " seconds");
                Event.schedule (this, sleep, CHECK_DEADLINES, null);
        }

-       // Are there any messages blocked by the bandwidth limiter?
-       private boolean waitingForBandwidth()
-       {
-               int bandwidth = node.bandwidth.available();
-               double now = Event.time();
-               if (searchQueue.headSize() > bandwidth
-               && searchQueue.deadline() <= now) return true;
-               if (transferQueue.headSize() > bandwidth
-               && transferQueue.deadline() <= now) return true;
-               return false;
-       }
-       
        public void log (String message)
        {
                Event.log (node.net.address + ":" + address + " " + message);

Modified: trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java  2006-11-01 
20:04:40 UTC (rev 10786)
+++ trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java  2006-11-01 
20:16:48 UTC (rev 10787)
@@ -2,13 +2,14 @@

 class TokenBucket
 {
-       private double tokens, rate, size, lastUpdated;
+       public final double rate, size;
+       private double tokens, lastUpdated;

        public TokenBucket (double rate, double size)
        {
-               tokens = size;
                this.rate = rate;
                this.size = size;
+               tokens = size;
                lastUpdated = 0.0; // Clock time
        }



Reply via email to