Author: mrogers
Date: 2006-07-18 10:43:32 +0000 (Tue, 18 Jul 2006)
New Revision: 9651

Added:
   trunk/apps/load-balancing-sims/phase4-leaky/
   trunk/apps/load-balancing-sims/phase4-leaky/Event.java
   trunk/apps/load-balancing-sims/phase4-leaky/EventTarget.java
   trunk/apps/load-balancing-sims/phase4-leaky/Network.java
   trunk/apps/load-balancing-sims/phase4-leaky/NetworkInterface.java
   trunk/apps/load-balancing-sims/phase4-leaky/Node.java
   trunk/apps/load-balancing-sims/phase4-leaky/Packet.java
   trunk/apps/load-balancing-sims/phase4-leaky/Peer.java
   trunk/apps/load-balancing-sims/phase4-leaky/Sim.java
   trunk/apps/load-balancing-sims/phase5/
   trunk/apps/load-balancing-sims/phase5/Event.java
   trunk/apps/load-balancing-sims/phase5/EventTarget.java
   trunk/apps/load-balancing-sims/phase5/Message.java
   trunk/apps/load-balancing-sims/phase5/Network.java
   trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
   trunk/apps/load-balancing-sims/phase5/Node.java
   trunk/apps/load-balancing-sims/phase5/Packet.java
   trunk/apps/load-balancing-sims/phase5/Peer.java
   trunk/apps/load-balancing-sims/phase5/Sim.java
Log:
Multi-packet messages, Nagle's algorithm

Added: trunk/apps/load-balancing-sims/phase4-leaky/Event.java
===================================================================
--- trunk/apps/load-balancing-sims/phase4-leaky/Event.java      2006-07-18 
01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase4-leaky/Event.java      2006-07-18 
10:43:32 UTC (rev 9651)
@@ -0,0 +1,98 @@
+import java.util.TreeSet; // Gotta love the collections framework...
+
+class Event implements Comparable
+{
+       // Static variables and methods for the event queue
+       
+       private static TreeSet<Event> queue = new TreeSet<Event>();
+       private static double clockTime = 0.0;
+       private static int nextId = 0;
+       public static double duration = Double.POSITIVE_INFINITY;
+       
+       public static void reset()
+       {
+               queue.clear();
+               clockTime = 0.0;
+               nextId = 0;
+               duration = Double.POSITIVE_INFINITY;
+       }
+       
+       public static void schedule (EventTarget target, double time,
+                               int type, Object data)
+       {
+               queue.add (new Event (target, time + clockTime, type, data));
+       }
+       
+       public static boolean nextEvent()
+       {
+               try {
+                       Event e = queue.first();
+                       queue.remove (e);
+                       // Update the clock
+                       clockTime = e.time;
+                       // Quit if the simulation's alloted time has run out
+                       if (clockTime > duration) return false;
+                       // Pass the packet to the target's callback method
+                       e.target.handleEvent (e.type, e.data);
+                       return true;
+               }
+               catch (java.util.NoSuchElementException x) {
+                       // No more events to dispatch
+                       return false;
+               }
+       }
+       
+       public static double time()
+       {
+               return clockTime;
+       }
+       
+       public static void log (String message)
+       {
+               System.out.print (clockTime + " " + message + "\n");
+       }
+       
+       // Run until the duration expires or there are no more events to process
+       public static void run()
+       {
+               while (nextEvent()) {}
+       }
+       
+       // Non-static variables and methods for individual events
+       
+       private EventTarget target;
+       private double time;
+       private int id;
+       private int type;
+       private Object data;
+       
+       public Event (EventTarget target, double time, int type, Object data)
+       {
+               this.target = target;
+               this.time = time;
+               this.type = type;
+               this.data = data;
+               id = nextId++;
+       }
+       
+       // Must be consistent with compareTo()
+       public boolean equals (Object o)
+       {
+               Event e = (Event) o;
+               if (e.time == time && e.id == id) return true;
+               return false;
+       }
+       
+       // Must be consistent with equals()
+       public int compareTo (Object o)
+       {
+               Event e = (Event) o;
+               // Sort events by time (order of occurrence)
+               if (e.time > time) return -1;
+               if (e.time < time) return 1;
+               // Break ties by ID (order of scheduling)
+               if (e.id > id) return -1;
+               if (e.id < id) return 1;
+               return 0;
+       }
+}

Added: trunk/apps/load-balancing-sims/phase4-leaky/EventTarget.java
===================================================================
--- trunk/apps/load-balancing-sims/phase4-leaky/EventTarget.java        
2006-07-18 01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase4-leaky/EventTarget.java        
2006-07-18 10:43:32 UTC (rev 9651)
@@ -0,0 +1,4 @@
+interface EventTarget
+{
+       public void handleEvent (int type, Object data);
+}

Added: trunk/apps/load-balancing-sims/phase4-leaky/Network.java
===================================================================
--- trunk/apps/load-balancing-sims/phase4-leaky/Network.java    2006-07-18 
01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase4-leaky/Network.java    2006-07-18 
10:43:32 UTC (rev 9651)
@@ -0,0 +1,34 @@
+import java.util.HashMap;
+
+class Network
+{
+       private static HashMap<Integer,NetworkInterface> interfaces
+               = new HashMap<Integer,NetworkInterface>();
+       private static int nextAddress = 0;
+       public static boolean reorder = false; // Can packets be reordered?
+       public static double lossRate = 0.0; // Random packet loss
+       // FIXME: duplication
+       
+       // Deliver a packet to an address
+       public static void deliver (Packet p)
+       {
+               NetworkInterface ni = interfaces.get (p.dest);
+               if (ni == null) return; // Node doesn't exist or is offline
+               // If the network allows reordering, randomise the latency a bit
+               if (reorder) p.latency *= (0.95 + Math.random() * 0.1);
+               if (Math.random() < lossRate) {
+                       Event.log ("packet lost by network");
+                       return;
+               }
+               // Schedule the arrival of the packet at the destination
+               Event.schedule (ni, p.latency, NetworkInterface.RX_Q_ADD, p);
+       }
+
+       // Attach an interface to the network - returns the address
+       public static int register (NetworkInterface ni)
+       {
+               int address = nextAddress++;
+               interfaces.put (address, ni);
+               return address;
+       }
+}

