Author: mrogers
Date: 2006-11-01 17:11:27 +0000 (Wed, 01 Nov 2006)
New Revision: 10775
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Event.java
trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
Log:
Refactored interleaving, coalescing, congestion control and bandwidth limiter
Modified: trunk/apps/load-balancing-sims/phase7/sim/Event.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Event.java 2006-11-01
10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/Event.java 2006-11-01
17:11:27 UTC (rev 10775)
@@ -7,6 +7,7 @@
private static TreeSet<Event> queue = new TreeSet<Event>();
private static double clockTime = 0.0;
+ private static double lastLogTime = Double.POSITIVE_INFINITY;
private static int nextId = 0;
public static double duration = Double.POSITIVE_INFINITY;
@@ -14,12 +15,13 @@
{
queue.clear();
clockTime = 0.0;
+ lastLogTime = Double.POSITIVE_INFINITY;
nextId = 0;
duration = Double.POSITIVE_INFINITY;
}
public static void schedule (EventTarget target, double time,
- int type, Object data)
+ int type, Object data)
{
queue.add (new Event (target, time + clockTime, type, data));
}
@@ -50,6 +52,9 @@
public static void log (String message)
{
+ // Print a blank line between events
+ if (clockTime > lastLogTime) System.out.println();
+ lastLogTime = clockTime;
System.out.print (clockTime + " " + message + "\n");
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
2006-11-01 10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
2006-11-01 17:11:27 UTC (rev 10775)
@@ -122,7 +122,6 @@
}
}
- // Each EventTarget class has its own event codes
public final static int RX_QUEUE = 1;
private final static int RX_END = 2;
private final static int TX_END = 3;
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 10:00:25 UTC
(rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 17:11:27 UTC
(rev 10775)
@@ -8,6 +8,9 @@
public class Node implements EventTarget
{
+ // Coarse-grained retransmission timer
+ public final static double RETX_TIMER = 0.1; // Seconds
+
public double location; // Routing location
public NetworkInterface net;
private HashMap<Integer,Peer> peers; // Look up a peer by its address
@@ -21,7 +24,7 @@
private boolean decrementMaxHtl = false;
private boolean decrementMinHtl = false;
public TokenBucket bandwidth; // Bandwidth limiter
- private boolean timerRunning = false; // Is the retx timer running?
+ private boolean timerRunning = false;
public Node (double txSpeed, double rxSpeed)
{
@@ -149,13 +152,13 @@
pubKeyCache.put (key);
}
- // Called by Peer
+ // Called by Peer after transmitting a packet
public void startTimer()
{
if (timerRunning) return;
- // log ("starting retransmission/coalescing timer");
- Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
timerRunning = true;
+ log ("starting retransmission timer");
+ Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
}
// Called by NetworkInterface
@@ -385,20 +388,13 @@
private void checkTimeouts()
{
- // Check the peers in a random order each time
- double deadline = Double.POSITIVE_INFINITY;
- for (Peer p : peers())
- deadline = Math.min (deadline, p.checkTimeouts());
- if (deadline == Double.POSITIVE_INFINITY) {
- // log ("stopping retransmission/coalescing timer");
+ boolean stopTimer = true;
+ for (Peer p : peers()) if (p.checkTimeouts()) stopTimer = false;
+ if (stopTimer) {
+ log ("stopping retransmission timer");
timerRunning = false;
}
- else {
- double sleep = deadline - Event.time();
- if (sleep < Peer.MIN_SLEEP) sleep = Peer.MIN_SLEEP;
- // log ("sleeping for " + sleep + " seconds");
- Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
- }
+ else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
}
// EventTarget interface
@@ -423,6 +419,7 @@
case SSK_COLLISION:
generateSskInsert ((Integer) data, 1);
+ break;
case CHECK_TIMEOUTS:
checkTimeouts();
@@ -430,7 +427,6 @@
}
}
- // Each EventTarget class has its own event codes
public final static int REQUEST_CHK = 1;
public final static int INSERT_CHK = 2;
public final static int REQUEST_SSK = 3;
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 10:00:25 UTC
(rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 17:11:27 UTC
(rev 10775)
@@ -4,7 +4,7 @@
import java.util.Iterator;
import java.util.HashSet;
-public class Peer
+public class Peer implements EventTarget
{
private Node node; // The local node
public int address; // The remote node's address
@@ -18,10 +18,10 @@
public final static double LINK_IDLE = 8.0; // RTTs without transmitting
// Coalescing
- public final static double MIN_SLEEP = 0.01; // Poll the b/w limiter
- public final static double MAX_DELAY = 0.1; // Coalescing delay in secs
+ private final static double MAX_DELAY = 0.1; // Max coalescing delay
+ private final static double MIN_SLEEP = 0.01; // Poll the b/w limiter
- // Out-of-order delivery with eventual detection of missing packets
+ // Out-of-order delivery with duplicate detection
public final static int SEQ_RANGE = 1000;
// Sender state
@@ -33,8 +33,9 @@
private DeadlineQueue<Message> searchQueue; // Outgoing search messages
private DeadlineQueue<Message> transferQueue; // Outgoing transfers
private CongestionWindow window; // AIMD congestion window
- private double lastTransmission = 0.0; // Clock time
- private boolean tgif = false; // "Transfers go in first" toggle
+ private double lastTransmission = Double.POSITIVE_INFINITY; // Time
+ private int searchBytesSent = 0, transferBytesSent = 0;
+ private boolean timerRunning = false; // Coalescing timer
// Receiver state
private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -58,8 +59,7 @@
public void sendMessage (Message m)
{
m.deadline = Event.time() + MAX_DELAY;
- if (m instanceof Block || m instanceof DataInsert
- || m instanceof ChkDataFound) {
+ if (m instanceof Block) {
log (m + " added to transfer queue");
transferQueue.add (m);
}
@@ -67,8 +67,8 @@
log (m + " added to search queue");
searchQueue.add (m);
}
- // Start the node's timer if necessary
- node.startTimer();
+ // Start the coalescing timer
+ startTimer();
// Send as many packets as possible
while (send());
}
@@ -78,72 +78,115 @@
{
log ("ack " + seq + " added to ack queue");
ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
- // Start the node's timer if necessary
- node.startTimer();
+ // Start the coalescing timer
+ startTimer();
// Send as many packets as possible
while (send());
}
+ // Start the coalescing timer
+ private void startTimer()
+ {
+ if (timerRunning) return;
+ timerRunning = true;
+ log ("starting coalescing timer");
+ Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
+ }
+
// Try to send a packet, return true if a packet was sent
private boolean send()
- {
- if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
- log ("nothing to send");
- return false;
- }
- log (ackQueue.size + " bytes of acks in queue");
- log (searchQueue.size + " bytes of searches in queue");
- log (transferQueue.size + " bytes of transfers in queue");
-
+ {
+ int waiting = ackQueue.size+searchQueue.size+transferQueue.size;
+ log (waiting + " bytes waiting");
+ if (waiting == 0) return false;
// Return to slow start when the link is idle
double now = Event.time();
if (now - lastTransmission > LINK_IDLE * rtt) window.reset();
lastTransmission = now;
-
- // Delay small packets for coalescing
- if (now < deadline (now)) {
- int payload = searchQueue.size + transferQueue.size;
- log ("delaying transmission of " + payload + " bytes");
+ // How many bytes of messages can we send?
+ int available = Math.min (window.available(),
+ node.bandwidth.available());
+ log (available + " bytes available for packet");
+ // If there are no urgent acks, and no urgent messages or no
+ // room to send them, and not enough messages for a large
+ // packet or no room to send a large packet, give up!
+ if (ackQueue.deadline() > now
+ && (searchQueue.deadline() > now
+ || searchQueue.headSize() > available)
+ && (transferQueue.deadline() > now
+ || transferQueue.headSize() > available)
+ && (waiting < Packet.SENSIBLE_PAYLOAD
+ || available < Packet.SENSIBLE_PAYLOAD)) {
+ log ("not sending a packet");
return false;
}
-
+ // Construct a packet
Packet p = new Packet();
-
- // Put all waiting acks in the packet
while (ackQueue.size > 0) p.addAck (ackQueue.pop());
-
- // Don't send sequence number n+SEQ_RANGE until sequence
- // number n has been acked - this limits the number of
- // sequence numbers the receiver must store for replay
- // detection. We must still be allowed to send acks,
- // otherwise the connection could deadlock.
-
- if (txSeq > txMaxSeq)
- log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
- else if (window.available() <= 0)
- log ("no room in congestion window for messages");
- else if (node.bandwidth.available() <= 0)
- log ("no bandwidth available for messages");
- else pack (p); // OK to send data
-
+ int space = Math.min (available, Packet.MAX_SIZE - p.size);
+ addPayload (p, space);
// Don't send empty packets
if (p.acks == null && p.messages == null) return false;
-
+ // Transmit the packet
+ log ("sending packet " + p.seq + ", " + p.size + " bytes");
+ node.net.send (p, address, latency);
+ node.bandwidth.remove (p.size);
// If the packet contains data, buffer it for retransmission
if (p.messages != null) {
- p.seq = txSeq++;
p.sent = now;
txBuffer.add (p);
+ node.startTimer(); // Start the retransmission timer
window.bytesSent (p.size);
}
-
- // Send the packet
- log ("sending packet " + p.seq + ", " + p.size + " bytes");
- node.net.send (p, address, latency);
- node.bandwidth.remove (p.size);
return true;
}
+ // Allocate a payload number and add messages to a packet
+ private void addPayload (Packet p, int space)
+ {
+ log (space + " bytes available for messages");
+ if (txSeq > txMaxSeq) {
+ log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
+ return;
+ }
+ p.seq = txSeq++;
+ // Searches get priority unless transfers are starving
+ if (searchBytesSent < transferBytesSent) {
+ while (searchQueue.size > 0
+ && searchQueue.headSize() <= space) {
+ Message m = searchQueue.pop();
+ searchBytesSent += m.size();
+ space -= m.size();
+ p.addMessage (m);
+ }
+ while (transferQueue.size > 0
+ && transferQueue.headSize() <= space) {
+ Message m = transferQueue.pop();
+ transferBytesSent += m.size();
+ space -= m.size();
+ p.addMessage (m);
+ }
+ }
+ else {
+ while (transferQueue.size > 0
+ && transferQueue.headSize() <= space) {
+ Message m = transferQueue.pop();
+ transferBytesSent += m.size();
+ space -= m.size();
+ p.addMessage (m);
+ }
+ while (searchQueue.size > 0
+ && searchQueue.headSize() <= space) {
+ Message m = searchQueue.pop();
+ searchBytesSent += m.size();
+ space -= m.size();
+ p.addMessage (m);
+ }
+ }
+ if (p.messages == null) log ("no messages added");
+ else log (p.messages.size() + " messages added");
+ }
+
// Called by Node when a packet arrives
public void handlePacket (Packet p)
{
@@ -153,26 +196,24 @@
private void handleData (Packet p)
{
- log ("received packet " + p.seq + ", " + p.size + " bytes");
+ log ("received " + p + ", " + p.size + " bytes");
+ sendAck (p.seq);
if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
- log ("duplicate packet");
- sendAck (p.seq); // Original ack may have been lost
+ log (p + " is a duplicate");
}
else if (p.seq == rxSeq) {
- log ("packet in order");
+ log (p + " is in order");
// Find the sequence number of the next missing packet
int was = rxSeq;
while (rxDupe.remove (++rxSeq));
log ("rxSeq was " + was + ", now " + rxSeq);
// Deliver the packet
unpack (p);
- sendAck (p.seq);
}
- else if (p.seq < rxSeq + SEQ_RANGE) {
- log ("packet out of order - expected " + rxSeq);
+ else if (p.seq < rxSeq + SEQ_RANGE * 2) {
+ log (p + " is out of order - expected " + rxSeq);
if (rxDupe.add (p.seq)) unpack (p);
- else log ("duplicate packet");
- sendAck (p.seq); // Original ack may have been lost
+ else log (p + " is a duplicate");
}
// This indicates a misbehaving sender - discard the packet
else log ("warning: received " + p.seq + " before " + rxSeq);
@@ -211,38 +252,11 @@
if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1;
else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
log ("maximum sequence number " + txMaxSeq);
- // Send as many packets as possible
- while (send());
+ // Send as many packets a possible
+ if (timerRunning) while (send());
+ else checkDeadlines();
}
- // Add messages to a packet
- private void pack (Packet p)
- {
- // Alternate between giving searches and transfers priority
- if (tgif) {
- // Transfers go in first
- while (transferQueue.size > 0
- && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
- p.addMessage (transferQueue.pop());
- // Fill any remaining space with searches
- while (searchQueue.size > 0
- && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
- p.addMessage (searchQueue.pop());
- tgif = false;
- }
- else {
- // Searches go in first
- while (searchQueue.size > 0
- && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
- p.addMessage (searchQueue.pop());
- // Fill any remaining space with transfers
- while (transferQueue.size > 0
- && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
- p.addMessage (transferQueue.pop());
- tgif = true;
- }
- }
-
// Remove messages from a packet and deliver them to the node
private void unpack (Packet p)
{
@@ -250,17 +264,13 @@
for (Message m : p.messages) node.handleMessage (m, this);
}
- // Called by Node, returns the next coalescing or retx deadline
- public double checkTimeouts()
+ // Check retx timeouts, return true if there are packets in flight
+ public boolean checkTimeouts()
{
- log ("checking timeouts");
- // Send as many packets as possible
- while (send());
+ log (txBuffer.size() + " packets in flight");
+ if (txBuffer.isEmpty()) return false;
double now = Event.time();
- if (txBuffer.isEmpty()) return deadline (now);
- log (txBuffer.size() + " packets in flight");
-
for (Packet p : txBuffer) {
if (now - p.sent > RTO * rtt + MAX_DELAY) {
// Retransmission timeout
@@ -270,51 +280,68 @@
window.timeout (now);
}
}
-
- // Sleep for up to MAX_DELAY seconds until the next deadline
- return Math.min (now + MAX_DELAY, deadline (now));
+ return true;
}
- // Work out when the first ack or search or transfer needs to be sent
- private double deadline (double now)
+ // Event callback: wake up, send packets, go back to sleep
+ private void checkDeadlines()
{
- return Math.min (ackQueue.deadline(), dataDeadline (now));
+ // Send as many packets as possible
+ while (send());
+ // Find the next coalescing deadline - ignore message
+ // deadlines if there isn't room in the congestion window
+ // (we have to wait for an ack before sending them)
+ double dl = ackQueue.deadline();
+ if (searchQueue.headSize() <= window.available())
+ dl = Math.min (dl, searchQueue.deadline());
+ if (transferQueue.headSize() <= window.available())
+ dl = Math.min (dl, transferQueue.deadline());
+ // If there's no deadline, stop the timer
+ if (dl == Double.POSITIVE_INFINITY) {
+ if (timerRunning) {
+ log ("stopping coalescing timer");
+ timerRunning = false;
+ }
+ return;
+ }
+ // Schedule the next check
+ double sleep = Math.max (dl - Event.time(), MIN_SLEEP);
+ if (waitingForBandwidth()) {
+ log ("waiting for bandwidth");
+ sleep = MIN_SLEEP; // Poll the bandwidth limiter
+ }
+ timerRunning = true;
+ log ("sleeping for " + sleep + " seconds");
+ Event.schedule (this, sleep, CHECK_DEADLINES, null);
}
- // Work out when the first search or transfer needs to be sent
- private double dataDeadline (double now)
+ // Are there any messages blocked by the bandwidth limiter?
+ private boolean waitingForBandwidth()
{
- // If there's no data waiting, use the ack deadline
- if (searchQueue.size + transferQueue.size == 0)
- return Double.POSITIVE_INFINITY;
-
- double deadline = Math.min (searchQueue.deadline(),
- transferQueue.deadline());
-
- // Delay small packets until the coalescing deadline
- if (searchQueue.size + transferQueue.size
- < Packet.SENSIBLE_PAYLOAD)
- return deadline;
-
- // If there's not enough room in the window, wait for an ack
- if (window.available() <= 0)
- return Double.POSITIVE_INFINITY;
-
- // If there's not enough bandwidth, try again shortly
- if (node.bandwidth.available() <= 0)
- return Math.max (now + MIN_SLEEP, deadline);
-
- // Send a packet immediately
- return now;
+ int bandwidth = node.bandwidth.available();
+ double now = Event.time();
+ if (searchQueue.headSize() > bandwidth
+ && searchQueue.deadline() <= now) return true;
+ if (transferQueue.headSize() > bandwidth
+ && transferQueue.deadline() <= now) return true;
+ return false;
}
public void log (String message)
{
- // Event.log (node.net.address + ":" + address + " " + message);
+ Event.log (node.net.address + ":" + address + " " + message);
}
public String toString()
{
return Integer.toString (address);
}
+
+ // EventTarget interface
+ public void handleEvent (int type, Object data)
+ {
+ if (type == CHECK_DEADLINES) checkDeadlines();
+ }
+
+ private final static int CHECK_DEADLINES = 1;
}
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-01 10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-01 17:11:27 UTC (rev 10775)
@@ -261,7 +261,6 @@
}
}
- // Each EventTarget class has its own event codes
private final static int ACCEPTED_TIMEOUT = 1;
private final static int SEARCH_TIMEOUT = 2;
private final static int DATA_TIMEOUT = 3;
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
2006-11-01 10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
2006-11-01 17:11:27 UTC (rev 10775)
@@ -132,7 +132,6 @@
}
}
- // Each EventTarget class has its own event codes
protected final static int ACCEPTED_TIMEOUT = 1;
protected final static int SEARCH_TIMEOUT = 2;
protected final static int TRANSFER_TIMEOUT = 3;
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-01 10:00:25 UTC (rev 10774)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-01 17:11:27 UTC (rev 10775)
@@ -213,7 +213,6 @@
}
}
- // Each EventTarget class has its own event codes
private final static int KEY_TIMEOUT = 1;
private final static int ACCEPTED_TIMEOUT = 2;
private final static int SEARCH_TIMEOUT = 3;