Author: mrogers
Date: 2006-07-26 20:14:47 +0000 (Wed, 26 Jul 2006)
New Revision: 9782

Added:
   trunk/apps/load-balancing-sims/phase5/Request.java
   trunk/apps/load-balancing-sims/phase5/RequestState.java
   trunk/apps/load-balancing-sims/phase5/Response.java
   trunk/apps/load-balancing-sims/phase5/RouteNotFound.java
Modified:
   trunk/apps/load-balancing-sims/phase5/Message.java
   trunk/apps/load-balancing-sims/phase5/Network.java
   trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
   trunk/apps/load-balancing-sims/phase5/Node.java
   trunk/apps/load-balancing-sims/phase5/Packet.java
   trunk/apps/load-balancing-sims/phase5/Peer.java
   trunk/apps/load-balancing-sims/phase5/Sim.java
Log:
Multiple messages per packet

Modified: trunk/apps/load-balancing-sims/phase5/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Message.java  2006-07-26 20:06:51 UTC 
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Message.java  2006-07-26 20:14:47 UTC 
(rev 9782)
@@ -2,12 +2,10 @@

 class Message
 {
-       public int seq; // Sequence number
-       public int size; // Size in bytes
+       public final static int HEADER_SIZE = 30; // Sequence number, MAC, etc
+       public final static int ID_SIZE = 16; // Size of unique request ID
+       public final static int KEY_SIZE = 32; // Size of routing key
+       public final static int DATA_SIZE = 1024; // Size of data block

-       public Message (int seq, int size)
-       {
-               this.seq = seq;
-               this.size = size;
-       }
+       public int size; // Size in bytes
 }

Modified: trunk/apps/load-balancing-sims/phase5/Network.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Network.java  2006-07-26 20:06:51 UTC 
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Network.java  2006-07-26 20:14:47 UTC 
(rev 9782)
@@ -7,7 +7,7 @@
        private static int nextAddress = 0;
        public static boolean reorder = false; // Can packets be reordered?
        public static double lossRate = 0.0; // Random packet loss
-       // FIXME: duplication
+       // FIXME: random packet duplication

        // Deliver a packet to an address
        public static void deliver (Packet p)
@@ -17,13 +17,13 @@
                // If the network allows reordering, randomise the latency a bit
                if (reorder) p.latency *= (0.95 + Math.random() * 0.1);
                if (Math.random() < lossRate) {
-                       Event.log ("packet lost by network");
+                       Event.log (p + " lost by network");
                        return;
                }
                // Schedule the arrival of the packet at the destination
-               Event.schedule (ni, p.latency, NetworkInterface.RX_Q_ADD, p);
+               Event.schedule (ni, p.latency, NetworkInterface.RX_QUEUE, p);
        }