Added: trunk/apps/load-balancing-sims/phase4-leaky/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase4-leaky/NetworkInterface.java   
2006-07-18 01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase4-leaky/NetworkInterface.java   
2006-07-18 10:43:32 UTC (rev 9651)
@@ -0,0 +1,134 @@
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+class NetworkInterface implements EventTarget
+{
+       public int address; // Represents an IP address and port
+       private Node owner; // The owner of this network interface
+       private double txSpeed, rxSpeed; // Bytes per second
+       
+       private LinkedList<Packet> txQueue; // Queue of outgoing packets
+       private LinkedList<Packet> rxQueue; // Queue of incoming packets
+       private int txQueueSize, rxQueueSize; // Limited-size drop-tail queues
+       private int txQueueMaxSize, rxQueueMaxSize; // Bytes
+       
+       public NetworkInterface (Node owner, double txSpeed, double rxSpeed)
+       {
+               this.owner = owner;
+               this.txSpeed = txSpeed;
+               this.rxSpeed = rxSpeed;
+               txQueue = new LinkedList<Packet>();
+               rxQueue = new LinkedList<Packet>();
+               txQueueSize = rxQueueSize = 0; // Bytes
+               txQueueMaxSize = 10000;
+               rxQueueMaxSize = 20000;
+               // Attach the interface to the network
+               address = Network.register (this);
+       }
+               
+       // Called by Peer
+       public void send (Packet p, int dest, double latency)
+       {
+               p.src = address;
+               p.dest = dest;
+               p.latency = latency;
+               if (txQueueSize + p.size > txQueueMaxSize) {
+                       Event.log (address + " no room in txQueue");
+                       return; // Packet lost
+               }
+               txQueue.add (p);
+               txQueueSize += p.size;
+               Event.log (address + " " + txQueueSize + " bytes in txQueue");
+               // If there are no other packets in the queue, start to transmit
+               if (txQueue.size() == 1) txStart (p);
+       }
+       
+       // Event callbacks
+       
+       // Add a packet to the rx queue
+       private void rxQueueAdd (Packet p)
+       {
+               if (rxQueueSize + p.size > rxQueueMaxSize) {
+                       Event.log (address + " no room in rxQueue");
+                       return; // Packet lost
+               }
+               rxQueue.add (p);
+               rxQueueSize += p.size;
+               Event.log (address + " " + rxQueueSize + " bytes in rxQueue");
+               // If there are no other packets in the queue, start to receive
+               if (rxQueue.size() == 1) rxStart (p);
+       }
+       
+       // Start receiving a packet
+       private void rxStart (Packet p)
+       {
+               // Delay depends on rx speed
+               Event.schedule (this, p.size / rxSpeed, RX_END, p);
+       }
+       
+       // Finish receiving a packet, pass it to the node
+       private void rxEnd (Packet p)
+       {
+               owner.handlePacket (p);
+               // If there's another packet waiting, start to receive it
+               try {
+                       rxQueueSize -= p.size;
+                       rxQueue.remove (p);
+                       rxStart (rxQueue.getFirst());
+               }
+               catch (NoSuchElementException nse) {}
+       }
+       
+       // Start transmitting a packet
+       private void txStart (Packet p)
+       {
+               // Delay depends on tx speed
+               Event.schedule (this, p.size / txSpeed, TX_END, p);
+       }
+       
+       // Finish transmitting a packet
+       private void txEnd (Packet p)
+       {
+               Network.deliver (p);
+               // If there's another packet waiting, start to transmit it
+               try {
+                       txQueueSize -= p.size;
+                       txQueue.remove (p);
+                       txStart (txQueue.getFirst());
+               }
+               catch (NoSuchElementException nse) {}
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               switch (type) {
+                       case RX_Q_ADD:
+                       rxQueueAdd ((Packet) data);
+                       break;
+                       
+                       case RX_START:
+                       rxStart ((Packet) data);
+                       break;
+                       
+                       case RX_END:
+                       rxEnd ((Packet) data);
+                       break;
+                       
+                       case TX_START:
+                       txStart ((Packet) data);
+                       break;
+                       
+                       case TX_END:
+                       txEnd ((Packet) data);
+                       break;
+               }
+       }
+       
+       // Each EventTarget class has its own event codes
+       public final static int RX_Q_ADD = 1;
+       public final static int RX_START = 2;
+       public final static int RX_END = 3;
+       public final static int TX_START = 4;
+       public final static int TX_END = 5;
+}

Added: trunk/apps/load-balancing-sims/phase4-leaky/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase4-leaky/Node.java       2006-07-18 
01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase4-leaky/Node.java       2006-07-18 
10:43:32 UTC (rev 9651)
@@ -0,0 +1,47 @@
+import java.util.HashMap;
+
+class Node implements EventTarget
+{
+       public final static int PACKET_SIZE = 500; // Excluding headers
+       
+       private NetworkInterface net;
+       private HashMap<Integer,Peer> peers; // Look up a peer by its address
+       
+       public Node (double txSpeed, double rxSpeed)
+       {
+               peers = new HashMap<Integer,Peer>();
+               net = new NetworkInterface (this, txSpeed, rxSpeed);
+       }
+       
+       public void connect (Node n, double latency)
+       {
+               Peer p = new Peer (n.net.address, latency, net);
+               peers.put (n.net.address, p);
+       }
+       
+       // Called by NetworkInterface
+       public void handlePacket (Packet packet)
+       {
+               Peer peer = peers.get (packet.src);
+               if (peer == null) Event.log (net.address + " unknown peer!");
+               else peer.handlePacket (packet);
+       }
+       
+       // Event callback
+       private void start()
+       {
+               // Give each peer some work to do
+               for (Peer p : peers.values())
+                       for (int i = 0; i < 1000000 / PACKET_SIZE; i++)
+                               p.write (new Packet (Packet.DATA, PACKET_SIZE));
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               if (type == START) start();
+       }
+       
+       // Each EventTarget class has its own event codes
+       public final static int START = 1;
+}

Added: trunk/apps/load-balancing-sims/phase4-leaky/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase4-leaky/Packet.java     2006-07-18 
01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase4-leaky/Packet.java     2006-07-18 
10:43:32 UTC (rev 9651)
@@ -0,0 +1,20 @@
+class Packet
+{
+       // Packet types
+       public final static int DATA = 1, ACK = 2;
+       public final static int HEADER_SIZE = 50;
+       
+       public int src, dest; // Network addresses
+       public int type; // Data, ack, etc
+       public int size; // Packet size in bytes, including headers
+       public int seq; // Sequence number or explicit ack
+       
+       public double latency; // Link latency (stored here for convenience)
+       public double sent; // Time at which the packet was last transmitted
+       
+       public Packet (int type, int dataSize)
+       {
+               this.type = type;
+               size = dataSize + HEADER_SIZE;
+       }
+}

