Author: mrogers
Date: 2006-08-27 19:56:53 +0000 (Sun, 27 Aug 2006)
New Revision: 10288
Added:
trunk/apps/load-balancing-sims/phase6/messages/DataNotFound.java
Modified:
trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
trunk/apps/load-balancing-sims/phase6/LruCache.java
trunk/apps/load-balancing-sims/phase6/Node.java
trunk/apps/load-balancing-sims/phase6/Packet.java
trunk/apps/load-balancing-sims/phase6/Peer.java
trunk/apps/load-balancing-sims/phase6/Sim.java
trunk/apps/load-balancing-sims/phase6/TokenBucket.java
Log:
Search and transfer timeouts
Modified: trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
2006-08-27 19:02:28 UTC (rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
2006-08-27 19:56:53 UTC (rev 10288)
@@ -3,12 +3,14 @@
import java.util.LinkedList;
import messages.*;
-class ChkRequestHandler
+class ChkRequestHandler implements EventTarget
{
// State machine
+ public final static int STARTED = 0;
public final static int REQUEST_SENT = 1;
public final static int ACCEPTED = 2;
public final static int TRANSFERRING = 3;
+ public final static int FAILED = 4;
public final int id; // The unique ID of the request
public final int key; // The requested key
@@ -16,7 +18,7 @@
private Peer prev; // The previous hop of the request
private Peer next = null; // The (current) next hop of the request
private LinkedList<Peer> nexts; // Candidates for the next hop
- private int state = REQUEST_SENT; // State machine
+ private int state = STARTED; // State machine
private int blockBitmap = 0; // Bitmap of received blocks
public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
@@ -26,6 +28,7 @@
this.node = node;
this.prev = prev;
nexts = new LinkedList<Peer> (node.peers());
+ nexts.remove (prev);
}
// Remove a peer from the list of candidates for the next hop
@@ -34,72 +37,80 @@
nexts.remove (p);
}
- public boolean handleMessage (Message m, Peer src)
+ public void handleMessage (Message m, Peer src)
{
if (src != next) {
node.log ("unexpected source for " + m);
- return false; // Request not completed
+ return;
}
- if (m instanceof Accepted) return handleAccepted ((Accepted) m);
- if (m instanceof ChkDataFound)
- return handleChkDataFound ((ChkDataFound) m);
- if (m instanceof Block) return handleBlock ((Block) m);
- if (m instanceof RouteNotFound) return forwardRequest();
- if (m instanceof RejectedLoop) return forwardRequest();
- // Unrecognised message type
- node.log ("unexpected type for " + m);
- return false; // Request not completed
+ if (m instanceof Accepted) handleAccepted ((Accepted) m);
+ else if (m instanceof ChkDataFound)
+ handleChkDataFound ((ChkDataFound) m);
+ else if (m instanceof DataNotFound)
+ handleDataNotFound ((DataNotFound) m);
+ else if (m instanceof Block) handleBlock ((Block) m);
+ else if (m instanceof RouteNotFound) forwardRequest();
+ else if (m instanceof RejectedLoop) forwardRequest();
+ else node.log ("unexpected type for " + m);
}
- private boolean handleAccepted (Accepted a)
+ private void handleAccepted (Accepted a)
{
- if (state != REQUEST_SENT)
- node.log (a + " received out of order");
+ if (state != REQUEST_SENT) node.log (a + " out of order");
state = ACCEPTED;
- return false; // Request not completed
+ // Wait 60 seconds for the next hop to start sending the data
+ Event.schedule (this, 60.0, FETCH_TIMEOUT, next);
}
- private boolean handleChkDataFound (ChkDataFound df)
+ private void handleChkDataFound (ChkDataFound df)
{
- if (state != ACCEPTED)
- node.log (df + " received out of order");
+ if (state != ACCEPTED) node.log (df + " out of order");
state = TRANSFERRING;
- if (prev != null) prev.sendMessage (df);
- return false; // Request not completed
+ if (prev != null) prev.sendMessage (df); // Forward the message
}
- private boolean handleBlock (Block b)
+ private void handleDataNotFound (DataNotFound dnf)
{
- if (state != TRANSFERRING)
- node.log (b + " received out of order");
- if (receivedBlock (b.index)) return false; // Ignore duplicates
+ if (state != ACCEPTED) node.log (dnf + " out of order");
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (dnf); // Forward the message
+ node.chkRequestCompleted (id);
+ }
+
+ private void handleBlock (Block b)
+ {
+ if (state != TRANSFERRING) node.log (b + " out of order");
+ if (receivedBlock (b.index)) return; // Ignore duplicates
// Forward the block
if (prev != null) {
node.log ("forwarding " + b);
prev.sendMessage (b);
}
+ // If the transfer is complete, store the data
if (receivedAll()) {
node.storeChk (key);
if (prev == null) node.log (this + " succeeded");
- return true; // Request completed
+ node.chkRequestCompleted (id);
}
- else return false; // Request not completed
}
- public boolean forwardRequest()
+ public void forwardRequest()
{
next = closestPeer();
if (next == null) {
node.log ("route not found for " + this);
if (prev == null) node.log (this + " failed");
else prev.sendMessage (new RouteNotFound (id));
- return true; // Request completed
+ node.chkRequestCompleted (id);
+ state = FAILED;
}
else {
node.log ("forwarding " + this + " to " + next.address);
next.sendMessage (new ChkRequest (id, key));
nexts.remove (next);
- return false; // Request not completed
+ state = REQUEST_SENT;
+ // Wait 5 seconds for the next hop to accept the request
+ Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
}
}
@@ -137,4 +148,34 @@
{
return new String ("CHK request (" + id + "," + key + ")");
}
+
+ // Event callback
+ private void acceptedTimeout (Peer p)
+ {
+ if (p != next) return; // We've already moved on to another peer
+ if (state != REQUEST_SENT) return; // Peer has already answered
+ node.log (this + " search timed out waiting for " + p);
+ forwardRequest(); // Try another peer
+ }
+
+ // Event callback
+ private void fetchTimeout (Peer p)
+ {
+ if (state != ACCEPTED) return; // Peer has already answered
+ node.log (this + " transfer timed out waiting for " + p);
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (new DataNotFound (id));
+ node.chkRequestCompleted (id);
+ }
+
+ // EventTarget interface
+ public void handleEvent (int type, Object data)
+ {
+ if (type == ACCEPTED_TIMEOUT) acceptedTimeout ((Peer) data);
+ else if (type == FETCH_TIMEOUT) fetchTimeout ((Peer) data);
+ }
+
+ // Each EventTarget class has its own event codes
+ private final static int ACCEPTED_TIMEOUT = 1;
+ private final static int FETCH_TIMEOUT = 2;
}
Modified: trunk/apps/load-balancing-sims/phase6/LruCache.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/LruCache.java 2006-08-27 19:02:28 UTC
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/LruCache.java 2006-08-27 19:56:53 UTC
(rev 10288)
@@ -41,6 +41,6 @@
private void log (String message)
{
- Event.log (message);
+ // Event.log (message);
}
}
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-27 19:02:28 UTC
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-27 19:56:53 UTC
(rev 10288)
@@ -25,6 +25,8 @@
public TokenBucket bandwidth; // Bandwidth limiter
private boolean timerRunning = false; // Is the timer running?
+ public boolean faulty = false; // DEBUG
+
public Node (double txSpeed, double rxSpeed)
{
location = Math.random();
@@ -96,7 +98,7 @@
public void startTimer()
{
if (timerRunning) return;
- log ("starting retransmission/coalescing timer");
+ // log ("starting retransmission/coalescing timer");
Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
timerRunning = true;
}
@@ -113,31 +115,35 @@
public void handleMessage (Message m, Peer src)
{
log ("received " + m);
- if (m instanceof ChkRequest) {
- if (handleChkRequest ((ChkRequest) m, src))
- chkRequests.remove (m.id); // Completed
- }
+ if (m instanceof ChkRequest)
+ handleChkRequest ((ChkRequest) m, src);
else {
ChkRequestHandler rh = chkRequests.get (m.id);
if (rh == null) log ("no request handler for " + m);
- else if (rh.handleMessage (m, src))
- chkRequests.remove (m.id); // Completed
+ else rh.handleMessage (m, src);
}
}
- private boolean handleChkRequest (ChkRequest r, Peer prev)
+ private void handleChkRequest (ChkRequest r, Peer prev)
{
if (!recentlySeenRequests.add (r.id)) {
log ("rejecting recently seen " + r);
prev.sendMessage (new RejectedLoop (r.id));
- // Optimisation: the previous hop has already seen
- // this request, so don't ask it in the future
+ // Don't forward the same request back to prev
ChkRequestHandler rh = chkRequests.get (r.id);
if (rh != null) rh.removeNextHop (prev);
- return false; // Request not completed
+ return;
}
// Accept the request
- if (prev != null) prev.sendMessage (new Accepted (r.id));
+ if (prev != null) {
+ log ("accepting " + r);
+ prev.sendMessage (new Accepted (r.id));
+ }
+ // DEBUG
+ if (faulty) {
+ log ("DEBUG: dropping " + r);
+ return;
+ }
// If the key is in the store, return it
if (chkStore.get (r.key)) {
log ("key " + r.key + " found in CHK store");
@@ -147,7 +153,8 @@
for (int i = 0; i < 32; i++)
prev.sendMessage (new Block (r.id, i));
}
- return true; // Request completed
+ chkRequestCompleted (r.id);
+ return;
}
log ("key " + r.key + " not found in CHK store");
// If the key is in the cache, return it
@@ -159,15 +166,22 @@
for (int i = 0; i < 32; i++)
prev.sendMessage (new Block (r.id, i));
}
- return true; // Request completed
+ chkRequestCompleted (r.id);
+ return;
}
log ("key " + r.key + " not found in CHK cache");
// Forward the request and store the request state
ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
chkRequests.put (r.id, rh);
- return rh.forwardRequest();
+ rh.forwardRequest();
}
+ // Remove a completed request from the list of pending requests
+ public void chkRequestCompleted (int id)
+ {
+ chkRequests.remove (id);
+ }
+
// Return the list of peers in a random order
public ArrayList<Peer> peers()
{
@@ -197,7 +211,7 @@
for (Peer p : peers())
deadline = Math.min (deadline, p.checkTimeouts());
if (deadline == Double.POSITIVE_INFINITY) {
- log ("stopping retransmission/coalescing timer");
+ // log ("stopping retransmission/coalescing timer");
timerRunning = false;
}
else {
Modified: trunk/apps/load-balancing-sims/phase6/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Packet.java 2006-08-27 19:02:28 UTC
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/Packet.java 2006-08-27 19:56:53 UTC
(rev 10288)
@@ -32,4 +32,9 @@
messages.add (m);
size += m.size;
}
+
+ public String toString()
+ {
+ return new String (src + ":" + dest + ":" + seq);
+ }
}
Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-27 19:02:28 UTC
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-27 19:56:53 UTC
(rev 10288)
@@ -293,6 +293,11 @@
public void log (String message)
{
- Event.log (node.net.address + ":" + address + " " + message);
+ // Event.log (node.net.address + ":" + address + " " + message);
}
+
+ public String toString()
+ {
+ return Integer.toString (address);
+ }
}
Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java 2006-08-27 19:02:28 UTC
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java 2006-08-27 19:56:53 UTC
(rev 10288)
@@ -23,7 +23,10 @@
n1.connectBothWays (n3, 0.001);
n2.connectBothWays (n3, 0.001);
- for (int i = 0; i < 10; i++) {
+ // DEBUG
+ n2.faulty = true;
+
+ for (int i = 0; i < 4; i++) {
int key = Node.locationToKey (Math.random());
// Half the requests will succeed, half will fail
if (i % 2 == 0) n3.storeChk (key);
Modified: trunk/apps/load-balancing-sims/phase6/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/TokenBucket.java 2006-08-27
19:02:28 UTC (rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/TokenBucket.java 2006-08-27
19:56:53 UTC (rev 10288)
@@ -24,6 +24,6 @@
public void remove (int t)
{
tokens -= t; // Counter can go negative
- Event.log (t + " tokens removed, " + tokens + " available");
+ // Event.log (t + " tokens removed, " + tokens + " available");
}
}
Added: trunk/apps/load-balancing-sims/phase6/messages/DataNotFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/DataNotFound.java
2006-08-27 19:02:28 UTC (rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/messages/DataNotFound.java
2006-08-27 19:56:53 UTC (rev 10288)
@@ -0,0 +1,15 @@
+package messages;
+
+public class DataNotFound extends Message
+{
+ public DataNotFound (int id)
+ {
+ this.id = id;
+ size = Message.HEADER_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("data not found (" + id + ")");
+ }
+}