Author: mrogers
Date: 2006-08-03 17:23:52 +0000 (Thu, 03 Aug 2006)
New Revision: 9866
Added:
trunk/apps/load-balancing-sims/phase5/LruCache.java
Modified:
trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
trunk/apps/load-balancing-sims/phase5/Node.java
trunk/apps/load-balancing-sims/phase5/Peer.java
trunk/apps/load-balancing-sims/phase5/Request.java
trunk/apps/load-balancing-sims/phase5/RequestState.java
trunk/apps/load-balancing-sims/phase5/Response.java
Log:
LRU cache, timeouts return to slow start, one retransmission timer per node
Added: trunk/apps/load-balancing-sims/phase5/LruCache.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/LruCache.java 2006-08-03 16:44:56 UTC
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/LruCache.java 2006-08-03 17:23:52 UTC
(rev 9866)
@@ -0,0 +1,46 @@
+// Limited-capacity LRU cache
+
+import java.util.LinkedHashSet;
+
+class LruCache<Key>
+{
+ public int capacity;
+ public LinkedHashSet<Key> set;
+
+ public LruCache (int capacity)
+ {
+ this.capacity = capacity;
+ set = new LinkedHashSet<Key> (capacity);
+ }
+
+ public boolean get (Key key)
+ {
+ log ("searching cache for key " + key);
+ if (set.remove (key)) {
+ set.add (key); // Move the key to the fresh end
+ return true;
+ }
+ return false;
+ }
+
+ public void put (Key key)
+ {
+ if (set.remove (key))
+ log ("key " + key + " already in cache");
+ else {
+ log ("adding key " + key + " to cache");
+ if (set.size() == capacity) {
+ // Discard the oldest element
+ Key oldest = set.iterator().next();
+ log ("discarding key " + oldest);
+ set.remove (oldest);
+ }
+ }
+ set.add (key); // Add or move the key to the fresh end
+ }
+
+ private void log (String message)
+ {
+ Event.log (message);
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-08-03
16:44:56 UTC (rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-08-03
17:23:52 UTC (rev 9866)
@@ -1,5 +1,4 @@
import java.util.LinkedList;
-import java.util.NoSuchElementException;
class NetworkInterface implements EventTarget
{
@@ -72,13 +71,10 @@
{
log ("finished receiving " + p);
node.handlePacket (p);
+ rxQueueSize -= p.size;
+ rxQueue.remove (p);
// If there's another packet waiting, start to receive it
- try {
- rxQueueSize -= p.size;
- rxQueue.remove (p);
- rxStart (rxQueue.getFirst());
- }
- catch (NoSuchElementException nse) {}
+ if (!rxQueue.isEmpty()) rxStart (rxQueue.peek());
}
// Start transmitting a packet
@@ -94,13 +90,10 @@
{
log ("finished transmitting " + p);
Network.deliver (p);
+ txQueueSize -= p.size;
+ txQueue.remove (p);
// If there's another packet waiting, start to transmit it
- try {
- txQueueSize -= p.size;
- txQueue.remove (p);
- txStart (txQueue.getFirst());
- }
- catch (NoSuchElementException nse) {}
+ if (!txQueue.isEmpty()) txStart (txQueue.peek());
}
private void log (String message)
Modified: trunk/apps/load-balancing-sims/phase5/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Node.java 2006-08-03 16:44:56 UTC
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/Node.java 2006-08-03 17:23:52 UTC
(rev 9866)
@@ -3,13 +3,17 @@
class Node implements EventTarget
{
+ public final static double RETX_TIMER = 0.1; // Coarse-grained timer
+ public final static int STORE_SIZE = 10; // Max number of keys in store
+
public double location; // Routing location
public NetworkInterface net;
private HashMap<Integer,Peer> peers; // Look up a peer by its address
private int requestsGenerated = 0;
private HashSet<Integer> recentlySeenRequests; // Request IDs
private HashMap<Integer,RequestState> outstandingRequests;
- public HashSet<Double> cache; // Datastore containing keys
+ public LruCache<Integer> cache; // Datastore containing keys
+ private boolean timerRunning = false; // Is the retx timer running?
public Node (double txSpeed, double rxSpeed)
{
@@ -18,7 +22,7 @@
peers = new HashMap<Integer,Peer>();
recentlySeenRequests = new HashSet<Integer>();
outstandingRequests = new HashMap<Integer,RequestState>();
- cache = new HashSet<Double>();
+ cache = new LruCache<Integer> (STORE_SIZE);
}
public void connect (Node n, double latency)
@@ -40,6 +44,27 @@
else return Math.min (b - a, a - b + 1.0);
}
+ // Convert an integer key to a routing location
+ public static double keyToLocation (int key)
+ {
+ return key / (double) Integer.MAX_VALUE;
+ }
+
+ // Convert a routing location to an integer key
+ public static int locationToKey (double location)
+ {
+ return (int) (location * Integer.MAX_VALUE);
+ }
+
+ // Called by Peer
+ public void startTimer()
+ {
+ if (timerRunning) return;
+ log ("starting retransmission timer");
+ Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
+ timerRunning = true;
+ }
+
// Called by NetworkInterface
public void handlePacket (Packet packet)
{
@@ -71,7 +96,7 @@
if (rs != null) rs.nexts.remove (prev);
return;
}
- if (cache.contains (r.key)) {
+ if (cache.get (r.key)) {
log ("key " + r.key + " found in cache");
if (prev == null)
log (r + " succeeded locally");
@@ -89,7 +114,7 @@
log ("unexpected " + r);
return;
}
- cache.add (r.key);
+ cache.put (r.key);
if (rs.prev == null) log (rs + " succeeded");
else {
log ("forwarding " + r);
@@ -133,19 +158,37 @@
{
if (requestsGenerated++ > 1000) return;
// Send a request to a random location
- Request r = new Request (0.1);
+ Request r = new Request (123456);
log ("generating request " + r.id);
handleRequest (r, null);
// Schedule the next request
- // Event.schedule (this, Math.random(), GENERATE_REQUEST, null);
+ Event.schedule (this, 0.01, GENERATE_REQUEST, null);
}
+ // Event callback
+ private void checkTimeouts()
+ {
+ boolean stopTimer = true;
+ for (Peer p : peers.values())
+ if (p.checkTimeouts()) stopTimer = false;
+ if (stopTimer) {
+ log ("stopping retransmission timer");
+ timerRunning = false;
+ }
+ else {
+ Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null);
+ timerRunning = true;
+ }
+ }
+
// EventTarget interface
public void handleEvent (int type, Object data)
{
if (type == GENERATE_REQUEST) generateRequest();
+ else if (type == CHECK_TIMEOUTS) checkTimeouts();
}
// Each EventTarget class has its own event codes
public final static int GENERATE_REQUEST = 1;
+ public final static int CHECK_TIMEOUTS = 2;
}
Modified: trunk/apps/load-balancing-sims/phase5/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Peer.java 2006-08-03 16:44:56 UTC
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/Peer.java 2006-08-03 17:23:52 UTC
(rev 9866)
@@ -2,7 +2,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
-class Peer implements EventTarget
+class Peer
{
private Node node; // The local node
public int address; // The remote node's address
@@ -10,7 +10,6 @@
private double latency; // The latency of the connection in seconds
// Retransmission parameters
- public final static double TIMER = 0.5; // Coarse-grained timer, seconds
public final static double RTO = 4.0; // Retransmission timeout in RTTs
public final static double FRTO = 1.5; // Fast retx timeout in RTTs
public final static double RTT_DECAY = 0.9; // Exp moving average
@@ -29,7 +28,7 @@
private boolean slowStart = true; // Are we in the slow start phase?
private double rtt = 3.0; // Estimated round-trip time in seconds
private double lastTransmission = 0.0; // Clock time
- private boolean timerRunning = false; // Is the retx timer running?
+ private double lastLeftSlowStart = 0.0; // Clock time
private int inflight = 0; // Bytes sent but not acked
private int txSeq = 0; // Sequence number of next outgoing packet
private LinkedList<DataPacket> txBuffer; // Retransmission buffer
@@ -126,12 +125,8 @@
inflight += p.size;
log (inflight + " bytes in flight");
txBuffer.add (p);
- // Start the coarse-grained retransmission timer if necessary
- if (!timerRunning) {
- log ("starting retransmission timer");
- Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
- timerRunning = true;
- }
+ // Start the node's retransmission timer if necessary
+ node.startTimer();
return true;
}
@@ -248,6 +243,7 @@
if (slowStart) {
log ("leaving slow start");
slowStart = false;
+ lastLeftSlowStart = now;
}
}
@@ -263,15 +259,13 @@
Event.log (node.net.address + ":" + address + " " + message);
}
- // Event callback
- private void checkTimeouts()
+ // Called by Node
+ public boolean checkTimeouts()
{
log ("checking timeouts");
- // If there are no packets in flight, stop the timer
if (txBuffer.isEmpty()) {
- log ("stopping retransmission timer");
- timerRunning = false;
- return;
+ log ("no packets in flight");
+ return false;
}
double now = Event.time();
for (DataPacket p : txBuffer) {
@@ -281,20 +275,19 @@
log ("retransmitting packet " + p.seq);
log (inflight + " bytes in flight");
node.net.send (p, address, latency);
- // Note: TCP would return to slow start
- decreaseCongestionWindow (now);
+ // Return to slow start
+ if (!slowStart &&
+ now - lastLeftSlowStart > RTO * rtt) {
+ log ("returning to slow start");
+ cwind = MIN_CWIND;
+ slowStart = true;
+ }
+ else {
+ log ("not returning to slow start");
+ decreaseCongestionWindow (now);
+ }
}
}
- // Reset the timer
- Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
+ return true;
}
-
- // EventTarget interface
- public void handleEvent (int type, Object data)
- {
- if (type == CHECK_TIMEOUTS) checkTimeouts();
- }
-
- // Each EventTarget class has its own event codes
- private final static int CHECK_TIMEOUTS = 1;
}
Modified: trunk/apps/load-balancing-sims/phase5/Request.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Request.java 2006-08-03 16:44:56 UTC
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/Request.java 2006-08-03 17:23:52 UTC
(rev 9866)
@@ -3,10 +3,10 @@
private static int nextId = 0;
public final int id; // The unique ID of the request
- public final double key; // The requested key (as a routing location)
+ public final int key; // The requested key
// Start a new request
- public Request (double key)
+ public Request (int key)
{
id = nextId++;
this.key = key;
@@ -14,7 +14,7 @@
}
// Forward a request
- public Request (int id, double key)
+ public Request (int id, int key)
{
this.id = id;
this.key = key;
Modified: trunk/apps/load-balancing-sims/phase5/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/RequestState.java 2006-08-03
16:44:56 UTC (rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/RequestState.java 2006-08-03
17:23:52 UTC (rev 9866)
@@ -6,7 +6,7 @@
class RequestState
{
public final int id; // The unique ID of the request
- public final double key; // The requested key (as a routing location)
+ public final int key; // The requested key
public final Peer prev; // The previous hop of the request
public final HashSet<Peer> nexts; // Possible next hops
@@ -22,10 +22,11 @@
// Returns the closest peer to the requested key
public Peer closestPeer()
{
+ double keyLoc = Node.keyToLocation (key);
double bestDist = Double.POSITIVE_INFINITY;
Peer bestPeer = null;
for (Peer peer : nexts) {
- double dist = Node.distance (key, peer.location);
+ double dist = Node.distance (keyLoc, peer.location);
if (dist < bestDist) {
bestDist = dist;
bestPeer = peer;
Modified: trunk/apps/load-balancing-sims/phase5/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Response.java 2006-08-03 16:44:56 UTC
(rev 9865)
+++ trunk/apps/load-balancing-sims/phase5/Response.java 2006-08-03 17:23:52 UTC
(rev 9866)
@@ -1,9 +1,9 @@
class Response extends Message
{
public final int id; // The unique ID of the request
- public final double key; // The requested key (as a routing location)
+ public final int key; // The requested key
- public Response (int id, double key)
+ public Response (int id, int key)
{
this.id = id;
this.key = key;