Author: mrogers
Date: 2006-11-01 17:11:27 +0000 (Wed, 01 Nov 2006)
New Revision: 10775

Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Event.java
   trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
Log:
Refactored interleaving, coalescing, congestion control and bandwidth limiter

Modified: trunk/apps/load-balancing-sims/phase7/sim/Event.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Event.java        2006-11-01 
10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/Event.java        2006-11-01 
17:11:27 UTC (rev 10775)
@@ -7,6 +7,7 @@

        private static TreeSet<Event> queue = new TreeSet<Event>();
        private static double clockTime = 0.0;
+       private static double lastLogTime = Double.POSITIVE_INFINITY;
        private static int nextId = 0;
        public static double duration = Double.POSITIVE_INFINITY;

@@ -14,12 +15,13 @@
        {
                queue.clear();
                clockTime = 0.0;
+               lastLogTime = Double.POSITIVE_INFINITY;
                nextId = 0;
                duration = Double.POSITIVE_INFINITY;
        }

        public static void schedule (EventTarget target, double time,
-                               int type, Object data)
+                                       int type, Object data)
        {
                queue.add (new Event (target, time + clockTime, type, data));
        }
@@ -50,6 +52,9 @@

        public static void log (String message)
        {
+               // Print a blank line between events
+               if (clockTime > lastLogTime) System.out.println();
+               lastLogTime = clockTime;
                System.out.print (clockTime + " " + message + "\n");
        }


Modified: trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java     
2006-11-01 10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java     
2006-11-01 17:11:27 UTC (rev 10775)
@@ -122,7 +122,6 @@
                }
        }

