Author: mrogers
Date: 2006-08-10 12:44:18 +0000 (Thu, 10 Aug 2006)
New Revision: 10022
Added:
trunk/apps/load-balancing-sims/phase5-single-ack-delay/
trunk/apps/load-balancing-sims/phase5-single-ack-delay/CongestionWindow.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Deadline.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Event.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/EventTarget.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/LruCache.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Message.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Network.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/NetworkInterface.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Node.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Packet.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Peer.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Request.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/RequestState.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Response.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/RouteNotFound.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/Sim.java
trunk/apps/load-balancing-sims/phase5-single-ack-delay/TokenBucket.java
Log:
Recording one ack delay per packet seems to be adequate
Added:
trunk/apps/load-balancing-sims/phase5-single-ack-delay/CongestionWindow.java
===================================================================
---
trunk/apps/load-balancing-sims/phase5-single-ack-delay/CongestionWindow.java
2006-08-10 12:31:23 UTC (rev 10021)
+++
trunk/apps/load-balancing-sims/phase5-single-ack-delay/CongestionWindow.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,74 @@
+// AIMD congestion control
+
+class CongestionWindow
+{
+ public final static int MIN_CWIND = 3000; // Minimum congestion window
+ public final static int MAX_CWIND = 1000000; // Max congestion window
+ public final static double ALPHA = 0.3125; // AIMD increase parameter
+ public final static double BETA = 0.875; // AIMD decrease parameter
+ public final static double GAMMA = 3.0; // Slow start divisor
+
+ private double cwind = MIN_CWIND; // Size of window in bytes
+ private int inflight = 0; // Bytes sent but not acked
+ private boolean slowStart = true; // Are we in the slow start phase?
+ private Peer peer; // The owner
+
+ public CongestionWindow (Peer peer)
+ {
+ this.peer = peer;
+ }
+
+ public void reset()
+ {
+ peer.log ("congestion window decreased to " + MIN_CWIND);
+ cwind = MIN_CWIND;
+ peer.log ("returning to slow start");
+ slowStart = true;
+ }
+
+ public int available()
+ {
+ return (int) cwind - inflight;
+ }
+
+ // Put bytes in flight
+ public void bytesSent (int bytes)
+ {
+ inflight += bytes;
+ peer.log (inflight + " bytes in flight");
+ }
+
+ // Take bytes out of flight
+ public void bytesAcked (int bytes)
+ {
+ inflight -= bytes;
+ peer.log (inflight + " bytes in flight");
+ // Increase the window
+ if (slowStart) cwind += bytes / GAMMA;
+ else cwind += bytes * bytes * ALPHA / cwind;
+ if (cwind > MAX_CWIND) cwind = MAX_CWIND;
+ peer.log ("congestion window increased to " + cwind);
+ }
+
+ // Decrease the window when a packet is fast retransmitted
+ public void fastRetransmission (double now)
+ {
+ peer.log (inflight + " bytes in flight");
+ cwind *= BETA;
+ if (cwind < MIN_CWIND) cwind = MIN_CWIND;
+ peer.log ("congestion window decreased to " + cwind);
+ // The slow start phase ends when the first packet is lost
+ if (slowStart) {
+ peer.log ("leaving slow start");
+ slowStart = false;
+ }
+ }
+
+ // Decrease the window when a packet is retransmitted due to a timeout
+ public void timeout (double now)
+ {
+ peer.log (inflight + " bytes in flight");
+ if (slowStart) fastRetransmission (now); // Leave slow start
+ else reset(); // Reset the window and return to slow start
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Deadline.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Deadline.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Deadline.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,13 @@
+// A queued item and the time at which it must be sent
+
+class Deadline<Item>
+{
+ public final Item item;
+ public final double deadline;
+
+ public Deadline (Item item, double deadline)
+ {
+ this.item = item;
+ this.deadline = deadline;
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Event.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Event.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Event.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,98 @@
+import java.util.TreeSet; // Gotta love the collections framework...
+
+class Event implements Comparable
+{
+ // Static variables and methods for the event queue
+
+ private static TreeSet<Event> queue = new TreeSet<Event>();
+ private static double clockTime = 0.0;
+ private static int nextId = 0;
+ public static double duration = Double.POSITIVE_INFINITY;
+
+ public static void reset()
+ {
+ queue.clear();
+ clockTime = 0.0;
+ nextId = 0;
+ duration = Double.POSITIVE_INFINITY;
+ }
+
+ public static void schedule (EventTarget target, double time,
+ int type, Object data)
+ {
+ queue.add (new Event (target, time + clockTime, type, data));
+ }
+
+ public static boolean nextEvent()
+ {
+ try {
+ Event e = queue.first();
+ queue.remove (e);
+ // Update the clock
+ clockTime = e.time;
+ // Quit if the simulation's alloted time has run out
+ if (clockTime > duration) return false;
+ // Pass the packet to the target's callback method
+ e.target.handleEvent (e.type, e.data);
+ return true;
+ }
+ catch (java.util.NoSuchElementException x) {
+ // No more events to dispatch
+ return false;
+ }
+ }
+
+ public static double time()
+ {
+ return clockTime;
+ }
+
+ public static void log (String message)
+ {
+ System.out.print (clockTime + " " + message + "\n");
+ }
+
+ // Run until the duration expires or there are no more events to process
+ public static void run()
+ {
+ while (nextEvent()) {}
+ }
+
+ // Non-static variables and methods for individual events
+
+ private EventTarget target;
+ private double time;
+ private int id;
+ private int type;
+ private Object data;
+
+ public Event (EventTarget target, double time, int type, Object data)
+ {
+ this.target = target;
+ this.time = time;
+ this.type = type;
+ this.data = data;
+ id = nextId++;
+ }
+
+ // Must be consistent with compareTo()
+ public boolean equals (Object o)
+ {
+ Event e = (Event) o;
+ if (e.time == time && e.id == id) return true;
+ return false;
+ }
+
+ // Must be consistent with equals()
+ public int compareTo (Object o)
+ {
+ Event e = (Event) o;
+ // Sort events by time (order of occurrence)
+ if (e.time > time) return -1;
+ if (e.time < time) return 1;
+ // Break ties by ID (order of scheduling)
+ if (e.id > id) return -1;
+ if (e.id < id) return 1;
+ return 0;
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/EventTarget.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/EventTarget.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/EventTarget.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,4 @@
+interface EventTarget
+{
+ public void handleEvent (int type, Object data);
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/LruCache.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/LruCache.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/LruCache.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,46 @@
+// Limited-capacity LRU cache
+
+import java.util.LinkedHashSet;
+
+class LruCache<Key>
+{
+ public int capacity;
+ public LinkedHashSet<Key> set;
+
+ public LruCache (int capacity)
+ {
+ this.capacity = capacity;
+ set = new LinkedHashSet<Key> (capacity);
+ }
+
+ public boolean get (Key key)
+ {
+ log ("searching cache for key " + key);
+ if (set.remove (key)) {
+ set.add (key); // Move the key to the fresh end
+ return true;
+ }
+ return false;
+ }
+
+ public void put (Key key)
+ {
+ if (set.remove (key))
+ log ("key " + key + " already in cache");
+ else {
+ log ("adding key " + key + " to cache");
+ if (set.size() == capacity) {
+ // Discard the oldest element
+ Key oldest = set.iterator().next();
+ log ("discarding key " + oldest);
+ set.remove (oldest);
+ }
+ }
+ set.add (key); // Add or move the key to the fresh end
+ }
+
+ private void log (String message)
+ {
+ Event.log (message);
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Message.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Message.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,11 @@
+// A high-level message (as opposed to a low-level packet)
+
+class Message
+{
+ public final static int HEADER_SIZE = 30; // Sequence number, MAC, etc
+ public final static int ID_SIZE = 16; // Size of unique request ID
+ public final static int KEY_SIZE = 32; // Size of routing key
+ public final static int DATA_SIZE = 1024; // Size of data block
+
+ public int size; // Size in bytes
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Network.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Network.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Network.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,34 @@
+import java.util.HashMap;
+
+class Network
+{
+ private static HashMap<Integer,NetworkInterface> interfaces
+ = new HashMap<Integer,NetworkInterface>();
+ private static int nextAddress = 0;
+ public static boolean reorder = false; // Can packets be reordered?
+ public static double lossRate = 0.0; // Random packet loss
+ // FIXME: random packet duplication
+
+ // Deliver a packet to an address
+ public static void deliver (Packet p)
+ {
+ NetworkInterface ni = interfaces.get (p.dest);
+ if (ni == null) return; // Node doesn't exist or is offline
+ // If the network allows reordering, randomise the latency a bit
+ if (reorder) p.latency *= (0.95 + Math.random() * 0.1);
+ if (Math.random() < lossRate) {
+ Event.log (p + " lost by network");
+ return;
+ }
+ // Schedule the arrival of the packet at the destination
+ Event.schedule (ni, p.latency, NetworkInterface.RX_QUEUE, p);
+ }
+
+ // Attach an interface to the network - returns the address
+ public static int register (NetworkInterface ni)
+ {
+ int address = nextAddress++;
+ interfaces.put (address, ni);
+ return address;
+ }
+}
Added:
trunk/apps/load-balancing-sims/phase5-single-ack-delay/NetworkInterface.java
===================================================================
---
trunk/apps/load-balancing-sims/phase5-single-ack-delay/NetworkInterface.java
2006-08-10 12:31:23 UTC (rev 10021)
+++
trunk/apps/load-balancing-sims/phase5-single-ack-delay/NetworkInterface.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,126 @@
+import java.util.LinkedList;
+
+class NetworkInterface implements EventTarget
+{
+ public int address; // Represents an IP address and port
+ private Node node; // The owner of this network interface
+ private double txSpeed, rxSpeed; // Bytes per second
+
+ private LinkedList<Packet> txQueue; // Queue of outgoing packets
+ private LinkedList<Packet> rxQueue; // Queue of incoming packets
+ private int txQueueSize, rxQueueSize; // Limited-size drop-tail queues
+ private int txQueueMaxSize, rxQueueMaxSize; // Bytes
+
+ public NetworkInterface (Node node, double txSpeed, double rxSpeed)
+ {
+ this.node = node;
+ this.txSpeed = txSpeed;
+ this.rxSpeed = rxSpeed;
+ txQueue = new LinkedList<Packet>();
+ rxQueue = new LinkedList<Packet>();
+ txQueueSize = rxQueueSize = 0; // Bytes currently queued
+ txQueueMaxSize = 10000;
+ rxQueueMaxSize = 20000;
+ // Attach the interface to the network
+ address = Network.register (this);
+ }
+
+ // Called by Peer
+ public void send (Packet p, int dest, double latency)
+ {
+ p.src = address;
+ p.dest = dest;
+ p.latency = latency;
+ if (txQueueSize + p.size > txQueueMaxSize) {
+ log ("no room in txQueue, " + p + " lost");
+ return;
+ }
+ txQueue.add (p);
+ txQueueSize += p.size;
+ log (txQueueSize + " bytes in txQueue");
+ // If there are no other packets in the queue, start to transmit
+ if (txQueue.size() == 1) txStart (p);
+ }
+
+ // Event callbacks
+
+ // Add a packet to the rx queue
+ private void rxQueueAdd (Packet p)
+ {
+ if (rxQueueSize + p.size > rxQueueMaxSize) {
+ log ("no room in rxQueue, " + p + " lost");
+ return;
+ }
+ rxQueue.add (p);
+ rxQueueSize += p.size;
+ log (rxQueueSize + " bytes in rxQueue");
+ // If there are no other packets in the queue, start to receive
+ if (rxQueue.size() == 1) rxStart (p);
+ }
+
+ // Start receiving a packet
+ private void rxStart (Packet p)
+ {
+ log ("starting to receive " + p);
+ // Delay depends on rx speed
+ Event.schedule (this, p.size / rxSpeed, RX_END, p);
+ }
+
+ // Finish receiving a packet, pass it to the node
+ private void rxEnd (Packet p)
+ {
+ log ("finished receiving " + p);
+ node.handlePacket (p);
+ rxQueueSize -= p.size;
+ rxQueue.remove (p);
+ // If there's another packet waiting, start to receive it
+ if (!rxQueue.isEmpty()) rxStart (rxQueue.peek());
+ }
+
+ // Start transmitting a packet
+ private void txStart (Packet p)
+ {
+ log ("starting to transmit " + p);
+ // Delay depends on tx speed
+ Event.schedule (this, p.size / txSpeed, TX_END, p);
+ }
+
+ // Finish transmitting a packet
+ private void txEnd (Packet p)
+ {
+ log ("finished transmitting " + p);
+ Network.deliver (p);
+ txQueueSize -= p.size;
+ txQueue.remove (p);
+ // If there's another packet waiting, start to transmit it
+ if (!txQueue.isEmpty()) txStart (txQueue.peek());
+ }
+
+ private void log (String message)
+ {
+ // Event.log (address + " " + message);
+ }
+
+ // EventTarget interface
+ public void handleEvent (int type, Object data)
+ {
+ switch (type) {
+ case RX_QUEUE:
+ rxQueueAdd ((Packet) data);
+ break;
+
+ case RX_END:
+ rxEnd ((Packet) data);
+ break;
+
+ case TX_END:
+ txEnd ((Packet) data);
+ break;
+ }
+ }
+
+ // Each EventTarget class has its own event codes
+ public final static int RX_QUEUE = 1;
+ private final static int RX_END = 2;
+ private final static int TX_END = 3;
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Node.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Node.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,207 @@
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+
+class Node implements EventTarget
+{
+ public final static int STORE_SIZE = 10; // Max number of keys in store
+ public final static double MIN_SLEEP = 0.01; // Seconds
+
+ // Token bucket bandwidth limiter
+ public final static int BUCKET_RATE = 10000; // Bytes per second
+ public final static int BUCKET_SIZE = 40000; // Burst size in bytes
+
+ public double location; // Routing location
+ public NetworkInterface net;
+ private HashMap<Integer,Peer> peers; // Look up a peer by its address
+ private int requestsGenerated = 0;
+ private HashSet<Integer> recentlySeenRequests; // Request IDs
+ private HashMap<Integer,RequestState> outstandingRequests;
+ public LruCache<Integer> cache; // Datastore containing keys
+ public TokenBucket bandwidth; // Bandwidth limiter
+ private boolean timerRunning = false; // Is the timer running?
+
+ public Node (double txSpeed, double rxSpeed)
+ {
+ location = Math.random();
+ net = new NetworkInterface (this, txSpeed, rxSpeed);
+ peers = new HashMap<Integer,Peer>();
+ recentlySeenRequests = new HashSet<Integer>();
+ outstandingRequests = new HashMap<Integer,RequestState>();
+ cache = new LruCache<Integer> (STORE_SIZE);
+ bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
+ }
+
+ public void connect (Node n, double latency)
+ {
+ Peer p = new Peer (this, n.net.address, n.location, latency);
+ peers.put (n.net.address, p);
+ }
+
+ public void connectBothWays (Node n, double latency)
+ {
+ connect (n, latency);
+ n.connect (this, latency);
+ }
+
+ // Calculate the circular distance between two locations
+ public static double distance (double a, double b)
+ {
+ if (a > b) return Math.min (a - b, b - a + 1.0);
+ else return Math.min (b - a, a - b + 1.0);
+ }
+
+ // Convert an integer key to a routing location
+ public static double keyToLocation (int key)
+ {
+ return key / (double) Integer.MAX_VALUE;
+ }
+
+ // Convert a routing location to an integer key
+ public static int locationToKey (double location)
+ {
+ return (int) (location * Integer.MAX_VALUE);
+ }
+
+ // Called by Peer
+ public void startTimer()
+ {
+ if (timerRunning) return;
+ log ("starting retransmission/coalescing timer");
+ Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
+ timerRunning = true;
+ }
+
+ // Called by NetworkInterface
+ public void handlePacket (Packet packet)
+ {
+ Peer peer = peers.get (packet.src);
+ if (peer == null) log ("unknown peer!");
+ else peer.handlePacket (packet);
+ }
+
+ // Called by Peer
+ public void handleMessage (Message m, Peer prev)
+ {
+ log ("received " + m);
+ // FIXME: ugly
+ if (m instanceof Request)
+ handleRequest ((Request) m, prev);
+ else if (m instanceof Response)
+ handleResponse ((Response) m);
+ else if (m instanceof RouteNotFound)
+ handleRouteNotFound ((RouteNotFound) m);
+ }
+
+ private void handleRequest (Request r, Peer prev)
+ {
+ if (!recentlySeenRequests.add (r.id)) {
+ log ("rejecting recently seen " + r);
+ prev.sendMessage (new RouteNotFound (r.id));
+ // Don't forward the request to prev, it's seen it
+ RequestState rs = outstandingRequests.get (r.id);
+ if (rs != null) rs.nexts.remove (prev);
+ return;
+ }
+ if (cache.get (r.key)) {
+ log ("key " + r.key + " found in cache");
+ if (prev == null) log (r + " succeeded locally");
+ else prev.sendMessage (new Response (r.id, r.key));
+ return;
+ }
+ log ("key " + r.key + " not found in cache");
+ forwardRequest (new RequestState (r, prev, peers.values()));
+ }
+
+ private void handleResponse (Response r)
+ {
+ RequestState rs = outstandingRequests.remove (r.id);
+ if (rs == null) {
+ log ("unexpected " + r);
+ return;
+ }
+ cache.put (r.key);
+ if (rs.prev == null) log (rs + " succeeded");
+ else {
+ log ("forwarding " + r);
+ rs.prev.sendMessage (r);
+ }
+ }
+
+ private void handleRouteNotFound (RouteNotFound r)
+ {
+ RequestState rs = outstandingRequests.remove (r.id);
+ if (rs == null) {
+ log ("unexpected route not found " + r.id);
+ return;
+ }
+ forwardRequest (rs);
+ }
+
+ private void forwardRequest (RequestState rs)
+ {
+ Peer next = rs.closestPeer();
+ if (next == null) {
+ log ("route not found for " + rs);
+ if (rs.prev == null) log (rs + " failed");
+ else rs.prev.sendMessage (new RouteNotFound (rs.id));
+ return;
+ }
+ log ("forwarding " + rs + " to " + next.address);
+ next.sendMessage (new Request (rs.id, rs.key));
+ rs.nexts.remove (next);
+ outstandingRequests.put (rs.id, rs);
+ }
+
+ private void log (String message)
+ {
+ Event.log (net.address + " " + message);
+ }
+
+ // Event callback
+ private void generateRequest()
+ {
+ if (requestsGenerated++ > 10000) return;
+ // Send a request to a random location
+ Request r = new Request (locationToKey (Math.random()));
+ log ("generating request " + r.id);
+ handleRequest (r, null);
+ // Schedule the next request
+ Event.schedule (this, 0.0123, GENERATE_REQUEST, null);
+ }
+
+ // Event callback
+ private void checkTimeouts()
+ {
+ // Check the peers in a random order each time
+ ArrayList<Peer> shuffled = new ArrayList<Peer> (peers.values());
+ Collections.shuffle (shuffled);
+
+ double deadline = Double.POSITIVE_INFINITY;
+ for (Peer p : shuffled)
+ deadline = Math.min (deadline, p.checkTimeouts());
+ if (deadline == Double.POSITIVE_INFINITY) {
+ log ("stopping retransmission/coalescing timer");
+ timerRunning = false;
+ }
+ else {
+ double sleep = deadline - Event.time(); // Can be < 0
+ if (sleep < MIN_SLEEP) sleep = MIN_SLEEP;
+ log ("sleeping for " + sleep + " seconds");
+ Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
+ timerRunning = true;
+ }
+ }
+
+ // EventTarget interface
+ public void handleEvent (int type, Object data)
+ {
+ if (type == GENERATE_REQUEST) generateRequest();
+ else if (type == CHECK_TIMEOUTS) checkTimeouts();
+ }
+
+ // Each EventTarget class has its own event codes
+ public final static int GENERATE_REQUEST = 1;
+ public final static int CHECK_TIMEOUTS = 2;
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Packet.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Packet.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,39 @@
+// A low-level packet (as opposed to a high-level message)
+
+import java.util.ArrayList;
+
+class Packet
+{
+ public final static int HEADER_SIZE = 82; // Including IP & UDP headers
+ public final static int ACK_SIZE = 4; // Size of an ack in bytes
+ public final static int MAX_SIZE = 1450; // MTU including headers
+ public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing
+
+ public int src, dest; // Network addresses
+ public int size = HEADER_SIZE; // Size in bytes, including headers
+ public int seq = -1; // Data sequence number (-1 for pure acks)
+ public double ackDelay; // How many seconds the first ack was delayed
+ public ArrayList<Integer> acks = null;
+ public ArrayList<Message> messages = null;
+
+ public double sent; // Time at which the packet was (re) transmitted
+ public double latency; // Link latency (stored here for convenience)
+
+ public void addAck (int ack, double delay)
+ {
+ if (acks == null) {
+ acks = new ArrayList<Integer>();
+ ackDelay = delay;
+ }
+ acks.add (ack);
+ size += ACK_SIZE;
+ }
+
+ // In real life the payload would be an array of bytes
+ public void addMessage (Message m)
+ {
+ if (messages == null) messages = new ArrayList<Message>();
+ messages.add (m);
+ size += m.size;
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Peer.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Peer.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,289 @@
+import java.util.LinkedList;
+import java.util.Iterator;
+import java.util.HashSet;
+
+class Peer
+{
+ private Node node; // The local node
+ public int address; // The remote node's address
+ public double location; // The remote node's routing location
+ private double latency; // The latency of the connection in seconds
+
+ // Retransmission parameters
+ public final static double RTO = 4.0; // Retransmission timeout in RTTs
+ public final static double FRTO = 1.5; // Fast retx timeout in RTTs
+ public final static double RTT_DECAY = 0.9; // Exp moving average
+
+ // Coalescing
+ public final static double MAX_DELAY = 0.1; // Coalescing delay, seconds
+
+ // Out-of-order delivery with eventual detection of missing packets
+ public final static int SEQ_RANGE = 1000;
+
+ // Sender state
+ private double rtt = 5.0; // Estimated round-trip time in seconds
+ private int txSeq = 0; // Sequence number of next outgoing data packet
+ private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
+ private LinkedList<Packet> txBuffer; // Retransmission buffer
+ private LinkedList<Deadline<Message>> msgQueue; // Outgoing messages
+ private int msgQueueSize = 0; // Size of message queue in bytes
+ private LinkedList<Deadline<Integer>> ackQueue; // Outgoing acks
+ private int ackQueueSize = 0; // Size of ack queue in bytes
+ private CongestionWindow window; // AIMD congestion window
+ private double lastTransmission = 0.0; // Clock time
+
+ // Receiver state
+ private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
+ private int rxSeq = 0; // Sequence number of next in-order incoming pkt
+
+ public Peer (Node node, int address, double location, double latency)
+ {
+ this.node = node;
+ this.address = address;
+ this.location = location;
+ this.latency = latency;
+ txBuffer = new LinkedList<Packet>();
+ msgQueue = new LinkedList<Deadline<Message>>();
+ ackQueue = new LinkedList<Deadline<Integer>>();
+ window = new CongestionWindow (this);
+ rxDupe = new HashSet<Integer>();
+ }
+
+ // Queue a message for transmission
+ public void sendMessage (Message m)
+ {
+ log (m + " added to message queue");
+ // Warning: until token-passing is implemented the length of
+ // the message queue is unlimited
+ double now = Event.time();
+ msgQueue.add (new Deadline<Message> (m, now + MAX_DELAY));
+ msgQueueSize += m.size;
+ log (msgQueue.size() + " messages in queue");
+ // Start the node's timer if necessary
+ node.startTimer();
+ // Send as many packets as possible
+ while (send());
+ }
+
+ // Try to send a packet, return true if a packet was sent
+ private boolean send()
+ {
+ if (ackQueueSize == 0 && msgQueueSize == 0) {
+ log ("no messages or acks to send");
+ return false;
+ }
+
+ // Return to slow start when the link is idle
+ double now = Event.time();
+ if (now - lastTransmission > RTO * rtt) window.reset();
+ lastTransmission = now;
+
+ // Work out how large a packet we can send
+ int headersAndAcks = Packet.HEADER_SIZE + ackQueueSize;
+ int payload = Packet.MAX_SIZE - headersAndAcks;
+ if (payload > msgQueueSize) payload = msgQueueSize;
+
+ int win = window.available() - headersAndAcks;
+ if (win <= 0) log ("no room in congestion window for messages");
+ if (payload > win) payload = win;
+
+ int bw = node.bandwidth.available() - headersAndAcks;
+ if (bw <= 0) log ("no bandwidth available for messages");
+ if (payload > bw) payload = bw;
+
+ // Delay small packets for coalescing
+ if (payload < Packet.SENSIBLE_PAYLOAD && now < deadline()) {
+ log ("delaying transmission of " + payload + " bytes");
+ return false;
+ }
+
+ Packet p = new Packet();
+
+ // Put all waiting acks in the packet
+ for (Deadline<Integer> a : ackQueue) {
+ double delay = now - (a.deadline - MAX_DELAY);
+ p.addAck (a.item, delay);
+ }
+ ackQueue.clear();
+ ackQueueSize = 0;
+
+ // Don't send sequence number n+SEQ_RANGE until sequence
+ // number n has been acked - this limits the number of
+ // sequence numbers the receiver must store for replay
+ // detection. We must still be allowed to send acks,
+ // otherwise the connection could deadlock.
+
+ if (txSeq > txMaxSeq)
+ log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
+ else {
+ // Put as many messages as possible in the packet
+ Iterator<Deadline<Message>> i = msgQueue.iterator();
+ while (i.hasNext()) {
+ Message m = i.next().item;
+ if (m.size > payload) break;
+ i.remove();
+ msgQueueSize -= m.size;
+ p.addMessage (m);
+ payload -= m.size;
+ }
+ }
+
+ // Don't send empty packets
+ if (p.acks == null && p.messages == null) return false;
+
+ // If the packet contains data, buffer it for retransmission
+ if (p.messages != null) {
+ p.seq = txSeq++;
+ p.sent = now;
+ txBuffer.add (p);
+ window.bytesSent (p.size);
+ }
+
+ // Send the packet
+ log ("sending packet " + p.seq + ", " + p.size + " bytes");
+ node.net.send (p, address, latency);
+ node.bandwidth.remove (p.size);
+ return true;
+ }
+
+ private void sendAck (int seq)
+ {
+ log ("ack " + seq + " added to ack queue");
+ double now = Event.time();
+ ackQueue.add (new Deadline<Integer> (seq, now + MAX_DELAY));
+ ackQueueSize += Packet.ACK_SIZE;
+ log (ackQueue.size() + " acks in queue");
+ // Start the node's timer if necessary
+ node.startTimer();
+ // Send as many packets as possible
+ while (send());
+ }
+
+ // Called by Node when a packet arrives
+ public void handlePacket (Packet p)
+ {
+ if (p.messages != null) handleData (p);
+ if (p.acks != null) {
+ updateRtt (p.acks.get (0), p.ackDelay);
+ for (int a : p.acks) handleAck (a);
+ }
+ }
+
+ private void handleData (Packet p)
+ {
+ log ("received packet " + p.seq + ", " + p.size + " bytes");
+ if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
+ log ("duplicate packet");
+ sendAck (p.seq); // Original ack may have been lost
+ }
+ else if (p.seq == rxSeq) {
+ log ("packet in order");
+ // Find the sequence number of the next missing packet
+ int was = rxSeq;
+ while (rxDupe.remove (++rxSeq));
+ log ("rxSeq was " + was + ", now " + rxSeq);
+ // Deliver the packet
+ unpack (p);
+ sendAck (p.seq);
+ }
+ else if (p.seq < rxSeq + SEQ_RANGE) {
+ log ("packet out of order - expected " + rxSeq);
+ if (rxDupe.add (p.seq)) unpack (p);
+ else log ("duplicate packet");
+ sendAck (p.seq); // Original ack may have been lost
+ }
+ // This indicates a misbehaving sender - discard the packet
+ else log ("warning: received " + p.seq + " before " + rxSeq);
+ }
+
+ private void updateRtt (int ack, double delay)
+ {
+ for (Packet p : txBuffer) {
+ if (p.seq == ack) {
+ double r = Event.time() - p.sent - delay;
+ rtt = rtt * RTT_DECAY + r * (1.0 - RTT_DECAY);
+ log ("ack delay " + delay);
+ log ("round-trip time " + r);
+ log ("average round-trip time " + rtt);
+ return;
+ }
+ }
+ }
+
+ private void handleAck (int ack)
+ {
+ log ("received ack " + ack);
+ double now = Event.time();
+ Iterator<Packet> i = txBuffer.iterator();
+ while (i.hasNext()) {
+ Packet p = i.next();
+ double age = now - p.sent;
+ // Explicit ack
+ if (p.seq == ack) {
+ log ("packet " + p.seq + " acknowledged");
+ i.remove();
+ window.bytesAcked (p.size);
+ break;
+ }
+ // Fast retransmission
+ if (p.seq < ack && age > FRTO * rtt) {
+ p.sent = now;
+ log ("fast retransmitting packet " + p.seq);
+ node.net.send (p, address, latency);
+ window.fastRetransmission (now);
+ }
+ }
+ // Recalculate the maximum sequence number
+ if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1;
+ else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
+ log ("maximum sequence number " + txMaxSeq);
+ // Send as many packets as possible
+ while (send());
+ }
+
+ // Remove messages from a packet and deliver them to the node
+ private void unpack (Packet p)
+ {
+ if (p.messages == null) return;
+ for (Message m : p.messages) node.handleMessage (m, this);
+ }
+
+ // Called by Node, returns the next coalescing or retx deadline
+ public double checkTimeouts()
+ {
+ log ("checking timeouts");
+ // Send as many packets as possible
+ while (send());
+ if (txBuffer.isEmpty()) {
+ log ("no packets in flight");
+ return deadline();
+ }
+ double now = Event.time();
+ for (Packet p : txBuffer) {
+ if (now - p.sent > RTO * rtt) {
+ // Retransmission timeout
+ log ("retransmitting packet " + p.seq);
+ p.sent = now;
+ node.net.send (p, address, latency);
+ window.timeout (now);
+ }
+ }
+ return Math.min (now + MAX_DELAY, deadline());
+ }
+
+ // Work out when the first ack or message needs to be sent
+ private double deadline()
+ {
+ double deadline = Double.POSITIVE_INFINITY;
+ Deadline<Integer> ack = ackQueue.peek();
+ if (ack != null) deadline = ack.deadline;
+ Deadline<Message> msg = msgQueue.peek();
+ if (msg != null) deadline = Math.min (deadline, msg.deadline);
+ return deadline;
+ }
+
+ public void log (String message)
+ {
+ Event.log (node.net.address + ":" + address + " " + message);
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Request.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Request.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Request.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,28 @@
+class Request extends Message
+{
+ private static int nextId = 0;
+
+ public final int id; // The unique ID of the request
+ public final int key; // The requested key
+
+ // Start a new request
+ public Request (int key)
+ {
+ id = nextId++;
+ this.key = key;
+ size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+ }
+
+ // Forward a request
+ public Request (int id, int key)
+ {
+ this.id = id;
+ this.key = key;
+ size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("request (" + id + "," + key + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/RequestState.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/RequestState.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,42 @@
+// The state of an outstanding request, stored at each node along the path
+
+import java.util.HashSet;
+import java.util.Collection;
+
+class RequestState
+{
+ public final int id; // The unique ID of the request
+ public final int key; // The requested key
+ public final Peer prev; // The previous hop of the request
+ public final HashSet<Peer> nexts; // Possible next hops
+
+ public RequestState (Request r, Peer prev, Collection<Peer> peers)
+ {
+ id = r.id;
+ key = r.key;
+ this.prev = prev;
+ nexts = new HashSet<Peer> (peers);
+ if (prev != null) nexts.remove (prev);
+ }
+
+ // Find the closest peer to the requested key
+ public Peer closestPeer()
+ {
+ double keyLoc = Node.keyToLocation (key);
+ double bestDist = Double.POSITIVE_INFINITY;
+ Peer bestPeer = null;
+ for (Peer peer : nexts) {
+ double dist = Node.distance (keyLoc, peer.location);
+ if (dist < bestDist) {
+ bestDist = dist;
+ bestPeer = peer;
+ }
+ }
+ return bestPeer; // Null if list was empty
+ }
+
+ public String toString()
+ {
+ return new String ("request (" + id + "," + key + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Response.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Response.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,18 @@
+class Response extends Message
+{
+ public final int id; // The unique ID of the request
+ public final int key; // The requested key
+
+ public Response (int id, int key)
+ {
+ this.id = id;
+ this.key = key;
+ size = Message.HEADER_SIZE + Message.ID_SIZE +
+ Message.KEY_SIZE + Message.DATA_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("response (" + id + "," + key + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/RouteNotFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/RouteNotFound.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/RouteNotFound.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,18 @@
+// Note: for the purposes of this simulation, RejectedLoop and RouteNotFound
+// are equivalent
+
+class RouteNotFound extends Message
+{
+ public final int id; // The unique ID of the request
+
+ public RouteNotFound (int id)
+ {
+ this.id = id;
+ size = Message.HEADER_SIZE + Message.ID_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("route not found (" + id + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/Sim.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/Sim.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,31 @@
+// Interesting parameters to play with: txSpeed and rxSpeed, retransmission
+// timeout, window sizes, AIMD increase and decrease (Peer.java), queue sizes
+// (NetworkInterface.java), packet size (Packet.java).
+
+class Sim
+{
+ public static void main (String[] args)
+ {
+ double txSpeed = 15000, rxSpeed = 15000; // Bytes per second
+ // rxSpeed = Math.exp (rand.nextGaussian() + 11.74);
+ // txSpeed = rxSpeed / 5.0;
+
+ Network.reorder = true;
+ Network.lossRate = 0.001;
+
+ Node n0 = new Node (txSpeed, rxSpeed);
+ Node n1 = new Node (txSpeed, rxSpeed);
+ Node n2 = new Node (txSpeed, rxSpeed);
+ Node n3 = new Node (txSpeed, rxSpeed);
+
+ n0.connectBothWays (n1, 0.1);
+ n1.connectBothWays (n2, 0.1);
+ n1.connectBothWays (n3, 0.1);
+ n2.connectBothWays (n3, 0.1);
+
+ Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, null);
+
+ // Run the simulation
+ Event.run();
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5-single-ack-delay/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-single-ack-delay/TokenBucket.java
2006-08-10 12:31:23 UTC (rev 10021)
+++ trunk/apps/load-balancing-sims/phase5-single-ack-delay/TokenBucket.java
2006-08-10 12:44:18 UTC (rev 10022)
@@ -0,0 +1,29 @@
+class TokenBucket
+{
+ private double tokens, rate, size, lastUpdated;
+
+ public TokenBucket (double rate, double size)
+ {
+ tokens = size;
+ this.rate = rate;
+ this.size = size;
+ lastUpdated = 0.0; // Clock time
+ }
+
+ public int available()
+ {
+ double now = Event.time();
+ double elapsed = now - lastUpdated;
+ lastUpdated = now;
+ tokens += elapsed * rate;
+ if (tokens > size) tokens = size;
+ Event.log (tokens + " tokens available");
+ return (int) tokens;
+ }
+
+ public void remove (int t)
+ {
+ tokens -= t; // Counter can go negative
+ Event.log (t + " tokens removed, " + tokens + " available");
+ }
+}