Author: mrogers
Date: 2006-11-16 15:15:27 +0000 (Thu, 16 Nov 2006)
New Revision: 10943

Modified:
   trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Packet.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
   trunk/apps/load-balancing-sims/phase7/sim/messages/Message.java
Log:
Measure the bandwidth delay - FIXME: the measured delay is large during 
transfers, but the delay is not actually being caused by the bandwidth limiter, 
which is set far above the available physical bandwidth

Modified: trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java     
2006-11-16 14:34:18 UTC (rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java     
2006-11-16 15:15:27 UTC (rev 10943)
@@ -28,12 +28,9 @@
                address = Network.register (this);
        }

-       // Called by Peer
-       public void send (Packet p, int dest, double latency)
+       // Called by Node
+       public void sendPacket (Packet p)
        {
-               p.src = address;
-               p.dest = dest;
-               p.latency = latency;
                if (txQueueSize + p.size > txQueueMaxSize) {
                        log ("no room in txQueue, " + p + " lost");
                        return;
@@ -75,7 +72,7 @@
                log ("finished receiving " + p);
                node.handlePacket (p);
                rxQueueSize -= p.size;
-               rxQueue.remove (p);
+               rxQueue.poll();
                // If there's another packet waiting, start to receive it
                if (!rxQueue.isEmpty()) rxStart (rxQueue.peek());
        }
@@ -94,7 +91,7 @@
                log ("finished transmitting " + p);
                Network.deliver (p);
                txQueueSize -= p.size;
-               txQueue.remove (p);
+               txQueue.poll();
                // If there's another packet waiting, start to transmit it
                if (!txQueue.isEmpty()) txStart (txQueue.peek());
        }

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-16 14:34:18 UTC 
(rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-16 15:15:27 UTC 
(rev 10943)
@@ -14,6 +14,7 @@
        // Flow control
        public final static int FLOW_TOKENS = 20; // Shared by all peers
        public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
+       public final static double DELAY_DECAY = 0.9; // Exp moving average

        public double location; // Routing location
        public NetworkInterface net;
@@ -30,6 +31,7 @@
        public TokenBucket bandwidth; // Bandwidth limiter
        private boolean timerRunning = false;
        private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
+       private double bwDelay = 0.0; // Average delay caused by b/w limiter

        public Node (double txSpeed, double rxSpeed)
        {
@@ -159,21 +161,50 @@
                pubKeyCache.put (key);
        }

-       // Called by Peer after transmitting a packet
+       // Called by Peer to start the retransmission timer
        public void startTimer()
        {
                if (timerRunning) return;
                timerRunning = true;
-               // log ("starting retransmission timer");
+               log ("starting retransmission timer");
                Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
        }

+       // Called by Peer to transmit a packet for the first time
+       public void sendPacket (Packet p)
+       {
+               // Update the bandwidth limiter
+               bandwidth.remove (p.size);
+               // Update the average bandwidth delay
+               if (p.messages != null) {
+                       double now = Event.time();
+                       for (Message m : p.messages) {
+                               double delay = now - m.deadline;
+                               log ("bandwidth delay " + delay);
+                               bwDelay *= DELAY_DECAY;
+                               bwDelay += delay * (1.0 - DELAY_DECAY);
+                       }
+                       log ("average bandwidth delay " + bwDelay);
+               }
+               // Send the packet
+               net.sendPacket (p);
+       }
+       
+       // Called by Peer to retransmit a packet
+       public void resendPacket (Packet p)
+       {
+               // Update the bandwidth limiter
+               bandwidth.remove (p.size);
+               // Send the packet
+               net.sendPacket (p);
+       }
+       
        // Called by NetworkInterface
-       public void handlePacket (Packet packet)
+       public void handlePacket (Packet p)
        {
-               Peer peer = peers.get (packet.src);
+               Peer peer = peers.get (p.src);
                if (peer == null) log ("received packet from unknown peer");
-               else peer.handlePacket (packet);
+               else peer.handlePacket (p);
        }

        // Called by Peer
@@ -447,7 +478,7 @@
                boolean stopTimer = true;
                for (Peer p : peers()) if (p.checkTimeouts()) stopTimer = false;
                if (stopTimer) {
-                       // log ("stopping retransmission timer");
+                       log ("stopping retransmission timer");
                        timerRunning = false;
                }
                else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);

