Author: mrogers
Date: 2006-11-16 15:15:27 +0000 (Thu, 16 Nov 2006)
New Revision: 10943
Modified:
trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/Packet.java
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
trunk/apps/load-balancing-sims/phase7/sim/messages/Message.java
Log:
Measure the bandwidth delay - FIXME: the measured delay is large during
transfers, but the delay is not actually being caused by the bandwidth limiter,
which is set far above the available physical bandwidth
Modified: trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
2006-11-16 14:34:18 UTC (rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java
2006-11-16 15:15:27 UTC (rev 10943)
@@ -28,12 +28,9 @@
address = Network.register (this);
}
- // Called by Peer
- public void send (Packet p, int dest, double latency)
+ // Called by Node
+ public void sendPacket (Packet p)
{
- p.src = address;
- p.dest = dest;
- p.latency = latency;
if (txQueueSize + p.size > txQueueMaxSize) {
log ("no room in txQueue, " + p + " lost");
return;
@@ -75,7 +72,7 @@
log ("finished receiving " + p);
node.handlePacket (p);
rxQueueSize -= p.size;
- rxQueue.remove (p);
+ rxQueue.poll();
// If there's another packet waiting, start to receive it
if (!rxQueue.isEmpty()) rxStart (rxQueue.peek());
}
@@ -94,7 +91,7 @@
log ("finished transmitting " + p);
Network.deliver (p);
txQueueSize -= p.size;
- txQueue.remove (p);
+ txQueue.poll();
// If there's another packet waiting, start to transmit it
if (!txQueue.isEmpty()) txStart (txQueue.peek());
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-16 14:34:18 UTC
(rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-16 15:15:27 UTC
(rev 10943)
@@ -14,6 +14,7 @@
// Flow control
public final static int FLOW_TOKENS = 20; // Shared by all peers
public final static double TOKEN_DELAY = 1.0; // Allocate initial tokens
+ public final static double DELAY_DECAY = 0.9; // Exp moving average
public double location; // Routing location
public NetworkInterface net;
@@ -30,6 +31,7 @@
public TokenBucket bandwidth; // Bandwidth limiter
private boolean timerRunning = false;
private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
+ private double bwDelay = 0.0; // Average delay caused by b/w limiter
public Node (double txSpeed, double rxSpeed)
{
@@ -159,21 +161,50 @@
pubKeyCache.put (key);
}
- // Called by Peer after transmitting a packet
+ // Called by Peer to start the retransmission timer
public void startTimer()
{
if (timerRunning) return;
timerRunning = true;
- // log ("starting retransmission timer");
+ log ("starting retransmission timer");
Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
}
+ // Called by Peer to transmit a packet for the first time
+ public void sendPacket (Packet p)
+ {
+ // Update the bandwidth limiter
+ bandwidth.remove (p.size);
+ // Update the average bandwidth delay
+ if (p.messages != null) {
+ double now = Event.time();
+ for (Message m : p.messages) {
+ double delay = now - m.deadline;
+ log ("bandwidth delay " + delay);
+ bwDelay *= DELAY_DECAY;
+ bwDelay += delay * (1.0 - DELAY_DECAY);
+ }
+ log ("average bandwidth delay " + bwDelay);
+ }
+ // Send the packet
+ net.sendPacket (p);
+ }
+
+ // Called by Peer to retransmit a packet
+ public void resendPacket (Packet p)
+ {
+ // Update the bandwidth limiter
+ bandwidth.remove (p.size);
+ // Send the packet
+ net.sendPacket (p);
+ }
+
// Called by NetworkInterface
- public void handlePacket (Packet packet)
+ public void handlePacket (Packet p)
{
- Peer peer = peers.get (packet.src);
+ Peer peer = peers.get (p.src);
if (peer == null) log ("received packet from unknown peer");
- else peer.handlePacket (packet);
+ else peer.handlePacket (p);
}
// Called by Peer
@@ -447,7 +478,7 @@
boolean stopTimer = true;
for (Peer p : peers()) if (p.checkTimeouts()) stopTimer = false;
if (stopTimer) {
- // log ("stopping retransmission timer");
+ log ("stopping retransmission timer");
timerRunning = false;
}
else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
Modified: trunk/apps/load-balancing-sims/phase7/sim/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Packet.java 2006-11-16
14:34:18 UTC (rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/Packet.java 2006-11-16
15:15:27 UTC (rev 10943)
@@ -12,15 +12,22 @@
public final static int MAX_SIZE = 1450; // MTU including headers
public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing
- public int src, dest; // Network addresses
+ public final int src, dest; // Network addresses
public int size = HEADER_SIZE; // Size in bytes, including headers
public int seq = -1; // Data sequence number (-1 if no data)
public ArrayList<Ack> acks = null;
public ArrayList<Message> messages = null;
public double sent; // Time at which the packet was (re) transmitted
- public double latency; // Link latency (stored here for convenience)
+ public double latency; // Link latency, stored here for convenience
+ public Packet (int src, int dest, double latency)
+ {
+ this.src = src;
+ this.dest = dest;
+ this.latency = latency;
+ }
+
public void addAck (Ack a)
{
if (acks == null) acks = new ArrayList<Ack>();
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-16 14:34:18 UTC
(rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-16 15:15:27 UTC
(rev 10943)
@@ -18,11 +18,11 @@
public final static double LINK_IDLE = 8.0; // RTTs without transmitting
// Coalescing
- private final static double MAX_SLEEP = 0.1; // Max coalescing delay
- private final static double MIN_SLEEP = 0.01; // Forty winks
+ public final static double MAX_DELAY = 0.1; // Max coalescing delay
+ public final static double MIN_SLEEP = 0.01; // Forty winks
// Out-of-order delivery with duplicate detection
- public final static int SEQ_RANGE = 1000;
+ public final static int SEQ_RANGE = 65536;
// Sender state
private double rtt = 5.0; // Estimated round-trip time in seconds
@@ -36,7 +36,6 @@
private double lastTransmission = Double.POSITIVE_INFINITY; // Time
private boolean tgif = false; // "Transfers go in first" toggle
private boolean timerRunning = false; // Coalescing timer
- private double pollingInterval; // Poll the bandwidth limiter
// Receiver state
private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -58,16 +57,12 @@
transferQueue = new DeadlineQueue<Message>();
window = new CongestionWindow (this);
rxDupe = new HashSet<Integer>();
- // Poll the bandwidth limiter at reasonable intervals
- pollingInterval = Packet.SENSIBLE_PAYLOAD / node.bandwidth.rate;
- if (pollingInterval > MAX_SLEEP) pollingInterval = MAX_SLEEP;
- if (pollingInterval < MIN_SLEEP) pollingInterval = MIN_SLEEP;
}
// Queue a message for transmission
public void sendMessage (Message m)
{
- m.deadline = Event.time() + MAX_SLEEP;
+ m.deadline = Event.time() + MAX_DELAY;
if (m instanceof Block) {
log (m + " added to transfer queue");
transferQueue.add (m);
@@ -86,7 +81,7 @@
private void sendAck (int seq)
{
log ("ack " + seq + " added to ack queue");
- ackQueue.add (new Ack (seq, Event.time() + MAX_SLEEP));
+ ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
// Start the coalescing timer
startTimer();
// Send as many packets as possible
@@ -99,7 +94,7 @@
if (timerRunning) return;
timerRunning = true;
log ("starting coalescing timer");
- Event.schedule (this, MAX_SLEEP, CHECK_DEADLINES, null);
+ Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
}
// Try to send a packet, return true if a packet was sent
@@ -138,9 +133,11 @@
private boolean sendPacket (int maxSize)
{
// Construct a packet
- Packet p = new Packet();
+ Packet p = new Packet (node.net.address, address, latency);
+ // Add all waiting acks to the packet
while (ackQueue.size > 0) p.addAck (ackQueue.pop());
log ((maxSize - p.size) + " bytes available for messages");
+ // Don't allow more than SEQ_RANGE payloads to be in flight
if (txSeq <= txMaxSeq) {
// Alternate priority between searches and transfers
if (tgif) {
@@ -161,8 +158,7 @@
if (p.acks == null && p.messages == null) return false;
// Transmit the packet
log ("sending packet " + p.seq + ", " + p.size + " bytes");
- node.net.send (p, address, latency);
- node.bandwidth.remove (p.size);
+ node.sendPacket (p);
// If the packet contains data, buffer it for retransmission
if (p.messages != null) {
p.sent = Event.time();
@@ -230,10 +226,10 @@
break;
}
// Fast retransmission
- if (p.seq < seq && age > FRTO * rtt + MAX_SLEEP) {
+ if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
p.sent = now;
log ("fast retransmitting packet " + p.seq);
- node.net.send (p, address, latency);
+ node.resendPacket (p);
window.fastRetransmission (now);
}
}
@@ -254,11 +250,11 @@
double now = Event.time();
for (Packet p : txBuffer) {
- if (now - p.sent > RTO * rtt + MAX_SLEEP) {
+ if (now - p.sent > RTO * rtt + MAX_DELAY) {
// Retransmission timeout
log ("retransmitting packet " + p.seq);
p.sent = now;
- node.net.send (p, address, latency);
+ node.resendPacket (p);
window.timeout (now);
}
}
@@ -288,7 +284,7 @@
}
// Schedule the next check
double sleep = dl - Event.time();
- if (shouldPoll()) sleep = Math.max (sleep, pollingInterval);
+ if (shouldPoll()) sleep = Math.max (sleep, node.bandwidth.poll);
else sleep = Math.max (sleep, MIN_SLEEP);
timerRunning = true;
log ("sleeping for " + sleep + " seconds");
@@ -299,25 +295,26 @@
private boolean shouldPoll()
{
double now = Event.time();
- if (ackQueue.deadline() < now + pollingInterval) return false;
-
+ // Will we need to send an ack before the next polling interval?
+ if (ackQueue.deadline() < now + node.bandwidth.poll)
+ return false;
double bw = node.bandwidth.available();
double win = window.available();
-
+ // Is there an overdue search that's waiting for bandwidth?
if (searchQueue.headSize() > bw
&& searchQueue.headSize() <= win
&& searchQueue.deadline() <= now) return true;
-
+ // Is there an overdue transfer that's waiting for bandwidth?
if (transferQueue.headSize() > bw
&& transferQueue.headSize() <= win
&& transferQueue.deadline() <= now) return true;
-
+ // We're waiting for something other than bandwidth
return false;
}
public void log (String message)
{
- // Event.log (node.net.address + ":" + address + " " + message);
+ Event.log (node.net.address + ":" + address + " " + message);
}
public String toString()
Modified: trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java 2006-11-16
14:34:18 UTC (rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java 2006-11-16
15:15:27 UTC (rev 10943)
@@ -2,13 +2,17 @@
class TokenBucket
{
- public final double rate, size;
+ public final double rate, size, poll;
private double tokens, lastUpdated;
public TokenBucket (double rate, double size)
{
this.rate = rate;
this.size = size;
+ double poll = Packet.MAX_SIZE / rate;
+ if (poll < Peer.MIN_SLEEP) poll = Peer.MIN_SLEEP;
+ if (poll > Peer.MAX_DELAY) poll = Peer.MAX_DELAY;
+ this.poll = poll;
tokens = size;
lastUpdated = 0.0; // Clock time
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/Message.java
2006-11-16 14:34:18 UTC (rev 10942)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/Message.java
2006-11-16 15:15:27 UTC (rev 10943)
@@ -4,16 +4,16 @@
public class Message
{
- public final static int HEADER_SIZE = 12; // Bytes, including unique ID
+ public final static int HEADER_SIZE = 12; // Bytes, including search ID
public final static int KEY_SIZE = 32; // Size of a routing key, bytes
public final static int PUB_KEY_SIZE = 1024; // Size of a pub key, bytes
public final static int DATA_SIZE = 1024; // Size of a data block, bytes
public final static int ACK_SIZE = 4; // Size of a sequence num, bytes
- public static int nextId = 0; // Each request and insert has a unique ID
+ public static int nextId = 0; // Each search has a unique ID
- public int id; // Unique request ID
- public double deadline = 0.0; // Coalescing, stored here for convenience
+ public int id; // Search ID
+ public double deadline = 0.0; // Coalescing deadline
// Override this
public int size()