Added: trunk/apps/load-balancing-sims/phase4-leaky/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase4-leaky/Peer.java       2006-07-18 
01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase4-leaky/Peer.java       2006-07-18 
10:43:32 UTC (rev 9651)
@@ -0,0 +1,307 @@
+import java.util.LinkedList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class Peer implements EventTarget
+{
+       public final static double TIMER = 0.5; // Coarse-grained timer, seconds
+       public final static double RTO = 4.0; // Retransmission timeout in RTTs
+       public final static double FRTO = 1.5; // Fast retx timeout in RTTs
+       public final static double RTT_DECAY = 0.8; // Exp moving average
+       public final static int MIN_CWIND = 3000; // Minimum congestion window
+       public final static int MAX_CWIND = 100000; // Maximum congestion window
+       public final static int RWIND = 100000; // Maximum bytes buffered at rx
+       public final static double ALPHA = 0.1615; // AIMD increase parameter
+       public final static double BETA = 0.9375; // AIMD decrease parameter
+       public final static double GAMMA = 3.0; // Slow start divisor
+       
+       public int address; // The remote node's address
+       private double latency; // Latency of the connection in seconds
+       private NetworkInterface net; // The local node's network interface
+       
+       // Sender
+       private double rtt = 3.0; // Estimated round-trip time in seconds
+       private double cwind = MIN_CWIND; // Congestion window in bytes
+       private boolean slowStart = true; // Are we in the slow start phase?
+       
+       // Leaky bucket
+       private double txRate = cwind / rtt; // Average tx rate in bytes/second
+       private boolean blocked = false; // Are we waiting for the leaky bucket?
+       
+       private double lastTx = Double.NEGATIVE_INFINITY; // Clock time
+       private double lastDecrease = Double.NEGATIVE_INFINITY; // Clock time
+       private boolean timerRunning = false; // Is the retx timer running?
+       
+       private int inflight = 0; // Bytes sent but not acked
+       private int txSeq = 0; // Sequence number of next outgoing packet
+       private LinkedList<Packet> txBuffer; // Retransmission buffer
+       private LinkedList<Packet> txQueue; // Packets waiting to be sent
+       
+       // Receiver
+       private int buffered = 0; // Bytes buffered for reassembly
+       private int rxSeq = 0; // Sequence number of next incoming packet
+       private LinkedList<Packet> rxBuffer; // Reassembly buffer
+       private LinkedList<Packet> rxQueue; // Packets waiting to be collected
+       
+       public Peer (int address, double latency, NetworkInterface net)
+       {
+               this.address = address;
+               this.latency = latency;
+               this.net = net;
+               txBuffer = new LinkedList<Packet>();
+               txQueue = new LinkedList<Packet>();
+               rxBuffer = new LinkedList<Packet>();
+               rxQueue = new LinkedList<Packet>();
+       }
+       
+       // Queues a packet for transmission
+       public void write (Packet p)
+       {
+               // Length of transmission queue is umlimited - be careful!
+               txQueue.add (p);
+               if (txQueue.size() == 1) sendData();
+       }
+       
+       // Returns a reassembled packet, or null if there are none waiting
+       public Packet read()
+       {
+               try { return rxQueue.removeFirst(); }
+               catch (NoSuchElementException nse) { return null; }
+       }
+       
+       // Send as much data as the receiver window and congestion window allow
+       private void sendData()
+       {
+               Iterator<Packet> i = txQueue.iterator();
+               while (i.hasNext()) {
+                       if (sendData (i.next())) i.remove();
+                       else break;
+               }
+       }
+       
+       // Try to send a packet, return true if it was sent
+       private boolean sendData (Packet p)
+       {
+               // Leaky bucket - space packets evenly across each RTT
+               if (blocked) return false;
+               double now = Event.time();
+               double nextTx = lastTx + p.size / txRate;
+               if (nextTx > now) {
+                       blocked = true;
+                       log ("blocked for " + (nextTx - now) + " seconds");
+                       // Sleep until it's time to transmit
+                       Event.schedule (this, nextTx - now, SEND_DATA, null);
+                       return false;
+               }
+               // Return to slow start when the link is idle
+               if (now - lastTx > RTO * rtt) {
+                       log ("entering slow start");
+                       slowStart = true;
+                       cwind = MIN_CWIND;
+                       updateTransmissionRate();
+               }
+               lastTx = now;
+               // Send the packet
+               p.seq = txSeq++;
+               log ("sending data " + p.seq);
+               net.send (p, address, latency);
+               // Buffer the packet for retransmission
+               p.sent = now;
+               inflight += p.size;
+               log (inflight + " bytes in flight");
+               txBuffer.add (p);
+               // Start the coarse-grained retransmission timer if necessary
+               if (!timerRunning) {
+                       log ("starting retransmission timer");
+                       Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
+                       timerRunning = true;
+               }
+               return true;
+       }
+       
+       private void sendAck (int seq)
+       {
+               Packet p = new Packet (Packet.ACK, 0);
+               p.seq = seq; // Explicit ack
+               log ("sending ack " + seq);
+               net.send (p, address, latency);
+       }
+       
+       // Called by Node when a packet arrives
+       public void handlePacket (Packet p)
+       {
+               switch (p.type) {
+                       case Packet.DATA:
+                       handleData (p);
+                       break;
+                       
+                       case Packet.ACK:
+                       handleAck (p);
+                       break;
+               }
+       }
+       
+       private void handleData (Packet p)
+       {
+               log ("received data " + p.seq);
+               // Is this the packet we've been waiting for?
+               if (p.seq == rxSeq) {
+                       log ("data in order");
+                       rxSeq++;
+                       rxQueue.add (p);
+                       // Reassemble contiguous packets
+                       Iterator<Packet> i = rxBuffer.iterator();
+                       while (i.hasNext()) {
+                               Packet q = i.next();
+                               if (q.seq == rxSeq) {
+                                       log ("adding contiguous data " + q.seq);
+                                       i.remove();
+                                       buffered -= q.size;
+                                       rxQueue.add (q);
+                                       rxSeq++;
+                               }
+                               else break;
+                       }
+                       log (buffered + " bytes buffered");
+                       log ("expecting data " + rxSeq);
+                       // FIXME: notify the node that there are packets waiting
+               }
+               else if (p.seq > rxSeq) {
+                       log ("data out of order, expected " + rxSeq);
+                       // Buffer the packet until all previous packets arrive
+                       int index;
+                       Iterator<Packet> i = rxBuffer.iterator();
+                       for (index = 0; i.hasNext(); index++) {
+                               Packet q = i.next();
+                               if (q.seq == p.seq) {
+                                       log ("duplicate data " + p.seq);
+                                       sendAck (p.seq);
+                                       return;
+                               }
+                               if (q.seq > p.seq) break;
+                       }
+                       if (buffered + p.size > RWIND) {
+                               // This shouldn't happen under normal conditions
+                               log ("no space in buffer - packet dropped");
+                               return;
+                       }
+                       buffered += p.size;
+                       log (buffered + " bytes buffered");
+                       rxBuffer.add (index, p);
+                       // DEBUG
+                       if (!rxBuffer.isEmpty()) {
+                               for (Packet z : rxBuffer)
+                                       System.out.print (z.seq + " ");
+                               System.out.println();
+                       }
+               }
+               else log ("duplicate data " + p.seq); // Ack may have been lost
+               sendAck (p.seq);
+       }
+       
+       private void handleAck (Packet p)
+       {
+               log ("received ack " + p.seq);
+               double now = Event.time();
+               boolean windowIncreased = false;
+               Iterator<Packet> i = txBuffer.iterator();
+               while (i.hasNext()) {
+                       Packet q = i.next();
+                       double age = now - q.sent;
+                       // Explicit ack
+                       if (q.seq == p.seq) {
+                               log ("data " + q.seq + " acknowledged");
+                               i.remove();
+                               inflight -= q.size;
+                               log (inflight + " bytes in flight");
+                               // Increase the congestion window
+                               if (slowStart) cwind += q.size / GAMMA;
+                               else cwind += q.size * q.size * ALPHA / cwind;
+                               if (cwind > MAX_CWIND) cwind = MAX_CWIND;
+                               log ("congestion window increased to " + cwind);
+                               // Update the average round-trip time
+                               rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
+                               log ("round-trip time " + age);
+                               log ("average round-trip time " + rtt);
+                               updateTransmissionRate();
+                               windowIncreased = true;
+                               break;
+                       }
+                       // Fast retransmission
+                       if (q.seq < p.seq && age > FRTO * rtt) {
+                               q.sent = now;
+                               log ("fast retransmitting data " + q.seq);
+                               log (inflight + " bytes in flight");
+                               net.send (q, address, latency);
+                               decreaseCongestionWindow (now);
+                       }
+               }
+               if (windowIncreased) sendData();
+       }
+       
+       private void updateTransmissionRate()
+       {
+               txRate = cwind / rtt;
+               log (txRate + " bytes per second");
+       }
+       
+       private void decreaseCongestionWindow (double now)
+       {
+               // The congestion window should only be decreased once per RTT
+               if (now - lastDecrease < rtt) return;
+               lastDecrease = now;
+               cwind *= BETA;
+               if (cwind < MIN_CWIND) cwind = MIN_CWIND;
+               log ("congestion window decreased to " + cwind);
+               // The slow start phase ends when the first packet is lost
+               if (slowStart) {
+                       log ("leaving slow start");
+                       slowStart = false;
+               }
+               updateTransmissionRate();
+       }
+       
+       private void log (String message)
+       {
+               Event.log (net.address + ":" + address + " " + message);
+       }
+       
+       // Event callback
+       private void checkTimeouts()
+       {
+               log ("checking timeouts");
+               // If there are no packets in flight, stop the timer
+               if (txBuffer.isEmpty()) {
+                       log ("stopping retransmission timer");
+                       timerRunning = false;
+                       return;
+               }
+               double now = Event.time();
+               for (Packet p : txBuffer) {
+                       // Slow retransmission
+                       if (now - p.sent > RTO * rtt) {
+                               p.sent = now;
+                               log ("retransmitting data " + p.seq);
+                               log (inflight + " bytes in flight");
+                               net.send (p, address, latency);
+                               decreaseCongestionWindow (now);
+                       }
+               }
+               // Reset the timer
+               Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               if (type == CHECK_TIMEOUTS) checkTimeouts();
+               else if (type == SEND_DATA) {
+                       blocked = false;
+                       sendData();
+               }
+       }
+       
+       // Each EventTarget class has its own event codes
+       private final static int CHECK_TIMEOUTS = 1;
+       private final static int SEND_DATA = 2;
+}

