Author: mrogers
Date: 2006-08-06 12:33:47 +0000 (Sun, 06 Aug 2006)
New Revision: 9938
Modified:
trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
Log:
Sleep until the next coalescing deadline or retx timer
Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Node.java 2006-08-06
12:22:50 UTC (rev 9937)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Node.java 2006-08-06
12:33:47 UTC (rev 9938)
@@ -3,8 +3,8 @@
class Node implements EventTarget
{
- public final static double RETX_TIMER = 0.1; // Coarse-grained timer
public final static int STORE_SIZE = 10; // Max number of keys in store
+ public final static double MIN_SLEEP = 0.01; // Seconds
public double location; // Routing location
public NetworkInterface net;
@@ -13,7 +13,7 @@
private HashSet<Integer> recentlySeenRequests; // Request IDs
private HashMap<Integer,RequestState> outstandingRequests;
public LruCache<Integer> cache; // Datastore containing keys
- private boolean timerRunning = false; // Is the retx timer running?
+ private boolean timerRunning = false; // Is the timer running?
public Node (double txSpeed, double rxSpeed)
{
@@ -60,8 +60,8 @@
public void startTimer()
{
if (timerRunning) return;
- log ("starting retransmission timer");
- Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
+ log ("starting retransmission/coalescing timer");
+ Event.schedule (this, Peer.COALESCE, CHECK_TIMEOUTS, null);
timerRunning = true;
}
@@ -162,21 +162,24 @@
log ("generating request " + r.id);
handleRequest (r, null);
// Schedule the next request
- Event.schedule (this, 0.05, GENERATE_REQUEST, null);
+ Event.schedule (this, 0.049, GENERATE_REQUEST, null);
}
// Event callback
private void checkTimeouts()
{
- boolean stopTimer = true;
+ double deadline = Double.POSITIVE_INFINITY;
for (Peer p : peers.values())
- if (p.checkTimeouts()) stopTimer = false;
- if (stopTimer) {
- log ("stopping retransmission timer");
+ deadline = Math.min (deadline, p.checkTimeouts());
+ if (deadline == Double.POSITIVE_INFINITY) {
+ log ("stopping retransmission/coalescing timer");
timerRunning = false;
}
else {
- Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
+ double sleep = deadline - Event.time();
+ if (sleep < MIN_SLEEP) sleep = MIN_SLEEP;
+ log ("sleeping for " + sleep + " seconds");
+ Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
timerRunning = true;
}
}
Modified: trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java 2006-08-06
12:22:50 UTC (rev 9937)
+++ trunk/apps/load-balancing-sims/phase5-coalescing/Peer.java 2006-08-06
12:33:47 UTC (rev 9938)
@@ -21,9 +21,8 @@
public final static double BETA = 0.9375; // AIMD decrease parameter
public final static double GAMMA = 3.0; // Slow start divisor
- // Coalescing parameters
- public final static double COALESCE_DATA = 0.1; // Max delay in seconds
- public final static double COALESCE_ACK = 0.1;
+ // Coalescing
+ public final static double COALESCE = 0.1; // Max delay in seconds
// Out-of-order delivery with eventual detection of missing packets
public final static int SEQ_RANGE = 1000; // Packets
@@ -66,9 +65,11 @@
// Warning: until token-passing is implemented the length of
// the transmission queue is unlimited
double now = Event.time();
- msgQueue.add (new Deadline<Message> (m, now + COALESCE_DATA));
+ msgQueue.add (new Deadline<Message> (m, now + COALESCE));
msgQueueSize += m.size;
log (msgQueue.size() + " messages in transmission queue");
+ // Start the node's timer if necessary
+ node.startTimer();
// Send as many packets as possible
while (send());
}
@@ -100,15 +101,8 @@
if (payload > msgQueueSize) payload = msgQueueSize;
if (payload > window) payload = window;
- // Work out when the first ack or message needs to be sent
- double deadline = Double.POSITIVE_INFINITY;
- Deadline<Integer> ack = ackQueue.peek();
- if (ack != null) deadline = ack.deadline;
- Deadline<Message> msg = msgQueue.peek();
- if (msg != null) deadline = Math.min (deadline, msg.deadline);
-
// Delay small packets for coalescing
- if (payload < Packet.SENSIBLE_PAYLOAD && now < deadline) {
+ if (payload < Packet.SENSIBLE_PAYLOAD && now < deadline()) {
log ("delaying transmission of " + payload + " bytes");
return false;
}
@@ -148,8 +142,6 @@
inflight += p.size; // Acks aren't congestion-controlled
log (inflight + " bytes in flight");
txBuffer.add (p);
- // Start the node's retransmission timer if necessary
- node.startTimer();
}
// Send the packet
@@ -161,9 +153,12 @@
private void sendAck (int seq)
{
double now = Event.time();
- ackQueue.add (new Deadline<Integer> (seq, now + COALESCE_ACK));
+ ackQueue.add (new Deadline<Integer> (seq, now + COALESCE));
ackQueueSize += Packet.ACK_SIZE;
- send();
+ // Start the node's timer if necessary
+ node.startTimer();
+ // Send as many packets as possible
+ while (send());
}
// Called by Node when a packet arrives
@@ -262,19 +257,31 @@
for (Message m : p.messages) node.handleMessage (m, this);
}
+ // Work out when the first ack or message needs to be sent
+ private double deadline()
+ {
+ double deadline = Double.POSITIVE_INFINITY;
+ Deadline<Integer> ack = ackQueue.peek();
+ if (ack != null) deadline = ack.deadline;
+ Deadline<Message> msg = msgQueue.peek();
+ if (msg != null) deadline = Math.min (deadline, msg.deadline);
+ return deadline;
+ }
+
private void log (String message)
{
Event.log (node.net.address + ":" + address + " " + message);
}
- // Called by Node
- public boolean checkTimeouts()
+ // Called by Node, returns the next coalescing or retx deadline
+ public double checkTimeouts()
{
log ("checking timeouts");
- send(); // Consider sending delayed packets
+ // Send as many packets as possible
+ while (send());
if (txBuffer.isEmpty()) {
log ("no packets in flight");
- return false;
+ return deadline();
}
double now = Event.time();
for (Packet p : txBuffer) {
@@ -297,6 +304,6 @@
}
}
}
- return true;
+ return Math.min (now + COALESCE, deadline());
}
}