Modified: trunk/apps/load-balancing-sims/phase7/sim/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Packet.java       2006-11-16 
14:34:18 UTC (rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/Packet.java       2006-11-16 
15:15:27 UTC (rev 10943)
@@ -12,15 +12,22 @@
        public final static int MAX_SIZE = 1450; // MTU including headers
        public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing

-       public int src, dest; // Network addresses
+       public final int src, dest; // Network addresses
        public int size = HEADER_SIZE; // Size in bytes, including headers
        public int seq = -1; // Data sequence number (-1 if no data)
        public ArrayList<Ack> acks = null;
        public ArrayList<Message> messages = null;

        public double sent; // Time at which the packet was (re) transmitted
-       public double latency; // Link latency (stored here for convenience)
+       public double latency; // Link latency, stored here for convenience

+       public Packet (int src, int dest, double latency)
+       {
+               this.src = src;
+               this.dest = dest;
+               this.latency = latency;
+       }
+       
        public void addAck (Ack a)
        {
                if (acks == null) acks = new ArrayList<Ack>();

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-16 14:34:18 UTC 
(rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-16 15:15:27 UTC 
(rev 10943)
@@ -18,11 +18,11 @@
        public final static double LINK_IDLE = 8.0; // RTTs without transmitting

        // Coalescing
-       private final static double MAX_SLEEP = 0.1; // Max coalescing delay
-       private final static double MIN_SLEEP = 0.01; // Forty winks
+       public final static double MAX_DELAY = 0.1; // Max coalescing delay
+       public final static double MIN_SLEEP = 0.01; // Forty winks

        // Out-of-order delivery with duplicate detection
-       public final static int SEQ_RANGE = 1000;
+       public final static int SEQ_RANGE = 65536;

        // Sender state
        private double rtt = 5.0; // Estimated round-trip time in seconds
@@ -36,7 +36,6 @@
        private double lastTransmission = Double.POSITIVE_INFINITY; // Time
        private boolean tgif = false; // "Transfers go in first" toggle
        private boolean timerRunning = false; // Coalescing timer
-       private double pollingInterval; // Poll the bandwidth limiter

        // Receiver state
        private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -58,16 +57,12 @@
                transferQueue = new DeadlineQueue<Message>();
                window = new CongestionWindow (this);
                rxDupe = new HashSet<Integer>();
-               // Poll the bandwidth limiter at reasonable intervals
-               pollingInterval = Packet.SENSIBLE_PAYLOAD / node.bandwidth.rate;
-               if (pollingInterval > MAX_SLEEP) pollingInterval = MAX_SLEEP;
-               if (pollingInterval < MIN_SLEEP) pollingInterval = MIN_SLEEP;
        }

        // Queue a message for transmission
        public void sendMessage (Message m)
        {
-               m.deadline = Event.time() + MAX_SLEEP;
+               m.deadline = Event.time() + MAX_DELAY;
                if (m instanceof Block) {
                        log (m + " added to transfer queue");
                        transferQueue.add (m);
@@ -86,7 +81,7 @@
        private void sendAck (int seq)
        {
                log ("ack " + seq + " added to ack queue");
-               ackQueue.add (new Ack (seq, Event.time() + MAX_SLEEP));
+               ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
                // Start the coalescing timer
                startTimer();
                // Send as many packets as possible
@@ -99,7 +94,7 @@
                if (timerRunning) return;
                timerRunning = true;
                log ("starting coalescing timer");
-               Event.schedule (this, MAX_SLEEP, CHECK_DEADLINES, null);
+               Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
        }

        // Try to send a packet, return true if a packet was sent
@@ -138,9 +133,11 @@
        private boolean sendPacket (int maxSize)
        {
                // Construct a packet
-               Packet p = new Packet();
+               Packet p = new Packet (node.net.address, address, latency);
+               // Add all waiting acks to the packet
                while (ackQueue.size > 0) p.addAck (ackQueue.pop());
                log ((maxSize - p.size) + " bytes available for messages");
+               // Don't allow more than SEQ_RANGE payloads to be in flight
                if (txSeq <= txMaxSeq) {
                        // Alternate priority between searches and transfers
                        if (tgif) {
@@ -161,8 +158,7 @@
                if (p.acks == null && p.messages == null) return false;
                // Transmit the packet
                log ("sending packet " + p.seq + ", " + p.size + " bytes");
-               node.net.send (p, address, latency);
-               node.bandwidth.remove (p.size);
+               node.sendPacket (p);
                // If the packet contains data, buffer it for retransmission
                if (p.messages != null) {
                        p.sent = Event.time();
@@ -230,10 +226,10 @@
                                break;
                        }
                        // Fast retransmission
-                       if (p.seq < seq && age > FRTO * rtt + MAX_SLEEP) {
+                       if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
                                p.sent = now;
                                log ("fast retransmitting packet " + p.seq);
-                               node.net.send (p, address, latency);
+                               node.resendPacket (p);
                                window.fastRetransmission (now);
                        }
                }
@@ -254,11 +250,11 @@

                double now = Event.time();
                for (Packet p : txBuffer) {
-                       if (now - p.sent > RTO * rtt + MAX_SLEEP) {
+                       if (now - p.sent > RTO * rtt + MAX_DELAY) {
                                // Retransmission timeout
                                log ("retransmitting packet " + p.seq);
                                p.sent = now;
-                               node.net.send (p, address, latency);
+                               node.resendPacket (p);
                                window.timeout (now);
                        }
                }
@@ -288,7 +284,7 @@
                }
                // Schedule the next check
                double sleep = dl - Event.time();
-               if (shouldPoll()) sleep = Math.max (sleep, pollingInterval);
+               if (shouldPoll()) sleep = Math.max (sleep, node.bandwidth.poll);
                else sleep = Math.max (sleep, MIN_SLEEP);
                timerRunning = true;
                log ("sleeping for " + sleep + " seconds");
@@ -299,25 +295,26 @@
        private boolean shouldPoll()
        {
                double now = Event.time();
-               if (ackQueue.deadline() < now + pollingInterval) return false;
-               
+               // Will we need to send an ack before the next polling interval?
+               if (ackQueue.deadline() < now + node.bandwidth.poll)
+                       return false;
                double bw = node.bandwidth.available();
                double win = window.available();
-               
+               // Is there an overdue search that's waiting for bandwidth?
                if (searchQueue.headSize() > bw
                && searchQueue.headSize() <= win
                && searchQueue.deadline() <= now) return true;
-               
+               // Is there an overdue transfer that's waiting for bandwidth?
                if (transferQueue.headSize() > bw
                && transferQueue.headSize() <= win
                && transferQueue.deadline() <= now) return true;
-               
+               // We're waiting for something other than bandwidth
                return false;
        }

        public void log (String message)
        {
-               // Event.log (node.net.address + ":" + address + " " + message);
+               Event.log (node.net.address + ":" + address + " " + message);
        }

        public String toString()

Modified: trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java  2006-11-16 
14:34:18 UTC (rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java  2006-11-16 
15:15:27 UTC (rev 10943)
@@ -2,13 +2,17 @@

 class TokenBucket
 {
-       public final double rate, size;
+       public final double rate, size, poll;
        private double tokens, lastUpdated;

        public TokenBucket (double rate, double size)
        {
                this.rate = rate;
                this.size = size;
+               double poll = Packet.MAX_SIZE / rate;
+               if (poll < Peer.MIN_SLEEP) poll = Peer.MIN_SLEEP;
+               if (poll > Peer.MAX_DELAY) poll = Peer.MAX_DELAY;
+               this.poll = poll;
                tokens = size;
                lastUpdated = 0.0; // Clock time
        }

Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/Message.java     
2006-11-16 14:34:18 UTC (rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/Message.java     
2006-11-16 15:15:27 UTC (rev 10943)
@@ -4,16 +4,16 @@

 public class Message
 {
-       public final static int HEADER_SIZE = 12; // Bytes, including unique ID
+       public final static int HEADER_SIZE = 12; // Bytes, including search ID
        public final static int KEY_SIZE = 32; // Size of a routing key, bytes
        public final static int PUB_KEY_SIZE = 1024; // Size of a pub key, bytes
        public final static int DATA_SIZE = 1024; // Size of a data block, bytes
        public final static int ACK_SIZE = 4; // Size of a sequence num, bytes

-       public static int nextId = 0; // Each request and insert has a unique ID
+       public static int nextId = 0; // Each search has a unique ID

-       public int id; // Unique request ID
-       public double deadline = 0.0; // Coalescing, stored here for convenience
+       public int id; // Search ID
+       public double deadline = 0.0; // Coalescing deadline

        // Override this
        public int size()


Reply via email to