Added: trunk/apps/load-balancing-sims/phase4-leaky/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase4-leaky/Sim.java        2006-07-18 
01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase4-leaky/Sim.java        2006-07-18 
10:43:32 UTC (rev 9651)
@@ -0,0 +1,34 @@
+// Interesting parameters to play with: txSpeed and rxSpeed, retransmission
+// timeout, window size, AIMD increase and decrease (Peer.java), queue size
+// (NetworkInterface.java), packet size (Node.java).
+
+class Sim
+{
+       public static void main (String[] args)
+       {               
+               double txSpeed = 15000, rxSpeed = 15000; // Bytes per second
+               // rxSpeed = Math.exp (rand.nextGaussian() + 11.74);
+               // txSpeed = rxSpeed / 5.0;
+               
+               Network.reorder = true;
+               Network.lossRate = 0.001;
+               
+               Node n0 = new Node (txSpeed, rxSpeed);
+               Node n1 = new Node (txSpeed, rxSpeed);
+               Node n2 = new Node (txSpeed, rxSpeed);
+               
+               n0.connect (n1, 0.1);
+               n0.connect (n2, 0.1);
+               n1.connect (n0, 0.1);
+               n1.connect (n2, 0.1);
+               n2.connect (n0, 0.1);
+               n2.connect (n1, 0.1);
+               
+               Event.schedule (n0, Math.random(), Node.START, null);
+               Event.schedule (n1, Math.random(), Node.START, null);
+               Event.schedule (n2, Math.random(), Node.START, null);
+               
+               // Run the simulation
+               Event.run();
+       }
+}