-       // Each EventTarget class has its own event codes
        public final static int RX_QUEUE = 1;
        private final static int RX_END = 2;
        private final static int TX_END = 3;

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 10:00:25 UTC 
(rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 17:11:27 UTC 
(rev 10775)
@@ -8,6 +8,9 @@

 public class Node implements EventTarget
 {
+       // Coarse-grained retransmission timer
+       public final static double RETX_TIMER = 0.1; // Seconds
+       
        public double location; // Routing location
        public NetworkInterface net;
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
@@ -21,7 +24,7 @@
        private boolean decrementMaxHtl = false;
        private boolean decrementMinHtl = false;
        public TokenBucket bandwidth; // Bandwidth limiter
-       private boolean timerRunning = false; // Is the retx timer running?
+       private boolean timerRunning = false;

        public Node (double txSpeed, double rxSpeed)
        {
@@ -149,13 +152,13 @@
                pubKeyCache.put (key);
        }

-       // Called by Peer
+       // Called by Peer after transmitting a packet
        public void startTimer()
        {
                if (timerRunning) return;
-               // log ("starting retransmission/coalescing timer");
-               Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
                timerRunning = true;
+               log ("starting retransmission timer");
+               Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
        }

        // Called by NetworkInterface
@@ -385,20 +388,13 @@

        private void checkTimeouts()
        {
-               // Check the peers in a random order each time
-               double deadline = Double.POSITIVE_INFINITY;
-               for (Peer p : peers())
-                       deadline = Math.min (deadline, p.checkTimeouts());
-               if (deadline == Double.POSITIVE_INFINITY) {
-                       // log ("stopping retransmission/coalescing timer");
+               boolean stopTimer = true;
+               for (Peer p : peers()) if (p.checkTimeouts()) stopTimer = false;
+               if (stopTimer) {
+                       log ("stopping retransmission timer");
                        timerRunning = false;
                }
-               else {
-                       double sleep = deadline - Event.time();
-                       if (sleep < Peer.MIN_SLEEP) sleep = Peer.MIN_SLEEP;
-                       // log ("sleeping for " + sleep + " seconds");
-                       Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
-               }
+               else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
        }

        // EventTarget interface
@@ -423,6 +419,7 @@

                        case SSK_COLLISION:
                        generateSskInsert ((Integer) data, 1);
+                       break;

                        case CHECK_TIMEOUTS:
                        checkTimeouts();
@@ -430,7 +427,6 @@
                }
        }

-       // Each EventTarget class has its own event codes
        public final static int REQUEST_CHK = 1;
        public final static int INSERT_CHK = 2;
        public final static int REQUEST_SSK = 3;

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 10:00:25 UTC 
(rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 17:11:27 UTC 
(rev 10775)
@@ -4,7 +4,7 @@
 import java.util.Iterator;
 import java.util.HashSet;

-public class Peer
+public class Peer implements EventTarget
 {
        private Node node; // The local node
        public int address; // The remote node's address
@@ -18,10 +18,10 @@
        public final static double LINK_IDLE = 8.0; // RTTs without transmitting

        // Coalescing
-       public final static double MIN_SLEEP = 0.01; // Poll the b/w limiter
-       public final static double MAX_DELAY = 0.1; // Coalescing delay in secs
+       private final static double MAX_DELAY = 0.1; // Max coalescing delay
+       private final static double MIN_SLEEP = 0.01; // Poll the b/w limiter

-       // Out-of-order delivery with eventual detection of missing packets
+       // Out-of-order delivery with duplicate detection
        public final static int SEQ_RANGE = 1000;

        // Sender state
@@ -33,8 +33,9 @@
        private DeadlineQueue<Message> searchQueue; // Outgoing search messages
        private DeadlineQueue<Message> transferQueue; // Outgoing transfers
        private CongestionWindow window; // AIMD congestion window
-       private double lastTransmission = 0.0; // Clock time
-       private boolean tgif = false; // "Transfers go in first" toggle
+       private double lastTransmission = Double.POSITIVE_INFINITY; // Time
+       private int searchBytesSent = 0, transferBytesSent = 0;
+       private boolean timerRunning = false; // Coalescing timer

        // Receiver state
        private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -58,8 +59,7 @@
        public void sendMessage (Message m)
        {
                m.deadline = Event.time() + MAX_DELAY;
-               if (m instanceof Block || m instanceof DataInsert
-               || m instanceof ChkDataFound) {
+               if (m instanceof Block) {
                        log (m + " added to transfer queue");
                        transferQueue.add (m);
                }
@@ -67,8 +67,8 @@
                        log (m + " added to search queue");
                        searchQueue.add (m);
                }
-               // Start the node's timer if necessary
-               node.startTimer();
+               // Start the coalescing timer
+               startTimer();
                // Send as many packets as possible
                while (send());
        }
@@ -78,72 +78,115 @@
        {
                log ("ack " + seq + " added to ack queue");
                ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
-               // Start the node's timer if necessary
-               node.startTimer();
+               // Start the coalescing timer
+               startTimer();
                // Send as many packets as possible
                while (send());
        }

+       // Start the coalescing timer
+       private void startTimer()
+       {
+               if (timerRunning) return;
+               timerRunning = true;
+               log ("starting coalescing timer");
+               Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
+       }
+       
        // Try to send a packet, return true if a packet was sent
        private boolean send()
-       {               
-               if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
-                       log ("nothing to send");
-                       return false;
-               }
-               log (ackQueue.size + " bytes of acks in queue");
-               log (searchQueue.size + " bytes of searches in queue");
-               log (transferQueue.size + " bytes of transfers in queue");
-               
+       {
+               int waiting = ackQueue.size+searchQueue.size+transferQueue.size;
+               log (waiting + " bytes waiting");
+               if (waiting == 0) return false;
                // Return to slow start when the link is idle
                double now = Event.time();
                if (now - lastTransmission > LINK_IDLE * rtt) window.reset();
                lastTransmission = now;
-               
-               // Delay small packets for coalescing
-               if (now < deadline (now)) {
-                       int payload = searchQueue.size + transferQueue.size;
-                       log ("delaying transmission of " + payload + " bytes");
+               // How many bytes of messages can we send?
+               int available = Math.min (window.available(),
+                                       node.bandwidth.available());
+               log (available + " bytes available for packet");
+               // If there are no urgent acks, and no urgent messages or no
+               // room to send them, and not enough messages for a large
+               // packet or no room to send a large packet, give up!
+               if (ackQueue.deadline() > now
+               && (searchQueue.deadline() > now
+               || searchQueue.headSize() > available)
+               && (transferQueue.deadline() > now
+               || transferQueue.headSize() > available)
+               && (waiting < Packet.SENSIBLE_PAYLOAD
+               || available < Packet.SENSIBLE_PAYLOAD)) {
+                       log ("not sending a packet");
                        return false;
                }
-               
+               // Construct a packet
                Packet p = new Packet();
-               
-               // Put all waiting acks in the packet
                while (ackQueue.size > 0) p.addAck (ackQueue.pop());
-               
-               // Don't send sequence number n+SEQ_RANGE until sequence
-               // number n has been acked - this limits the number of
-               // sequence numbers the receiver must store for replay
-               // detection. We must still be allowed to send acks,
-               // otherwise the connection could deadlock.
-               
-               if (txSeq > txMaxSeq)
-                       log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
-               else if (window.available() <= 0)
-                       log ("no room in congestion window for messages");
-               else if (node.bandwidth.available() <= 0)
-                       log ("no bandwidth available for messages");
-               else pack (p); // OK to send data
-               
+               int space = Math.min (available, Packet.MAX_SIZE - p.size);
+               addPayload (p, space);
                // Don't send empty packets
                if (p.acks == null && p.messages == null) return false;
-               
+               // Transmit the packet
+               log ("sending packet " + p.seq + ", " + p.size + " bytes");
+               node.net.send (p, address, latency);
+               node.bandwidth.remove (p.size);
                // If the packet contains data, buffer it for retransmission
                if (p.messages != null) {
-                       p.seq = txSeq++;
                        p.sent = now;
                        txBuffer.add (p);
+                       node.startTimer(); // Start the retransmission timer
                        window.bytesSent (p.size);
                }
-               
-               // Send the packet
-               log ("sending packet " + p.seq + ", " + p.size + " bytes");
-               node.net.send (p, address, latency);
-               node.bandwidth.remove (p.size);
                return true;
        }

+       // Allocate a payload number and add messages to a packet
+       private void addPayload (Packet p, int space)
+       {
+               log (space + " bytes available for messages");
+               if (txSeq > txMaxSeq) {
+                       log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
+                       return;
+               }
+               p.seq = txSeq++;
+               // Searches get priority unless transfers are starving
+               if (searchBytesSent < transferBytesSent) {
+                       while (searchQueue.size > 0
+                       && searchQueue.headSize() <= space) {
+                               Message m = searchQueue.pop();
+                               searchBytesSent += m.size();
+                               space -= m.size();
+                               p.addMessage (m);
+                       }
+                       while (transferQueue.size > 0
+                       && transferQueue.headSize() <= space) {
+                               Message m = transferQueue.pop();
+                               transferBytesSent += m.size();
+                               space -= m.size();
+                               p.addMessage (m);
+                       }
+               }
+               else {
+                       while (transferQueue.size > 0
+                       && transferQueue.headSize() <= space) {
+                               Message m = transferQueue.pop();
+                               transferBytesSent += m.size();
+                               space -= m.size();
+                               p.addMessage (m);
+                       }
+                       while (searchQueue.size > 0
+                       && searchQueue.headSize() <= space) {
+                               Message m = searchQueue.pop();
+                               searchBytesSent += m.size();
+                               space -= m.size();
+                               p.addMessage (m);
+                       }
+               }
+               if (p.messages == null) log ("no messages added");
+               else log (p.messages.size() + " messages added");
+       }
+       
        // Called by Node when a packet arrives
        public void handlePacket (Packet p)
        {
@@ -153,26 +196,24 @@

        private void handleData (Packet p)
        {
-               log ("received packet " + p.seq + ", " + p.size + " bytes");
+               log ("received " + p + ", " + p.size + " bytes");
+               sendAck (p.seq);
                if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
-                       log ("duplicate packet");
-                       sendAck (p.seq); // Original ack may have been lost
+                       log (p + " is a duplicate");
                }
                else if (p.seq == rxSeq) {
-                       log ("packet in order");
+                       log (p + " is in order");
                        // Find the sequence number of the next missing packet
                        int was = rxSeq;
                        while (rxDupe.remove (++rxSeq));
                        log ("rxSeq was " + was + ", now " + rxSeq);
                        // Deliver the packet
                        unpack (p);
-                       sendAck (p.seq);
                }
-               else if (p.seq < rxSeq + SEQ_RANGE) {
-                       log ("packet out of order - expected " + rxSeq);
+               else if (p.seq < rxSeq + SEQ_RANGE * 2) {
+                       log (p + " is out of order - expected " + rxSeq);
                        if (rxDupe.add (p.seq)) unpack (p);
-                       else log ("duplicate packet");
-                       sendAck (p.seq); // Original ack may have been lost
+                       else log (p + " is a duplicate");
                }
                // This indicates a misbehaving sender - discard the packet
                else log ("warning: received " + p.seq + " before " + rxSeq);
@@ -211,38 +252,11 @@
                if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1;
                else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
                log ("maximum sequence number " + txMaxSeq);
-               // Send as many packets as possible
-               while (send());
+               // Send as many packets a possible
+               if (timerRunning) while (send());
+               else checkDeadlines();
        }

-       // Add messages to a packet
-       private void pack (Packet p)
-       {
-               // Alternate between giving searches and transfers priority
-               if (tgif) {
-                       // Transfers go in first
-                       while (transferQueue.size > 0
-                       && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
-                               p.addMessage (transferQueue.pop());
-                       // Fill any remaining space with searches
-                       while (searchQueue.size > 0
-                       && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
-                               p.addMessage (searchQueue.pop());
-                       tgif = false;
-               }
-               else {
-                       // Searches go in first
-                       while (searchQueue.size > 0
-                       && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
-                               p.addMessage (searchQueue.pop());
-                       // Fill any remaining space with transfers
-                       while (transferQueue.size > 0
-                       && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
-                               p.addMessage (transferQueue.pop());
-                       tgif = true;
-               }
-       }
-       
        // Remove messages from a packet and deliver them to the node
        private void unpack (Packet p)
        {
@@ -250,17 +264,13 @@
                for (Message m : p.messages) node.handleMessage (m, this);
        }

-       // Called by Node, returns the next coalescing or retx deadline
-       public double checkTimeouts()
+       // Check retx timeouts, return true if there are packets in flight
+       public boolean checkTimeouts()
        {
-               log ("checking timeouts");
-               // Send as many packets as possible
-               while (send());
+               log (txBuffer.size() + " packets in flight");
+               if (txBuffer.isEmpty()) return false;

                double now = Event.time();
-               if (txBuffer.isEmpty()) return deadline (now);
-               log (txBuffer.size() + " packets in flight");
-               
                for (Packet p : txBuffer) {
                        if (now - p.sent > RTO * rtt + MAX_DELAY) {
                                // Retransmission timeout
@@ -270,51 +280,68 @@
                                window.timeout (now);
                        }
                }
-               
-               // Sleep for up to MAX_DELAY seconds until the next deadline
-               return Math.min (now + MAX_DELAY, deadline (now));
+               return true;
        }

-       // Work out when the first ack or search or transfer needs to be sent
-       private double deadline (double now)
+       // Event callback: wake up, send packets, go back to sleep
+       private void checkDeadlines()
        {
-               return Math.min (ackQueue.deadline(), dataDeadline (now));
+               // Send as many packets as possible
+               while (send());
+               // Find the next coalescing deadline - ignore message
+               // deadlines if there isn't room in the congestion window
+               // (we have to wait for an ack before sending them)
+               double dl = ackQueue.deadline();
+               if (searchQueue.headSize() <= window.available())
+                       dl = Math.min (dl, searchQueue.deadline());
+               if (transferQueue.headSize() <= window.available())
+                       dl = Math.min (dl, transferQueue.deadline());
+               // If there's no deadline, stop the timer
+               if (dl == Double.POSITIVE_INFINITY) {
+                       if (timerRunning) {
+                               log ("stopping coalescing timer");
+                               timerRunning = false;
+                       }
+                       return;
+               }
+               // Schedule the next check
+               double sleep = Math.max (dl - Event.time(), MIN_SLEEP);
+               if (waitingForBandwidth()) {
+                       log ("waiting for bandwidth");
+                       sleep = MIN_SLEEP; // Poll the bandwidth limiter
+               }
+               timerRunning = true;
+               log ("sleeping for " + sleep + " seconds");
+               Event.schedule (this, sleep, CHECK_DEADLINES, null);
        }

-       // Work out when the first search or transfer needs to be sent
-       private double dataDeadline (double now)
+       // Are there any messages blocked by the bandwidth limiter?
+       private boolean waitingForBandwidth()
        {
-               // If there's no data waiting, use the ack deadline
-               if (searchQueue.size + transferQueue.size == 0)
-                       return Double.POSITIVE_INFINITY;
-               
-               double deadline = Math.min (searchQueue.deadline(),
-                                               transferQueue.deadline());
-               
-               // Delay small packets until the coalescing deadline
-               if (searchQueue.size + transferQueue.size
-               < Packet.SENSIBLE_PAYLOAD)
-                       return deadline;
-               
-               // If there's not enough room in the window, wait for an ack
-               if (window.available() <= 0)
-                       return Double.POSITIVE_INFINITY;
-               
-               // If there's not enough bandwidth, try again shortly
-               if (node.bandwidth.available() <= 0)
-                       return Math.max (now + MIN_SLEEP, deadline);
-               
-               // Send a packet immediately
-               return now;
+               int bandwidth = node.bandwidth.available();
+               double now = Event.time();
+               if (searchQueue.headSize() > bandwidth
+               && searchQueue.deadline() <= now) return true;
+               if (transferQueue.headSize() > bandwidth
+               && transferQueue.deadline() <= now) return true;
+               return false;
        }

        public void log (String message)
        {
-               // Event.log (node.net.address + ":" + address + " " + message);
+               Event.log (node.net.address + ":" + address + " " + message);
        }

        public String toString()
        {
                return Integer.toString (address);
        }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               if (type == CHECK_DEADLINES) checkDeadlines();
+       }
+       
+       private final static int CHECK_DEADLINES = 1;
 }

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-01 10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-01 17:11:27 UTC (rev 10775)
@@ -261,7 +261,6 @@
                }
        }

-       // Each EventTarget class has its own event codes
        private final static int ACCEPTED_TIMEOUT = 1;
        private final static int SEARCH_TIMEOUT = 2;
        private final static int DATA_TIMEOUT = 3;

Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java      
2006-11-01 10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java      
2006-11-01 17:11:27 UTC (rev 10775)
@@ -132,7 +132,6 @@
                }
        }

-       // Each EventTarget class has its own event codes
        protected final static int ACCEPTED_TIMEOUT = 1;
        protected final static int SEARCH_TIMEOUT = 2;
        protected final static int TRANSFER_TIMEOUT = 3;

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java    
2006-11-01 10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java    
2006-11-01 17:11:27 UTC (rev 10775)
@@ -213,7 +213,6 @@
                }
        }

-       // Each EventTarget class has its own event codes
        private final static int KEY_TIMEOUT = 1;
        private final static int ACCEPTED_TIMEOUT = 2;
        private final static int SEARCH_TIMEOUT = 3;


Reply via email to