Author: mrogers
Date: 2006-11-01 20:16:48 +0000 (Wed, 01 Nov 2006)
New Revision: 10787
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/Packet.java
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
Log:
Refactored interleaving, coalescing, congestion control and bandwidth limiter
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 20:04:40 UTC
(rev 10786)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 20:16:48 UTC
(rev 10787)
@@ -45,7 +45,7 @@
pubKeyCache = new LruCache<Integer> (1000);
if (Math.random() < 0.5) decrementMaxHtl = true;
if (Math.random() < 0.25) decrementMinHtl = true;
- bandwidth = new TokenBucket (30000, 60000);
+ bandwidth = new TokenBucket (15000, 30000);
}
// Return true if a connection was added, false if already connected
Modified: trunk/apps/load-balancing-sims/phase7/sim/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Packet.java 2006-11-01
20:04:40 UTC (rev 10786)
+++ trunk/apps/load-balancing-sims/phase7/sim/Packet.java 2006-11-01
20:16:48 UTC (rev 10787)
@@ -35,6 +35,12 @@
size += m.size();
}
+ public void addMessages (DeadlineQueue q, int maxSize)
+ {
+ while (q.size > 0 && size + q.headSize() <= maxSize)
+ addMessage (q.pop());
+ }
+
public String toString()
{
return new String ("packet " + src + ":" + dest + ":" + seq);
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 20:04:40 UTC
(rev 10786)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 20:16:48 UTC
(rev 10787)
@@ -34,7 +34,7 @@
private DeadlineQueue<Message> transferQueue; // Outgoing transfers
private CongestionWindow window; // AIMD congestion window
private double lastTransmission = Double.POSITIVE_INFINITY; // Time
- private int searchBytesSent = 0, transferBytesSent = 0;
+ private boolean tgif = false; // "Transfers go in first" toggle
private boolean timerRunning = false; // Coalescing timer
// Receiver state
@@ -99,41 +99,64 @@
int waiting = ackQueue.size+searchQueue.size+transferQueue.size;
log (waiting + " bytes waiting");
if (waiting == 0) return false;
+
// Return to slow start when the link is idle
double now = Event.time();
if (now - lastTransmission > LINK_IDLE * rtt) window.reset();
lastTransmission = now;
- // How many bytes of messages can we send?
- int available = Math.min (window.available(),
- node.bandwidth.available());
- log (available + " bytes available for packet");
- // If there are no urgent acks, and no urgent messages or no
- // room to send them, and not enough messages for a large
- // packet or no room to send a large packet, give up!
- if (ackQueue.deadline() > now
- && (searchQueue.deadline() > now
- || searchQueue.headSize() > available)
- && (transferQueue.deadline() > now
- || transferQueue.headSize() > available)
- && (waiting < Packet.SENSIBLE_PAYLOAD
- || available < Packet.SENSIBLE_PAYLOAD)) {
- log ("not sending a packet");
- return false;
- }
+
+ // How many bytes can we send?
+ int size = Math.min (Packet.MAX_SIZE, window.available());
+ size = Math.min (size, node.bandwidth.available());
+ log (size + " bytes available for packet");
+
+ // Urgent acks to send?
+ if (ackQueue.deadline() <= now) return sendPacket (size);
+ // Urgent searches and room to send them?
+ if (searchQueue.deadline() <= now
+ && searchQueue.headSize() <= size) return sendPacket (size);
+ // Urgent transfers and room to send them?
+ if (transferQueue.deadline() <= now
+ && transferQueue.headSize() <= size) return sendPacket (size);
+ // Enough non-urgent messages for a large packet?
+ if (waiting >= Packet.SENSIBLE_PAYLOAD
+ && size >= Packet.SENSIBLE_PAYLOAD) return sendPacket (size);
+
+ log ("not sending a packet");
+ return false;
+ }
+
+ private boolean sendPacket (int maxSize)
+ {
// Construct a packet
Packet p = new Packet();
while (ackQueue.size > 0) p.addAck (ackQueue.pop());
- int space = Math.min (available, Packet.MAX_SIZE - p.size);
- addPayload (p, space);
+ log ((maxSize - p.size) + " bytes available for messages");
+ if (txSeq <= txMaxSeq) {
+ // Alternate priority between searches and transfers
+ if (tgif) {
+ p.addMessages (transferQueue, maxSize);
+ p.addMessages (searchQueue, maxSize);
+ tgif = false;
+ }
+ else {
+ p.addMessages (searchQueue, maxSize);
+ p.addMessages (transferQueue, maxSize);
+ tgif = true;
+ }
+ if (p.messages == null) log ("no messages added");
+ else p.seq = txSeq++;
+ }
+ else log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
// Don't send empty packets
if (p.acks == null && p.messages == null) return false;
// Transmit the packet
- log ("sending " + p + ", " + p.size + " bytes");
+ log ("sending packet " + p.seq + ", " + p.size + " bytes");
node.net.send (p, address, latency);
node.bandwidth.remove (p.size);
// If the packet contains data, buffer it for retransmission
if (p.messages != null) {
- p.sent = now;
+ p.sent = Event.time();
txBuffer.add (p);
node.startTimer(); // Start the retransmission timer
window.bytesSent (p.size);
@@ -141,52 +164,6 @@
return true;
}
- // Allocate a payload number and add messages to a packet
- private void addPayload (Packet p, int space)
- {
- log (space + " bytes available for messages");
- if (txSeq > txMaxSeq) {
- log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
- return;
- }
- p.seq = txSeq++;
- // Searches get priority unless transfers are starving
- if (searchBytesSent < transferBytesSent) {
- while (searchQueue.size > 0
- && searchQueue.headSize() <= space) {
- Message m = searchQueue.pop();
- searchBytesSent += m.size();
- space -= m.size();
- p.addMessage (m);
- }
- while (transferQueue.size > 0
- && transferQueue.headSize() <= space) {
- Message m = transferQueue.pop();
- transferBytesSent += m.size();
- space -= m.size();
- p.addMessage (m);
- }
- }
- else {
- while (transferQueue.size > 0
- && transferQueue.headSize() <= space) {
- Message m = transferQueue.pop();
- transferBytesSent += m.size();
- space -= m.size();
- p.addMessage (m);
- }
- while (searchQueue.size > 0
- && searchQueue.headSize() <= space) {
- Message m = searchQueue.pop();
- searchBytesSent += m.size();
- space -= m.size();
- p.addMessage (m);
- }
- }
- if (p.messages == null) log ("no messages added");
- else log (p.messages.size() + " messages added");
- }
-
// Called by Node when a packet arrives
public void handlePacket (Packet p)
{
@@ -196,27 +173,30 @@
private void handleData (Packet p)
{
- log ("received " + p + ", " + p.size + " bytes");
- sendAck (p.seq);
+ log ("received packet " + p.seq + ", expected " + rxSeq);
if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
- log (p + " is a duplicate");
+ log ("duplicate packet");
+ sendAck (p.seq); // Original ack may have been lost
}
else if (p.seq == rxSeq) {
- log (p + " is in order");
// Find the sequence number of the next missing packet
- int was = rxSeq;
while (rxDupe.remove (++rxSeq));
- log ("rxSeq was " + was + ", now " + rxSeq);
- // Deliver the packet
- unpack (p);
+ log ("packet in order, now expecting " + rxSeq);
+ // Deliver the messages to the node
+ for (Message m : p.messages)
+ node.handleMessage (m, this);
+ sendAck (p.seq);
}
else if (p.seq < rxSeq + SEQ_RANGE * 2) {
- log (p + " is out of order - expected " + rxSeq);
- if (rxDupe.add (p.seq)) unpack (p);
- else log (p + " is a duplicate");
+ log ("packet out of order");
+ rxDupe.add (p.seq);
+ // Deliver the messages to the node
+ for (Message m : p.messages)
+ node.handleMessage (m, this);
+ sendAck (p.seq);
}
// This indicates a misbehaving sender - discard the packet
- else log ("warning: received " + p.seq + " before " + rxSeq);
+ else log ("warning: sequence number out of range");
}
private void handleAck (Ack a)
@@ -230,7 +210,7 @@
double age = now - p.sent;
// Explicit ack
if (p.seq == seq) {
- log (p + " acknowledged");
+ log ("packet " + p.seq + " acknowledged");
i.remove();
// Update the congestion window
window.bytesAcked (p.size);
@@ -243,7 +223,7 @@
// Fast retransmission
if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
p.sent = now;
- log ("fast retransmitting " + p);
+ log ("fast retransmitting packet " + p.seq);
node.net.send (p, address, latency);
window.fastRetransmission (now);
}
@@ -257,13 +237,6 @@
else checkDeadlines();
}
- // Remove messages from a packet and deliver them to the node
- private void unpack (Packet p)
- {
- if (p.messages == null) return;
- for (Message m : p.messages) node.handleMessage (m, this);
- }
-
// Check retx timeouts, return true if there are packets in flight
public boolean checkTimeouts()
{
@@ -274,7 +247,7 @@
for (Packet p : txBuffer) {
if (now - p.sent > RTO * rtt + MAX_DELAY) {
// Retransmission timeout
- log ("retransmitting " + p);
+ log ("retransmitting packet " + p.seq);
p.sent = now;
node.net.send (p, address, latency);
window.timeout (now);
@@ -288,9 +261,8 @@
{
// Send as many packets as possible
while (send());
- // Find the next coalescing deadline - ignore message
- // deadlines if there isn't room in the congestion window
- // (we have to wait for an ack before sending them)
+ // Find the next coalescing deadline - ignore message deadlines
+ // if there isn't room in the congestion window to send them
double dl = ackQueue.deadline();
if (searchQueue.headSize() <= window.available())
dl = Math.min (dl, searchQueue.deadline());
@@ -306,27 +278,11 @@
}
// Schedule the next check
double sleep = Math.max (dl - Event.time(), MIN_SLEEP);
- if (waitingForBandwidth()) {
- log ("waiting for bandwidth");
- sleep = MIN_SLEEP; // Poll the bandwidth limiter
- }
timerRunning = true;
log ("sleeping for " + sleep + " seconds");
Event.schedule (this, sleep, CHECK_DEADLINES, null);
}
- // Are there any messages blocked by the bandwidth limiter?
- private boolean waitingForBandwidth()
- {
- int bandwidth = node.bandwidth.available();
- double now = Event.time();
- if (searchQueue.headSize() > bandwidth
- && searchQueue.deadline() <= now) return true;
- if (transferQueue.headSize() > bandwidth
- && transferQueue.deadline() <= now) return true;
- return false;
- }
-
public void log (String message)
{
Event.log (node.net.address + ":" + address + " " + message);
Modified: trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java 2006-11-01
20:04:40 UTC (rev 10786)
+++ trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java 2006-11-01
20:16:48 UTC (rev 10787)
@@ -2,13 +2,14 @@
class TokenBucket
{
- private double tokens, rate, size, lastUpdated;
+ public final double rate, size;
+ private double tokens, lastUpdated;
public TokenBucket (double rate, double size)
{
- tokens = size;
this.rate = rate;
this.size = size;
+ tokens = size;
lastUpdated = 0.0; // Clock time
}