-
+       
        // Attach an interface to the network - returns the address
        public static int register (NetworkInterface ni)
        {

Modified: trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-07-26 
20:06:51 UTC (rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-07-26 
20:14:47 UTC (rev 9782)
@@ -4,7 +4,7 @@
 class NetworkInterface implements EventTarget
 {
        public int address; // Represents an IP address and port
-       private Node owner; // The owner of this network interface
+       private Node node; // The owner of this network interface
        private double txSpeed, rxSpeed; // Bytes per second

        private LinkedList<Packet> txQueue; // Queue of outgoing packets
@@ -12,14 +12,14 @@
        private int txQueueSize, rxQueueSize; // Limited-size drop-tail queues
        private int txQueueMaxSize, rxQueueMaxSize; // Bytes

-       public NetworkInterface (Node owner, double txSpeed, double rxSpeed)
+       public NetworkInterface (Node node, double txSpeed, double rxSpeed)
        {
-               this.owner = owner;
+               this.node = node;
                this.txSpeed = txSpeed;
                this.rxSpeed = rxSpeed;
                txQueue = new LinkedList<Packet>();
                rxQueue = new LinkedList<Packet>();
-               txQueueSize = rxQueueSize = 0; // Bytes
+               txQueueSize = rxQueueSize = 0; // Bytes currently queued
                txQueueMaxSize = 10000;
                rxQueueMaxSize = 20000;
                // Attach the interface to the network
@@ -33,12 +33,12 @@
                p.dest = dest;
                p.latency = latency;
                if (txQueueSize + p.size > txQueueMaxSize) {
-                       Event.log (address + " no room in txQueue");
-                       return; // Packet lost
+                       log ("no room in txQueue, " + p + " lost");
+                       return;
                }
                txQueue.add (p);
                txQueueSize += p.size;
-               Event.log (address + " " + txQueueSize + " bytes in txQueue");
+               log (txQueueSize + " bytes in txQueue");
                // If there are no other packets in the queue, start to transmit
                if (txQueue.size() == 1) txStart (p);
        }
@@ -49,12 +49,12 @@
        private void rxQueueAdd (Packet p)
        {
                if (rxQueueSize + p.size > rxQueueMaxSize) {
-                       Event.log (address + " no room in rxQueue");
-                       return; // Packet lost
+                       log ("no room in rxQueue, " + p + " lost");
+                       return;
                }
                rxQueue.add (p);
                rxQueueSize += p.size;
-               Event.log (address + " " + rxQueueSize + " bytes in rxQueue");
+               log (rxQueueSize + " bytes in rxQueue");
                // If there are no other packets in the queue, start to receive
                if (rxQueue.size() == 1) rxStart (p);
        }
@@ -62,6 +62,7 @@
        // Start receiving a packet
        private void rxStart (Packet p)
        {
+               log ("starting to receive " + p);
                // Delay depends on rx speed
                Event.schedule (this, p.size / rxSpeed, RX_END, p);
        }
@@ -69,7 +70,8 @@
        // Finish receiving a packet, pass it to the node
        private void rxEnd (Packet p)
        {
-               owner.handlePacket (p);
+               log ("finished receiving " + p);
+               node.handlePacket (p);
                // If there's another packet waiting, start to receive it
                try {
                        rxQueueSize -= p.size;
@@ -82,6 +84,7 @@
        // Start transmitting a packet
        private void txStart (Packet p)
        {
+               log ("starting to transmit " + p);
                // Delay depends on tx speed
                Event.schedule (this, p.size / txSpeed, TX_END, p);
        }
@@ -89,6 +92,7 @@
        // Finish transmitting a packet
        private void txEnd (Packet p)
        {
+               log ("finished transmitting " + p);
                Network.deliver (p);
                // If there's another packet waiting, start to transmit it
                try {
@@ -99,26 +103,23 @@
                catch (NoSuchElementException nse) {}
        }

+       private void log (String message)
+       {
+               // Event.log (address + " " + message);
+       }
+       
        // EventTarget interface
        public void handleEvent (int type, Object data)
        {
                switch (type) {
-                       case RX_Q_ADD:
+                       case RX_QUEUE:
                        rxQueueAdd ((Packet) data);
                        break;

-                       case RX_START:
-                       rxStart ((Packet) data);
-                       break;
-                       
                        case RX_END:
                        rxEnd ((Packet) data);
                        break;

-                       case TX_START:
-                       txStart ((Packet) data);
-                       break;
-                       
                        case TX_END:
                        txEnd ((Packet) data);
                        break;
@@ -126,9 +127,7 @@
        }

        // Each EventTarget class has its own event codes
-       public final static int RX_Q_ADD = 1;
-       public final static int RX_START = 2;
-       public final static int RX_END = 3;
-       public final static int TX_START = 4;
-       public final static int TX_END = 5;
+       public final static int RX_QUEUE = 1;
+       private final static int RX_END = 2;
+       private final static int TX_END = 3;
 }

Modified: trunk/apps/load-balancing-sims/phase5/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Node.java     2006-07-26 20:06:51 UTC 
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Node.java     2006-07-26 20:14:47 UTC 
(rev 9782)
@@ -1,23 +1,45 @@
 import java.util.HashMap;
+import java.util.HashSet;

 class Node implements EventTarget
 {
+       public double location; // Routing location
        public NetworkInterface net;
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
-       private int messagesSent = 0;
+       private int requestsGenerated = 0;
+       private HashSet<Integer> recentlySeenRequests; // Request IDs
+       private HashMap<Integer,RequestState> outstandingRequests;
+       public HashSet<Double> cache; // Datastore containing keys

        public Node (double txSpeed, double rxSpeed)
        {
+               location = Math.random();
+               net = new NetworkInterface (this, txSpeed, rxSpeed);
                peers = new HashMap<Integer,Peer>();
-               net = new NetworkInterface (this, txSpeed, rxSpeed);
+               recentlySeenRequests = new HashSet<Integer>();
+               outstandingRequests = new HashMap<Integer,RequestState>();
+               cache = new HashSet<Double>();
        }

        public void connect (Node n, double latency)
        {
-               Peer p = new Peer (n.net.address, latency, this);
+               Peer p = new Peer (this, n.net.address, n.location, latency);
                peers.put (n.net.address, p);
        }

+       public void connectBothWays (Node n, double latency)
+       {
+               connect (n, latency);
+               n.connect (this, latency);
+       }
+       
+       // Returns the circular distance between two locations
+       public static double distance (double a, double b)
+       {
+               if (a > b) return Math.min (a - b, b - a + 1.0);
+               else return Math.min (b - a, a - b + 1.0);
+       }
+       
        // Called by NetworkInterface
        public void handlePacket (Packet packet)
        {
@@ -27,39 +49,103 @@
        }

        // Called by Peer
-       public void messagesWaiting (Peer p)
+       public void handleMessage (Message m, Peer prev)
        {
-               for (Message m = p.receiveMessage(); m != null; m = 
p.receiveMessage())
-                       log ("received message " + m.seq + ", " + m.size + " 
bytes");
+               log ("received " + m);
+               // FIXME: ugly
+               if (m instanceof Request)
+                       handleRequest ((Request) m, prev);
+               else if (m instanceof Response)
+                       handleResponse ((Response) m);
+               else if (m instanceof RouteNotFound)
+                       handleRouteNotFound ((RouteNotFound) m);
        }

+       private void handleRequest (Request r, Peer prev)
+       {
+               if (!recentlySeenRequests.add (r.id)) {
+                       log ("rejecting recently seen " + r);
+                       prev.sendMessage (new RouteNotFound (r.id));
+                       // Don't forward the request to prev, it's seen it
+                       RequestState rs = outstandingRequests.get (r.id);
+                       if (rs != null) rs.nexts.remove (prev);
+                       return;
+               }
+               if (cache.contains (r.key)) {
+                       log ("key " + r.key + " found in cache");
+                       if (prev == null)
+                               log (r + " succeeded locally");
+                       else prev.sendMessage (new Response (r.id, r.key));
+                       return;
+               }
+               log ("key " + r.key + " not found in cache");
+               forwardRequest (new RequestState (r, prev, peers.values()));
+       }
+       
+       private void handleResponse (Response r)
+       {
+               RequestState rs = outstandingRequests.remove (r.id);
+               if (rs == null) {
+                       log ("unexpected " + r);
+                       return;
+               }
+               cache.add (r.key);
+               if (rs.prev == null) log (rs + " succeeded");
+               else {
+                       log ("forwarding " + r);
+                       rs.prev.sendMessage (r);
+               }
+       }
+       
+       private void handleRouteNotFound (RouteNotFound r)
+       {
+               RequestState rs = outstandingRequests.remove (r.id);
+               if (rs == null) {
+                       log ("unexpected route not found " + r.id);
+                       return;
+               }
+               forwardRequest (rs);
+       }
+       
+       private void forwardRequest (RequestState rs)
+       {
+               Peer next = rs.closestPeer();
+               if (next == null) {
+                       log ("route not found for " + rs);
+                       if (rs.prev == null)
+                               log (rs + " failed");
+                       else rs.prev.sendMessage (new RouteNotFound (rs.id));
+                       return;
+               }
+               log ("forwarding " + rs + " to " + next.address);
+               next.sendMessage (new Request (rs.id, rs.key));
+               rs.nexts.remove (next);
+               outstandingRequests.put (rs.id, rs);
+       }
+       
        private void log (String message)
        {
                Event.log (net.address + " " + message);
        }

        // Event callback
-       private void sendMessages()
+       private void generateRequest()
        {
-               // Send a message to each peer
-               for (Peer p : peers.values()) {
-                       int size = (int) (Math.random() * 2500);
-                       Message m = new Message (messagesSent, size);
-                       log ("sending message " + m.seq + ", " + m.size + " 
bytes");
-                       p.sendMessage (m);
-               }
-               // Send a total of 1000 messages to each peer
-               messagesSent++;
-               if (messagesSent < 1000)
-                       Event.schedule (this, 0.1, SEND_MESSAGES, null);
+               if (requestsGenerated++ > 1000) return;
+               // Send a request to a random location
+               Request r = new Request (0.1);
+               log ("generating request " + r.id);
+               handleRequest (r, null);
+               // Schedule the next request
+               // Event.schedule (this, Math.random(), GENERATE_REQUEST, null);
        }

        // EventTarget interface
        public void handleEvent (int type, Object data)
        {
-               if (type == SEND_MESSAGES) sendMessages();
+               if (type == GENERATE_REQUEST) generateRequest();
        }

        // Each EventTarget class has its own event codes
-       public final static int SEND_MESSAGES = 1;
+       public final static int GENERATE_REQUEST = 1;
 }

Modified: trunk/apps/load-balancing-sims/phase5/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Packet.java   2006-07-26 20:06:51 UTC 
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Packet.java   2006-07-26 20:14:47 UTC 
(rev 9782)
@@ -1,11 +1,5 @@
-// A low-level packet containing one or more complete or incomplete messages
+// A low-level packet (as opposed to a high-level message)

-// In real life the payload would be an array of bytes, but in the sim the
-// payload is represented by an ArrayList of Messages. Large messages can be
-// split across more than one packet, in which case the message only appears
-// in the payload of the *last* packet. This means it's possible for a full
-// packet to have an apparently empty payload.
-
 import java.util.ArrayList;

 abstract class Packet
@@ -14,7 +8,6 @@
        public final static int MAX_PAYLOAD = 1400;

        public int src, dest; // Network addresses
-       public int type; // Data, ack, etc
        public int size; // Packet size in bytes, including headers
        public int seq; // Sequence number or explicit ack
        public double latency; // Link latency (stored here for convenience)
@@ -22,14 +15,27 @@

 class DataPacket extends Packet
 {
-       public ArrayList messages; // Payload   
+       public ArrayList<Message> messages = null; // Payload   
        public double sent; // Time at which the packet was (re)transmitted

        public DataPacket (int dataSize)
        {
                size = dataSize + HEADER_SIZE;
-               messages = new ArrayList();
        }
+       
+       /*
+       In real life the payload would be an array of bytes, but here the 
+       payload is represented by an ArrayList of Messages. A large message can
+       be split across more than one packet, in which case the message only
+       appears in the payload of the *last* packet. This means it's possible
+       for a full packet to have an apparently empty payload.
+       */
+       
+       public void addMessage (Message m)
+       {
+               if (messages == null) messages = new ArrayList<Message>();
+               messages.add (m);
+       }
 }

 class Ack extends Packet

Modified: trunk/apps/load-balancing-sims/phase5/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Peer.java     2006-07-26 20:06:51 UTC 
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Peer.java     2006-07-26 20:14:47 UTC 
(rev 9782)
@@ -4,10 +4,14 @@

 class Peer implements EventTarget
 {
+       private Node node; // The local node
        public int address; // The remote node's address
-       private double latency; // Latency of the connection in seconds
-       private Node owner; // The local node
+       public double location; // The remote node's routing location
+       private double latency; // The latency of the connection in seconds

+       // Nagle's algorithm
+       public final static int SENSIBLE_PAYLOAD = 1000; // Minimum packet size
+       
        // Retransmission parameters
        public final static double TIMER = 0.5; // Coarse-grained timer, seconds
        public final static double RTO = 4.0; // Retransmission timeout in RTTs
@@ -28,48 +32,40 @@
        private boolean slowStart = true; // Are we in the slow start phase?
        private double rtt = 3.0; // Estimated round-trip time in seconds
        private double lastTransmission = 0.0; // Clock time
-       private double lastCongestionDecrease = 0.0; // Clock time
        private boolean timerRunning = false; // Is the retx timer running?
        private int inflight = 0; // Bytes sent but not acked
        private int txSeq = 0; // Sequence number of next outgoing packet
        private LinkedList<DataPacket> txBuffer; // Retransmission buffer
        private LinkedList<Message> txQueue; // Messages waiting to be sent
        private int txQueueSize = 0; // Size of transmission queue in bytes
-       private int txRemaining = 0; // Bytes of current message unsent
+       private int txHeadSize = 0; // Size of first message in transmission q

        // Receiver state
        private int rxSeq = 0; // Sequence number of next in-order packet
        private LinkedList<DataPacket> rxBuffer; // Reassembly buffer
        private int rxBufferSize = 0; // Size of reassembly buffer in bytes
-       private LinkedList<Message> rxQueue; // Messages waiting to be collected

-       public Peer (int address, double latency, Node owner)
+       public Peer (Node node, int address, double location, double latency)
        {
+               this.node = node;
                this.address = address;
+               this.location = location;
                this.latency = latency;
-               this.owner = owner;
                txBuffer = new LinkedList<DataPacket>();
                txQueue = new LinkedList<Message>();
                rxBuffer = new LinkedList<DataPacket>();
-               rxQueue = new LinkedList<Message>();
        }

-       // Returns the first message in the queue or null if the queue is empty
-       public Message receiveMessage()
-       {
-               try { return rxQueue.removeFirst(); }
-               catch (NoSuchElementException nse) { return null; }
-       }
-       
        // Queue a message for transmission
        public void sendMessage (Message m)
        {
+               log (m + " added to transmission queue");
                // Warning: until token-passing is implemented the length of
                // the transmission queue is unlimited
-               if (txQueue.isEmpty()) txRemaining = m.size;
+               if (txQueue.isEmpty()) txHeadSize = m.size;
                txQueue.add (m);
                txQueueSize += m.size;
-               log (txQueue.size() + " messages waiting to be sent");
+               log (txQueue.size() + " messages in transmission queue");
                // Send as many packets as possible
                while (send());
        }
@@ -82,7 +78,7 @@
                        return false;
                }

-               if (inflight == cwind) {
+               if (cwind - inflight <= Packet.HEADER_SIZE) {
                        log ("no room in congestion window");
                        return false;
                }
@@ -97,42 +93,39 @@
                lastTransmission = now;

                // Work out how large a packet we can send
-               int size = Packet.MAX_PAYLOAD;
-               if (size > txQueueSize) size = txQueueSize;
-               if (size > cwind - inflight) size = (int) cwind - inflight;
+               int payload = Packet.MAX_PAYLOAD;
+               if (payload > txQueueSize) payload = txQueueSize;
+               if (payload > cwind - inflight - Packet.HEADER_SIZE)
+                       payload = (int) cwind - inflight - Packet.HEADER_SIZE;

                // Nagle's algorithm - try to coalesce small packets
-               if (size < Packet.MAX_PAYLOAD && inflight > 0) {
-                       log ("delaying transmission of " + size + " bytes");
+               if (payload < SENSIBLE_PAYLOAD && inflight > 0) {
+                       log ("delaying transmission of " + payload + " bytes");
                        return false;
                }

-               DataPacket p = new DataPacket (size);
                // Put as many messages as possible in the packet
-               while (txRemaining <= size) {
+               DataPacket p = new DataPacket (payload);
+               while (payload >= txHeadSize) {
                        try {
                                Message m = txQueue.removeFirst();
-                               p.messages.add (m);
-                               size -= txRemaining;
-                               txQueueSize -= txRemaining;
+                               p.addMessage (m);
+                               payload -= txHeadSize;
+                               txQueueSize -= txHeadSize;
                                // Move on to the next message
-                               txRemaining = txQueue.getFirst().size;
+                               txHeadSize = txQueue.getFirst().size;
                        }
                        catch (NoSuchElementException nse) {
                                // No more messages in the txQueue
-                               txRemaining = 0;
+                               txHeadSize = 0;
                                break;
                        }
                }
-               // Fill the rest of the packet with part of the current message
-               if (txRemaining > 0) {
-                       txRemaining -= size;
-                       txQueueSize -= size;
-               }
+               
                // Send the packet
                p.seq = txSeq++;
                log ("sending packet " + p.seq + ", " + p.size + " bytes");
-               owner.net.send (p, address, latency);
+               node.net.send (p, address, latency);
                // Buffer the packet for retransmission
                p.sent = now;
                inflight += p.size;
@@ -140,7 +133,7 @@
                txBuffer.add (p);
                // Start the coarse-grained retransmission timer if necessary
                if (!timerRunning) {
-                       log ("starting timer");
+                       log ("starting retransmission timer");
                        Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
                        timerRunning = true;
                }
@@ -151,7 +144,7 @@
        {
                Ack a = new Ack (seq);
                log ("sending ack " + seq);
-               owner.net.send (a, address, latency);
+               node.net.send (a, address, latency);
        }

        // Called by Node when a packet arrives
@@ -167,8 +160,8 @@
                // Is this the packet we've been waiting for?
                if (p.seq == rxSeq) {
                        log ("packet in order");
+                       unpack (p);
                        rxSeq++;
-                       rxQueue.addAll (p.messages);
                        // Reassemble contiguous packets
                        Iterator<DataPacket> i = rxBuffer.iterator();
                        while (i.hasNext()) {
@@ -177,15 +170,13 @@
                                        log ("adding packet " + q.seq);
                                        i.remove();
                                        rxBufferSize -= q.size;
-                                       rxQueue.addAll (p.messages);
+                                       unpack (q);
                                        rxSeq++;
                                }
                                else break;
                        }
                        log (rxBufferSize + " bytes buffered");
                        log ("expecting packet " + rxSeq);
-                       // Tell the node there are messages to be collected
-                       owner.messagesWaiting (this);
                }
                else if (p.seq > rxSeq) {
                        log ("packet out of order, expected " + rxSeq);
@@ -210,14 +201,8 @@
                        rxBuffer.add (index, p);
                        rxBufferSize += p.size;
                        log (rxBufferSize + " bytes buffered");
-                       // DEBUG
-                       if (!rxBuffer.isEmpty()) {
-                               for (Packet z : rxBuffer)
-                                       System.out.print (z.seq + " ");
-                               System.out.println();
-                       }
                }
-               else log ("duplicate packet " + p.seq);
+               else log ("duplicate packet " + p.seq); // p.seq < rxSeq
                sendAck (p.seq); // Ack may have been lost
        }

@@ -225,7 +210,6 @@
        {
                log ("received ack " + a.seq);
                double now = Event.time();
-               boolean windowIncreased = false;
                Iterator<DataPacket> i = txBuffer.iterator();
                while (i.hasNext()) {
                        DataPacket p = i.next();
@@ -245,7 +229,6 @@
                                rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
                                log ("round-trip time " + age);
                                log ("average round-trip time " + rtt);
-                               windowIncreased = true;
                                break;
                        }
                        // Fast retransmission
@@ -253,18 +236,16 @@
                                p.sent = now;
                                log ("fast retransmitting packet " + p.seq);
                                log (inflight + " bytes in flight");
-                               owner.net.send (p, address, latency);
+                               node.net.send (p, address, latency);
                                decreaseCongestionWindow (now);
                        }
                }
-               if (windowIncreased) while (send());
+               // Send as many packets as possible
+               while (send());
        }

        private void decreaseCongestionWindow (double now)
        {
-               // The congestion window should only be decreased once per RTT
-               if (now - lastCongestionDecrease < rtt) return;
-               lastCongestionDecrease = now;
                cwind *= BETA;
                if (cwind < MIN_CWIND) cwind = MIN_CWIND;
                log ("congestion window decreased to " + cwind);
@@ -275,9 +256,16 @@
                }
        }

+       // Remove messages from a packet and deliver them to the node
+       private void unpack (DataPacket p)
+       {
+               if (p.messages == null) return;
+               for (Message m : p.messages) node.handleMessage (m, this);
+       }
+       
        private void log (String message)
        {
-               Event.log (owner.net.address + ":" + address + " " + message);
+               // Event.log (node.net.address + ":" + address + " " + message);
        }

        // Event callback
@@ -286,18 +274,19 @@
                log ("checking timeouts");
                // If there are no packets in flight, stop the timer
                if (txBuffer.isEmpty()) {
-                       log ("stopping timer");
+                       log ("stopping retransmission timer");
                        timerRunning = false;
                        return;
                }
                double now = Event.time();
                for (DataPacket p : txBuffer) {
-                       // Slow retransmission
                        if (now - p.sent > RTO * rtt) {
+                               // Retransmission timeout
                                p.sent = now;
                                log ("retransmitting packet " + p.seq);
                                log (inflight + " bytes in flight");
-                               owner.net.send (p, address, latency);
+                               node.net.send (p, address, latency);
+                               // Note: TCP would return to slow start
                                decreaseCongestionWindow (now);
                        }
                }

Added: trunk/apps/load-balancing-sims/phase5/Request.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Request.java  2006-07-26 20:06:51 UTC 
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Request.java  2006-07-26 20:14:47 UTC 
(rev 9782)
@@ -0,0 +1,28 @@
+class Request extends Message
+{
+       private static int nextId = 0;
+       
+       public final int id; // The unique ID of the request
+       public final double key; // The requested key (as a routing location)
+       
+       // Start a new request
+       public Request (double key)
+       {
+               id = nextId++;
+               this.key = key;
+               size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+       }
+       
+       // Forward a request
+       public Request (int id, double key)
+       {
+               this.id = id;
+               this.key = key;
+               size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("request (" + id + "," + key + ")");
+       }
+}

Added: trunk/apps/load-balancing-sims/phase5/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/RequestState.java     2006-07-26 
20:06:51 UTC (rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/RequestState.java     2006-07-26 
20:14:47 UTC (rev 9782)
@@ -0,0 +1,41 @@
+// The state of an outstanding request, stored at each node along the path
+
+import java.util.HashSet;
+import java.util.Collection;
+
+class RequestState
+{
+       public final int id; // The unique ID of the request
+       public final double key; // The requested key (as a routing location)
+       public final Peer prev; // The previous hop of the request
+       public final HashSet<Peer> nexts; // Possible next hops
+       
+       public RequestState (Request r, Peer prev, Collection<Peer> peers)
+       {
+               id = r.id;
+               key = r.key;
+               this.prev = prev;
+               nexts = new HashSet<Peer> (peers);
+               if (prev != null) nexts.remove (prev);
+       }
+       
+       // Returns the closest peer to the requested key
+       public Peer closestPeer()
+       {
+               double bestDist = Double.POSITIVE_INFINITY;
+               Peer bestPeer = null;
+               for (Peer peer : nexts) {
+                       double dist = Node.distance (key, peer.location);
+                       if (dist < bestDist) {
+                               bestDist = dist;
+                               bestPeer = peer;
+                       }
+               }
+               return bestPeer; // Null if list was empty
+       }
+       
+       public String toString()
+       {
+               return new String ("request (" + id + "," + key + ")");
+       }
+}

Added: trunk/apps/load-balancing-sims/phase5/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Response.java 2006-07-26 20:06:51 UTC 
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Response.java 2006-07-26 20:14:47 UTC 
(rev 9782)
@@ -0,0 +1,18 @@
+class Response extends Message
+{
+       public final int id; // The unique ID of the request
+       public final double key; // The requested key (as a routing location)
+       
+       public Response (int id, double key)
+       {
+               this.id = id;
+               this.key = key;
+               size = Message.HEADER_SIZE + Message.ID_SIZE +
+                       Message.KEY_SIZE + Message.DATA_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("response (" + id + "," + key + ")");
+       }
+}

Added: trunk/apps/load-balancing-sims/phase5/RouteNotFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/RouteNotFound.java    2006-07-26 
20:06:51 UTC (rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/RouteNotFound.java    2006-07-26 
20:14:47 UTC (rev 9782)
@@ -0,0 +1,18 @@
+// Note: for the purposes of this simulation, RejectedLoop and RouteNotFound
+// are equivalent
+
+class RouteNotFound extends Message
+{
+       public final int id; // The unique ID of the request
+       
+       public RouteNotFound (int id)
+       {
+               this.id = id;
+               size = Message.HEADER_SIZE + Message.ID_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("route not found (" + id + ")");
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase5/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Sim.java      2006-07-26 20:06:51 UTC 
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Sim.java      2006-07-26 20:14:47 UTC 
(rev 9782)
@@ -1,6 +1,6 @@
 // Interesting parameters to play with: txSpeed and rxSpeed, retransmission
-// timeout, window size, AIMD increase and decrease (Peer.java), queue size
-// (NetworkInterface.java), packet size (Node.java).
+// timeout, window sizes, AIMD increase and decrease (Peer.java), queue sizes
+// (NetworkInterface.java), packet size (Packet.java).

 class Sim
 {
@@ -15,18 +15,15 @@

                Node n0 = new Node (txSpeed, rxSpeed);
                Node n1 = new Node (txSpeed, rxSpeed);
-               // Node n2 = new Node (txSpeed, rxSpeed);
+               Node n2 = new Node (txSpeed, rxSpeed);
+               Node n3 = new Node (txSpeed, rxSpeed);

-               n0.connect (n1, 0.1);
-               // n0.connect (n2, 0.1);
-               n1.connect (n0, 0.1);
-               // n1.connect (n2, 0.1);
-               // n2.connect (n0, 0.1);
-               // n2.connect (n1, 0.1);
+               n0.connectBothWays (n1, 0.1);
+               n1.connectBothWays (n2, 0.1);
+               n1.connectBothWays (n3, 0.1);
+               n2.connectBothWays (n3, 0.1);

-               Event.schedule (n0, Math.random(), Node.SEND_MESSAGES, null);
-               // Event.schedule (n1, Math.random(), Node.SEND_MESSAGES, null);
-               // Event.schedule (n2, Math.random(), Node.SEND_MESSAGES, null);
+               Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, null);

                // Run the simulation
                Event.run();


Reply via email to