Author: mrogers
Date: 2006-11-01 22:38:27 +0000 (Wed, 01 Nov 2006)
New Revision: 10791

Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/Sim.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
Log:
Slightly less frantic polling of the bandwidth limiter

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 22:02:35 UTC 
(rev 10790)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 22:38:27 UTC 
(rev 10791)
@@ -18,8 +18,8 @@
        public final static double LINK_IDLE = 8.0; // RTTs without transmitting

        // Coalescing
-       private final static double MAX_DELAY = 0.1; // Max coalescing delay
-       private final static double MIN_SLEEP = 0.01; // Poll the b/w limiter
+       private final static double MAX_SLEEP = 0.1; // Max coalescing delay
+       private final static double MIN_SLEEP = 0.01; // Forty winks

        // Out-of-order delivery with duplicate detection
        public final static int SEQ_RANGE = 1000;
@@ -36,6 +36,7 @@
        private double lastTransmission = Double.POSITIVE_INFINITY; // Time
        private boolean tgif = false; // "Transfers go in first" toggle
        private boolean timerRunning = false; // Coalescing timer
+       private double pollingInterval; // Poll the bandwidth limiter

        // Receiver state
        private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -53,12 +54,16 @@
                transferQueue = new DeadlineQueue<Message>();
                window = new CongestionWindow (this);
                rxDupe = new HashSet<Integer>();
