Author: mrogers
Date: 2006-10-30 18:06:44 +0000 (Mon, 30 Oct 2006)
New Revision: 10746

Removed:
   trunk/apps/load-balancing-sims/phase6/TokenBucket.java
Modified:
   trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
   trunk/apps/load-balancing-sims/phase6/Node.java
   trunk/apps/load-balancing-sims/phase6/Peer.java
Log:
Bandwidth & congestion refactoring, part 1

Modified: trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java    2006-10-29 
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java    2006-10-30 
18:06:44 UTC (rev 10746)
@@ -24,9 +24,8 @@

        public double deadline()
        {
-               Double deadline = deadlines.peek();
-               if (deadline == null) return Double.POSITIVE_INFINITY;
-               else return deadline;
+               if (deadlines.isEmpty()) return Double.POSITIVE_INFINITY;
+               else return deadlines.peek();
        }

        public MESSAGE pop()

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-10-29 23:34:22 UTC 
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-10-30 18:06:44 UTC 
(rev 10746)
@@ -6,12 +6,6 @@

 class Node implements EventTarget
 {
-       public final static double SHORT_SLEEP = 0.01; // Poll the bw limiter
-       
-       // Token bucket bandwidth limiter
-       public final static int BUCKET_RATE = 30000; // Bytes per second
-       public final static int BUCKET_SIZE = 60000; // Burst size in bytes
-       
        public double location; // Routing location
        public NetworkInterface net;
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
@@ -24,7 +18,6 @@
        private LruCache<Integer> pubKeyCache; // SSK public keys
        private boolean decrementMaxHtl = false;
        private boolean decrementMinHtl = false;
-       public TokenBucket bandwidth; // Bandwidth limiter
        private boolean timerRunning = false; // Is the retx timer running?

        public Node (double txSpeed, double rxSpeed)
@@ -46,7 +39,6 @@
                pubKeyCache = new LruCache<Integer> (10);
                if (Math.random() < 0.5) decrementMaxHtl = true;
                if (Math.random() < 0.25) decrementMinHtl = true;
-               bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
        }

        // Return true if a connection was added, false if already connected
@@ -54,7 +46,7 @@
        {
                if (n == this) return false;
                if (peers.containsKey (n.net.address)) return false;
-               log ("adding peer " + n.net.address);
+               // log ("adding peer " + n.net.address);
                Peer p = new Peer (this, n.net.address, n.location, latency);
                peers.put (n.net.address, p);
                return true;
@@ -398,8 +390,7 @@
                        timerRunning = false;
                }
                else {
-                       double sleep = deadline - Event.time(); // Can be < 0
-                       if (sleep < SHORT_SLEEP) sleep = SHORT_SLEEP;
+                       double sleep = Math.max (deadline - Event.time(), 0.0);
                        // log ("sleeping for " + sleep + " seconds");
                        Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
                }

Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java     2006-10-29 23:34:22 UTC 
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java     2006-10-30 18:06:44 UTC 
(rev 10746)
@@ -83,14 +83,16 @@

        // Try to send a packet, return true if a packet was sent
        private boolean send()
