Author: mrogers
Date: 2006-11-01 22:38:27 +0000 (Wed, 01 Nov 2006)
New Revision: 10791
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
trunk/apps/load-balancing-sims/phase7/sim/Sim.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
Log:
Slightly less frantic polling of the bandwidth limiter
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 22:02:35 UTC
(rev 10790)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 22:38:27 UTC
(rev 10791)
@@ -18,8 +18,8 @@
public final static double LINK_IDLE = 8.0; // RTTs without transmitting
// Coalescing
- private final static double MAX_DELAY = 0.1; // Max coalescing delay
- private final static double MIN_SLEEP = 0.01; // Poll the b/w limiter
+ private final static double MAX_SLEEP = 0.1; // Max coalescing delay
+ private final static double MIN_SLEEP = 0.01; // Forty winks
// Out-of-order delivery with duplicate detection
public final static int SEQ_RANGE = 1000;
@@ -36,6 +36,7 @@
private double lastTransmission = Double.POSITIVE_INFINITY; // Time
private boolean tgif = false; // "Transfers go in first" toggle
private boolean timerRunning = false; // Coalescing timer
+ private double pollingInterval; // Poll the bandwidth limiter
// Receiver state
private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -53,12 +54,16 @@
transferQueue = new DeadlineQueue<Message>();
window = new CongestionWindow (this);
rxDupe = new HashSet<Integer>();
+ // Poll the bandwidth limiter at reasonable intervals
+ pollingInterval = Packet.SENSIBLE_PAYLOAD / node.bandwidth.rate;
+ if (pollingInterval > MAX_SLEEP) pollingInterval = MAX_SLEEP;
+ if (pollingInterval < MIN_SLEEP) pollingInterval = MIN_SLEEP;
}
// Queue a message for transmission
public void sendMessage (Message m)
{
- m.deadline = Event.time() + MAX_DELAY;
+ m.deadline = Event.time() + MAX_SLEEP;
if (m instanceof Block) {
log (m + " added to transfer queue");
transferQueue.add (m);
@@ -77,7 +82,7 @@
private void sendAck (int seq)
{
log ("ack " + seq + " added to ack queue");
- ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
+ ackQueue.add (new Ack (seq, Event.time() + MAX_SLEEP));
// Start the coalescing timer
startTimer();
// Send as many packets as possible
@@ -90,7 +95,7 @@
if (timerRunning) return;
timerRunning = true;
log ("starting coalescing timer");
- Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
+ Event.schedule (this, MAX_SLEEP, CHECK_DEADLINES, null);
}
// Try to send a packet, return true if a packet was sent
@@ -221,7 +226,7 @@
break;
}
// Fast retransmission
- if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
+ if (p.seq < seq && age > FRTO * rtt + MAX_SLEEP) {
p.sent = now;
log ("fast retransmitting packet " + p.seq);
node.net.send (p, address, latency);
@@ -245,7 +250,7 @@
double now = Event.time();
for (Packet p : txBuffer) {
- if (now - p.sent > RTO * rtt + MAX_DELAY) {
+ if (now - p.sent > RTO * rtt + MAX_SLEEP) {
// Retransmission timeout
log ("retransmitting packet " + p.seq);
p.sent = now;
@@ -264,9 +269,10 @@
// Find the next coalescing deadline - ignore message deadlines
// if there isn't room in the congestion window to send them
double dl = ackQueue.deadline();
- if (searchQueue.headSize() <= window.available())
+ int win = window.available() -Packet.HEADER_SIZE -ackQueue.size;
+ if (searchQueue.headSize() <= win)
dl = Math.min (dl, searchQueue.deadline());
- if (transferQueue.headSize() <= window.available())
+ if (transferQueue.headSize() <= win)
dl = Math.min (dl, transferQueue.deadline());
// If there's no deadline, stop the timer
if (dl == Double.POSITIVE_INFINITY) {
@@ -277,12 +283,34 @@
return;
}
// Schedule the next check
- double sleep = Math.max (dl - Event.time(), MIN_SLEEP);
+ double sleep = dl - Event.time();
+ if (shouldPoll()) sleep = Math.max (sleep, pollingInterval);
+ else sleep = Math.max (sleep, MIN_SLEEP);
timerRunning = true;
log ("sleeping for " + sleep + " seconds");
Event.schedule (this, sleep, CHECK_DEADLINES, null);
}
+ // Are we waiting for the bandwidth limiter?
+ private boolean shouldPoll()
+ {
+ double now = Event.time();
+ if (ackQueue.deadline() < now + pollingInterval) return false;
+
+ double bw = node.bandwidth.available();
+ double win = window.available();
+
+ if (searchQueue.headSize() > bw
+ && searchQueue.headSize() <= win
+ && searchQueue.deadline() <= now) return true;
+
+ if (transferQueue.headSize() > bw
+ && transferQueue.headSize() <= win
+ && transferQueue.deadline() <= now) return true;
+
+ return false;
+ }
+
public void log (String message)
{
Event.log (node.net.address + ":" + address + " " + message);
Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-01 22:02:35 UTC
(rev 10790)
+++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-01 22:38:27 UTC
(rev 10791)
@@ -5,9 +5,9 @@
{
private final int NODES = 100; // Number of nodes
private final int DEGREE = 4; // Average degree
- private final double SPEED = 20000; // Bytes per second
+ private final double SPEED = 40000; // Bytes per second
private final double LATENCY = 0.1; // Latency of all links in seconds
- private final double RATE = 0.01; // Inserts per second
+ private final double RATE = 0.005; // Inserts per second
private final int INSERTS = 50;
private Node[] nodes;
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-01 22:02:35 UTC (rev 10790)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-01 22:38:27 UTC (rev 10791)
@@ -71,6 +71,7 @@
if (blocks[b.index] != null) return; // Ignore duplicates
blocks[b.index] = b;
blocksReceived++;
+ if (inState != TRANSFERRING) return; // Forward it later
// Forward the block to all receivers
for (Peer p : receivers) p.sendMessage (b);
// If the transfer is complete, consider finishing
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-01 22:02:35 UTC (rev 10790)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-01 22:38:27 UTC (rev 10791)
@@ -6,13 +6,13 @@
public class ChkRequestHandler extends RequestHandler
{
- private boolean[] received; // Keep track of received blocks
+ private Block[] blocks; // Store incoming blocks for forwarding
private int blocksReceived = 0;
public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
{
super (r, node, prev);
- received = new boolean[32];
+ blocks = new Block[32];
}
public void handleMessage (Message m, Peer src)
@@ -40,7 +40,13 @@
{
if (searchState != ACCEPTED) node.log (df + " out of order");
searchState = TRANSFERRING;
- if (prev != null) prev.sendMessage (df); // Forward the message
+ if (prev != null) {
+ // Forward the message & all previously received blocks
+ prev.sendMessage (df);
+ for (int i = 0; i < 32; i++)
+ if (blocks[i] != null)
+ prev.sendMessage (blocks[i]);
+ }
// Wait for the transfer to complete (FIXME: check real timeout)
Event.schedule (this, 120.0, TRANSFER_TIMEOUT, next);
}
@@ -48,9 +54,10 @@
private void handleBlock (Block b)
{
if (searchState != TRANSFERRING) node.log (b + " out of order");
- if (received[b.index]) return; // Ignore duplicates
- received[b.index] = true;
+ if (blocks[b.index] != null) return; // Ignore duplicates
+ blocks[b.index] = b;
blocksReceived++;
+ if (searchState == TRANSFERRING) return; // Forward it later
// Forward the block
if (prev != null) {
node.log ("forwarding " + b);