Added: trunk/apps/load-balancing-sims/phase5/Event.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Event.java    2006-07-18 01:05:10 UTC 
(rev 9650)
+++ trunk/apps/load-balancing-sims/phase5/Event.java    2006-07-18 10:43:32 UTC 
(rev 9651)
@@ -0,0 +1,98 @@
+import java.util.TreeSet; // Gotta love the collections framework...
+
+class Event implements Comparable
+{
+       // Static variables and methods for the event queue
+       
+       private static TreeSet<Event> queue = new TreeSet<Event>();
+       private static double clockTime = 0.0;
+       private static int nextId = 0;
+       public static double duration = Double.POSITIVE_INFINITY;
+       
+       public static void reset()
+       {
+               queue.clear();
+               clockTime = 0.0;
+               nextId = 0;
+               duration = Double.POSITIVE_INFINITY;
+       }
+       
+       public static void schedule (EventTarget target, double time,
+                               int type, Object data)
+       {
+               queue.add (new Event (target, time + clockTime, type, data));
+       }
+       
+       public static boolean nextEvent()
+       {
+               try {
+                       Event e = queue.first();
+                       queue.remove (e);
+                       // Update the clock
+                       clockTime = e.time;
+                       // Quit if the simulation's alloted time has run out
+                       if (clockTime > duration) return false;
+                       // Pass the packet to the target's callback method
+                       e.target.handleEvent (e.type, e.data);
+                       return true;
+               }
+               catch (java.util.NoSuchElementException x) {
+                       // No more events to dispatch
+                       return false;
+               }
+       }
+       
+       public static double time()
+       {
+               return clockTime;
+       }
+       
+       public static void log (String message)
+       {
+               System.out.print (clockTime + " " + message + "\n");
+       }
+       
+       // Run until the duration expires or there are no more events to process
+       public static void run()
+       {
+               while (nextEvent()) {}
+       }
+       
+       // Non-static variables and methods for individual events
+       
+       private EventTarget target;
+       private double time;
+       private int id;
+       private int type;
+       private Object data;
+       
+       public Event (EventTarget target, double time, int type, Object data)
+       {
+               this.target = target;
+               this.time = time;
+               this.type = type;
+               this.data = data;
+               id = nextId++;
+       }
+       
+       // Must be consistent with compareTo()
+       public boolean equals (Object o)
+       {
+               Event e = (Event) o;
+               if (e.time == time && e.id == id) return true;
+               return false;
+       }
+       
+       // Must be consistent with equals()
+       public int compareTo (Object o)
+       {
+               Event e = (Event) o;
+               // Sort events by time (order of occurrence)
+               if (e.time > time) return -1;
+               if (e.time < time) return 1;
+               // Break ties by ID (order of scheduling)
+               if (e.id > id) return -1;
+               if (e.id < id) return 1;
+               return 0;
+       }
+}

Added: trunk/apps/load-balancing-sims/phase5/EventTarget.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/EventTarget.java      2006-07-18 
01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase5/EventTarget.java      2006-07-18 
10:43:32 UTC (rev 9651)
@@ -0,0 +1,4 @@
+interface EventTarget
+{
+       public void handleEvent (int type, Object data);
+}

Added: trunk/apps/load-balancing-sims/phase5/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Message.java  2006-07-18 01:05:10 UTC 
(rev 9650)
+++ trunk/apps/load-balancing-sims/phase5/Message.java  2006-07-18 10:43:32 UTC 
(rev 9651)
@@ -0,0 +1,13 @@
+// A high-level message (as opposed to a low-level packet)
+
+class Message
+{
+       public int seq; // Sequence number
+       public int size; // Size in bytes
+       
+       public Message (int seq, int size)
+       {
+               this.seq = seq;
+               this.size = size;
+       }
+}

Added: trunk/apps/load-balancing-sims/phase5/Network.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Network.java  2006-07-18 01:05:10 UTC 
(rev 9650)
+++ trunk/apps/load-balancing-sims/phase5/Network.java  2006-07-18 10:43:32 UTC 
(rev 9651)
@@ -0,0 +1,34 @@
+import java.util.HashMap;
+
+class Network
+{
+       private static HashMap<Integer,NetworkInterface> interfaces
+               = new HashMap<Integer,NetworkInterface>();
+       private static int nextAddress = 0;
+       public static boolean reorder = false; // Can packets be reordered?
+       public static double lossRate = 0.0; // Random packet loss
+       // FIXME: duplication
+       
+       // Deliver a packet to an address
+       public static void deliver (Packet p)
+       {
+               NetworkInterface ni = interfaces.get (p.dest);
+               if (ni == null) return; // Node doesn't exist or is offline
+               // If the network allows reordering, randomise the latency a bit
+               if (reorder) p.latency *= (0.95 + Math.random() * 0.1);
+               if (Math.random() < lossRate) {
+                       Event.log ("packet lost by network");
+                       return;
+               }
+               // Schedule the arrival of the packet at the destination
+               Event.schedule (ni, p.latency, NetworkInterface.RX_Q_ADD, p);
+       }
+
+       // Attach an interface to the network - returns the address
+       public static int register (NetworkInterface ni)
+       {
+               int address = nextAddress++;
+               interfaces.put (address, ni);
+               return address;
+       }
+}