-       {               
-               if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
+       {
+               int waiting = ackQueue.size + searchQueue.size
+                               + transferQueue.size;
+               if (waiting == 0) {
                        log ("nothing to send");
                        return false;
                }
-               log (ackQueue.size + " bytes of acks in queue");
-               log (searchQueue.size + " bytes of searches in queue");
-               log (transferQueue.size + " bytes of transfers in queue");
+               log (ackQueue.size + " bytes in ack queue");
+               log (searchQueue.size + " bytes in search queue");
+               log (transferQueue.size + " bytes in transfer queue");

                // Return to slow start when the link is idle
                double now = Event.time();
@@ -98,9 +100,8 @@
                lastTransmission = now;

                // Delay small packets for coalescing
-               if (now < deadline (now)) {
-                       int payload = searchQueue.size + transferQueue.size;
-                       log ("delaying transmission of " + payload + " bytes");
+               if (now < deadline()) {
+                       log ("delaying transmission of " + waiting + " bytes");
                        return false;
                }

@@ -119,8 +120,6 @@
                        log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
                else if (window.available() <= 0)
                        log ("no room in congestion window for messages");
-               else if (node.bandwidth.available() <= 0)
-                       log ("no bandwidth available for messages");
                else pack (p); // OK to send data

                // Don't send empty packets
@@ -137,7 +136,6 @@
                // Send the packet
                log ("sending packet " + p.seq + ", " + p.size + " bytes");
                node.net.send (p, address, latency);
-               node.bandwidth.remove (p.size);
                return true;
        }

@@ -250,13 +248,13 @@
        // Called by Node, returns the next coalescing or retx deadline
        public double checkTimeouts()
        {
-               log ("checking timeouts");
                // Send as many packets as possible
                while (send());

-               log (txBuffer.size() + " packets in flight");
+               log ("checking timeouts");
                double now = Event.time();
-               if (txBuffer.isEmpty()) return deadline (now);
+               if (txBuffer.isEmpty()) return deadline();
+               log (txBuffer.size() + " packets in flight");

                for (Packet p : txBuffer) {
                        if (now - p.sent > RTO * rtt + MAX_DELAY) {
@@ -269,40 +267,34 @@
                }

                // Sleep for up to MAX_DELAY seconds until the next deadline
-               return Math.min (now + MAX_DELAY, deadline (now));
+               return Math.min (now + MAX_DELAY, deadline());
        }

        // Work out when the first ack or search or transfer needs to be sent
-       private double deadline (double now)
+       private double deadline()
        {
-               return Math.min (ackQueue.deadline(), dataDeadline (now));
+               return Math.min (ackQueue.deadline(), dataDeadline());
        }

        // Work out when the first search or transfer needs to be sent
-       private double dataDeadline (double now)
+       private double dataDeadline()
        {
+               int waiting = searchQueue.size + transferQueue.size;
+               
                // If there's no data waiting, use the ack deadline
-               if (searchQueue.size + transferQueue.size == 0)
-                       return Double.POSITIVE_INFINITY;
+               if (waiting == 0) return Double.POSITIVE_INFINITY;

-               double deadline = Math.min (searchQueue.deadline(),
-                                               transferQueue.deadline());
-               
                // Delay small packets until the coalescing deadline
-               if (searchQueue.size + transferQueue.size
-               < Packet.SENSIBLE_PAYLOAD)
-                       return deadline;
+               if (waiting < Packet.SENSIBLE_PAYLOAD)
+                       return Math.min (searchQueue.deadline(),
+                                       transferQueue.deadline());

                // If there's not enough room in the window, wait for an ack
-               if (window.available() <= 0)
+               if (window.available() < Packet.SENSIBLE_PAYLOAD)
                        return Double.POSITIVE_INFINITY;

-               // If there's not enough bandwidth, try again shortly
-               if (node.bandwidth.available() <= 0)
-                       return Math.max (deadline, now + Node.SHORT_SLEEP);
-               
                // Send a packet immediately
-               return now;
+               return 0.0;
        }

        public void log (String message)

Deleted: trunk/apps/load-balancing-sims/phase6/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/TokenBucket.java      2006-10-29 
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase6/TokenBucket.java      2006-10-30 
18:06:44 UTC (rev 10746)
@@ -1,29 +0,0 @@
-class TokenBucket
-{
-       private double tokens, rate, size, lastUpdated;
-       
-       public TokenBucket (double rate, double size)
-       {
-               tokens = size;
-               this.rate = rate;
-               this.size = size;
-               lastUpdated = 0.0; // Clock time
-       }
-       
-       public int available()
-       {
-               double now = Event.time();
-               double elapsed = now - lastUpdated;
-               lastUpdated = now;
-               tokens += elapsed * rate;
-               if (tokens > size) tokens = size;
-               // Event.log (tokens + " tokens available");
-               return (int) tokens;
-       }
-       
-       public void remove (int t)
-       {
-               tokens -= t; // Counter can go negative
-               // Event.log (t + " tokens removed, " + tokens + " available");
-       }
-}


Reply via email to