Author: mrogers
Date: 2006-10-30 20:23:12 +0000 (Mon, 30 Oct 2006)
New Revision: 10748

Added:
   trunk/apps/load-balancing-sims/phase6/TokenBucket.java
Modified:
   trunk/apps/load-balancing-sims/phase6/Node.java
   trunk/apps/load-balancing-sims/phase6/Peer.java
Log:
Forget it, more urgent things to do...

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-10-30 18:17:39 UTC 
(rev 10747)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-10-30 20:23:12 UTC 
(rev 10748)
@@ -6,6 +6,12 @@

 class Node implements EventTarget
 {
+       public final static double SHORT_SLEEP = 0.01; // Poll the bw limiter
+       
+       // Token bucket bandwidth limiter
+       public final static int BUCKET_RATE = 30000; // Bytes per second
+       public final static int BUCKET_SIZE = 60000; // Burst size in bytes
+       
        public double location; // Routing location
        public NetworkInterface net;
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
@@ -18,6 +24,7 @@
        private LruCache<Integer> pubKeyCache; // SSK public keys
        private boolean decrementMaxHtl = false;
        private boolean decrementMinHtl = false;
+       public TokenBucket bandwidth; // Bandwidth limiter
        private boolean timerRunning = false; // Is the retx timer running?

        public Node (double txSpeed, double rxSpeed)
@@ -39,6 +46,7 @@
                pubKeyCache = new LruCache<Integer> (10);
                if (Math.random() < 0.5) decrementMaxHtl = true;
                if (Math.random() < 0.25) decrementMinHtl = true;
+               bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
        }

        // Return true if a connection was added, false if already connected
@@ -390,7 +398,8 @@
                        timerRunning = false;
                }
                else {
-                       double sleep = Math.max (deadline - Event.time(), 0.0);
+                       double sleep = deadline - Event.time(); // Can be < 0
+                       if (sleep < SHORT_SLEEP) sleep = SHORT_SLEEP;
                        // log ("sleeping for " + sleep + " seconds");
                        Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
                }

Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java     2006-10-30 18:17:39 UTC 
(rev 10747)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java     2006-10-30 20:23:12 UTC 
(rev 10748)
@@ -55,7 +55,7 @@
        // Queue a message for transmission
        public void sendMessage (Message m)
        {
-               m.deadline = Event.time() + MAX_DELAY; // FIXME
+               m.deadline = Event.time() + MAX_DELAY;
                if (m instanceof Block || m instanceof DataInsert
                || m instanceof ChkDataFound) {
                        log (m + " added to transfer queue");
@@ -84,16 +84,14 @@

        // Try to send a packet, return true if a packet was sent
        private boolean send()
-       {
-               int waiting = ackQueue.size + searchQueue.size
-                               + transferQueue.size;
-               if (waiting == 0) {
+       {               
+               if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
                        log ("nothing to send");
                        return false;
                }
-               log (ackQueue.size + " bytes in ack queue");
-               log (searchQueue.size + " bytes in search queue");
-               log (transferQueue.size + " bytes in transfer queue");
+               log (ackQueue.size + " bytes of acks in queue");
+               log (searchQueue.size + " bytes of searches in queue");
+               log (transferQueue.size + " bytes of transfers in queue");

                // Return to slow start when the link is idle
                double now = Event.time();
@@ -101,8 +99,9 @@
                lastTransmission = now;

                // Delay small packets for coalescing
-               if (now < deadline()) {
-                       log ("delaying transmission of " + waiting + " bytes");
+               if (now < deadline (now)) {
+                       int payload = searchQueue.size + transferQueue.size;
+                       log ("delaying transmission of " + payload + " bytes");
                        return false;
                }

@@ -121,6 +120,8 @@
                        log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
                else if (window.available() <= 0)
                        log ("no room in congestion window for messages");
+               else if (node.bandwidth.available() <= 0)
+                       log ("no bandwidth available for messages");
                else pack (p); // OK to send data

                // Don't send empty packets
@@ -137,6 +138,7 @@
                // Send the packet
                log ("sending packet " + p.seq + ", " + p.size + " bytes");
                node.net.send (p, address, latency);
+               node.bandwidth.remove (p.size);
                return true;
        }

@@ -249,13 +251,13 @@
        // Called by Node, returns the next coalescing or retx deadline
        public double checkTimeouts()
        {
+               log ("checking timeouts");
                // Send as many packets as possible
                while (send());

-               log ("checking timeouts");
+               log (txBuffer.size() + " packets in flight");
                double now = Event.time();
-               if (txBuffer.isEmpty()) return deadline();
-               log (txBuffer.size() + " packets in flight");
+               if (txBuffer.isEmpty()) return deadline (now);

                for (Packet p : txBuffer) {
                        if (now - p.sent > RTO * rtt + MAX_DELAY) {
@@ -268,34 +270,40 @@
                }

                // Sleep for up to MAX_DELAY seconds until the next deadline
-               return Math.min (now + MAX_DELAY, deadline());
+               return Math.min (now + MAX_DELAY, deadline (now));
        }

        // Work out when the first ack or search or transfer needs to be sent
-       private double deadline()
+       private double deadline (double now)
        {
-               return Math.min (ackQueue.deadline(), dataDeadline());
+               return Math.min (ackQueue.deadline(), dataDeadline (now));
        }

        // Work out when the first search or transfer needs to be sent
-       private double dataDeadline()
+       private double dataDeadline (double now)
        {
-               int waiting = searchQueue.size + transferQueue.size;
-               
                // If there's no data waiting, use the ack deadline
-               if (waiting == 0) return Double.POSITIVE_INFINITY;
+               if (searchQueue.size + transferQueue.size == 0)
+                       return Double.POSITIVE_INFINITY;

+               double deadline = Math.min (searchQueue.deadline(),
+                                               transferQueue.deadline());
+               
                // Delay small packets until the coalescing deadline
-               if (waiting < Packet.SENSIBLE_PAYLOAD)
-                       return Math.min (searchQueue.deadline(),
-                                       transferQueue.deadline());
+               if (searchQueue.size + transferQueue.size
+               < Packet.SENSIBLE_PAYLOAD)
+                       return deadline;

                // If there's not enough room in the window, wait for an ack
-               if (window.available() < Packet.SENSIBLE_PAYLOAD)
+               if (window.available() <= 0)
                        return Double.POSITIVE_INFINITY;

+               // If there's not enough bandwidth, try again shortly
+               if (node.bandwidth.available() <= 0)
+                       return Math.max (deadline, now + Node.SHORT_SLEEP);
+               
                // Send a packet immediately
-               return 0.0;
+               return now;
        }

        public void log (String message)

Copied: trunk/apps/load-balancing-sims/phase6/TokenBucket.java (from rev 10742, 
trunk/apps/load-balancing-sims/phase6/TokenBucket.java)


Reply via email to