Author: mrogers
Date: 2006-08-08 10:11:27 +0000 (Tue, 08 Aug 2006)
New Revision: 9961
Added:
trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
trunk/apps/load-balancing-sims/phase5-coalescing/TokenBucket.java
Modified:
trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
trunk/apps/load-balancing-sims/phase5-coalescing/RequestState.java
Log:
Token bucket bandwidth limiter, moved AIMD congestion window to separate class
Added: trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
2006-08-08 08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
2006-08-08 10:11:27 UTC (rev 9961)
@@ -0,0 +1,67 @@
+// An AIMD congestion window
+
+class CongestionWindow
+{
+ public final static int MIN_CWIND = 3000; // Minimum congestion window
+ public final static int MAX_CWIND = 100000; // Maximum congestion window
+ public final static double ALPHA = 0.1615; // AIMD increase parameter
+ public final static double BETA = 0.9375; // AIMD decrease parameter
+ public final static double GAMMA = 3.0; // Slow start divisor
+
+ private double cwind = MIN_CWIND; // Size of window in bytes
+ private int inflight = 0; // Bytes sent but not acked
+ private boolean slowStart = true; // Are we in the slow start phase?
+
+ public void reset()
+ {
+ Event.log ("returning to slow start");
+ cwind = MIN_CWIND;
+ slowStart = true;
+ }
+
+ public int available()
+ {
+ return (int) cwind - inflight;
+ }
+
+ // Put bytes in flight
+ public void bytesSent (int bytes)
+ {
+ inflight += bytes;
+ Event.log (inflight + " bytes in flight");
+ }
+
+ // Take bytes out of flight
+ public void bytesAcked (int bytes)
+ {
+ inflight -= bytes;
+ Event.log (inflight + " bytes in flight");
+ // Increase the window
+ if (slowStart) cwind += bytes / GAMMA;
+ else cwind += bytes * bytes * ALPHA / cwind;
+ if (cwind > MAX_CWIND) cwind = MAX_CWIND;
+ Event.log ("congestion window increased to " + cwind);
+ }
+
+ // Decrease the window when a packet is fast retransmitted
+ public void fastRetransmission (double now)
+ {
+ Event.log (inflight + " bytes in flight");
+ cwind *= BETA;
+ if (cwind < MIN_CWIND) cwind = MIN_CWIND;
+ Event.log ("congestion window decreased to " + cwind);
+ // The slow start phase ends when the first packet is lost
+ if (slowStart) {
+ Event.log ("leaving slow start");
+ slowStart = false;
+ }
+ }
+
+ // Decrease the window when a packet is retransmitted due to a timeout
+ public void timeout (double now)
+ {
+ Event.log (inflight + " bytes in flight");
+ if (slowStart) fastRetransmission (now); // Leave slow start
+ else reset(); // Reset the window and return to slow start
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Node.java 2006-08-08
08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Node.java 2006-08-08
10:11:27 UTC (rev 9961)
@@ -37,7 +37,7 @@
n.connect (this, latency);
}
- // Returns the circular distance between two locations
+ // Calculate the circular distance between two locations
public static double distance (double a, double b)
{
if (a > b) return Math.min (a - b, b - a + 1.0);
Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java 2006-08-08
08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java 2006-08-08
10:11:27 UTC (rev 9961)
@@ -14,26 +14,18 @@
public final static double FRTO = 1.5; // Fast retx timeout in RTTs
public final static double RTT_DECAY = 0.9; // Exp moving average
- // Congestion control parameters
- public final static int MIN_CWIND = 3000; // Minimum congestion window
- public final static int MAX_CWIND = 100000; // Maximum congestion window
- public final static double ALPHA = 0.1615; // AIMD increase parameter
- public final static double BETA = 0.9375; // AIMD decrease parameter
- public final static double GAMMA = 3.0; // Slow start divisor
-
// Coalescing
public final static double COALESCE = 0.1; // Max delay in seconds
// Out-of-order delivery with eventual detection of missing packets
public final static int SEQ_RANGE = 1000; // Packets
+ // Token bucket bandwidth limiter
+ public final static int BUCKET_RATE = 1000; // Bytes per second
+ public final static int BUCKET_SIZE = 2000; // Burst size in bytes
+
// Sender state
- private double cwind = MIN_CWIND; // Congestion window in bytes
- private boolean slowStart = true; // Are we in the slow start phase?
private double rtt = 3.0; // Estimated round-trip time in seconds
- private double lastTransmission = 0.0; // Clock time
- private double lastLeftSlowStart = 0.0; // Clock time
- private int inflight = 0; // Bytes sent but not acked
private int txSeq = 0; // Sequence number of next outgoing data packet
private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
private LinkedList<Packet> txBuffer; // Retransmission buffer
@@ -41,6 +33,9 @@
private int msgQueueSize = 0; // Size of message queue in bytes
private LinkedList<Deadline<Integer>> ackQueue; // Outgoing acks
private int ackQueueSize = 0; // Size of ack queue in bytes
+ private CongestionWindow window; // AIMD congestion window
+ private double lastTransmission = 0.0; // Clock time
+ private TokenBucket bandwidth; // Token bucket bandwidth limiter
// Receiver state
private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -55,6 +50,8 @@
txBuffer = new LinkedList<Packet>();
msgQueue = new LinkedList<Deadline<Message>>();
ackQueue = new LinkedList<Deadline<Integer>>();
+ window = new CongestionWindow();
+ bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
rxDupe = new HashSet<Integer>();
}
@@ -76,31 +73,31 @@
// Try to send a packet, return true if a packet was sent
private boolean send()
- {
- // Return to slow start when the link is idle
- double now = Event.time();
- if (now - lastTransmission > RTO * rtt) {
- log ("returning to slow start");
- cwind = MIN_CWIND;
- slowStart = true;
- }
- lastTransmission = now;
-
+ {
if (ackQueueSize == 0 && msgQueueSize == 0) {
log ("no messages or acks to send");
return false;
}
+ // Return to slow start when the link is idle
+ double now = Event.time();
+ if (now - lastTransmission > RTO * rtt) window.reset();
+ lastTransmission = now;
+
Packet p = new Packet();
- int window = (int) cwind - inflight - p.size - ackQueueSize;
- if (window <= 0) log ("no room in congestion window");
-
// Work out how large a packet we can send
int payload = Packet.MAX_SIZE - p.size - ackQueueSize;
if (payload > msgQueueSize) payload = msgQueueSize;
- if (payload > window) payload = window;
+ int win = window.available() - p.size - ackQueueSize;
+ if (win <= 0) log ("no room in congestion window for messages");
+ if (payload > win) payload = win;
+
+ int bw = bandwidth.available() - p.size - ackQueueSize;
+ if (bw <= 0) log ("no bandwidth available for messages");
+ if (payload > bw) payload = bw;
+
// Delay small packets for coalescing
if (payload < Packet.SENSIBLE_PAYLOAD && now < deadline()) {
log ("delaying transmission of " + payload + " bytes");
@@ -125,10 +122,11 @@
Iterator<Deadline<Message>> i = msgQueue.iterator();
while (i.hasNext()) {
Message m = i.next().item;
- if (p.size + m.size > Packet.MAX_SIZE) break;
+ if (payload - m.size < 0) break;
i.remove();
msgQueueSize -= m.size;
p.addMessage (m);
+ payload -= m.size;
}
}
@@ -139,14 +137,14 @@
if (p.messages != null) {
p.seq = txSeq++;
p.sent = now;
- inflight += p.size; // Acks aren't congestion-controlled
- log (inflight + " bytes in flight");
+ window.bytesSent (p.size);
txBuffer.add (p);
}
// Send the packet
log ("sending packet " + p.seq + ", " + p.size + " bytes");
node.net.send (p, address, latency);
+ bandwidth.remove (p.size);
return true;
}
@@ -155,6 +153,7 @@
double now = Event.time();
ackQueue.add (new Deadline<Integer> (seq, now + COALESCE));
ackQueueSize += Packet.ACK_SIZE;
+ log (ackQueue.size() + " acks in ack queue");
// Start the node's timer if necessary
node.startTimer();
// Send as many packets as possible
@@ -207,13 +206,8 @@
if (p.seq == seq) {
log ("packet " + p.seq + " acknowledged");
i.remove();
- inflight -= p.size;
- log (inflight + " bytes in flight");
- // Increase the congestion window
- if (slowStart) cwind += p.size / GAMMA;
- else cwind += p.size * p.size * ALPHA / cwind;
- if (cwind > MAX_CWIND) cwind = MAX_CWIND;
- log ("congestion window increased to " + cwind);
+ // Update the congestion window
+ window.bytesAcked (p.size);
// Update the average round-trip time
rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
log ("round-trip time " + age);
@@ -224,9 +218,8 @@
if (p.seq < seq && age > FRTO * rtt) {
p.sent = now;
log ("fast retransmitting packet " + p.seq);
- log (inflight + " bytes in flight");
node.net.send (p, address, latency);
- decreaseCongestionWindow (now);
+ window.fastRetransmission (now);
}
}
// Recalculate the maximum sequence number
@@ -237,19 +230,6 @@
while (send());
}
- private void decreaseCongestionWindow (double now)
- {
- cwind *= BETA;
- if (cwind < MIN_CWIND) cwind = MIN_CWIND;
- log ("congestion window decreased to " + cwind);
- // The slow start phase ends when the first packet is lost
- if (slowStart) {
- log ("leaving slow start");
- slowStart = false;
- lastLeftSlowStart = now;
- }
- }
-
// Remove messages from a packet and deliver them to the node
private void unpack (Packet p)
{
@@ -257,22 +237,6 @@
for (Message m : p.messages) node.handleMessage (m, this);
}
- // Work out when the first ack or message needs to be sent
- private double deadline()
- {
- double deadline = Double.POSITIVE_INFINITY;
- Deadline<Integer> ack = ackQueue.peek();
- if (ack != null) deadline = ack.deadline;
- Deadline<Message> msg = msgQueue.peek();
- if (msg != null) deadline = Math.min (deadline, msg.deadline);
- return deadline;
- }
-
- private void log (String message)
- {
- Event.log (node.net.address + ":" + address + " " + message);
- }
-
// Called by Node, returns the next coalescing or retx deadline
public double checkTimeouts()
{
@@ -287,23 +251,28 @@
for (Packet p : txBuffer) {
if (now - p.sent > RTO * rtt) {
// Retransmission timeout
- p.sent = now;
log ("retransmitting packet " + p.seq);
- log (inflight + " bytes in flight");
+ p.sent = now;
node.net.send (p, address, latency);
- // Return to slow start
- if (!slowStart &&
- now - lastLeftSlowStart > RTO * rtt) {
- log ("returning to slow start");
- cwind = MIN_CWIND;
- slowStart = true;
- }
- else {
- log ("not returning to slow start");
- decreaseCongestionWindow (now);
- }
+ window.timeout (now);
}
}
return Math.min (now + COALESCE, deadline());
}
+
+ // Work out when the first ack or message needs to be sent
+ private double deadline()
+ {
+ double deadline = Double.POSITIVE_INFINITY;
+ Deadline<Integer> ack = ackQueue.peek();
+ if (ack != null) deadline = ack.deadline;
+ Deadline<Message> msg = msgQueue.peek();
+ if (msg != null) deadline = Math.min (deadline, msg.deadline);
+ return deadline;
+ }
+
+ private void log (String message)
+ {
+ Event.log (node.net.address + ":" + address + " " + message);
+ }
}
Modified: trunk/apps/load-balancing-sims/phase5-coalescing/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/RequestState.java
2006-08-08 08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/RequestState.java
2006-08-08 10:11:27 UTC (rev 9961)
@@ -19,7 +19,7 @@
if (prev != null) nexts.remove (prev);
}
- // Returns the closest peer to the requested key
+ // Find the closest peer to the requested key
public Peer closestPeer()
{
double keyLoc = Node.keyToLocation (key);
Added: trunk/apps/load-balancing-sims/phase5-coalescing/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/TokenBucket.java
2006-08-08 08:55:37 UTC (rev 9960)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/TokenBucket.java
2006-08-08 10:11:27 UTC (rev 9961)
@@ -0,0 +1,27 @@
+class TokenBucket
+{
+ private double tokens, rate, size, lastUpdated;
+
+ public TokenBucket (double rate, double size)
+ {
+ tokens = size;
+ this.rate = rate;
+ this.size = size;
+ lastUpdated = 0.0; // Clock time
+ }
+
+ public int available()
+ {
+ double now = Event.time();
+ double elapsed = now - lastUpdated;
+ lastUpdated = now;
+ tokens += elapsed * rate;
+ if (tokens > size) tokens = size;
+ return (int) tokens;
+ }
+
+ public void remove (int t)
+ {
+ tokens -= t; // Counter can go negative
+ }
+}