Added: trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-07-18 
01:05:10 UTC (rev 9650)
+++ trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-07-18 
10:43:32 UTC (rev 9651)
@@ -0,0 +1,134 @@
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+class NetworkInterface implements EventTarget
+{
+       public int address; // Represents an IP address and port
+       private Node owner; // The owner of this network interface
+       private double txSpeed, rxSpeed; // Bytes per second
+       
+       private LinkedList<Packet> txQueue; // Queue of outgoing packets
+       private LinkedList<Packet> rxQueue; // Queue of incoming packets
+       private int txQueueSize, rxQueueSize; // Limited-size drop-tail queues
+       private int txQueueMaxSize, rxQueueMaxSize; // Bytes
+       
+       public NetworkInterface (Node owner, double txSpeed, double rxSpeed)
+       {
+               this.owner = owner;
+               this.txSpeed = txSpeed;
+               this.rxSpeed = rxSpeed;
+               txQueue = new LinkedList<Packet>();
+               rxQueue = new LinkedList<Packet>();
+               txQueueSize = rxQueueSize = 0; // Bytes
+               txQueueMaxSize = 10000;
+               rxQueueMaxSize = 20000;
+               // Attach the interface to the network
+               address = Network.register (this);
+       }
+               
+       // Called by Peer
+       public void send (Packet p, int dest, double latency)
+       {
+               p.src = address;
+               p.dest = dest;
+               p.latency = latency;
+               if (txQueueSize + p.size > txQueueMaxSize) {
+                       Event.log (address + " no room in txQueue");
+                       return; // Packet lost
+               }
+               txQueue.add (p);
+               txQueueSize += p.size;
+               Event.log (address + " " + txQueueSize + " bytes in txQueue");
+               // If there are no other packets in the queue, start to transmit
+               if (txQueue.size() == 1) txStart (p);
+       }
+       
+       // Event callbacks
+       
+       // Add a packet to the rx queue
+       private void rxQueueAdd (Packet p)
+       {
+               if (rxQueueSize + p.size > rxQueueMaxSize) {
+                       Event.log (address + " no room in rxQueue");
+                       return; // Packet lost
+               }
+               rxQueue.add (p);
+               rxQueueSize += p.size;
+               Event.log (address + " " + rxQueueSize + " bytes in rxQueue");
+               // If there are no other packets in the queue, start to receive
+               if (rxQueue.size() == 1) rxStart (p);
+       }
+       
+       // Start receiving a packet
+       private void rxStart (Packet p)
+       {
+               // Delay depends on rx speed
+               Event.schedule (this, p.size / rxSpeed, RX_END, p);
+       }
+       
+       // Finish receiving a packet, pass it to the node
+       private void rxEnd (Packet p)
+       {
+               owner.handlePacket (p);
+               // If there's another packet waiting, start to receive it
+               try {
+                       rxQueueSize -= p.size;
+                       rxQueue.remove (p);
+                       rxStart (rxQueue.getFirst());
+               }
+               catch (NoSuchElementException nse) {}
+       }
+       
+       // Start transmitting a packet
+       private void txStart (Packet p)
+       {
+               // Delay depends on tx speed
+               Event.schedule (this, p.size / txSpeed, TX_END, p);
+       }
+       
+       // Finish transmitting a packet
+       private void txEnd (Packet p)
+       {
+               Network.deliver (p);
+               // If there's another packet waiting, start to transmit it
+               try {
+                       txQueueSize -= p.size;
+                       txQueue.remove (p);
+                       txStart (txQueue.getFirst());
+               }
+               catch (NoSuchElementException nse) {}
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               switch (type) {
+                       case RX_Q_ADD:
+                       rxQueueAdd ((Packet) data);
+                       break;
+                       
+                       case RX_START:
+                       rxStart ((Packet) data);
+                       break;
+                       
+                       case RX_END:
+                       rxEnd ((Packet) data);
+                       break;
+                       
+                       case TX_START:
+                       txStart ((Packet) data);
+                       break;
+                       
+                       case TX_END:
+                       txEnd ((Packet) data);
+                       break;
+               }
+       }
+       
+       // Each EventTarget class has its own event codes
+       public final static int RX_Q_ADD = 1;
+       public final static int RX_START = 2;
+       public final static int RX_END = 3;
+       public final static int TX_START = 4;
+       public final static int TX_END = 5;
+}

