Author: mrogers
Date: 2006-08-06 12:33:47 +0000 (Sun, 06 Aug 2006)
New Revision: 9938

Modified:
   trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
   trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
Log:
Sleep until the next coalescing deadline or retx timer

Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Node.java  2006-08-06 
12:22:50 UTC (rev 9937)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Node.java  2006-08-06 
12:33:47 UTC (rev 9938)
@@ -3,8 +3,8 @@

 class Node implements EventTarget
 {
-       public final static double RETX_TIMER = 0.1; // Coarse-grained timer
        public final static int STORE_SIZE = 10; // Max number of keys in store
+       public final static double MIN_SLEEP = 0.01; // Seconds

        public double location; // Routing location
        public NetworkInterface net;
@@ -13,7 +13,7 @@
        private HashSet<Integer> recentlySeenRequests; // Request IDs
        private HashMap<Integer,RequestState> outstandingRequests;
        public LruCache<Integer> cache; // Datastore containing keys
-       private boolean timerRunning = false; // Is the retx timer running?
+       private boolean timerRunning = false; // Is the timer running?

        public Node (double txSpeed, double rxSpeed)
        {
@@ -60,8 +60,8 @@
        public void startTimer()
        {
                if (timerRunning) return;
-               log ("starting retransmission timer");
-               Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
+               log ("starting retransmission/coalescing timer");
+               Event.schedule (this, Peer.COALESCE, CHECK_TIMEOUTS, null);
                timerRunning = true;
        }

@@ -162,21 +162,24 @@
                log ("generating request " + r.id);
                handleRequest (r, null);
                // Schedule the next request
-               Event.schedule (this, 0.05, GENERATE_REQUEST, null);
+               Event.schedule (this, 0.049, GENERATE_REQUEST, null);
        }

        // Event callback
        private void checkTimeouts()
        {
-               boolean stopTimer = true;
+               double deadline = Double.POSITIVE_INFINITY;
                for (Peer p : peers.values())
-                       if (p.checkTimeouts()) stopTimer = false;
-               if (stopTimer) {
-                       log ("stopping retransmission timer");
+                       deadline = Math.min (deadline, p.checkTimeouts());
+               if (deadline == Double.POSITIVE_INFINITY) {
+                       log ("stopping retransmission/coalescing timer");
                        timerRunning = false;
                }
                else {
-                       Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
+                       double sleep = deadline - Event.time();
+                       if (sleep < MIN_SLEEP) sleep = MIN_SLEEP;
+                       log ("sleeping for " + sleep + " seconds");
+                       Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
                        timerRunning = true;
                }
        }

Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java  2006-08-06 
12:22:50 UTC (rev 9937)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java  2006-08-06 
12:33:47 UTC (rev 9938)
@@ -21,9 +21,8 @@
        public final static double BETA = 0.9375; // AIMD decrease parameter
        public final static double GAMMA = 3.0; // Slow start divisor

-       // Coalescing parameters
-       public final static double COALESCE_DATA = 0.1; // Max delay in seconds
-       public final static double COALESCE_ACK = 0.1;
+       // Coalescing
+       public final static double COALESCE = 0.1; // Max delay in seconds

        // Out-of-order delivery with eventual detection of missing packets
        public final static int SEQ_RANGE = 1000; // Packets
@@ -66,9 +65,11 @@
                // Warning: until token-passing is implemented the length of
                // the transmission queue is unlimited
                double now = Event.time();
-               msgQueue.add (new Deadline<Message> (m, now + COALESCE_DATA));
+               msgQueue.add (new Deadline<Message> (m, now + COALESCE));
                msgQueueSize += m.size;
                log (msgQueue.size() + " messages in transmission queue");
+               // Start the node's timer if necessary
+               node.startTimer();
                // Send as many packets as possible
                while (send());
        }
@@ -100,15 +101,8 @@
                if (payload > msgQueueSize) payload = msgQueueSize;
                if (payload > window) payload = window;

-               // Work out when the first ack or message needs to be sent
-               double deadline = Double.POSITIVE_INFINITY;
-               Deadline<Integer> ack = ackQueue.peek();
-               if (ack != null) deadline = ack.deadline;
-               Deadline<Message> msg = msgQueue.peek();
-               if (msg != null) deadline = Math.min (deadline, msg.deadline);
-               
                // Delay small packets for coalescing
-               if (payload < Packet.SENSIBLE_PAYLOAD && now < deadline) {
+               if (payload < Packet.SENSIBLE_PAYLOAD && now < deadline()) {
                        log ("delaying transmission of " + payload + " bytes");
                        return false;
                }
@@ -148,8 +142,6 @@
                        inflight += p.size; // Acks aren't congestion-controlled
                        log (inflight + " bytes in flight");
                        txBuffer.add (p);
-                       // Start the node's retransmission timer if necessary
-                       node.startTimer();
                }

                // Send the packet
@@ -161,9 +153,12 @@
        private void sendAck (int seq)
        {
                double now = Event.time();
-               ackQueue.add (new Deadline<Integer> (seq, now + COALESCE_ACK));
+               ackQueue.add (new Deadline<Integer> (seq, now + COALESCE));
                ackQueueSize += Packet.ACK_SIZE;
-               send();
+               // Start the node's timer if necessary
+               node.startTimer();
+               // Send as many packets as possible
+               while (send());
        }

        // Called by Node when a packet arrives
@@ -262,19 +257,31 @@
                for (Message m : p.messages) node.handleMessage (m, this);
        }

+       // Work out when the first ack or message needs to be sent
+       private double deadline()
+       {
+               double deadline = Double.POSITIVE_INFINITY;
+               Deadline<Integer> ack = ackQueue.peek();
+               if (ack != null) deadline = ack.deadline;
+               Deadline<Message> msg = msgQueue.peek();
+               if (msg != null) deadline = Math.min (deadline, msg.deadline);
+               return deadline;
+       }
+       
        private void log (String message)
        {
                Event.log (node.net.address + ":" + address + " " + message);
        }

-       // Called by Node
-       public boolean checkTimeouts()
+       // Called by Node, returns the next coalescing or retx deadline
+       public double checkTimeouts()
        {
                log ("checking timeouts");
-               send(); // Consider sending delayed packets
+               // Send as many packets as possible
+               while (send());
                if (txBuffer.isEmpty()) {
                        log ("no packets in flight");
-                       return false;
+                       return deadline();
                }
                double now = Event.time();
                for (Packet p : txBuffer) {
@@ -297,6 +304,6 @@
                                }
                        }
                }
-               return true;
+               return Math.min (now + COALESCE, deadline());
        }
 }


Reply via email to