Author: mrogers
Date: 2006-09-14 20:48:05 +0000 (Thu, 14 Sep 2006)
New Revision: 10474
Added:
trunk/apps/load-balancing-sims/phase6/RequestHandler.java
trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java
trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
trunk/apps/load-balancing-sims/phase6/messages/SskAccepted.java
trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
trunk/apps/load-balancing-sims/phase6/messages/SskPubKey.java
trunk/apps/load-balancing-sims/phase6/messages/SskRequest.java
Modified:
trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
trunk/apps/load-balancing-sims/phase6/MessageHandler.java
trunk/apps/load-balancing-sims/phase6/Node.java
trunk/apps/load-balancing-sims/phase6/Sim.java
trunk/apps/load-balancing-sims/phase6/messages/ChkInsert.java
trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java
Log:
SSK inserts and requests (no collisions yet)
Modified: trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-09-14
14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-09-14
20:48:05 UTC (rev 10474)
@@ -95,7 +95,7 @@
for (int i = 0; i < 32; i++)
if (blocks[i] != null) next.sendMessage (blocks[i]);
// Wait for TransfersCompleted (FIXME: check real timeout)
- Event.schedule (this, 120.0, TRANSFER_OUT_TIMEOUT, next);
+ Event.schedule (this, 240.0, TRANSFER_OUT_TIMEOUT, next);
}
private void handleRejectedLoop (RejectedLoop rl)
@@ -149,7 +149,7 @@
node.log (this + " has htl " + htl);
}
node.log ("forwarding " + this + " to " + next.address);
- next.sendMessage (new ChkInsert (id, key, closest, htl));
+ next.sendMessage (makeSearchMessage());
nexts.remove (next);
searchState = SENT;
// Wait 10 seconds for the next hop to accept the search
@@ -170,12 +170,17 @@
searchState = COMPLETED;
if (prev == null) node.log (this + " completed");
else prev.sendMessage (new TransfersCompleted (id));
- node.chkInsertCompleted (id);
+ node.removeMessageHandler (id);
}
+ protected Search makeSearchMessage()
+ {
+ return new ChkInsert (id, key, closest, htl);
+ }
+
public String toString()
{
- return new String ("CHK insert (" +id+ "," +key+ "," +htl+ ")");
+ return new String ("CHK insert (" + id + "," + key + ")");
}
// Event callbacks
Modified: trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
2006-09-14 20:48:05 UTC (rev 10474)
@@ -2,9 +2,8 @@
import messages.*;
-class ChkRequestHandler extends MessageHandler implements EventTarget
+class ChkRequestHandler extends RequestHandler
{
- private int state = STARTED; // State of search
private boolean[] received; // Keep track of received blocks
private int blocksReceived = 0;
@@ -12,6 +11,7 @@
{
super (r, node, prev);
received = new boolean[32];
+ forwardSearch();
}
public void handleMessage (Message m, Peer src)
@@ -35,36 +35,6 @@
else node.log ("unexpected type for " + m);
}
- private void handleAccepted (Accepted a)
- {
- if (state != SENT) node.log (a + " out of order");
- state = ACCEPTED;
- // Wait 60 seconds for a reply to the search
- Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
- }
-
- private void handleRejectedLoop (RejectedLoop rl)
- {
- if (state != SENT) node.log (rl + " out of order");
- forwardSearch();
- }
-
- private void handleRouteNotFound (RouteNotFound rnf)
- {
- if (state != ACCEPTED) node.log (rnf + " out of order");
- if (rnf.htl < htl) htl = rnf.htl;
- // Use the remaining htl to try another peer
- forwardSearch();
- }
-
- private void handleDataNotFound (DataNotFound dnf)
- {
- if (state != ACCEPTED) node.log (dnf + " out of order");
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (dnf); // Forward the message
- finish();
- }
-
private void handleChkDataFound (ChkDataFound df)
{
if (state != ACCEPTED) node.log (df + " out of order");
@@ -93,99 +63,13 @@
}
}
- public void forwardSearch()
+ protected Search makeSearchMessage()
{
- next = null;
- // If the search has run out of hops, send DataNotFound
- if (htl == 0) {
- node.log ("data not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new DataNotFound (id));
- finish();
- return;
- }
- // Forward the search to the closest remaining peer
- next = closestPeer();
- if (next == null) {
- node.log ("route not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new RouteNotFound (id, htl));
- finish();
- return;
- }
- // Decrement the htl if the next node is not the closest so far
- double target = Node.keyToLocation (key);
- if (Node.distance (target, next.location)
- > Node.distance (target, closest)) {
- htl = node.decrementHtl (htl);
- node.log (this + " has htl " + htl);
- }
- node.log ("forwarding " + this + " to " + next.address);
- next.sendMessage (new ChkRequest (id, key, closest, htl));
- nexts.remove (next);
- state = SENT;
- // Wait 5 seconds for the next hop to accept the search
- Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
+ return new ChkRequest (id, key, closest, htl);
}
- private void finish()
- {
- state = COMPLETED;
- node.chkRequestCompleted (id);
- }
-
public String toString()
{
return new String ("CHK request (" + id + "," + key + ")");
}
-
- // Event callbacks
-
- private void acceptedTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (state != SENT) return;
- node.log (this + " accepted timeout waiting for " + p);
- forwardSearch(); // Try another peer
- }
-
- private void searchTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (state != ACCEPTED) return;
- node.log (this + " search timeout waiting for " + p);
- if (prev == null) node.log (this + " failed");
- finish();
- }
-
- private void transferTimeout (Peer p)
- {
- if (state != TRANSFERRING) return;
- node.log (this + " transfer timeout receiving from " + p);
- if (prev == null) node.log (this + " failed");
- finish();
- }
-
- // EventTarget interface
- public void handleEvent (int type, Object data)
- {
- switch (type) {
- case ACCEPTED_TIMEOUT:
- acceptedTimeout ((Peer) data);
- break;
-
- case SEARCH_TIMEOUT:
- searchTimeout ((Peer) data);
- break;
-
- case TRANSFER_TIMEOUT:
- transferTimeout ((Peer) data);
- break;
- }
- }
-
- // Each EventTarget class has its own event codes
- private final static int ACCEPTED_TIMEOUT = 1;
- private final static int SEARCH_TIMEOUT = 2;
- private final static int TRANSFER_TIMEOUT = 3;
}
Modified: trunk/apps/load-balancing-sims/phase6/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/MessageHandler.java 2006-09-14
14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/MessageHandler.java 2006-09-14
20:48:05 UTC (rev 10474)
@@ -65,5 +65,7 @@
return closestPeer; // Null if the list was empty
}
- public abstract void handleMessage (Message m, Peer src);
+ public abstract void handleMessage (Message m, Peer src);
+
+ protected abstract Search makeSearchMessage();
}
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-09-14 14:14:20 UTC
(rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-09-14 20:48:05 UTC
(rev 10474)
@@ -6,10 +6,7 @@
class Node implements EventTarget
{
- public final static int STORE_SIZE = 10; // Max number of keys in store
- public final static int CACHE_SIZE = 10; // Max number of keys in cache
- public final static double MIN_SLEEP = 0.01; // Seconds
- public final static double SHORT_SLEEP = 0.05; // Poll the bw limiter
+ public final static double SHORT_SLEEP = 0.01; // Poll the bw limiter
// Token bucket bandwidth limiter
public final static int BUCKET_RATE = 30000; // Bytes per second
@@ -20,8 +17,11 @@
private HashMap<Integer,Peer> peers; // Look up a peer by its address
private HashSet<Integer> recentlySeenRequests; // Request IDs
private HashMap<Integer,MessageHandler> messageHandlers; // By ID
- private LruCache<Integer> chkStore; // CHK datastore
- private LruCache<Integer> chkCache; // CHK datacache
+ private LruCache<Integer> chkStore;
+ private LruCache<Integer> chkCache;
+ private LruCache<Integer> sskStore;
+ private LruCache<Integer> sskCache;
+ private LruCache<Integer> pubKeyCache; // SSK public keys
public TokenBucket bandwidth; // Bandwidth limiter
private boolean timerRunning = false; // Is the timer running?
@@ -32,8 +32,11 @@
peers = new HashMap<Integer,Peer>();
recentlySeenRequests = new HashSet<Integer>();
messageHandlers = new HashMap<Integer,MessageHandler>();
- chkStore = new LruCache<Integer> (STORE_SIZE);
- chkCache = new LruCache<Integer> (CACHE_SIZE);
+ chkStore = new LruCache<Integer> (10);
+ chkCache = new LruCache<Integer> (10);
+ sskStore = new LruCache<Integer> (10);
+ sskCache = new LruCache<Integer> (10);
+ pubKeyCache = new LruCache<Integer> (10);
bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
}
@@ -103,6 +106,30 @@
else log ("key " + key + " not added to CHK store");
}
+ // Add an SSK to the cache
+ public void cacheSsk (int key)
+ {
+ log ("key " + key + " added to SSK cache");
+ sskCache.put (key);
+ }
+
+ // Consider adding an SSK to the store
+ public void storeSsk (int key)
+ {
+ if (closerThanPeers (keyToLocation (key))) {
+ log ("key " + key + " added to SSK store");
+ sskStore.put (key);
+ }
+ else log ("key " + key + " not added to SSK store");
+ }
+
+ // Add a public key to the cache
+ public void cachePubKey (int key)
+ {
+ log ("public key " + key + " added to cache");
+ pubKeyCache.put (key);
+ }
+
// Called by Peer
public void startTimer()
{
@@ -128,6 +155,10 @@
handleChkRequest ((ChkRequest) m, src);
else if (m instanceof ChkInsert)
handleChkInsert ((ChkInsert) m, src);
+ else if (m instanceof SskRequest)
+ handleSskRequest ((SskRequest) m, src);
+ else if (m instanceof SskInsert)
+ handleSskInsert ((SskInsert) m, src);
else {
MessageHandler mh = messageHandlers.get (m.id);
if (mh == null) log ("no message handler for " + m);
@@ -150,28 +181,33 @@
log ("accepting " + r);
prev.sendMessage (new Accepted (r.id));
}
- // If the key is in the store, return it
+ // If the data is in the store, return it
if (chkStore.get (r.key)) {
log ("key " + r.key + " found in CHK store");
if (prev == null) log (r + " succeeded locally");
- else prev.sendMessage (new ChkDataFound (r.id));
- chkRequestCompleted (r.id);
+ else {
+ prev.sendMessage (new ChkDataFound (r.id));
+ for (int i = 0; i < 32; i++)
+ prev.sendMessage (new Block (r.id, i));
+ }
return;
}
log ("key " + r.key + " not found in CHK store");
- // If the key is in the cache, return it
+ // If the data is in the cache, return it
if (chkCache.get (r.key)) {
log ("key " + r.key + " found in CHK cache");
if (prev == null) log (r + " succeeded locally");
- else prev.sendMessage (new ChkDataFound (r.id));
- chkRequestCompleted (r.id);
+ else {
+ prev.sendMessage (new ChkDataFound (r.id));
+ for (int i = 0; i < 32; i++)
+ prev.sendMessage (new Block (r.id, i));
+ }
return;
}
log ("key " + r.key + " not found in CHK cache");
// Store the request handler and forward the search
ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
messageHandlers.put (r.id, rh);
- rh.forwardSearch();
}
private void handleChkInsert (ChkInsert i, Peer prev)
@@ -194,18 +230,87 @@
messageHandlers.put (i.id, ih);
}
- // Remove a completed request from the list of pending requests
- public void chkRequestCompleted (int id)
+ private void handleSskRequest (SskRequest r, Peer prev)
{
- messageHandlers.remove (id);
+ if (!recentlySeenRequests.add (r.id)) {
+ log ("rejecting recently seen " + r);
+ prev.sendMessage (new RejectedLoop (r.id));
+ // Don't forward the same search back to prev
+ MessageHandler mh = messageHandlers.get (r.id);
+ if (mh != null) mh.removeNextHop (prev);
+ return;
+ }
+ // Look up the public key
+ boolean pub = pubKeyCache.get (r.key);
+ if (pub) log ("public key " + r.key + " found in cache");
+ else log ("public key " + r.key + " not found in cache");
+ // Accept the search
+ if (prev != null) {
+ log ("accepting " + r);
+ prev.sendMessage (new Accepted (r.id));
+ }
+ // If the data is in the store, return it
+ if (pub && sskStore.get (r.key)) {
+ log ("key " + r.key + " found in SSK store");
+ if (prev == null) log (r + " succeeded locally");
+ else {
+ prev.sendMessage (new SskDataFound (r.id));
+ if (r.needPubKey)
+ prev.sendMessage
+ (new SskPubKey (r.id, r.key));
+ }
+ return;
+ }
+ log ("key " + r.key + " not found in SSK store");
+ // If the data is in the cache, return it
+ if (pub && sskCache.get (r.key)) {
+ log ("key " + r.key + " found in SSK cache");
+ if (prev == null) log (r + " succeeded locally");
+ else {
+ prev.sendMessage (new SskDataFound (r.id));
+ if (r.needPubKey)
+ prev.sendMessage
+ (new SskPubKey (r.id, r.key));
+ }
+ return;
+ }
+ log ("key " + r.key + " not found in SSK cache");
+ // Store the request handler and forward the search
+ SskRequestHandler rh = new SskRequestHandler (r,this,prev,!pub);
+ messageHandlers.put (r.id, rh);
}
- // Remove a completed insert from the list of pending inserts
- public void chkInsertCompleted (int id)
+ private void handleSskInsert (SskInsert i, Peer prev)
{
- messageHandlers.remove (id);
+ if (!recentlySeenRequests.add (i.id)) {
+ log ("rejecting recently seen " + i);
+ prev.sendMessage (new RejectedLoop (i.id));
+ // Don't forward the same search back to prev
+ MessageHandler mh = messageHandlers.get (i.id);
+ if (mh != null) mh.removeNextHop (prev);
+ return;
+ }
+ // Look up the public key
+ boolean pub = pubKeyCache.get (i.key);
+ if (pub) log ("public key " + i.key + " found in cache");
+ else log ("public key " + i.key + " not found in cache");
+ // Accept the search
+ if (prev != null) {
+ log ("accepting " + i);
+ prev.sendMessage (new SskAccepted (i.id, !pub));
+ }
+ // Store the insert handler and possibly wait for the pub key
+ SskInsertHandler ih = new SskInsertHandler (i,this,prev,!pub);
+ messageHandlers.put (i.id, ih);
}
+ public void removeMessageHandler (int id)
+ {
+ MessageHandler mh = messageHandlers.remove (id);
+ if (mh == null) log ("no message handler to remove for " + id);
+ else log ("removing message handler for " + id);
+ }
+
// Return the list of peers in a random order
public ArrayList<Peer> peers()
{
@@ -219,16 +324,16 @@
Event.log (net.address + " " + message);
}
- // Event callback
- private void generateRequest (int key)
+ // Event callbacks
+
+ private void generateChkRequest (int key)
{
ChkRequest cr = new ChkRequest (key, location);
log ("generating " + cr);
handleChkRequest (cr, null);
}
- // Event callback
- private void generateInsert (int key)
+ private void generateChkInsert (int key)
{
ChkInsert ci = new ChkInsert (key, location);
log ("generating " + ci);
@@ -238,7 +343,21 @@
handleMessage (new Block (ci.id, i), null);
}
- // Event callback
+ private void generateSskRequest (int key)
+ {
+ SskRequest sr = new SskRequest (key, location, true);
+ log ("generating " + sr);
+ handleSskRequest (sr, null);
+ }
+
+ private void generateSskInsert (int key)
+ {
+ SskInsert si = new SskInsert (key, location);
+ log ("generating " + si);
+ pubKeyCache.put (key);
+ handleSskInsert (si, null);
+ }
+
private void checkTimeouts()
{
// Check the peers in a random order each time
@@ -251,7 +370,7 @@
}
else {
double sleep = deadline - Event.time(); // Can be < 0
- if (sleep < MIN_SLEEP) sleep = MIN_SLEEP;
+ if (sleep < SHORT_SLEEP) sleep = SHORT_SLEEP;
// log ("sleeping for " + sleep + " seconds");
Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
}
@@ -261,14 +380,22 @@
public void handleEvent (int type, Object data)
{
switch (type) {
- case GENERATE_REQUEST:
- generateRequest ((Integer) data);
+ case GENERATE_CHK_REQUEST:
+ generateChkRequest ((Integer) data);
break;
- case GENERATE_INSERT:
- generateInsert ((Integer)data);
+ case GENERATE_CHK_INSERT:
+ generateChkInsert ((Integer) data);
break;
+ case GENERATE_SSK_REQUEST:
+ generateSskRequest ((Integer) data);
+ break;
+
+ case GENERATE_SSK_INSERT:
+ generateSskInsert ((Integer) data);
+ break;
+
case CHECK_TIMEOUTS:
checkTimeouts();
break;
@@ -276,7 +403,9 @@
}
// Each EventTarget class has its own event codes
- public final static int GENERATE_REQUEST = 1;
- public final static int GENERATE_INSERT = 2;
- private final static int CHECK_TIMEOUTS = 3;
+ public final static int GENERATE_CHK_REQUEST = 1;
+ public final static int GENERATE_CHK_INSERT = 2;
+ public final static int GENERATE_SSK_REQUEST = 3;
+ public final static int GENERATE_SSK_INSERT = 4;
+ private final static int CHECK_TIMEOUTS = 5;
}
Added: trunk/apps/load-balancing-sims/phase6/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestHandler.java 2006-09-14
14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/RequestHandler.java 2006-09-14
20:48:05 UTC (rev 10474)
@@ -0,0 +1,134 @@
+// The parent class of ChkRequestHandler and SskRequestHandler
+
+import messages.*;
+
+abstract class RequestHandler extends MessageHandler implements EventTarget
+{
+ protected int state = STARTED; // State of search
+
+ public RequestHandler (Search s, Node node, Peer prev)
+ {
+ super (s, node, prev);
+ }
+
+ protected void handleAccepted (Accepted a)
+ {
+ if (state != SENT) node.log (a + " out of order");
+ state = ACCEPTED;
+ // Wait 60 seconds for a reply to the search
+ Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
+ }
+
+ protected void handleRejectedLoop (RejectedLoop rl)
+ {
+ if (state != SENT) node.log (rl + " out of order");
+ forwardSearch();
+ }
+
+ protected void handleRouteNotFound (RouteNotFound rnf)
+ {
+ if (state != ACCEPTED) node.log (rnf + " out of order");
+ if (rnf.htl < htl) htl = rnf.htl;
+ // Use the remaining htl to try another peer
+ forwardSearch();
+ }
+
+ protected void handleDataNotFound (DataNotFound dnf)
+ {
+ if (state != ACCEPTED) node.log (dnf + " out of order");
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (dnf); // Forward the message
+ finish();
+ }
+
+ protected void forwardSearch()
+ {
+ next = null;
+ // If the search has run out of hops, send DataNotFound
+ if (htl == 0) {
+ node.log ("data not found for " + this);
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (new DataNotFound (id));
+ finish();
+ return;
+ }
+ // Forward the search to the closest remaining peer
+ next = closestPeer();
+ if (next == null) {
+ node.log ("route not found for " + this);
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (new RouteNotFound (id, htl));
+ finish();
+ return;
+ }
+ // Decrement the htl if the next node is not the closest so far
+ double target = Node.keyToLocation (key);
+ if (Node.distance (target, next.location)
+ > Node.distance (target, closest)) {
+ htl = node.decrementHtl (htl);
+ node.log (this + " has htl " + htl);
+ }
+ node.log ("forwarding " + this + " to " + next.address);
+ next.sendMessage (makeSearchMessage());
+ nexts.remove (next);
+ state = SENT;
+ // Wait 5 seconds for the next hop to accept the search
+ Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
+ }
+
+ protected void finish()
+ {
+ state = COMPLETED;
+ node.removeMessageHandler (id);
+ }
+
+ // Event callbacks
+
+ protected void acceptedTimeout (Peer p)
+ {
+ if (p != next) return; // We've already moved on to another peer
+ if (state != SENT) return;
+ node.log (this + " accepted timeout waiting for " + p);
+ forwardSearch(); // Try another peer
+ }
+
+ protected void searchTimeout (Peer p)
+ {
+ if (p != next) return; // We've already moved on to another peer
+ if (state != ACCEPTED) return;
+ node.log (this + " search timeout waiting for " + p);
+ if (prev == null) node.log (this + " failed");
+ finish();
+ }
+
+ protected void transferTimeout (Peer p)
+ {
+ if (state != TRANSFERRING) return;
+ node.log (this + " transfer timeout receiving from " + p);
+ if (prev == null) node.log (this + " failed");
+ finish();
+ }
+
+ // EventTarget interface
+ public void handleEvent (int type, Object data)
+ {
+ switch (type) {
+ case ACCEPTED_TIMEOUT:
+ acceptedTimeout ((Peer) data);
+ break;
+
+ case SEARCH_TIMEOUT:
+ searchTimeout ((Peer) data);
+ break;
+
+ case TRANSFER_TIMEOUT:
+ transferTimeout ((Peer) data);
+ break;
+ }
+ }
+
+ // Each EventTarget class has its own event codes
+ protected final static int ACCEPTED_TIMEOUT = 1;
+ protected final static int SEARCH_TIMEOUT = 2;
+ protected final static int TRANSFER_TIMEOUT = 3;
+}
Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java 2006-09-14 14:14:20 UTC
(rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java 2006-09-14 20:48:05 UTC
(rev 10474)
@@ -26,8 +26,8 @@
n3.connectBothWays (n4, 0.1);
int key = Node.locationToKey (Math.random());
- Event.schedule (n0, 0.0, Node.GENERATE_INSERT, key);
- Event.schedule (n4, 30.0, Node.GENERATE_REQUEST, key);
+ Event.schedule (n0, 0.0, Node.GENERATE_SSK_INSERT, key);
+ Event.schedule (n4, 30.0, Node.GENERATE_SSK_REQUEST, key);
// Run the simulation
Event.run();
Added: trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java 2006-09-14
14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java 2006-09-14
20:48:05 UTC (rev 10474)
@@ -0,0 +1,187 @@
+// The state of an SSK insert as stored at each node along the path
+
+import java.util.HashSet;
+import messages.*;
+
+class SskInsertHandler extends MessageHandler implements EventTarget
+{
+ private int state = STARTED; // State of search
+ private SskPubKey pubKey = null;
+
+ public SskInsertHandler (SskInsert i, Node node,
+ Peer prev, boolean needPubKey)
+ {
+ super (i, node, prev);
+ // Wait 10 seconds for the previous hop to send the public key
+ if (needPubKey) Event.schedule (this, 10.0, KEY_TIMEOUT, null);
+ else {
+ pubKey = new SskPubKey (id, key);
+ node.cacheSsk (key);
+ node.storeSsk (key);
+ forwardSearch();
+ }
+ }
+
+ public void handleMessage (Message m, Peer src)
+ {
+ if (src == prev) {
+ if (m instanceof SskPubKey)
+ handleSskPubKey ((SskPubKey) m);
+ else node.log ("unexpected type for " + m);
+ }
+ else if (src == next) {
+ if (m instanceof SskAccepted)
+ handleSskAccepted ((SskAccepted) m);
+ else if (m instanceof RejectedLoop)
+ handleRejectedLoop ((RejectedLoop) m);
+ else if (m instanceof RouteNotFound)
+ handleRouteNotFound ((RouteNotFound) m);
+ else if (m instanceof InsertReply)
+ handleInsertReply ((InsertReply) m);
+ else node.log ("unexpected type for " + m);
+ }
+ else node.log ("unexpected source for " + m);
+ }
+
+ private void handleSskPubKey (SskPubKey pk)
+ {
+ if (state != STARTED) node.log (pk + " out of order");
+ pubKey = pk;
+ node.cachePubKey (key);
+ node.cacheSsk (key);
+ node.storeSsk (key);
+ forwardSearch();
+ }
+
+ private void handleSskAccepted (SskAccepted sa)
+ {
+ if (state != SENT) node.log (sa + " out of order");
+ state = ACCEPTED;
+ // Wait 60 seconds for a reply to the search
+ Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
+ // Send the public key if requested
+ if (sa.needPubKey) next.sendMessage (pubKey);
+ }
+
+ private void handleRejectedLoop (RejectedLoop rl)
+ {
+ if (state != SENT) node.log (rl + " out of order");
+ forwardSearch();
+ }
+
+ private void handleRouteNotFound (RouteNotFound rnf)
+ {
+ if (state != ACCEPTED) node.log (rnf + " out of order");
+ if (rnf.htl < htl) htl = rnf.htl;
+ // Use the remaining htl to try another peer
+ forwardSearch();
+ }
+
+ private void handleInsertReply (InsertReply ir)
+ {
+ if (state != ACCEPTED) node.log (ir + " out of order");
+ if (prev == null) node.log (this + " succeeded");
+ else prev.sendMessage (ir); // Forward the message
+ finish();
+ }
+
+ public void forwardSearch()
+ {
+ next = null;
+ // If the search has run out of hops, send InsertReply
+ if (htl == 0) {
+ node.log (this + " has no hops left");
+ if (prev == null) node.log (this + " succeeded");
+ else prev.sendMessage (new InsertReply (id));
+ finish();
+ return;
+ }
+ // Forward the search to the closest remaining peer
+ next = closestPeer();
+ if (next == null) {
+ node.log ("route not found for " + this);
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (new RouteNotFound (id, htl));
+ finish();
+ return;
+ }
+ // Decrement the htl if the next node is not the closest so far
+ double target = Node.keyToLocation (key);
+ if (Node.distance (target, next.location)
+ > Node.distance (target, closest)) {
+ htl = node.decrementHtl (htl);
+ node.log (this + " has htl " + htl);
+ }
+ node.log ("forwarding " + this + " to " + next.address);
+ next.sendMessage (makeSearchMessage());
+ nexts.remove (next);
+ state = SENT;
+ // Wait 10 seconds for the next hop to accept the search
+ Event.schedule (this, 10.0, ACCEPTED_TIMEOUT, next);
+ }
+
+ private void finish()
+ {
+ state = COMPLETED;
+ node.removeMessageHandler (id);
+ }
+
+ protected Search makeSearchMessage()
+ {
+ return new SskInsert (id, key, closest, htl);
+ }
+
+ public String toString()
+ {
+ return new String ("SSK insert (" + id + "," + key + ")");
+ }
+
+ // Event callbacks
+
+ private void keyTimeout()
+ {
+ if (state != STARTED) return;
+ node.log (this + " key timeout waiting for " + prev);
+ finish();
+ }
+
+ private void acceptedTimeout (Peer p)
+ {
+ if (p != next) return; // We've already moved on to another peer
+ if (state != SENT) return;
+ node.log (this + " accepted timeout waiting for " + p);
+ forwardSearch(); // Try another peer
+ }
+
+ private void searchTimeout (Peer p)
+ {
+ if (p != next) return; // We've already moved on to another peer
+ if (state != ACCEPTED) return;
+ node.log (this + " search timeout waiting for " + p);
+ if (prev == null) node.log (this + " failed");
+ finish();
+ }
+
+ // EventTarget interface
+ public void handleEvent (int type, Object data)
+ {
+ switch (type) {
+ case KEY_TIMEOUT:
+ keyTimeout();
+ break;
+
+ case ACCEPTED_TIMEOUT:
+ acceptedTimeout ((Peer) data);
+ break;
+
+ case SEARCH_TIMEOUT:
+ searchTimeout ((Peer) data);
+ break;
+ }
+ }
+
+ // Each EventTarget class has its own event codes
+ private final static int KEY_TIMEOUT = 1;
+ private final static int ACCEPTED_TIMEOUT = 2;
+ private final static int SEARCH_TIMEOUT = 3;
+}
Added: trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,80 @@
+// The state of an SSK request as stored at each node along the path
+
+import messages.*;
+
+class SskRequestHandler extends RequestHandler
+{
+ private boolean needPubKey; // Ask the next hop for the public key?
+ private SskPubKey pubKey = null;
+ private SskDataFound data = null;
+
+ public SskRequestHandler (SskRequest r, Node node,
+ Peer prev, boolean needPubKey)
+ {
+ super (r, node, prev);
+ this.needPubKey = needPubKey;
+ if (!needPubKey) pubKey = new SskPubKey (id, key);
+ forwardSearch();
+ }
+
+ public void handleMessage (Message m, Peer src)
+ {
+ if (src != next) {
+ node.log ("unexpected source for " + m);
+ return;
+ }
+ if (m instanceof Accepted)
+ handleAccepted ((Accepted) m);
+ else if (m instanceof RejectedLoop)
+ handleRejectedLoop ((RejectedLoop) m);
+ else if (m instanceof RouteNotFound)
+ handleRouteNotFound ((RouteNotFound) m);
+ else if (m instanceof DataNotFound)
+ handleDataNotFound ((DataNotFound) m);
+ else if (m instanceof SskDataFound)
+ handleSskDataFound ((SskDataFound) m);
+ else if (m instanceof SskPubKey)
+ handleSskPubKey ((SskPubKey) m);
+ else node.log ("unexpected type for " + m);
+ }
+
+ private void handleSskDataFound (SskDataFound df)
+ {
+ if (state != ACCEPTED) node.log (df + " out of order");
+ data = df;
+ if (pubKey == null) return; // Keep waiting
+ if (prev == null) node.log (this + " succeeded");
+ else {
+ prev.sendMessage (data);
+ if (needPubKey) prev.sendMessage (pubKey);
+ }
+ node.cachePubKey (key);
+ node.cacheSsk (key);
+ finish();
+ }
+
+ private void handleSskPubKey (SskPubKey pk)
+ {
+ if (state != ACCEPTED) node.log (pk + " out of order");
+ pubKey = pk;
+ if (data == null) return; // Keep waiting
+ if (prev == null) node.log (this + " succeeded");
+ else {
+ prev.sendMessage (data);
+ if (needPubKey) prev.sendMessage (pubKey);
+ }
+ node.cachePubKey (key);
+ node.cacheSsk (key);
+ finish();
+ }
+
+ protected Search makeSearchMessage()
+ {
+ return new SskRequest (id, key, closest, htl, pubKey == null);
+ }
+
+ public String toString()
+ {
+ return new String ("SSK request (" + id + "," + key + ")");
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase6/messages/ChkInsert.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/ChkInsert.java
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/ChkInsert.java
2006-09-14 20:48:05 UTC (rev 10474)
@@ -16,6 +16,6 @@
public String toString()
{
- return new String ("CHK insert (" +id+ "," +key+ "," +htl+ ")");
+ return new String ("CHK insert (" + id + "," + key + ")");
}
}
Modified: trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java
2006-09-14 20:48:05 UTC (rev 10474)
@@ -16,6 +16,6 @@
public String toString()
{
- return new String ("CHK request (" +id+ "," +key+ "," +htl+")");
+ return new String ("CHK request (" + id + "," + key + ")");
}
}
Added: trunk/apps/load-balancing-sims/phase6/messages/SskAccepted.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskAccepted.java
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskAccepted.java
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,18 @@
+package messages;
+
+public class SskAccepted extends Message
+{
+ public final boolean needPubKey;
+
+ public SskAccepted (int id, boolean needPubKey)
+ {
+ this.id = id;
+ this.needPubKey = needPubKey;
+ size = Message.HEADER_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("SSK accepted (" +id+ "," +needPubKey+ ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,15 @@
+package messages;
+
+public class SskDataFound extends Message
+{
+ public SskDataFound (int id)
+ {
+ this.id = id;
+ size = Message.HEADER_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("SSK data found (" + id + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,21 @@
+package messages;
+
+public class SskInsert extends Search
+{
+ // Start a new insert
+ public SskInsert (int key, double location)
+ {
+ super (key, location);
+ }
+
+ // Forward an insert
+ public SskInsert (int id, int key, double closest, int htl)
+ {
+ super (id, key, closest, htl);
+ }
+
+ public String toString()
+ {
+ return new String ("SSK insert (" + id + "," + key + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase6/messages/SskPubKey.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskPubKey.java
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskPubKey.java
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,18 @@
+package messages;
+
+public class SskPubKey extends Message
+{
+ public final int key;
+
+ public SskPubKey (int id, int key)
+ {
+ this.id = id;
+ this.key = key;
+ size = Message.HEADER_SIZE + Message.KEY_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("SSK public key (" + id + "," + key + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase6/messages/SskRequest.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskRequest.java
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskRequest.java
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,27 @@
+package messages;
+
+public class SskRequest extends Search
+{
+ public final boolean needPubKey;
+
+ // Start a new request
+ public SskRequest (int key, double location, boolean needPubKey)
+ {
+ super (key, location);
+ this.needPubKey = needPubKey;
+ }
+
+ // Forward a request
+ public SskRequest (int id, int key, double closest,
+ int htl, boolean needPubKey)
+ {
+ super (id, key, closest, htl);
+ this.needPubKey = needPubKey;
+ }
+
+ public String toString()
+ {
+ return new String ("SSK request (" + id + "," + key
+ + "," + needPubKey + ")");
+ }
+}