Author: mrogers
Date: 2006-08-21 18:15:04 +0000 (Mon, 21 Aug 2006)
New Revision: 10226

Added:
   trunk/apps/load-balancing-sims/phase6/AckQueue.java
Removed:
   trunk/apps/load-balancing-sims/phase6/messages/Ack.java
Modified:
   trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
   trunk/apps/load-balancing-sims/phase6/Node.java
   trunk/apps/load-balancing-sims/phase6/Packet.java
   trunk/apps/load-balancing-sims/phase6/Peer.java
   trunk/apps/load-balancing-sims/phase6/RequestState.java
   trunk/apps/load-balancing-sims/phase6/Sim.java
   trunk/apps/load-balancing-sims/phase6/messages/Block.java
   trunk/apps/load-balancing-sims/phase6/messages/Message.java
   trunk/apps/load-balancing-sims/phase6/messages/Request.java
   trunk/apps/load-balancing-sims/phase6/messages/Response.java
   trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
Log:
Multi-block transfers

Added: trunk/apps/load-balancing-sims/phase6/AckQueue.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/AckQueue.java 2006-08-21 18:13:33 UTC 
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/AckQueue.java 2006-08-21 18:15:04 UTC 
(rev 10226)
@@ -0,0 +1,32 @@
+// A queue storing outgoing acks and their coalescing deadlines
+
+import java.util.LinkedList;
+
+class AckQueue
+{
+       public int size = 0; // Size in bytes
+       private LinkedList<Integer> acks = new LinkedList<Integer>();
+       private LinkedList<Double> deadlines = new LinkedList<Double>();
+       
+       public void add (int ack, double deadline)
+       {
+               size += Packet.ACK_SIZE;
+               acks.add (ack);
+               deadlines.add (deadline);
+       }
+       
+       public double deadline()
+       {
+               Double deadline = deadlines.peek();
+               if (deadline == null) return Double.POSITIVE_INFINITY;
+               else return deadline;
+       }
+       
+       public Integer pop()
+       {
+               deadlines.poll();
+               Integer ack = acks.poll();
+               if (ack != null) size -= Packet.ACK_SIZE;
+               return ack;
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java    2006-08-21 
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java    2006-08-21 
18:15:04 UTC (rev 10226)
@@ -1,19 +1,18 @@
-// A queue storing outgoing messages (including acks and transfers) and their
-// coalescing deadlines
+// A queue storing outgoing messages and their coalescing deadlines

 import java.util.LinkedList;
 import messages.Message;

-class DeadlineQueue<T extends Message>
+class DeadlineQueue
 {
        public int size = 0; // Size in bytes
-       private LinkedList<T> messages = new LinkedList<T>();
+       private LinkedList<Message> messages = new LinkedList<Message>();
        private LinkedList<Double> deadlines = new LinkedList<Double>();

-       public void add (T message, double deadline)
+       public void add (Message m, double deadline)
        {
-               size += message.size;
-               messages.add (message);
+               size += m.size;
+               messages.add (m);
                deadlines.add (deadline);
        }

@@ -25,16 +24,16 @@

        public double deadline()
        {
-               Double d = deadlines.peek();
-               if (d == null) return Double.POSITIVE_INFINITY;
-               else return d;
+               Double deadline = deadlines.peek();
+               if (deadline == null) return Double.POSITIVE_INFINITY;
+               else return deadline;
        }

-       public T pop()
+       public Message pop()
        {
                deadlines.poll();
-               T message = messages.poll();
-               size -= message.size;
-               return message;
+               Message m = messages.poll();
+               if (m != null) size -= m.size;
+               return m;
        }
 }

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-21 18:13:33 UTC 
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-21 18:15:04 UTC 
(rev 10226)
@@ -17,9 +17,8 @@
        public double location; // Routing location
        public NetworkInterface net;
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
-       private int requestsGenerated = 0;
        private HashSet<Integer> recentlySeenRequests; // Request IDs
-       private HashMap<Integer,RequestState> outstandingRequests;
+       private HashMap<Integer,RequestState> outstandingRequests; // By ID
        public LruCache<Integer> cache; // Datastore containing keys
        public TokenBucket bandwidth; // Bandwidth limiter
        private boolean timerRunning = false; // Is the timer running?
@@ -87,13 +86,15 @@
        public void handleMessage (Message m, Peer prev)
        {
                log ("received " + m);
-               // FIXME: ugly
-               if (m instanceof Request)
-                       handleRequest ((Request) m, prev);
-               else if (m instanceof Response)
-                       handleResponse ((Response) m);
-               else if (m instanceof RouteNotFound)
-                       handleRouteNotFound ((RouteNotFound) m);
+               if (m instanceof Request) handleRequest ((Request) m, prev);
+               else {
+                       RequestState rs = outstandingRequests.get (m.id);
+                       if (rs == null) log ("unexpected " + m);
+                       else if (m instanceof Response)
+                               handleResponse ((Response) m, rs);
+                       else if (m instanceof RouteNotFound)
+                               handleRouteNotFound ((RouteNotFound) m, rs);
+               }
        }

        private void handleRequest (Request r, Peer prev)
@@ -101,7 +102,8 @@
                if (!recentlySeenRequests.add (r.id)) {
                        log ("rejecting recently seen " + r);
                        prev.sendMessage (new RouteNotFound (r.id));
-                       // Don't forward the request to prev, it's seen it
+                       // Optimisation: prev has seen the request, so remove
+                       // it from the list of potential next hops
                        RequestState rs = outstandingRequests.get (r.id);
                        if (rs != null) rs.nexts.remove (prev);
                        return;
@@ -109,35 +111,32 @@
                if (cache.get (r.key)) {
                        log ("key " + r.key + " found in cache");
                        if (prev == null) log (r + " succeeded locally");
-                       else prev.sendMessage (new Response (r.id, r.key));
+                       else for (int i = 0; i < 32; i++)
+                               prev.sendBlock (new Response (r.id, i));
                        return;
                }
                log ("key " + r.key + " not found in cache");
-               forwardRequest (new RequestState (r, prev, peers.values()));
+               forwardRequest (new RequestState (r, prev, shufflePeers()));
        }

-       private void handleResponse (Response r)
+       private void handleResponse (Response r, RequestState rs)
        {
-               RequestState rs = outstandingRequests.remove (r.id);
-               if (rs == null) {
-                       log ("unexpected " + r);
-                       return;
+               rs.state = RequestState.TRANSFERRING;
+               if (rs.receivedBlock (r.index)) return; // Ignore duplicates
+               if (rs.receivedAll()) {
+                       cache.put (rs.key);
+                       if (rs.prev == null) log (rs + " succeeded");
+                       outstandingRequests.remove (rs.id);
                }
-               cache.put (r.key);
-               if (rs.prev == null) log (rs + " succeeded");
-               else {
+               // Forward the block
+               if (rs.prev != null) {
                        log ("forwarding " + r);
-                       rs.prev.sendMessage (r);
+                       rs.prev.sendBlock (r);
                }
        }

-       private void handleRouteNotFound (RouteNotFound r)
+       private void handleRouteNotFound (RouteNotFound r, RequestState rs)
        {
-               RequestState rs = outstandingRequests.remove (r.id);
-               if (rs == null) {
-                       log ("unexpected route not found " + r.id);
-                       return;
-               }
                forwardRequest (rs);
        }

@@ -148,39 +147,43 @@
                        log ("route not found for " + rs);
                        if (rs.prev == null) log (rs + " failed");
                        else rs.prev.sendMessage (new RouteNotFound (rs.id));
-                       return;
+                       outstandingRequests.remove (rs.id);
                }
-               log ("forwarding " + rs + " to " + next.address);
-               next.sendMessage (new Request (rs.id, rs.key));
-               rs.nexts.remove (next);
-               outstandingRequests.put (rs.id, rs);
+               else {
+                       log ("forwarding " + rs + " to " + next.address);
+                       next.sendMessage (new Request (rs.id, rs.key));
+                       rs.nexts.remove (next);
+                       outstandingRequests.put (rs.id, rs);
+               }
        }

+       // Return the list of peers in a random order
+       private ArrayList<Peer> shufflePeers()
+       {
+               ArrayList<Peer> copy = new ArrayList<Peer> (peers.values());
+               Collections.shuffle (copy);
+               return copy;
+       }
+       
        private void log (String message)
        {
                Event.log (net.address + " " + message);
        }

        // Event callback
-       private void generateRequest()
+       private void generateRequest (int key)
        {
-               for (int i = 0; i < 10000; i++) {
-                       // Send a request to a random location
-                       Request r = new Request (locationToKey (Math.random()));
-                       log ("generating request " + r.id);
-                       handleRequest (r, null);
-               }
+               Request r = new Request (key);
+               log ("generating request " + r.id);
+               handleRequest (r, null);
        }

        // Event callback
        private void checkTimeouts()
        {
                // Check the peers in a random order each time
-               ArrayList<Peer> shuffled = new ArrayList<Peer> (peers.values());
-               Collections.shuffle (shuffled);
-               
                double deadline = Double.POSITIVE_INFINITY;
-               for (Peer p : shuffled)
+               for (Peer p : shufflePeers())
                        deadline = Math.min (deadline, p.checkTimeouts());
                if (deadline == Double.POSITIVE_INFINITY) {
                        log ("stopping retransmission/coalescing timer");
@@ -197,7 +200,7 @@
        // EventTarget interface
        public void handleEvent (int type, Object data)
        {
-               if (type == GENERATE_REQUEST) generateRequest();
+               if (type == GENERATE_REQUEST) generateRequest ((Integer) data);
                else if (type == CHECK_TIMEOUTS) checkTimeouts();
        }


Modified: trunk/apps/load-balancing-sims/phase6/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Packet.java   2006-08-21 18:13:33 UTC 
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/Packet.java   2006-08-21 18:15:04 UTC 
(rev 10226)
@@ -2,28 +2,28 @@

 import java.util.ArrayList;
 import messages.Message;
-import messages.Ack;

 class Packet
 {
        public final static int HEADER_SIZE = 80; // Including IP & UDP headers
+       public final static int ACK_SIZE = 4; // Size of an ack in bytes
        public final static int MAX_SIZE = 1450; // MTU including headers
        public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing

        public int src, dest; // Network addresses
        public int size = HEADER_SIZE; // Size in bytes, including headers
        public int seq = -1; // Data sequence number (-1 if no data)
-       public ArrayList<Ack> acks = null;
+       public ArrayList<Integer> acks = null;
        public ArrayList<Message> messages = null;

        public double sent; // Time at which the packet was (re) transmitted
        public double latency; // Link latency (stored here for convenience)

-       public void addAck (Ack a)
+       public void addAck (Integer ack)
        {
-               if (acks == null) acks = new ArrayList<Ack>();
-               acks.add (a);
-               size += a.size;
+               if (acks == null) acks = new ArrayList<Integer>();
+               acks.add (ack);
+               size += ACK_SIZE;
        }

        public void addMessage (Message m)

Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-21 18:13:33 UTC 
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-21 18:15:04 UTC 
(rev 10226)
@@ -1,7 +1,6 @@
 import java.util.LinkedList;
 import java.util.Iterator;
 import java.util.HashSet;
-import messages.Ack;
 import messages.Message;
 import messages.Block;

@@ -29,9 +28,9 @@
        private int txSeq = 0; // Sequence number of next outgoing data packet
        private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
        private LinkedList<Packet> txBuffer; // Retransmission buffer
-       private DeadlineQueue<Ack> ackQueue; // Outgoing acks
-       private DeadlineQueue<Message> searchQueue; // Outgoing search messages
-       private DeadlineQueue<Block> transferQueue; // Outgoing transfers
+       private AckQueue ackQueue; // Outgoing acks
+       private DeadlineQueue searchQueue; // Outgoing search messages
+       private DeadlineQueue transferQueue; // Outgoing transfers
        private boolean tgif = false; // "Transfers go in first" toggle
        private CongestionWindow window; // AIMD congestion window
        private double lastTransmission = 0.0; // Clock time
@@ -47,9 +46,9 @@
                this.location = location;
                this.latency = latency;
                txBuffer = new LinkedList<Packet>();
-               ackQueue = new DeadlineQueue<Ack>();
-               searchQueue = new DeadlineQueue<Message>();
-               transferQueue = new DeadlineQueue<Block>();
+               ackQueue = new AckQueue();
+               searchQueue = new DeadlineQueue();
+               transferQueue = new DeadlineQueue();
                window = new CongestionWindow (this);
                rxDupe = new HashSet<Integer>();
        }
@@ -80,7 +79,7 @@
        private void sendAck (int seq)
        {
                log ("ack " + seq + " added to ack queue");
-               ackQueue.add (new Ack (seq), Event.time() + MAX_DELAY);
+               ackQueue.add (seq, Event.time() + MAX_DELAY);
                // Start the node's timer if necessary
                node.startTimer();
                // Send as many packets as possible
@@ -155,7 +154,7 @@
        public void handlePacket (Packet p)
        {
                if (p.messages != null) handleData (p);
-               if (p.acks != null) for (Ack a : p.acks) handleAck (a.seq);
+               if (p.acks != null) for (int ack : p.acks) handleAck (ack);
        }

        private void handleData (Packet p)

Modified: trunk/apps/load-balancing-sims/phase6/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestState.java     2006-08-21 
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/RequestState.java     2006-08-21 
18:15:04 UTC (rev 10226)
@@ -6,10 +6,16 @@

 class RequestState
 {
+       // State machine
+       public final static int REQUEST_SENT = 1;
+       public final static int TRANSFERRING = 2;
+       
        public final int id; // The unique ID of the request
        public final int key; // The requested key
        public final Peer prev; // The previous hop of the request
        public final HashSet<Peer> nexts; // Possible next hops
+       public int state = REQUEST_SENT; // State machine
+       private int blockBitmap = 0; // Bitmap of received blocks

        public RequestState (Request r, Peer prev, Collection<Peer> peers)
        {
@@ -17,7 +23,7 @@
                key = r.key;
                this.prev = prev;
                nexts = new HashSet<Peer> (peers);
-               if (prev != null) nexts.remove (prev);
+               nexts.remove (prev);
        }

        // Find the closest peer to the requested key
@@ -36,6 +42,20 @@
                return bestPeer; // Null if list was empty
        }

+       // Mark a block as received, return true if it's a duplicate
+       public boolean receivedBlock (int index)
+       {
+               boolean duplicate = (blockBitmap & 1 << index) != 0;
+               blockBitmap |= 1 << index;
+               return duplicate;
+       }
+       
+       // Return true if all blocks have been received
+       public boolean receivedAll()
+       {
+               return blockBitmap == 0xFFFFFFFF;
+       }
+       
        public String toString()
        {
                return new String ("request (" + id + "," + key + ")");

Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java      2006-08-21 18:13:33 UTC 
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java      2006-08-21 18:15:04 UTC 
(rev 10226)
@@ -23,7 +23,11 @@
                n1.connectBothWays (n3, 0.001);
                n2.connectBothWays (n3, 0.001);

-               Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, null);
+               for (int i = 0; i < 5; i++) {
+                       int key = Node.locationToKey (Math.random());
+                       n3.cache.put (key);
+                       Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, key);
+               }

                // Run the simulation
                Event.run();

Deleted: trunk/apps/load-balancing-sims/phase6/messages/Ack.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Ack.java     2006-08-21 
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Ack.java     2006-08-21 
18:15:04 UTC (rev 10226)
@@ -1,17 +0,0 @@
-// Note: acks are not FNP messages, they're only here because it makes the
-// implementation simpler
-
-package messages;
-
-public class Ack extends Message
-{
-       public final static int SIZE = 4; // Bytes
-       
-       public final int seq; // Sequence number of the acknowledged packet
-       
-       public Ack (int seq)
-       {
-               this.seq = seq;
-               size = SIZE;
-       }
-}

Modified: trunk/apps/load-balancing-sims/phase6/messages/Block.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Block.java   2006-08-21 
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Block.java   2006-08-21 
18:15:04 UTC (rev 10226)
@@ -1,14 +1,15 @@
-// A single block of a data transfer (currently 32 blocks per transfer)
+// A single block of a multi-block transfer (currently 32 blocks per transfer)

 package messages;

 public class Block extends Message
 {
-       public final static int SIZE = 1024; // Bytes
+       public final int index; // Index of this block from 0-31

-       // FIXME: placeholder
-       public Block()
+       public Block (int id, int index)
        {
-               size = SIZE;
+               this.id = id;
+               this.index = index;
+               size = Message.HEADER_SIZE + Message.DATA_SIZE;
        }
 }

Modified: trunk/apps/load-balancing-sims/phase6/messages/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21 
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21 
18:15:04 UTC (rev 10226)
@@ -4,9 +4,10 @@

 public class Message
 {
-       public final static int HEADER_SIZE = 4; // Message type etc
-       public final static int ID_SIZE = 8; // Size of unique request ID
-       public final static int KEY_SIZE = 32; // Size of routing key
+       public final static int HEADER_SIZE = 12; // Bytes, including unique ID
+       public final static int KEY_SIZE = 32; // Size of routing key, bytes
+       public final static int DATA_SIZE = 1024; // Size of data block, bytes

        public int size; // Size in bytes
+       public int id; // Unique request ID
 }

Modified: trunk/apps/load-balancing-sims/phase6/messages/Request.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-21 
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-21 
18:15:04 UTC (rev 10226)
@@ -4,7 +4,6 @@
 {
        private static int nextId = 0;

-       public final int id; // The unique ID of the request
        public final int key; // The requested key

        // Start a new request
@@ -12,7 +11,7 @@
        {
                id = nextId++;
                this.key = key;
-               size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+               size = Message.HEADER_SIZE + Message.KEY_SIZE;
        }

        // Forward a request
@@ -20,7 +19,7 @@
        {
                this.id = id;
                this.key = key;
-               size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+               size = Message.HEADER_SIZE + Message.KEY_SIZE;
        }

        public String toString()

Modified: trunk/apps/load-balancing-sims/phase6/messages/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Response.java        
2006-08-21 18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Response.java        
2006-08-21 18:15:04 UTC (rev 10226)
@@ -1,20 +1,16 @@
+// A single block of a multi-block response
+
 package messages;

-public class Response extends Message
+public class Response extends Block
 {
-       public final int id; // The unique ID of the request
-       public final int key; // The requested key
-       
-       public Response (int id, int key)
+       public Response (int id, int index)
        {
-               this.id = id;
-               this.key = key;
-               size = Message.HEADER_SIZE + Message.ID_SIZE +
-                       Message.KEY_SIZE + Block.SIZE;
+               super (id, index);
        }

        public String toString()
        {
-               return new String ("response (" + id + "," + key + ")");
+               return new String ("response (" + id + "," + index + ")");
        }
 }

Modified: trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java   
2006-08-21 18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java   
2006-08-21 18:15:04 UTC (rev 10226)
@@ -5,12 +5,10 @@

 public class RouteNotFound extends Message
 {
-       public final int id; // The unique ID of the request
-       
        public RouteNotFound (int id)
        {
                this.id = id;
-               size = Message.HEADER_SIZE + Message.ID_SIZE;
+               size = Message.HEADER_SIZE;
        }

        public String toString()


Reply via email to