+               // Poll the bandwidth limiter at reasonable intervals
+               pollingInterval = Packet.SENSIBLE_PAYLOAD / node.bandwidth.rate;
+               if (pollingInterval > MAX_SLEEP) pollingInterval = MAX_SLEEP;
+               if (pollingInterval < MIN_SLEEP) pollingInterval = MIN_SLEEP;
        }

        // Queue a message for transmission
        public void sendMessage (Message m)
        {
-               m.deadline = Event.time() + MAX_DELAY;
+               m.deadline = Event.time() + MAX_SLEEP;
                if (m instanceof Block) {
                        log (m + " added to transfer queue");
                        transferQueue.add (m);
@@ -77,7 +82,7 @@
        private void sendAck (int seq)
        {
                log ("ack " + seq + " added to ack queue");
-               ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
+               ackQueue.add (new Ack (seq, Event.time() + MAX_SLEEP));
                // Start the coalescing timer
                startTimer();
                // Send as many packets as possible
@@ -90,7 +95,7 @@
                if (timerRunning) return;
                timerRunning = true;
                log ("starting coalescing timer");
-               Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
+               Event.schedule (this, MAX_SLEEP, CHECK_DEADLINES, null);
        }

        // Try to send a packet, return true if a packet was sent
@@ -221,7 +226,7 @@
                                break;
                        }
                        // Fast retransmission
-                       if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
+                       if (p.seq < seq && age > FRTO * rtt + MAX_SLEEP) {
                                p.sent = now;
                                log ("fast retransmitting packet " + p.seq);
                                node.net.send (p, address, latency);
@@ -245,7 +250,7 @@

                double now = Event.time();
                for (Packet p : txBuffer) {
-                       if (now - p.sent > RTO * rtt + MAX_DELAY) {
+                       if (now - p.sent > RTO * rtt + MAX_SLEEP) {
                                // Retransmission timeout
                                log ("retransmitting packet " + p.seq);
                                p.sent = now;
@@ -264,9 +269,10 @@
                // Find the next coalescing deadline - ignore message deadlines
                // if there isn't room in the congestion window to send them
                double dl = ackQueue.deadline();
-               if (searchQueue.headSize() <= window.available())
+               int win = window.available() -Packet.HEADER_SIZE -ackQueue.size;
+               if (searchQueue.headSize() <= win)
                        dl = Math.min (dl, searchQueue.deadline());
-               if (transferQueue.headSize() <= window.available())
+               if (transferQueue.headSize() <= win)
                        dl = Math.min (dl, transferQueue.deadline());
                // If there's no deadline, stop the timer
                if (dl == Double.POSITIVE_INFINITY) {
@@ -277,12 +283,34 @@
                        return;
                }
                // Schedule the next check
-               double sleep = Math.max (dl - Event.time(), MIN_SLEEP);
+               double sleep = dl - Event.time();
+               if (shouldPoll()) sleep = Math.max (sleep, pollingInterval);
+               else sleep = Math.max (sleep, MIN_SLEEP);
                timerRunning = true;
                log ("sleeping for " + sleep + " seconds");
                Event.schedule (this, sleep, CHECK_DEADLINES, null);
        }

+       // Are we waiting for the bandwidth limiter?
+       private boolean shouldPoll()
+       {
+               double now = Event.time();
+               if (ackQueue.deadline() < now + pollingInterval) return false;
+               
+               double bw = node.bandwidth.available();
+               double win = window.available();
+               
+               if (searchQueue.headSize() > bw
+               && searchQueue.headSize() <= win
+               && searchQueue.deadline() <= now) return true;
+               
+               if (transferQueue.headSize() > bw
+               && transferQueue.headSize() <= win
+               && transferQueue.deadline() <= now) return true;
+               
+               return false;
+       }
+       
        public void log (String message)
        {
                Event.log (node.net.address + ":" + address + " " + message);

Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Sim.java  2006-11-01 22:02:35 UTC 
(rev 10790)
+++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java  2006-11-01 22:38:27 UTC 
(rev 10791)
@@ -5,9 +5,9 @@
 {
        private final int NODES = 100; // Number of nodes
        private final int DEGREE = 4; // Average degree
-       private final double SPEED = 20000; // Bytes per second
+       private final double SPEED = 40000; // Bytes per second
        private final double LATENCY = 0.1; // Latency of all links in seconds
-       private final double RATE = 0.01; // Inserts per second
+       private final double RATE = 0.005; // Inserts per second
        private final int INSERTS = 50;
        private Node[] nodes;


Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-01 22:02:35 UTC (rev 10790)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-01 22:38:27 UTC (rev 10791)
@@ -71,6 +71,7 @@
                if (blocks[b.index] != null) return; // Ignore duplicates
                blocks[b.index] = b;
                blocksReceived++;
+               if (inState != TRANSFERRING) return; // Forward it later
                // Forward the block to all receivers
                for (Peer p : receivers) p.sendMessage (b);
                // If the transfer is complete, consider finishing

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-01 22:02:35 UTC (rev 10790)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-01 22:38:27 UTC (rev 10791)
@@ -6,13 +6,13 @@

 public class ChkRequestHandler extends RequestHandler
 {
-       private boolean[] received; // Keep track of received blocks
+       private Block[] blocks; // Store incoming blocks for forwarding
        private int blocksReceived = 0;

        public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
        {
                super (r, node, prev);
-               received = new boolean[32];
+               blocks = new Block[32];
        }

        public void handleMessage (Message m, Peer src)
@@ -40,7 +40,13 @@
        {
                if (searchState != ACCEPTED) node.log (df + " out of order");
                searchState = TRANSFERRING;
-               if (prev != null) prev.sendMessage (df); // Forward the message
+               if (prev != null) {
+                       // Forward the message & all previously received blocks
+                       prev.sendMessage (df);
+                       for (int i = 0; i < 32; i++)
+                               if (blocks[i] != null)
+                                       prev.sendMessage (blocks[i]);
+               }
                // Wait for the transfer to complete (FIXME: check real timeout)
                Event.schedule (this, 120.0, TRANSFER_TIMEOUT, next);
        }
@@ -48,9 +54,10 @@
        private void handleBlock (Block b)
        {
                if (searchState != TRANSFERRING) node.log (b + " out of order");
-               if (received[b.index]) return; // Ignore duplicates
-               received[b.index] = true;
+               if (blocks[b.index] != null) return; // Ignore duplicates
+               blocks[b.index] = b;
                blocksReceived++;
+               if (searchState == TRANSFERRING) return; // Forward it later
                // Forward the block
                if (prev != null) {
                        node.log ("forwarding " + b);


Reply via email to