Added: trunk/apps/load-balancing-sims/phase5/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Node.java     2006-07-18 01:05:10 UTC 
(rev 9650)
+++ trunk/apps/load-balancing-sims/phase5/Node.java     2006-07-18 10:43:32 UTC 
(rev 9651)
@@ -0,0 +1,65 @@
+import java.util.HashMap;
+
+class Node implements EventTarget
+{
+       public NetworkInterface net;
+       private HashMap<Integer,Peer> peers; // Look up a peer by its address
+       private int messagesSent = 0;
+       
+       public Node (double txSpeed, double rxSpeed)
+       {
+               peers = new HashMap<Integer,Peer>();
+               net = new NetworkInterface (this, txSpeed, rxSpeed);
+       }
+       
+       public void connect (Node n, double latency)
+       {
+               Peer p = new Peer (n.net.address, latency, this);
+               peers.put (n.net.address, p);
+       }
+       
+       // Called by NetworkInterface
+       public void handlePacket (Packet packet)
+       {
+               Peer peer = peers.get (packet.src);
+               if (peer == null) log ("unknown peer!");
+               else peer.handlePacket (packet);
+       }
+       
+       // Called by Peer
+       public void messagesWaiting (Peer p)
+       {
+               for (Message m = p.receiveMessage(); m != null; m = 
p.receiveMessage())
+                       log ("received message " + m.seq + ", " + m.size + " 
bytes");
+       }
+       
+       private void log (String message)
+       {
+               Event.log (net.address + " " + message);
+       }
+       
+       // Event callback
+       private void sendMessages()
+       {
+               // Send a message to each peer
+               for (Peer p : peers.values()) {
+                       int size = (int) (Math.random() * 2500);
+                       Message m = new Message (messagesSent, size);
+                       log ("sending message " + m.seq + ", " + m.size + " 
bytes");
+                       p.sendMessage (m);
+               }
+               // Send a total of 1000 messages to each peer
+               messagesSent++;
+               if (messagesSent < 1000)
+                       Event.schedule (this, 0.1, SEND_MESSAGES, null);
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               if (type == SEND_MESSAGES) sendMessages();
+       }
+       
+       // Each EventTarget class has its own event codes
+       public final static int SEND_MESSAGES = 1;
+}

Added: trunk/apps/load-balancing-sims/phase5/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Packet.java   2006-07-18 01:05:10 UTC 
(rev 9650)
+++ trunk/apps/load-balancing-sims/phase5/Packet.java   2006-07-18 10:43:32 UTC 
(rev 9651)
@@ -0,0 +1,42 @@
+// A low-level packet containing one or more complete or incomplete messages
+
+// In real life the payload would be an array of bytes, but in the sim the
+// payload is represented by an ArrayList of Messages. Large messages can be
+// split across more than one packet, in which case the message only appears
+// in the payload of the *last* packet. This means it's possible for a full
+// packet to have an apparently empty payload.
+
+import java.util.ArrayList;
+
+abstract class Packet
+{
+       public final static int HEADER_SIZE = 50;
+       public final static int MAX_PAYLOAD = 1400;
+       
+       public int src, dest; // Network addresses
+       public int type; // Data, ack, etc
+       public int size; // Packet size in bytes, including headers
+       public int seq; // Sequence number or explicit ack
+       public double latency; // Link latency (stored here for convenience)
+}
+
+class DataPacket extends Packet
+{
+       public ArrayList messages; // Payload   
+       public double sent; // Time at which the packet was (re)transmitted
+       
+       public DataPacket (int dataSize)
+       {
+               size = dataSize + HEADER_SIZE;
+               messages = new ArrayList();
+       }
+}
+
+class Ack extends Packet
+{
+       public Ack (int seq)
+       {
+               size = HEADER_SIZE;
+               this.seq = seq;
+       }
+}

Added: trunk/apps/load-balancing-sims/phase5/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Peer.java     2006-07-18 01:05:10 UTC 
(rev 9650)
+++ trunk/apps/load-balancing-sims/phase5/Peer.java     2006-07-18 10:43:32 UTC 
(rev 9651)
@@ -0,0 +1,316 @@
+import java.util.LinkedList;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+class Peer implements EventTarget
+{
+       public int address; // The remote node's address
+       private double latency; // Latency of the connection in seconds
+       private Node owner; // The local node
+       
+       // Retransmission parameters
+       public final static double TIMER = 0.5; // Coarse-grained timer, seconds
+       public final static double RTO = 4.0; // Retransmission timeout in RTTs
+       public final static double FRTO = 1.5; // Fast retx timeout in RTTs
+       public final static double RTT_DECAY = 0.9; // Exp moving average
+       
+       // Congestion control parameters
+       public final static int MIN_CWIND = 3000; // Minimum congestion window
+       public final static int MAX_CWIND = 100000; // Maximum congestion window
+       // Note: RWIND must be at least 2 * FRTO * MAX_CWIND
+       public final static int RWIND = 400000; // Maximum bytes buffered at rx
+       public final static double ALPHA = 0.1615; // AIMD increase parameter
+       public final static double BETA = 0.9375; // AIMD decrease parameter
+       public final static double GAMMA = 3.0; // Slow start divisor
+       
+       // Sender state
+       private double cwind = MIN_CWIND; // Congestion window in bytes
+       private boolean slowStart = true; // Are we in the slow start phase?
+       private double rtt = 3.0; // Estimated round-trip time in seconds
+       private double lastTransmission = 0.0; // Clock time
+       private double lastCongestionDecrease = 0.0; // Clock time
+       private boolean timerRunning = false; // Is the retx timer running?
+       private int inflight = 0; // Bytes sent but not acked
+       private int txSeq = 0; // Sequence number of next outgoing packet
+       private LinkedList<DataPacket> txBuffer; // Retransmission buffer
+       private LinkedList<Message> txQueue; // Messages waiting to be sent
+       private int txQueueSize = 0; // Size of transmission queue in bytes
+       private int txRemaining = 0; // Bytes of current message unsent
+       
+       // Receiver state
+       private int rxSeq = 0; // Sequence number of next in-order packet
+       private LinkedList<DataPacket> rxBuffer; // Reassembly buffer
+       private int rxBufferSize = 0; // Size of reassembly buffer in bytes
+       private LinkedList<Message> rxQueue; // Messages waiting to be collected
+       
+       public Peer (int address, double latency, Node owner)
+       {
+               this.address = address;
+               this.latency = latency;
+               this.owner = owner;
+               txBuffer = new LinkedList<DataPacket>();
+               txQueue = new LinkedList<Message>();
+               rxBuffer = new LinkedList<DataPacket>();
+               rxQueue = new LinkedList<Message>();
+       }
+       
+       // Returns the first message in the queue or null if the queue is empty
+       public Message receiveMessage()
+       {
+               try { return rxQueue.removeFirst(); }
+               catch (NoSuchElementException nse) { return null; }
+       }
+       
+       // Queue a message for transmission
+       public void sendMessage (Message m)
+       {
+               // Warning: until token-passing is implemented the length of
+               // the transmission queue is unlimited
+               if (txQueue.isEmpty()) txRemaining = m.size;
+               txQueue.add (m);
+               txQueueSize += m.size;
+               log (txQueue.size() + " messages waiting to be sent");
+               // Send as many packets as possible
+               while (send());
+       }
+       
+       // Try to send a packet, return true if a packet was sent
+       private boolean send()
+       {
+               if (txQueueSize == 0) {
+                       log ("no messages to send");
+                       return false;
+               }
+               
+               if (inflight == cwind) {
+                       log ("no room in congestion window");
+                       return false;
+               }
+               
+               // Return to slow start when the link is idle
+               double now = Event.time();
+               if (now - lastTransmission > RTO * rtt) {
+                       log ("returning to slow start");
+                       cwind = MIN_CWIND;
+                       slowStart = true;
+               }
+               lastTransmission = now;
+               
+               // Work out how large a packet we can send
+               int size = Packet.MAX_PAYLOAD;
+               if (size > txQueueSize) size = txQueueSize;
+               if (size > cwind - inflight) size = (int) cwind - inflight;
+               
+               // Nagle's algorithm - try to coalesce small packets
+               if (size < Packet.MAX_PAYLOAD && inflight > 0) {
+                       log ("delaying transmission of " + size + " bytes");
+                       return false;
+               }
+               
+               DataPacket p = new DataPacket (size);
+               // Put as many messages as possible in the packet
+               while (txRemaining <= size) {
+                       try {
+                               Message m = txQueue.removeFirst();
+                               p.messages.add (m);
+                               size -= txRemaining;
+                               txQueueSize -= txRemaining;
+                               // Move on to the next message
+                               txRemaining = txQueue.getFirst().size;
+                       }
+                       catch (NoSuchElementException nse) {
+                               // No more messages in the txQueue
+                               txRemaining = 0;
+                               break;
+                       }
+               }
+               // Fill the rest of the packet with part of the current message
+               if (txRemaining > 0) {
+                       txRemaining -= size;
+                       txQueueSize -= size;
+               }
+               // Send the packet
+               p.seq = txSeq++;
+               log ("sending packet " + p.seq + ", " + p.size + " bytes");
+               owner.net.send (p, address, latency);
+               // Buffer the packet for retransmission
+               p.sent = now;
+               inflight += p.size;
+               log (inflight + " bytes in flight");
+               txBuffer.add (p);
+               // Start the coarse-grained retransmission timer if necessary
+               if (!timerRunning) {
+                       log ("starting timer");
+                       Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
+                       timerRunning = true;
+               }
+               return true;
+       }
+       
+       private void sendAck (int seq)
+       {
+               Ack a = new Ack (seq);
+               log ("sending ack " + seq);
+               owner.net.send (a, address, latency);
+       }
+       
+       // Called by Node when a packet arrives
+       public void handlePacket (Packet p)
+       {
+               if (p instanceof DataPacket) handleData ((DataPacket) p);
+               else if (p instanceof Ack) handleAck ((Ack) p);
+       }
+       
+       private void handleData (DataPacket p)
+       {
+               log ("received packet " + p.seq + ", " + p.size + " bytes");
+               // Is this the packet we've been waiting for?
+               if (p.seq == rxSeq) {
+                       log ("packet in order");
+                       rxSeq++;
+                       rxQueue.addAll (p.messages);
+                       // Reassemble contiguous packets
+                       Iterator<DataPacket> i = rxBuffer.iterator();
+                       while (i.hasNext()) {
+                               DataPacket q = i.next();
+                               if (q.seq == rxSeq) {
+                                       log ("adding packet " + q.seq);
+                                       i.remove();
+                                       rxBufferSize -= q.size;
+                                       rxQueue.addAll (p.messages);
+                                       rxSeq++;
+                               }
+                               else break;
+                       }
+                       log (rxBufferSize + " bytes buffered");
+                       log ("expecting packet " + rxSeq);
+                       // Tell the node there are messages to be collected
+                       owner.messagesWaiting (this);
+               }
+               else if (p.seq > rxSeq) {
+                       log ("packet out of order, expected " + rxSeq);
+                       // Buffer the packet until all previous packets arrive
+                       int index;
+                       Iterator<DataPacket> i = rxBuffer.iterator();
+                       for (index = 0; i.hasNext(); index++) {
+                               DataPacket q = i.next();
+                               if (q.seq == p.seq) {
+                                       // Already buffered
+                                       log ("duplicate packet " + p.seq);
+                                       sendAck (p.seq);
+                                       return;
+                               }
+                               if (q.seq > p.seq) break;
+                       }
+                       if (rxBufferSize + p.size > RWIND) {
+                               // This shouldn't happen under normal conditions
+                               log ("no space in buffer - packet dropped");
+                               return;
+                       }
+                       rxBuffer.add (index, p);
+                       rxBufferSize += p.size;
+                       log (rxBufferSize + " bytes buffered");
+                       // DEBUG
+                       if (!rxBuffer.isEmpty()) {
+                               for (Packet z : rxBuffer)
+                                       System.out.print (z.seq + " ");
+                               System.out.println();
+                       }
+               }
+               else log ("duplicate packet " + p.seq);
+               sendAck (p.seq); // Ack may have been lost
+       }
+       
+       private void handleAck (Ack a)
+       {
+               log ("received ack " + a.seq);
+               double now = Event.time();
+               boolean windowIncreased = false;
+               Iterator<DataPacket> i = txBuffer.iterator();
+               while (i.hasNext()) {
+                       DataPacket p = i.next();
+                       double age = now - p.sent;
+                       // Explicit ack
+                       if (p.seq == a.seq) {
+                               log ("packet " + p.seq + " acknowledged");
+                               i.remove();
+                               inflight -= p.size;
+                               log (inflight + " bytes in flight");
+                               // Increase the congestion window
+                               if (slowStart) cwind += p.size / GAMMA;
+                               else cwind += p.size * p.size * ALPHA / cwind;
+                               if (cwind > MAX_CWIND) cwind = MAX_CWIND;
+                               log ("congestion window increased to " + cwind);
+                               // Update the average round-trip time
+                               rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
+                               log ("round-trip time " + age);
+                               log ("average round-trip time " + rtt);
+                               windowIncreased = true;
+                               break;
+                       }
+                       // Fast retransmission
+                       if (p.seq < a.seq && age > FRTO * rtt) {
+                               p.sent = now;
+                               log ("fast retransmitting packet " + p.seq);
+                               log (inflight + " bytes in flight");
+                               owner.net.send (p, address, latency);
+                               decreaseCongestionWindow (now);
+                       }
+               }
+               if (windowIncreased) while (send());
+       }
+       
+       private void decreaseCongestionWindow (double now)
+       {
+               // The congestion window should only be decreased once per RTT
+               if (now - lastCongestionDecrease < rtt) return;
+               lastCongestionDecrease = now;
+               cwind *= BETA;
+               if (cwind < MIN_CWIND) cwind = MIN_CWIND;
+               log ("congestion window decreased to " + cwind);
+               // The slow start phase ends when the first packet is lost
+               if (slowStart) {
+                       log ("leaving slow start");
+                       slowStart = false;
+               }
+       }
+       
+       private void log (String message)
+       {
+               Event.log (owner.net.address + ":" + address + " " + message);
+       }
+       
+       // Event callback
+       private void checkTimeouts()
+       {
+               log ("checking timeouts");
+               // If there are no packets in flight, stop the timer
+               if (txBuffer.isEmpty()) {
+                       log ("stopping timer");
+                       timerRunning = false;
+                       return;
+               }
+               double now = Event.time();
+               for (DataPacket p : txBuffer) {
+                       // Slow retransmission
+                       if (now - p.sent > RTO * rtt) {
+                               p.sent = now;
+                               log ("retransmitting packet " + p.seq);
+                               log (inflight + " bytes in flight");
+                               owner.net.send (p, address, latency);
+                               decreaseCongestionWindow (now);
+                       }
+               }
+               // Reset the timer
+               Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               if (type == CHECK_TIMEOUTS) checkTimeouts();
+       }
+       
+       // Each EventTarget class has its own event codes
+       private final static int CHECK_TIMEOUTS = 1;
+}

Added: trunk/apps/load-balancing-sims/phase5/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Sim.java      2006-07-18 01:05:10 UTC 
(rev 9650)
+++ trunk/apps/load-balancing-sims/phase5/Sim.java      2006-07-18 10:43:32 UTC 
(rev 9651)
@@ -0,0 +1,34 @@
+// Interesting parameters to play with: txSpeed and rxSpeed, retransmission
+// timeout, window size, AIMD increase and decrease (Peer.java), queue size
+// (NetworkInterface.java), packet size (Node.java).
+
+class Sim
+{
+       public static void main (String[] args)
+       {               
+               double txSpeed = 15000, rxSpeed = 15000; // Bytes per second
+               // rxSpeed = Math.exp (rand.nextGaussian() + 11.74);
+               // txSpeed = rxSpeed / 5.0;
+               
+               Network.reorder = true;
+               Network.lossRate = 0.001;
+               
+               Node n0 = new Node (txSpeed, rxSpeed);
+               Node n1 = new Node (txSpeed, rxSpeed);
+               // Node n2 = new Node (txSpeed, rxSpeed);
+               
+               n0.connect (n1, 0.1);
+               // n0.connect (n2, 0.1);
+               n1.connect (n0, 0.1);
+               // n1.connect (n2, 0.1);
+               // n2.connect (n0, 0.1);
+               // n2.connect (n1, 0.1);
+               
+               Event.schedule (n0, Math.random(), Node.SEND_MESSAGES, null);
+               // Event.schedule (n1, Math.random(), Node.SEND_MESSAGES, null);
+               // Event.schedule (n2, Math.random(), Node.SEND_MESSAGES, null);
+               
+               // Run the simulation
+               Event.run();
+       }
+}


Reply via email to