Author: mrogers
Date: 2006-08-21 18:15:04 +0000 (Mon, 21 Aug 2006)
New Revision: 10226
Added:
trunk/apps/load-balancing-sims/phase6/AckQueue.java
Removed:
trunk/apps/load-balancing-sims/phase6/messages/Ack.java
Modified:
trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
trunk/apps/load-balancing-sims/phase6/Node.java
trunk/apps/load-balancing-sims/phase6/Packet.java
trunk/apps/load-balancing-sims/phase6/Peer.java
trunk/apps/load-balancing-sims/phase6/RequestState.java
trunk/apps/load-balancing-sims/phase6/Sim.java
trunk/apps/load-balancing-sims/phase6/messages/Block.java
trunk/apps/load-balancing-sims/phase6/messages/Message.java
trunk/apps/load-balancing-sims/phase6/messages/Request.java
trunk/apps/load-balancing-sims/phase6/messages/Response.java
trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
Log:
Multi-block transfers
Added: trunk/apps/load-balancing-sims/phase6/AckQueue.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/AckQueue.java 2006-08-21 18:13:33 UTC
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/AckQueue.java 2006-08-21 18:15:04 UTC
(rev 10226)
@@ -0,0 +1,32 @@
+// A queue storing outgoing acks and their coalescing deadlines
+
+import java.util.LinkedList;
+
+class AckQueue
+{
+ public int size = 0; // Size in bytes
+ private LinkedList<Integer> acks = new LinkedList<Integer>();
+ private LinkedList<Double> deadlines = new LinkedList<Double>();
+
+ public void add (int ack, double deadline)
+ {
+ size += Packet.ACK_SIZE;
+ acks.add (ack);
+ deadlines.add (deadline);
+ }
+
+ public double deadline()
+ {
+ Double deadline = deadlines.peek();
+ if (deadline == null) return Double.POSITIVE_INFINITY;
+ else return deadline;
+ }
+
+ public Integer pop()
+ {
+ deadlines.poll();
+ Integer ack = acks.poll();
+ if (ack != null) size -= Packet.ACK_SIZE;
+ return ack;
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java 2006-08-21
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java 2006-08-21
18:15:04 UTC (rev 10226)
@@ -1,19 +1,18 @@
-// A queue storing outgoing messages (including acks and transfers) and their
-// coalescing deadlines
+// A queue storing outgoing messages and their coalescing deadlines
import java.util.LinkedList;
import messages.Message;
-class DeadlineQueue<T extends Message>
+class DeadlineQueue
{
public int size = 0; // Size in bytes
- private LinkedList<T> messages = new LinkedList<T>();
+ private LinkedList<Message> messages = new LinkedList<Message>();
private LinkedList<Double> deadlines = new LinkedList<Double>();
- public void add (T message, double deadline)
+ public void add (Message m, double deadline)
{
- size += message.size;
- messages.add (message);
+ size += m.size;
+ messages.add (m);
deadlines.add (deadline);
}
@@ -25,16 +24,16 @@
public double deadline()
{
- Double d = deadlines.peek();
- if (d == null) return Double.POSITIVE_INFINITY;
- else return d;
+ Double deadline = deadlines.peek();
+ if (deadline == null) return Double.POSITIVE_INFINITY;
+ else return deadline;
}
- public T pop()
+ public Message pop()
{
deadlines.poll();
- T message = messages.poll();
- size -= message.size;
- return message;
+ Message m = messages.poll();
+ if (m != null) size -= m.size;
+ return m;
}
}
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-21 18:13:33 UTC
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-21 18:15:04 UTC
(rev 10226)
@@ -17,9 +17,8 @@
public double location; // Routing location
public NetworkInterface net;
private HashMap<Integer,Peer> peers; // Look up a peer by its address
- private int requestsGenerated = 0;
private HashSet<Integer> recentlySeenRequests; // Request IDs
- private HashMap<Integer,RequestState> outstandingRequests;
+ private HashMap<Integer,RequestState> outstandingRequests; // By ID
public LruCache<Integer> cache; // Datastore containing keys
public TokenBucket bandwidth; // Bandwidth limiter
private boolean timerRunning = false; // Is the timer running?
@@ -87,13 +86,15 @@
public void handleMessage (Message m, Peer prev)
{
log ("received " + m);
- // FIXME: ugly
- if (m instanceof Request)
- handleRequest ((Request) m, prev);
- else if (m instanceof Response)
- handleResponse ((Response) m);
- else if (m instanceof RouteNotFound)
- handleRouteNotFound ((RouteNotFound) m);
+ if (m instanceof Request) handleRequest ((Request) m, prev);
+ else {
+ RequestState rs = outstandingRequests.get (m.id);
+ if (rs == null) log ("unexpected " + m);
+ else if (m instanceof Response)
+ handleResponse ((Response) m, rs);
+ else if (m instanceof RouteNotFound)
+ handleRouteNotFound ((RouteNotFound) m, rs);
+ }
}
private void handleRequest (Request r, Peer prev)
@@ -101,7 +102,8 @@
if (!recentlySeenRequests.add (r.id)) {
log ("rejecting recently seen " + r);
prev.sendMessage (new RouteNotFound (r.id));
- // Don't forward the request to prev, it's seen it
+ // Optimisation: prev has seen the request, so remove
+ // it from the list of potential next hops
RequestState rs = outstandingRequests.get (r.id);
if (rs != null) rs.nexts.remove (prev);
return;
@@ -109,35 +111,32 @@
if (cache.get (r.key)) {
log ("key " + r.key + " found in cache");
if (prev == null) log (r + " succeeded locally");
- else prev.sendMessage (new Response (r.id, r.key));
+ else for (int i = 0; i < 32; i++)
+ prev.sendBlock (new Response (r.id, i));
return;
}
log ("key " + r.key + " not found in cache");
- forwardRequest (new RequestState (r, prev, peers.values()));
+ forwardRequest (new RequestState (r, prev, shufflePeers()));
}
- private void handleResponse (Response r)
+ private void handleResponse (Response r, RequestState rs)
{
- RequestState rs = outstandingRequests.remove (r.id);
- if (rs == null) {
- log ("unexpected " + r);
- return;
+ rs.state = RequestState.TRANSFERRING;
+ if (rs.receivedBlock (r.index)) return; // Ignore duplicates
+ if (rs.receivedAll()) {
+ cache.put (rs.key);
+ if (rs.prev == null) log (rs + " succeeded");
+ outstandingRequests.remove (rs.id);
}
- cache.put (r.key);
- if (rs.prev == null) log (rs + " succeeded");
- else {
+ // Forward the block
+ if (rs.prev != null) {
log ("forwarding " + r);
- rs.prev.sendMessage (r);
+ rs.prev.sendBlock (r);
}
}
- private void handleRouteNotFound (RouteNotFound r)
+ private void handleRouteNotFound (RouteNotFound r, RequestState rs)
{
- RequestState rs = outstandingRequests.remove (r.id);
- if (rs == null) {
- log ("unexpected route not found " + r.id);
- return;
- }
forwardRequest (rs);
}
@@ -148,39 +147,43 @@
log ("route not found for " + rs);
if (rs.prev == null) log (rs + " failed");
else rs.prev.sendMessage (new RouteNotFound (rs.id));
- return;
+ outstandingRequests.remove (rs.id);
}
- log ("forwarding " + rs + " to " + next.address);
- next.sendMessage (new Request (rs.id, rs.key));
- rs.nexts.remove (next);
- outstandingRequests.put (rs.id, rs);
+ else {
+ log ("forwarding " + rs + " to " + next.address);
+ next.sendMessage (new Request (rs.id, rs.key));
+ rs.nexts.remove (next);
+ outstandingRequests.put (rs.id, rs);
+ }
}
+ // Return the list of peers in a random order
+ private ArrayList<Peer> shufflePeers()
+ {
+ ArrayList<Peer> copy = new ArrayList<Peer> (peers.values());
+ Collections.shuffle (copy);
+ return copy;
+ }
+
private void log (String message)
{
Event.log (net.address + " " + message);
}
// Event callback
- private void generateRequest()
+ private void generateRequest (int key)
{
- for (int i = 0; i < 10000; i++) {
- // Send a request to a random location
- Request r = new Request (locationToKey (Math.random()));
- log ("generating request " + r.id);
- handleRequest (r, null);
- }
+ Request r = new Request (key);
+ log ("generating request " + r.id);
+ handleRequest (r, null);
}
// Event callback
private void checkTimeouts()
{
// Check the peers in a random order each time
- ArrayList<Peer> shuffled = new ArrayList<Peer> (peers.values());
- Collections.shuffle (shuffled);
-
double deadline = Double.POSITIVE_INFINITY;
- for (Peer p : shuffled)
+ for (Peer p : shufflePeers())
deadline = Math.min (deadline, p.checkTimeouts());
if (deadline == Double.POSITIVE_INFINITY) {
log ("stopping retransmission/coalescing timer");
@@ -197,7 +200,7 @@
// EventTarget interface
public void handleEvent (int type, Object data)
{
- if (type == GENERATE_REQUEST) generateRequest();
+ if (type == GENERATE_REQUEST) generateRequest ((Integer) data);
else if (type == CHECK_TIMEOUTS) checkTimeouts();
}
Modified: trunk/apps/load-balancing-sims/phase6/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Packet.java 2006-08-21 18:13:33 UTC
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/Packet.java 2006-08-21 18:15:04 UTC
(rev 10226)
@@ -2,28 +2,28 @@
import java.util.ArrayList;
import messages.Message;
-import messages.Ack;
class Packet
{
public final static int HEADER_SIZE = 80; // Including IP & UDP headers
+ public final static int ACK_SIZE = 4; // Size of an ack in bytes
public final static int MAX_SIZE = 1450; // MTU including headers
public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing
public int src, dest; // Network addresses
public int size = HEADER_SIZE; // Size in bytes, including headers
public int seq = -1; // Data sequence number (-1 if no data)
- public ArrayList<Ack> acks = null;
+ public ArrayList<Integer> acks = null;
public ArrayList<Message> messages = null;
public double sent; // Time at which the packet was (re) transmitted
public double latency; // Link latency (stored here for convenience)
- public void addAck (Ack a)
+ public void addAck (Integer ack)
{
- if (acks == null) acks = new ArrayList<Ack>();
- acks.add (a);
- size += a.size;
+ if (acks == null) acks = new ArrayList<Integer>();
+ acks.add (ack);
+ size += ACK_SIZE;
}
public void addMessage (Message m)
Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-21 18:13:33 UTC
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-21 18:15:04 UTC
(rev 10226)
@@ -1,7 +1,6 @@
import java.util.LinkedList;
import java.util.Iterator;
import java.util.HashSet;
-import messages.Ack;
import messages.Message;
import messages.Block;
@@ -29,9 +28,9 @@
private int txSeq = 0; // Sequence number of next outgoing data packet
private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
private LinkedList<Packet> txBuffer; // Retransmission buffer
- private DeadlineQueue<Ack> ackQueue; // Outgoing acks
- private DeadlineQueue<Message> searchQueue; // Outgoing search messages
- private DeadlineQueue<Block> transferQueue; // Outgoing transfers
+ private AckQueue ackQueue; // Outgoing acks
+ private DeadlineQueue searchQueue; // Outgoing search messages
+ private DeadlineQueue transferQueue; // Outgoing transfers
private boolean tgif = false; // "Transfers go in first" toggle
private CongestionWindow window; // AIMD congestion window
private double lastTransmission = 0.0; // Clock time
@@ -47,9 +46,9 @@
this.location = location;
this.latency = latency;
txBuffer = new LinkedList<Packet>();
- ackQueue = new DeadlineQueue<Ack>();
- searchQueue = new DeadlineQueue<Message>();
- transferQueue = new DeadlineQueue<Block>();
+ ackQueue = new AckQueue();
+ searchQueue = new DeadlineQueue();
+ transferQueue = new DeadlineQueue();
window = new CongestionWindow (this);
rxDupe = new HashSet<Integer>();
}
@@ -80,7 +79,7 @@
private void sendAck (int seq)
{
log ("ack " + seq + " added to ack queue");
- ackQueue.add (new Ack (seq), Event.time() + MAX_DELAY);
+ ackQueue.add (seq, Event.time() + MAX_DELAY);
// Start the node's timer if necessary
node.startTimer();
// Send as many packets as possible
@@ -155,7 +154,7 @@
public void handlePacket (Packet p)
{
if (p.messages != null) handleData (p);
- if (p.acks != null) for (Ack a : p.acks) handleAck (a.seq);
+ if (p.acks != null) for (int ack : p.acks) handleAck (ack);
}
private void handleData (Packet p)
Modified: trunk/apps/load-balancing-sims/phase6/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestState.java 2006-08-21
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/RequestState.java 2006-08-21
18:15:04 UTC (rev 10226)
@@ -6,10 +6,16 @@
class RequestState
{
+ // State machine
+ public final static int REQUEST_SENT = 1;
+ public final static int TRANSFERRING = 2;
+
public final int id; // The unique ID of the request
public final int key; // The requested key
public final Peer prev; // The previous hop of the request
public final HashSet<Peer> nexts; // Possible next hops
+ public int state = REQUEST_SENT; // State machine
+ private int blockBitmap = 0; // Bitmap of received blocks
public RequestState (Request r, Peer prev, Collection<Peer> peers)
{
@@ -17,7 +23,7 @@
key = r.key;
this.prev = prev;
nexts = new HashSet<Peer> (peers);
- if (prev != null) nexts.remove (prev);
+ nexts.remove (prev);
}
// Find the closest peer to the requested key
@@ -36,6 +42,20 @@
return bestPeer; // Null if list was empty
}
+ // Mark a block as received, return true if it's a duplicate
+ public boolean receivedBlock (int index)
+ {
+ boolean duplicate = (blockBitmap & 1 << index) != 0;
+ blockBitmap |= 1 << index;
+ return duplicate;
+ }
+
+ // Return true if all blocks have been received
+ public boolean receivedAll()
+ {
+ return blockBitmap == 0xFFFFFFFF;
+ }
+
public String toString()
{
return new String ("request (" + id + "," + key + ")");
Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java 2006-08-21 18:13:33 UTC
(rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java 2006-08-21 18:15:04 UTC
(rev 10226)
@@ -23,7 +23,11 @@
n1.connectBothWays (n3, 0.001);
n2.connectBothWays (n3, 0.001);
- Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, null);
+ for (int i = 0; i < 5; i++) {
+ int key = Node.locationToKey (Math.random());
+ n3.cache.put (key);
+ Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, key);
+ }
// Run the simulation
Event.run();
Deleted: trunk/apps/load-balancing-sims/phase6/messages/Ack.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Ack.java 2006-08-21
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Ack.java 2006-08-21
18:15:04 UTC (rev 10226)
@@ -1,17 +0,0 @@
-// Note: acks are not FNP messages, they're only here because it makes the
-// implementation simpler
-
-package messages;
-
-public class Ack extends Message
-{
- public final static int SIZE = 4; // Bytes
-
- public final int seq; // Sequence number of the acknowledged packet
-
- public Ack (int seq)
- {
- this.seq = seq;
- size = SIZE;
- }
-}
Modified: trunk/apps/load-balancing-sims/phase6/messages/Block.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Block.java 2006-08-21
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Block.java 2006-08-21
18:15:04 UTC (rev 10226)
@@ -1,14 +1,15 @@
-// A single block of a data transfer (currently 32 blocks per transfer)
+// A single block of a multi-block transfer (currently 32 blocks per transfer)
package messages;
public class Block extends Message
{
- public final static int SIZE = 1024; // Bytes
+ public final int index; // Index of this block from 0-31
- // FIXME: placeholder
- public Block()
+ public Block (int id, int index)
{
- size = SIZE;
+ this.id = id;
+ this.index = index;
+ size = Message.HEADER_SIZE + Message.DATA_SIZE;
}
}
Modified: trunk/apps/load-balancing-sims/phase6/messages/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21
18:15:04 UTC (rev 10226)
@@ -4,9 +4,10 @@
public class Message
{
- public final static int HEADER_SIZE = 4; // Message type etc
- public final static int ID_SIZE = 8; // Size of unique request ID
- public final static int KEY_SIZE = 32; // Size of routing key
+ public final static int HEADER_SIZE = 12; // Bytes, including unique ID
+ public final static int KEY_SIZE = 32; // Size of routing key, bytes
+ public final static int DATA_SIZE = 1024; // Size of data block, bytes
public int size; // Size in bytes
+ public int id; // Unique request ID
}
Modified: trunk/apps/load-balancing-sims/phase6/messages/Request.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-21
18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Request.java 2006-08-21
18:15:04 UTC (rev 10226)
@@ -4,7 +4,6 @@
{
private static int nextId = 0;
- public final int id; // The unique ID of the request
public final int key; // The requested key
// Start a new request
@@ -12,7 +11,7 @@
{
id = nextId++;
this.key = key;
- size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+ size = Message.HEADER_SIZE + Message.KEY_SIZE;
}
// Forward a request
@@ -20,7 +19,7 @@
{
this.id = id;
this.key = key;
- size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+ size = Message.HEADER_SIZE + Message.KEY_SIZE;
}
public String toString()
Modified: trunk/apps/load-balancing-sims/phase6/messages/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Response.java
2006-08-21 18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/Response.java
2006-08-21 18:15:04 UTC (rev 10226)
@@ -1,20 +1,16 @@
+// A single block of a multi-block response
+
package messages;
-public class Response extends Message
+public class Response extends Block
{
- public final int id; // The unique ID of the request
- public final int key; // The requested key
-
- public Response (int id, int key)
+ public Response (int id, int index)
{
- this.id = id;
- this.key = key;
- size = Message.HEADER_SIZE + Message.ID_SIZE +
- Message.KEY_SIZE + Block.SIZE;
+ super (id, index);
}
public String toString()
{
- return new String ("response (" + id + "," + key + ")");
+ return new String ("response (" + id + "," + index + ")");
}
}
Modified: trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
2006-08-21 18:13:33 UTC (rev 10225)
+++ trunk/apps/load-balancing-sims/phase6/messages/RouteNotFound.java
2006-08-21 18:15:04 UTC (rev 10226)
@@ -5,12 +5,10 @@
public class RouteNotFound extends Message
{
- public final int id; // The unique ID of the request
-
public RouteNotFound (int id)
{
this.id = id;
- size = Message.HEADER_SIZE + Message.ID_SIZE;
+ size = Message.HEADER_SIZE;
}
public String toString()