Author: mrogers
Date: 2006-10-30 18:06:44 +0000 (Mon, 30 Oct 2006)
New Revision: 10746
Removed:
trunk/apps/load-balancing-sims/phase6/TokenBucket.java
Modified:
trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
trunk/apps/load-balancing-sims/phase6/Node.java
trunk/apps/load-balancing-sims/phase6/Peer.java
Log:
Bandwidth & congestion refactoring, part 1
Modified: trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java 2006-10-30
18:06:44 UTC (rev 10746)
@@ -24,9 +24,8 @@
public double deadline()
{
- Double deadline = deadlines.peek();
- if (deadline == null) return Double.POSITIVE_INFINITY;
- else return deadline;
+ if (deadlines.isEmpty()) return Double.POSITIVE_INFINITY;
+ else return deadlines.peek();
}
public MESSAGE pop()
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-10-30 18:06:44 UTC
(rev 10746)
@@ -6,12 +6,6 @@
class Node implements EventTarget
{
- public final static double SHORT_SLEEP = 0.01; // Poll the bw limiter
-
- // Token bucket bandwidth limiter
- public final static int BUCKET_RATE = 30000; // Bytes per second
- public final static int BUCKET_SIZE = 60000; // Burst size in bytes
-
public double location; // Routing location
public NetworkInterface net;
private HashMap<Integer,Peer> peers; // Look up a peer by its address
@@ -24,7 +18,6 @@
private LruCache<Integer> pubKeyCache; // SSK public keys
private boolean decrementMaxHtl = false;
private boolean decrementMinHtl = false;
- public TokenBucket bandwidth; // Bandwidth limiter
private boolean timerRunning = false; // Is the retx timer running?
public Node (double txSpeed, double rxSpeed)
@@ -46,7 +39,6 @@
pubKeyCache = new LruCache<Integer> (10);
if (Math.random() < 0.5) decrementMaxHtl = true;
if (Math.random() < 0.25) decrementMinHtl = true;
- bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
}
// Return true if a connection was added, false if already connected
@@ -54,7 +46,7 @@
{
if (n == this) return false;
if (peers.containsKey (n.net.address)) return false;
- log ("adding peer " + n.net.address);
+ // log ("adding peer " + n.net.address);
Peer p = new Peer (this, n.net.address, n.location, latency);
peers.put (n.net.address, p);
return true;
@@ -398,8 +390,7 @@
timerRunning = false;
}
else {
- double sleep = deadline - Event.time(); // Can be < 0
- if (sleep < SHORT_SLEEP) sleep = SHORT_SLEEP;
+ double sleep = Math.max (deadline - Event.time(), 0.0);
// log ("sleeping for " + sleep + " seconds");
Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
}
Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java 2006-10-30 18:06:44 UTC
(rev 10746)
@@ -83,14 +83,16 @@
// Try to send a packet, return true if a packet was sent
private boolean send()
- {
- if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
+ {
+ int waiting = ackQueue.size + searchQueue.size
+ + transferQueue.size;
+ if (waiting == 0) {
log ("nothing to send");
return false;
}
- log (ackQueue.size + " bytes of acks in queue");
- log (searchQueue.size + " bytes of searches in queue");
- log (transferQueue.size + " bytes of transfers in queue");
+ log (ackQueue.size + " bytes in ack queue");
+ log (searchQueue.size + " bytes in search queue");
+ log (transferQueue.size + " bytes in transfer queue");
// Return to slow start when the link is idle
double now = Event.time();
@@ -98,9 +100,8 @@
lastTransmission = now;
// Delay small packets for coalescing
- if (now < deadline (now)) {
- int payload = searchQueue.size + transferQueue.size;
- log ("delaying transmission of " + payload + " bytes");
+ if (now < deadline()) {
+ log ("delaying transmission of " + waiting + " bytes");
return false;
}
@@ -119,8 +120,6 @@
log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
else if (window.available() <= 0)
log ("no room in congestion window for messages");
- else if (node.bandwidth.available() <= 0)
- log ("no bandwidth available for messages");
else pack (p); // OK to send data
// Don't send empty packets
@@ -137,7 +136,6 @@
// Send the packet
log ("sending packet " + p.seq + ", " + p.size + " bytes");
node.net.send (p, address, latency);
- node.bandwidth.remove (p.size);
return true;
}
@@ -250,13 +248,13 @@
// Called by Node, returns the next coalescing or retx deadline
public double checkTimeouts()
{
- log ("checking timeouts");
// Send as many packets as possible
while (send());
- log (txBuffer.size() + " packets in flight");
+ log ("checking timeouts");
double now = Event.time();
- if (txBuffer.isEmpty()) return deadline (now);
+ if (txBuffer.isEmpty()) return deadline();
+ log (txBuffer.size() + " packets in flight");
for (Packet p : txBuffer) {
if (now - p.sent > RTO * rtt + MAX_DELAY) {
@@ -269,40 +267,34 @@
}
// Sleep for up to MAX_DELAY seconds until the next deadline
- return Math.min (now + MAX_DELAY, deadline (now));
+ return Math.min (now + MAX_DELAY, deadline());
}
// Work out when the first ack or search or transfer needs to be sent
- private double deadline (double now)
+ private double deadline()
{
- return Math.min (ackQueue.deadline(), dataDeadline (now));
+ return Math.min (ackQueue.deadline(), dataDeadline());
}
// Work out when the first search or transfer needs to be sent
- private double dataDeadline (double now)
+ private double dataDeadline()
{
+ int waiting = searchQueue.size + transferQueue.size;
+
// If there's no data waiting, use the ack deadline
- if (searchQueue.size + transferQueue.size == 0)
- return Double.POSITIVE_INFINITY;
+ if (waiting == 0) return Double.POSITIVE_INFINITY;
- double deadline = Math.min (searchQueue.deadline(),
- transferQueue.deadline());
-
// Delay small packets until the coalescing deadline
- if (searchQueue.size + transferQueue.size
- < Packet.SENSIBLE_PAYLOAD)
- return deadline;
+ if (waiting < Packet.SENSIBLE_PAYLOAD)
+ return Math.min (searchQueue.deadline(),
+ transferQueue.deadline());
// If there's not enough room in the window, wait for an ack
- if (window.available() <= 0)
+ if (window.available() < Packet.SENSIBLE_PAYLOAD)
return Double.POSITIVE_INFINITY;
- // If there's not enough bandwidth, try again shortly
- if (node.bandwidth.available() <= 0)
- return Math.max (deadline, now + Node.SHORT_SLEEP);
-
// Send a packet immediately
- return now;
+ return 0.0;
}
public void log (String message)
Deleted: trunk/apps/load-balancing-sims/phase6/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/TokenBucket.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase6/TokenBucket.java 2006-10-30
18:06:44 UTC (rev 10746)
@@ -1,29 +0,0 @@
-class TokenBucket
-{
- private double tokens, rate, size, lastUpdated;
-
- public TokenBucket (double rate, double size)
- {
- tokens = size;
- this.rate = rate;
- this.size = size;
- lastUpdated = 0.0; // Clock time
- }
-
- public int available()
- {
- double now = Event.time();
- double elapsed = now - lastUpdated;
- lastUpdated = now;
- tokens += elapsed * rate;
- if (tokens > size) tokens = size;
- // Event.log (tokens + " tokens available");
- return (int) tokens;
- }
-
- public void remove (int t)
- {
- tokens -= t; // Counter can go negative
- // Event.log (t + " tokens removed, " + tokens + " available");
- }
-}