Author: mrogers
Date: 2007-02-23 20:30:06 +0000 (Fri, 23 Feb 2007)
New Revision: 11903
Removed:
trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
Log:
Cleaner (saner?) coalescing and retransmission code
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-23 18:11:16 UTC
(rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-23 20:30:06 UTC
(rev 11903)
@@ -12,9 +12,6 @@
{
public final static boolean LOG = false;
- // Coarse-grained retransmission timer
- public final static double RETX_TIMER = 0.1; // Seconds
-
// Flow control
public static boolean useTokens = false;
public static boolean useBackoff = false;
@@ -43,7 +40,6 @@
private boolean decrementMaxHtl = false;
private boolean decrementMinHtl = false;
public TokenBucket bandwidth; // Bandwidth limiter
- private boolean timerRunning = false;
private int spareTokens = FLOW_TOKENS; // Tokens not allocated to a peer
private double delay = 0.0; // Delay caused by congestion or b/w limiter
private LinkedList<Search> searchQueue;
@@ -70,7 +66,7 @@
pubKeyCache = new LruCache<Integer> (16000);
if (Math.random() < 0.5) decrementMaxHtl = true;
if (Math.random() < 0.25) decrementMinHtl = true;
- bandwidth = new TokenBucket (40000, 60000);
+ bandwidth = new TokenBucket (40000, 400000);
searchQueue = new LinkedList<Search>();
if (useTokens) {
// Allocate flow control tokens after a short delay
@@ -230,15 +226,6 @@
else if (LOG) log ("public key " + key + " not added to store");
}
- // Called by Peer to start the retransmission timer
- public void startTimer()
- {
- if (timerRunning) return;
- timerRunning = true;
- if (LOG) log ("starting retransmission timer");
- Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
- }
-
// Called by Peer to transmit a packet for the first time
public void sendPacket (Packet p)
{
@@ -611,17 +598,6 @@
addToSearchQueue (si);
}
- private void checkTimeouts()
- {
- boolean stopTimer = true;
- for (Peer p : peers()) if (p.checkTimeouts()) stopTimer = false;
- if (stopTimer) {
- if (LOG) log ("stopping retransmission timer");
- timerRunning = false;
- }
- else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
- }
-
// Allocate all flow control tokens at startup
private void allocateTokens()
{
@@ -657,10 +633,6 @@
generateSskInsert ((Integer) data, 1, null);
break;
- case CHECK_TIMEOUTS:
- checkTimeouts();
- break;
-
case ALLOCATE_TOKENS:
allocateTokens();
break;
@@ -676,7 +648,6 @@
public final static int REQUEST_SSK = 3;
public final static int INSERT_SSK = 4;
public final static int SSK_COLLISION = 5;
- private final static int CHECK_TIMEOUTS = 6;
- private final static int ALLOCATE_TOKENS = 7;
- private final static int SEND_SEARCH = 8;
+ private final static int ALLOCATE_TOKENS = 6;
+ private final static int SEND_SEARCH = 7;
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-23 18:11:16 UTC
(rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-23 20:30:06 UTC
(rev 11903)
@@ -19,9 +19,8 @@
public final static double RTT_DECAY = 0.9; // Exp moving average
public final static double LINK_IDLE = 8.0; // RTTs without transmitting
- // Coalescing
- public final static double MAX_DELAY = 0.1; // Max coalescing delay
- public final static double MIN_SLEEP = 0.01; // Forty winks
+ // Retransmission/coalescing timer
+ public final static double TICK = 0.1; // Timer granularity, seconds
// Backoff
public final static double INITIAL_BACKOFF = 1.0; // Seconds
@@ -39,9 +38,9 @@
private DeadlineQueue<Message> searchQueue; // Outgoing search messages
private DeadlineQueue<Message> transferQueue; // Outgoing transfers
private CongestionWindow window; // AIMD congestion window
- private double lastTransmission = Double.POSITIVE_INFINITY; // Time
+ private double lastTransmission = Double.POSITIVE_INFINITY; // Abs. time
private boolean tgif = false; // "Transfers go in first" toggle
- private boolean timerRunning = false; // Coalescing timer
+ private boolean timerRunning = false; // Retransmission/coalescing timer
// Receiver state
private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -50,8 +49,8 @@
// Flow control
private int tokensOut = 0; // How many searches can we send?
private int tokensIn = 0; // How many searches should we accept?
- public double backoffUntil = 0.0; // Time
- public double backoffLength = INITIAL_BACKOFF; // Seconds
+ public double backoffUntil = 0.0; // Absolute time, seconds
+ public double backoffLength = INITIAL_BACKOFF; // Relative time, seconds
public Peer (Node node, int address, double location, double latency)
{
@@ -69,7 +68,7 @@
// Queue a message for transmission
public void sendMessage (Message m)
{
- m.deadline = Event.time() + MAX_DELAY;
+ m.deadline = Event.time() + TICK;
if (m instanceof Block) {
if (LOG) log (m + " added to transfer queue");
transferQueue.add (m);
@@ -84,13 +83,13 @@
while (send (-1));
}
- // Start the coalescing timer
+ // Start the retransmission/coalescing timer
private void startTimer()
{
if (timerRunning) return;
timerRunning = true;
- if (LOG) log ("starting coalescing timer");
- Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null);
+ if (LOG) log ("starting timer");
+ Event.schedule (this, TICK, TIMER, null);
}
// Try to send a packet, return true if a packet was sent
@@ -165,7 +164,7 @@
if (p.messages != null) {
p.sent = Event.time();
txBuffer.add (p);
- node.startTimer(); // Start the retransmission timer
+ startTimer(); // Start the retransmission timer
window.bytesSent (p.size);
}
return true;
@@ -241,8 +240,7 @@
else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
if (LOG) log ("maximum sequence number " + txMaxSeq);
// Send as many packets as possible
- if (timerRunning) while (send (-1));
- else checkDeadlines();
+ while (send (-1));
}
// When a local RejectedOverload is received, back off unless backed off
@@ -305,12 +303,19 @@
return tokensIn;
}
- // Check retx timeouts, return true if there are packets in flight
- public boolean checkTimeouts()
+ // Event callback: wake up, send packets, go back to sleep
+ private void timer()
{
- if (LOG) log (txBuffer.size() + " packets in flight");
- if (txBuffer.isEmpty()) return false;
-
+ // Send as many packets as possible
+ while (send (-1));
+ // Stop the timer if there's nothing to wait for
+ if (searchQueue.size + transferQueue.size == 0
+ && txBuffer.isEmpty()) {
+ if (LOG) log ("stopping timer");
+ timerRunning = false;
+ return;
+ }
+ // Check the retransmission timeouts
double now = Event.time();
for (Packet p : txBuffer) {
if (now - p.sent > RTO * rtt) {
@@ -321,57 +326,10 @@
window.timeout (now);
}
}
- return true;
- }
-
- // Event callback: wake up, send packets, go back to sleep
- private void checkDeadlines()
- {
- // Send as many packets as possible
- while (send (-1));
- // Find the next coalescing deadline - ignore message deadlines
- // if there isn't room in the congestion window to send them
- double dl = Double.POSITIVE_INFINITY;
- int win = window.available() - Packet.HEADER_SIZE;
- if (searchQueue.headSize() <= win)
- dl = Math.min (dl, searchQueue.deadline());
- if (transferQueue.headSize() <= win)
- dl = Math.min (dl, transferQueue.deadline());
- // If there's no deadline, stop the timer
- if (dl == Double.POSITIVE_INFINITY) {
- if (timerRunning) {
- if (LOG) log ("stopping coalescing timer");
- timerRunning = false;
- }
- return;
- }
// Schedule the next check
- double sleep = dl - Event.time();
- if (shouldPoll()) sleep = Math.max (sleep, node.bandwidth.poll);
- else sleep = Math.max (sleep, MIN_SLEEP);
- timerRunning = true;
- if (LOG) log ("sleeping for " + sleep + " seconds");
- Event.schedule (this, sleep, CHECK_DEADLINES, null);
+ Event.schedule (this, TICK, TIMER, null);
}
- // Are we waiting for the bandwidth limiter?
- private boolean shouldPoll()
- {
- double bw = node.bandwidth.available();
- double win = window.available();
- double now = Event.time();
- // Is there an overdue search that's waiting for bandwidth?
- if (searchQueue.headSize() > bw
- && searchQueue.headSize() <= win
- && searchQueue.deadline() <= now) return true;
- // Is there an overdue transfer that's waiting for bandwidth?
- if (transferQueue.headSize() > bw
- && transferQueue.headSize() <= win
- && transferQueue.deadline() <= now) return true;
- // We're waiting for something other than bandwidth
- return false;
- }
-
public void log (String message)
{
Event.log (node.net.address + ":" + address + " " + message);
@@ -385,8 +343,8 @@
// EventTarget interface
public void handleEvent (int type, Object data)
{
- if (type == CHECK_DEADLINES) checkDeadlines();
+ if (type == TIMER) timer();
}
- private final static int CHECK_DEADLINES = 1;
+ private final static int TIMER = 1;
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java 2007-02-23
18:11:16 UTC (rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java 2007-02-23
20:30:06 UTC (rev 11903)
@@ -2,17 +2,13 @@
class TokenBucket
{
- public final double rate, size, poll;
+ public final double rate, size;
private double tokens, lastUpdated;
public TokenBucket (double rate, double size)
{
this.rate = rate; // Bandwidth limit in bytes per second
this.size = size; // Size of maximum burst in bytes
- double poll = Packet.MAX_SIZE / rate;
- if (poll < Peer.MIN_SLEEP) poll = Peer.MIN_SLEEP;
- if (poll > Peer.MAX_DELAY) poll = Peer.MAX_DELAY;
- this.poll = poll; // Polling interval in seconds
tokens = size;
lastUpdated = 0.0; // Time
}
Deleted: trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java 2007-02-23
18:11:16 UTC (rev 11902)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/Ack.java 2007-02-23
20:30:06 UTC (rev 11903)
@@ -1,15 +0,0 @@
-package sim.messages;
-
-public class Ack extends Message
-{
- public Ack (int seq, double deadline)
- {
- id = seq; // Space-saving hack
- this.deadline = deadline;
- }
-
- public int size()
- {
- return ACK_SIZE;
- }
-}