Author: mrogers
Date: 2006-08-08 14:39:02 +0000 (Tue, 08 Aug 2006)
New Revision: 9966
Added:
trunk/apps/load-balancing-sims/phase5-coalescing/Ack.java
Modified:
trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
trunk/apps/load-balancing-sims/phase5-coalescing/Packet.java
trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
Log:
Subtract the amount of time the ack was held for coalescing from the RTT
Added: trunk/apps/load-balancing-sims/phase5-coalescing/Ack.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Ack.java 2006-08-08
13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Ack.java 2006-08-08
14:39:02 UTC (rev 9966)
@@ -0,0 +1,13 @@
+// Tell the sender how long each ack was delayed so it can measure the RTT
+
+class Ack
+{
+ public final int seq; // Sequence number of an acked packet
+ public final double delay; // Seconds the ack was delayed for coalescing
+
+ public Ack (int seq, double delay)
+ {
+ this.seq = seq;
+ this.delay = delay;
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
2006-08-08 13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/CongestionWindow.java
2006-08-08 14:39:02 UTC (rev 9966)
@@ -1,21 +1,28 @@
-// An AIMD congestion window
+// AIMD congestion control
class CongestionWindow
{
public final static int MIN_CWIND = 3000; // Minimum congestion window
- public final static int MAX_CWIND = 100000; // Maximum congestion window
- public final static double ALPHA = 0.1615; // AIMD increase parameter
- public final static double BETA = 0.9375; // AIMD decrease parameter
+ public final static int MAX_CWIND = 1000000; // Max congestion window
+ public final static double ALPHA = 0.3125; // AIMD increase parameter
+ public final static double BETA = 0.875; // AIMD decrease parameter
public final static double GAMMA = 3.0; // Slow start divisor
private double cwind = MIN_CWIND; // Size of window in bytes
private int inflight = 0; // Bytes sent but not acked
private boolean slowStart = true; // Are we in the slow start phase?
+ private Peer peer; // The owner
+ public CongestionWindow (Peer peer)
+ {
+ this.peer = peer;
+ }
+
public void reset()
{
- Event.log ("returning to slow start");
+ peer.log ("congestion window decreased to " + MIN_CWIND);
cwind = MIN_CWIND;
+ peer.log ("returning to slow start");
slowStart = true;
}
@@ -28,31 +35,31 @@
public void bytesSent (int bytes)
{
inflight += bytes;
- Event.log (inflight + " bytes in flight");
+ peer.log (inflight + " bytes in flight");
}
// Take bytes out of flight
public void bytesAcked (int bytes)
{
inflight -= bytes;
- Event.log (inflight + " bytes in flight");
+ peer.log (inflight + " bytes in flight");
// Increase the window
if (slowStart) cwind += bytes / GAMMA;
else cwind += bytes * bytes * ALPHA / cwind;
if (cwind > MAX_CWIND) cwind = MAX_CWIND;
- Event.log ("congestion window increased to " + cwind);
+ peer.log ("congestion window increased to " + cwind);
}
// Decrease the window when a packet is fast retransmitted
public void fastRetransmission (double now)
{
- Event.log (inflight + " bytes in flight");
+ peer.log (inflight + " bytes in flight");
cwind *= BETA;
if (cwind < MIN_CWIND) cwind = MIN_CWIND;
- Event.log ("congestion window decreased to " + cwind);
+ peer.log ("congestion window decreased to " + cwind);
// The slow start phase ends when the first packet is lost
if (slowStart) {
- Event.log ("leaving slow start");
+ peer.log ("leaving slow start");
slowStart = false;
}
}
@@ -60,7 +67,7 @@
// Decrease the window when a packet is retransmitted due to a timeout
public void timeout (double now)
{
- Event.log (inflight + " bytes in flight");
+ peer.log (inflight + " bytes in flight");
if (slowStart) fastRetransmission (now); // Leave slow start
else reset(); // Reset the window and return to slow start
}
Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Node.java 2006-08-08
13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Node.java 2006-08-08
14:39:02 UTC (rev 9966)
@@ -61,7 +61,7 @@
{
if (timerRunning) return;
log ("starting retransmission/coalescing timer");
- Event.schedule (this, Peer.COALESCE, CHECK_TIMEOUTS, null);
+ Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
timerRunning = true;
}
@@ -156,13 +156,13 @@
// Event callback
private void generateRequest()
{
- if (requestsGenerated++ > 1000) return;
+ if (requestsGenerated++ > 10000) return;
// Send a request to a random location
Request r = new Request (locationToKey (Math.random()));
log ("generating request " + r.id);
handleRequest (r, null);
// Schedule the next request
- Event.schedule (this, 0.049, GENERATE_REQUEST, null);
+ Event.schedule (this, 0.123, GENERATE_REQUEST, null);
}
// Event callback
@@ -176,7 +176,7 @@
timerRunning = false;
}
else {
- double sleep = deadline - Event.time();
+ double sleep = deadline - Event.time(); // Can be < 0
if (sleep < MIN_SLEEP) sleep = MIN_SLEEP;
log ("sleeping for " + sleep + " seconds");
Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Packet.java
2006-08-08 13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Packet.java
2006-08-08 14:39:02 UTC (rev 9966)
@@ -5,23 +5,23 @@
class Packet
{
public final static int HEADER_SIZE = 80; // Including IP & UDP headers
- public final static int ACK_SIZE = 4; // Size of a sequence num in bytes
+ public final static int ACK_SIZE = 8; // Size of an ack in bytes
public final static int MAX_SIZE = 1450; // MTU including headers
- public final static int SENSIBLE_PAYLOAD = 1000; // Nagle's algorithm
+ public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing
public int src, dest; // Network addresses
public int size = HEADER_SIZE; // Size in bytes, including headers
public int seq = -1; // Data sequence number (-1 if no data)
- public ArrayList<Integer> acks = null; // Sequence numbers of acked pkts
- public ArrayList<Message> messages = null; // Payload
+ public ArrayList<Ack> acks = null;
+ public ArrayList<Message> messages = null;
public double sent; // Time at which the packet was (re) transmitted
public double latency; // Link latency (stored here for convenience)
- public void addAck (Integer seq)
+ public void addAck (Ack a)
{
- if (acks == null) acks = new ArrayList<Integer>();
- acks.add (seq);
+ if (acks == null) acks = new ArrayList<Ack>();
+ acks.add (a);
size += ACK_SIZE;
}
Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java 2006-08-08
13:19:56 UTC (rev 9965)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java 2006-08-08
14:39:02 UTC (rev 9966)
@@ -15,17 +15,17 @@
public final static double RTT_DECAY = 0.9; // Exp moving average
// Coalescing
- public final static double COALESCE = 0.1; // Max delay in seconds
+ public final static double MAX_DELAY = 0.1; // Coalescing delay, seconds
// Out-of-order delivery with eventual detection of missing packets
public final static int SEQ_RANGE = 1000;
// Token bucket bandwidth limiter
- public final static int BUCKET_RATE = 1000; // Bytes per second
- public final static int BUCKET_SIZE = 2000; // Burst size in bytes
+ public final static int BUCKET_RATE = 2000; // Bytes per second
+ public final static int BUCKET_SIZE = 4000; // Burst size in bytes
// Sender state
- private double rtt = 3.0; // Estimated round-trip time in seconds
+ private double rtt = 5.0; // Estimated round-trip time in seconds
private int txSeq = 0; // Sequence number of next outgoing data packet
private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
private LinkedList<Packet> txBuffer; // Retransmission buffer
@@ -50,7 +50,7 @@
txBuffer = new LinkedList<Packet>();
msgQueue = new LinkedList<Deadline<Message>>();
ackQueue = new LinkedList<Deadline<Integer>>();
- window = new CongestionWindow();
+ window = new CongestionWindow (this);
bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
rxDupe = new HashSet<Integer>();
}
@@ -62,7 +62,7 @@
// Warning: until token-passing is implemented the length of
// the message queue is unlimited
double now = Event.time();
- msgQueue.add (new Deadline<Message> (m, now + COALESCE));
+ msgQueue.add (new Deadline<Message> (m, now + MAX_DELAY));
msgQueueSize += m.size;
log (msgQueue.size() + " messages in queue");
// Start the node's timer if necessary
@@ -103,9 +103,13 @@
return false;
}
+ Packet p = new Packet();
+
// Put all waiting acks in the packet
- Packet p = new Packet();
- for (Deadline<Integer> a : ackQueue) p.addAck (a.item);
+ for (Deadline<Integer> a : ackQueue) {
+ double delay = now - (a.deadline - MAX_DELAY);
+ p.addAck (new Ack (a.item, delay));
+ }
ackQueue.clear();
ackQueueSize = 0;
@@ -152,7 +156,7 @@
{
log ("ack " + seq + " added to ack queue");
double now = Event.time();
- ackQueue.add (new Deadline<Integer> (seq, now + COALESCE));
+ ackQueue.add (new Deadline<Integer> (seq, now + MAX_DELAY));
ackQueueSize += Packet.ACK_SIZE;
log (ackQueue.size() + " acks in queue");
// Start the node's timer if necessary
@@ -165,7 +169,7 @@
public void handlePacket (Packet p)
{
if (p.messages != null) handleData (p);
- if (p.acks != null) for (int seq : p.acks) handleAck (seq);
+ if (p.acks != null) for (Ack a : p.acks) handleAck (a);
}
private void handleData (Packet p)
@@ -195,28 +199,30 @@
else log ("warning: received " + p.seq + " before " + rxSeq);
}
- private void handleAck (int seq)
+ private void handleAck (Ack a)
{
- log ("received ack " + seq);
+ log ("received ack " + a.seq);
double now = Event.time();
Iterator<Packet> i = txBuffer.iterator();
while (i.hasNext()) {
Packet p = i.next();
double age = now - p.sent;
// Explicit ack
- if (p.seq == seq) {
+ if (p.seq == a.seq) {
log ("packet " + p.seq + " acknowledged");
i.remove();
// Update the congestion window
window.bytesAcked (p.size);
// Update the average round-trip time
- rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
- log ("round-trip time " + age);
+ rtt *= RTT_DECAY;
+ rtt += (age - a.delay) * (1.0 - RTT_DECAY);
+ log ("ack delay " + a.delay);
+ log ("round-trip time " + (age - a.delay));
log ("average round-trip time " + rtt);
break;
}
// Fast retransmission
- if (p.seq < seq && age > FRTO * rtt) {
+ if (p.seq < a.seq && age > FRTO * rtt) {
p.sent = now;
log ("fast retransmitting packet " + p.seq);
node.net.send (p, address, latency);
@@ -258,7 +264,7 @@
window.timeout (now);
}
}
- return Math.min (now + COALESCE, deadline());
+ return Math.min (now + MAX_DELAY, deadline());
}
// Work out when the first ack or message needs to be sent
@@ -272,7 +278,7 @@
return deadline;
}
- private void log (String message)
+ public void log (String message)
{
Event.log (node.net.address + ":" + address + " " + message);
}