Author: mrogers
Date: 2006-08-25 11:48:51 +0000 (Fri, 25 Aug 2006)
New Revision: 10262
Modified:
trunk/apps/load-balancing-sims/phase6/Node.java
trunk/apps/load-balancing-sims/phase6/Peer.java
trunk/apps/load-balancing-sims/phase6/RequestState.java
trunk/apps/load-balancing-sims/phase6/Sim.java
trunk/apps/load-balancing-sims/phase6/TokenBucket.java
Log:
Moved request handling into RequestState in preparation for a more complete FNP
implementation
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-24 19:51:36 UTC
(rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-25 11:48:51 UTC
(rev 10262)
@@ -83,89 +83,51 @@
}
// Called by Peer
- public void handleMessage (Message m, Peer prev)
+ public void handleMessage (Message m, Peer src)
{
log ("received " + m);
- if (m instanceof Request) handleRequest ((Request) m, prev);
+ if (m instanceof Request) {
+ if (handleRequest ((Request) m, src))
+ outstandingRequests.remove (m.id);
+ }
else {
RequestState rs = outstandingRequests.get (m.id);
if (rs == null) log ("unexpected " + m);
- else if (m instanceof Response)
- handleResponse ((Response) m, rs);
- else if (m instanceof RouteNotFound)
- handleRouteNotFound ((RouteNotFound) m, rs);
+ else if (rs.handleMessage (m, src))
+ outstandingRequests.remove (m.id);
}
}
- private void handleRequest (Request r, Peer prev)
+ private boolean handleRequest (Request r, Peer prev)
{
if (!recentlySeenRequests.add (r.id)) {
log ("rejecting recently seen " + r);
prev.sendMessage (new RouteNotFound (r.id));
- // Optimisation: prev has seen the request, so remove
- // it from the list of potential next hops
- RequestState rs = outstandingRequests.get (r.id);
- if (rs != null) rs.nexts.remove (prev);
- return;
+ return false; // Request not completed
}
if (cache.get (r.key)) {
log ("key " + r.key + " found in cache");
if (prev == null) log (r + " succeeded locally");
else for (int i = 0; i < 32; i++)
prev.sendMessage (new Response (r.id, i));
- return;
+ return true; // Request completed
}
log ("key " + r.key + " not found in cache");
- forwardRequest (new RequestState (r, prev, shufflePeers()));
+ // Forward the request and store the request state
+ RequestState rs = new RequestState (r, this, prev, peers());
+ outstandingRequests.put (r.id, rs);
+ return rs.forwardRequest();
}
- private void handleResponse (Response r, RequestState rs)
- {
- rs.state = RequestState.TRANSFERRING;
- if (rs.receivedBlock (r.index)) return; // Ignore duplicates
- if (rs.receivedAll()) {
- cache.put (rs.key);
- if (rs.prev == null) log (rs + " succeeded");
- outstandingRequests.remove (rs.id);
- }
- // Forward the block
- if (rs.prev != null) {
- log ("forwarding " + r);
- rs.prev.sendMessage (r);
- }
- }
-
- private void handleRouteNotFound (RouteNotFound r, RequestState rs)
- {
- forwardRequest (rs);
- }
-
- private void forwardRequest (RequestState rs)
- {
- Peer next = rs.closestPeer();
- if (next == null) {
- log ("route not found for " + rs);
- if (rs.prev == null) log (rs + " failed");
- else rs.prev.sendMessage (new RouteNotFound (rs.id));
- outstandingRequests.remove (rs.id);
- }
- else {
- log ("forwarding " + rs + " to " + next.address);
- next.sendMessage (new Request (rs.id, rs.key));
- rs.nexts.remove (next);
- outstandingRequests.put (rs.id, rs);
- }
- }
-
// Return the list of peers in a random order
- private ArrayList<Peer> shufflePeers()
+ private ArrayList<Peer> peers()
{
ArrayList<Peer> copy = new ArrayList<Peer> (peers.values());
Collections.shuffle (copy);
return copy;
}
- private void log (String message)
+ public void log (String message)
{
Event.log (net.address + " " + message);
}
@@ -183,7 +145,7 @@
{
// Check the peers in a random order each time
double deadline = Double.POSITIVE_INFINITY;
- for (Peer p : shufflePeers())
+ for (Peer p : peers())
deadline = Math.min (deadline, p.checkTimeouts());
if (deadline == Double.POSITIVE_INFINITY) {
log ("stopping retransmission/coalescing timer");
Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-24 19:51:36 UTC
(rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-25 11:48:51 UTC
(rev 10262)
@@ -83,7 +83,7 @@
// Try to send a packet, return true if a packet was sent
private boolean send()
{
- if (ackQueue.size + searchQueue.size + transferQueue.size ==0) {
+ if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
log ("nothing to send");
return false;
}
@@ -240,11 +240,10 @@
// Send as many packets as possible
while (send());
+ log (txBuffer.size() + " packets in flight");
double now = Event.time();
- if (txBuffer.isEmpty()) {
- log ("no packets in flight");
- return deadline (now); // Sleep until the next deadline
- }
+ if (txBuffer.isEmpty()) return deadline (now);
+
for (Packet p : txBuffer) {
if (now - p.sent > RTO * rtt + MAX_DELAY) {
// Retransmission timeout
Modified: trunk/apps/load-balancing-sims/phase6/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestState.java 2006-08-24
19:51:36 UTC (rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/RequestState.java 2006-08-25
11:48:51 UTC (rev 10262)
@@ -2,7 +2,7 @@
import java.util.HashSet;
import java.util.Collection;
-import messages.Request;
+import messages.*;
class RequestState
{
@@ -12,22 +12,74 @@
public final int id; // The unique ID of the request
public final int key; // The requested key
- public final Peer prev; // The previous hop of the request
- public final HashSet<Peer> nexts; // Possible next hops
- public int state = REQUEST_SENT; // State machine
+ private Node node; // The owner of this RequestState
+ private Peer prev; // The previous hop of the request
+ private Peer next; // The (current) next hop of the request
+ private HashSet<Peer> nexts; // Possible next hops
+ private int state = REQUEST_SENT; // State machine
private int blockBitmap = 0; // Bitmap of received blocks
- public RequestState (Request r, Peer prev, Collection<Peer> peers)
+ public RequestState (Request r, Node node, Peer prev,
+ Collection<Peer> peers)
{
id = r.id;
key = r.key;
+ this.node = node;
this.prev = prev;
+ next = null;
nexts = new HashSet<Peer> (peers);
nexts.remove (prev);
}
+ public boolean handleMessage (Message m, Peer src)
+ {
+ if (src != next) {
+ node.log ("unexpected source for " + m);
+ return false; // Request not completed
+ }
+ if (m instanceof Response) return handleResponse ((Response) m);
+ else if (m instanceof RouteNotFound) return forwardRequest();
+ // Unrecognised message type
+ node.log ("unrecognised " + m);
+ return false; // Request not completed
+ }
+
+ private boolean handleResponse (Response r)
+ {
+ state = TRANSFERRING;
+ if (receivedBlock (r.index)) return false; // Ignore duplicates
+ // Forward the block
+ if (prev != null) {
+ node.log ("forwarding " + r);
+ prev.sendMessage (r);
+ }
+ if (receivedAll()) {
+ node.cache.put (key);
+ if (prev == null) node.log (this + " succeeded");
+ return true; // Request completed
+ }
+ else return false; // Request not completed
+ }
+
+ public boolean forwardRequest()
+ {
+ next = closestPeer();
+ if (next == null) {
+ node.log ("route not found for " + this);
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (new RouteNotFound (id));
+ return true; // Request completed
+ }
+ else {
+ node.log ("forwarding " + this + " to " + next.address);
+ next.sendMessage (new Request (id, key));
+ nexts.remove (next);
+ return false; // Request not completed
+ }
+ }
+
// Find the closest peer to the requested key
- public Peer closestPeer()
+ private Peer closestPeer()
{
double keyLoc = Node.keyToLocation (key);
double bestDist = Double.POSITIVE_INFINITY;
@@ -39,11 +91,11 @@
bestPeer = peer;
}
}
- return bestPeer; // Null if list was empty
+ return bestPeer; // Null if the list was empty
}
// Mark a block as received, return true if it's a duplicate
- public boolean receivedBlock (int index)
+ private boolean receivedBlock (int index)
{
boolean duplicate = (blockBitmap & 1 << index) != 0;
blockBitmap |= 1 << index;
@@ -51,7 +103,7 @@
}
// Return true if all blocks have been received
- public boolean receivedAll()
+ private boolean receivedAll()
{
return blockBitmap == 0xFFFFFFFF;
}
Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java 2006-08-24 19:51:36 UTC
(rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java 2006-08-25 11:48:51 UTC
(rev 10262)
@@ -23,9 +23,10 @@
n1.connectBothWays (n3, 0.001);
n2.connectBothWays (n3, 0.001);
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 10; i++) {
int key = Node.locationToKey (Math.random());
- n3.cache.put (key);
+ // Half the requests will succeed, half will fail
+ if (i % 2 == 0) n3.cache.put (key);
Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, key);
}
Modified: trunk/apps/load-balancing-sims/phase6/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/TokenBucket.java 2006-08-24
19:51:36 UTC (rev 10261)
+++ trunk/apps/load-balancing-sims/phase6/TokenBucket.java 2006-08-25
11:48:51 UTC (rev 10262)
@@ -17,7 +17,7 @@
lastUpdated = now;
tokens += elapsed * rate;
if (tokens > size) tokens = size;
- Event.log (tokens + " tokens available");
+ // Event.log (tokens + " tokens available");
return (int) tokens;
}