Author: mrogers
Date: 2006-10-30 20:23:12 +0000 (Mon, 30 Oct 2006)
New Revision: 10748
Added:
trunk/apps/load-balancing-sims/phase6/TokenBucket.java
Modified:
trunk/apps/load-balancing-sims/phase6/Node.java
trunk/apps/load-balancing-sims/phase6/Peer.java
Log:
Forget it, more urgent things to do...
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-10-30 18:17:39 UTC
(rev 10747)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-10-30 20:23:12 UTC
(rev 10748)
@@ -6,6 +6,12 @@
class Node implements EventTarget
{
+ public final static double SHORT_SLEEP = 0.01; // Poll the bw limiter
+
+ // Token bucket bandwidth limiter
+ public final static int BUCKET_RATE = 30000; // Bytes per second
+ public final static int BUCKET_SIZE = 60000; // Burst size in bytes
+
public double location; // Routing location
public NetworkInterface net;
private HashMap<Integer,Peer> peers; // Look up a peer by its address
@@ -18,6 +24,7 @@
private LruCache<Integer> pubKeyCache; // SSK public keys
private boolean decrementMaxHtl = false;
private boolean decrementMinHtl = false;
+ public TokenBucket bandwidth; // Bandwidth limiter
private boolean timerRunning = false; // Is the retx timer running?
public Node (double txSpeed, double rxSpeed)
@@ -39,6 +46,7 @@
pubKeyCache = new LruCache<Integer> (10);
if (Math.random() < 0.5) decrementMaxHtl = true;
if (Math.random() < 0.25) decrementMinHtl = true;
+ bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
}
// Return true if a connection was added, false if already connected
@@ -390,7 +398,8 @@
timerRunning = false;
}
else {
- double sleep = Math.max (deadline - Event.time(), 0.0);
+ double sleep = deadline - Event.time(); // Can be < 0
+ if (sleep < SHORT_SLEEP) sleep = SHORT_SLEEP;
// log ("sleeping for " + sleep + " seconds");
Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
}
Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java 2006-10-30 18:17:39 UTC
(rev 10747)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java 2006-10-30 20:23:12 UTC
(rev 10748)
@@ -55,7 +55,7 @@
// Queue a message for transmission
public void sendMessage (Message m)
{
- m.deadline = Event.time() + MAX_DELAY; // FIXME
+ m.deadline = Event.time() + MAX_DELAY;
if (m instanceof Block || m instanceof DataInsert
|| m instanceof ChkDataFound) {
log (m + " added to transfer queue");
@@ -84,16 +84,14 @@
// Try to send a packet, return true if a packet was sent
private boolean send()
- {
- int waiting = ackQueue.size + searchQueue.size
- + transferQueue.size;
- if (waiting == 0) {
+ {
+ if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
log ("nothing to send");
return false;
}
- log (ackQueue.size + " bytes in ack queue");
- log (searchQueue.size + " bytes in search queue");
- log (transferQueue.size + " bytes in transfer queue");
+ log (ackQueue.size + " bytes of acks in queue");
+ log (searchQueue.size + " bytes of searches in queue");
+ log (transferQueue.size + " bytes of transfers in queue");
// Return to slow start when the link is idle
double now = Event.time();
@@ -101,8 +99,9 @@
lastTransmission = now;
// Delay small packets for coalescing
- if (now < deadline()) {
- log ("delaying transmission of " + waiting + " bytes");
+ if (now < deadline (now)) {
+ int payload = searchQueue.size + transferQueue.size;
+ log ("delaying transmission of " + payload + " bytes");
return false;
}
@@ -121,6 +120,8 @@
log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
else if (window.available() <= 0)
log ("no room in congestion window for messages");
+ else if (node.bandwidth.available() <= 0)
+ log ("no bandwidth available for messages");
else pack (p); // OK to send data
// Don't send empty packets
@@ -137,6 +138,7 @@
// Send the packet
log ("sending packet " + p.seq + ", " + p.size + " bytes");
node.net.send (p, address, latency);
+ node.bandwidth.remove (p.size);
return true;
}
@@ -249,13 +251,13 @@
// Called by Node, returns the next coalescing or retx deadline
public double checkTimeouts()
{
+ log ("checking timeouts");
// Send as many packets as possible
while (send());
- log ("checking timeouts");
+ log (txBuffer.size() + " packets in flight");
double now = Event.time();
- if (txBuffer.isEmpty()) return deadline();
- log (txBuffer.size() + " packets in flight");
+ if (txBuffer.isEmpty()) return deadline (now);
for (Packet p : txBuffer) {
if (now - p.sent > RTO * rtt + MAX_DELAY) {
@@ -268,34 +270,40 @@
}
// Sleep for up to MAX_DELAY seconds until the next deadline
- return Math.min (now + MAX_DELAY, deadline());
+ return Math.min (now + MAX_DELAY, deadline (now));
}
// Work out when the first ack or search or transfer needs to be sent
- private double deadline()
+ private double deadline (double now)
{
- return Math.min (ackQueue.deadline(), dataDeadline());
+ return Math.min (ackQueue.deadline(), dataDeadline (now));
}
// Work out when the first search or transfer needs to be sent
- private double dataDeadline()
+ private double dataDeadline (double now)
{
- int waiting = searchQueue.size + transferQueue.size;
-
// If there's no data waiting, use the ack deadline
- if (waiting == 0) return Double.POSITIVE_INFINITY;
+ if (searchQueue.size + transferQueue.size == 0)
+ return Double.POSITIVE_INFINITY;
+ double deadline = Math.min (searchQueue.deadline(),
+ transferQueue.deadline());
+
// Delay small packets until the coalescing deadline
- if (waiting < Packet.SENSIBLE_PAYLOAD)
- return Math.min (searchQueue.deadline(),
- transferQueue.deadline());
+ if (searchQueue.size + transferQueue.size
+ < Packet.SENSIBLE_PAYLOAD)
+ return deadline;
// If there's not enough room in the window, wait for an ack
- if (window.available() < Packet.SENSIBLE_PAYLOAD)
+ if (window.available() <= 0)
return Double.POSITIVE_INFINITY;
+ // If there's not enough bandwidth, try again shortly
+ if (node.bandwidth.available() <= 0)
+ return Math.max (deadline, now + Node.SHORT_SLEEP);
+
// Send a packet immediately
- return 0.0;
+ return now;
}
public void log (String message)
Copied: trunk/apps/load-balancing-sims/phase6/TokenBucket.java (from rev 10742,
trunk/apps/load-balancing-sims/phase6/TokenBucket.java)