Author: mrogers
Date: 2006-09-14 20:48:05 +0000 (Thu, 14 Sep 2006)
New Revision: 10474

Added:
   trunk/apps/load-balancing-sims/phase6/RequestHandler.java
   trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java
   trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
   trunk/apps/load-balancing-sims/phase6/messages/SskAccepted.java
   trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
   trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
   trunk/apps/load-balancing-sims/phase6/messages/SskPubKey.java
   trunk/apps/load-balancing-sims/phase6/messages/SskRequest.java
Modified:
   trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java
   trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
   trunk/apps/load-balancing-sims/phase6/MessageHandler.java
   trunk/apps/load-balancing-sims/phase6/Node.java
   trunk/apps/load-balancing-sims/phase6/Sim.java
   trunk/apps/load-balancing-sims/phase6/messages/ChkInsert.java
   trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java
Log:
SSK inserts and requests (no collisions yet)

Modified: trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-09-14 
14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-09-14 
20:48:05 UTC (rev 10474)
@@ -95,7 +95,7 @@
                for (int i = 0; i < 32; i++)
                        if (blocks[i] != null) next.sendMessage (blocks[i]);
                // Wait for TransfersCompleted (FIXME: check real timeout)
-               Event.schedule (this, 120.0, TRANSFER_OUT_TIMEOUT, next);
+               Event.schedule (this, 240.0, TRANSFER_OUT_TIMEOUT, next);
        }

        private void handleRejectedLoop (RejectedLoop rl)
@@ -149,7 +149,7 @@
                        node.log (this + " has htl " + htl);
                }
                node.log ("forwarding " + this + " to " + next.address);
-               next.sendMessage (new ChkInsert (id, key, closest, htl));
+               next.sendMessage (makeSearchMessage());
                nexts.remove (next);
                searchState = SENT;
                // Wait 10 seconds for the next hop to accept the search
@@ -170,12 +170,17 @@
                searchState = COMPLETED;
                if (prev == null) node.log (this + " completed");
                else prev.sendMessage (new TransfersCompleted (id));
-               node.chkInsertCompleted (id);
+               node.removeMessageHandler (id);
        }

+       protected Search makeSearchMessage()
+       {
+               return new ChkInsert (id, key, closest, htl);
+       }
+       
        public String toString()
        {
-               return new String ("CHK insert (" +id+ "," +key+ "," +htl+ ")");
+               return new String ("CHK insert (" + id + "," + key + ")");
        }

        // Event callbacks

Modified: trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java        
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java        
2006-09-14 20:48:05 UTC (rev 10474)
@@ -2,9 +2,8 @@

 import messages.*;

-class ChkRequestHandler extends MessageHandler implements EventTarget
+class ChkRequestHandler extends RequestHandler
 {
-       private int state = STARTED; // State of search
        private boolean[] received; // Keep track of received blocks
        private int blocksReceived = 0;

@@ -12,6 +11,7 @@
        {
                super (r, node, prev);
                received = new boolean[32];
+               forwardSearch();
        }

        public void handleMessage (Message m, Peer src)
@@ -35,36 +35,6 @@
                else node.log ("unexpected type for " + m);
        }

-       private void handleAccepted (Accepted a)
-       {
-               if (state != SENT) node.log (a + " out of order");
-               state = ACCEPTED;
-               // Wait 60 seconds for a reply to the search
-               Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
-       }
-       
-       private void handleRejectedLoop (RejectedLoop rl)
-       {
-               if (state != SENT) node.log (rl + " out of order");
-               forwardSearch();
-       }
-       
-       private void handleRouteNotFound (RouteNotFound rnf)
-       {
-               if (state != ACCEPTED) node.log (rnf + " out of order");
-               if (rnf.htl < htl) htl = rnf.htl;
-               // Use the remaining htl to try another peer
-               forwardSearch();
-       }
-       
-       private void handleDataNotFound (DataNotFound dnf)
-       {
-               if (state != ACCEPTED) node.log (dnf + " out of order");
-               if (prev == null) node.log (this + " failed");
-               else prev.sendMessage (dnf); // Forward the message
-               finish();
-       }
-       
        private void handleChkDataFound (ChkDataFound df)
        {
                if (state != ACCEPTED) node.log (df + " out of order");
@@ -93,99 +63,13 @@
                }
        }

-       public void forwardSearch()
+       protected Search makeSearchMessage()
        {
-               next = null;
-               // If the search has run out of hops, send DataNotFound
-               if (htl == 0) {
-                       node.log ("data not found for " + this);
-                       if (prev == null) node.log (this + " failed");
-                       else prev.sendMessage (new DataNotFound (id));
-                       finish();
-                       return;
-               }
-               // Forward the search to the closest remaining peer
-               next = closestPeer();
-               if (next == null) {
-                       node.log ("route not found for " + this);
-                       if (prev == null) node.log (this + " failed");
-                       else prev.sendMessage (new RouteNotFound (id, htl));
-                       finish();
-                       return;
-               }
-               // Decrement the htl if the next node is not the closest so far
-               double target = Node.keyToLocation (key);
-               if (Node.distance (target, next.location)
-               > Node.distance (target, closest)) {
-                       htl = node.decrementHtl (htl);
-                       node.log (this + " has htl " + htl);
-               }
-               node.log ("forwarding " + this + " to " + next.address);
-               next.sendMessage (new ChkRequest (id, key, closest, htl));
-               nexts.remove (next);
-               state = SENT;
-               // Wait 5 seconds for the next hop to accept the search
-               Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
+               return new ChkRequest (id, key, closest, htl);
        }

-       private void finish()
-       {
-               state = COMPLETED;
-               node.chkRequestCompleted (id);
-       }
-       
        public String toString()
        {
                return new String ("CHK request (" + id + "," + key + ")");
        }
-       
-       // Event callbacks
-       
-       private void acceptedTimeout (Peer p)
-       {
-               if (p != next) return; // We've already moved on to another peer
-               if (state != SENT) return;
-               node.log (this + " accepted timeout waiting for " + p);
-               forwardSearch(); // Try another peer
-       }
-       
-       private void searchTimeout (Peer p)
-       {
-               if (p != next) return; // We've already moved on to another peer
-               if (state != ACCEPTED) return;
-               node.log (this + " search timeout waiting for " + p);
-               if (prev == null) node.log (this + " failed");
-               finish();
-       }
-       
-       private void transferTimeout (Peer p)
-       {
-               if (state != TRANSFERRING) return;
-               node.log (this + " transfer timeout receiving from " + p);
-               if (prev == null) node.log (this + " failed");
-               finish();
-       }
-       
-       // EventTarget interface
-       public void handleEvent (int type, Object data)
-       {
-               switch (type) {
-                       case ACCEPTED_TIMEOUT:
-                       acceptedTimeout ((Peer) data);
-                       break;
-                       
-                       case SEARCH_TIMEOUT:
-                       searchTimeout ((Peer) data);
-                       break;
-                       
-                       case TRANSFER_TIMEOUT:
-                       transferTimeout ((Peer) data);
-                       break;
-               }
-       }
-       
-       // Each EventTarget class has its own event codes
-       private final static int ACCEPTED_TIMEOUT = 1;
-       private final static int SEARCH_TIMEOUT = 2;
-       private final static int TRANSFER_TIMEOUT = 3;
 }

Modified: trunk/apps/load-balancing-sims/phase6/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/MessageHandler.java   2006-09-14 
14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/MessageHandler.java   2006-09-14 
20:48:05 UTC (rev 10474)
@@ -65,5 +65,7 @@
                return closestPeer; // Null if the list was empty
        }

-       public abstract void handleMessage (Message m, Peer src);       
+       public abstract void handleMessage (Message m, Peer src);
+       
+       protected abstract Search makeSearchMessage();
 }

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-09-14 14:14:20 UTC 
(rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-09-14 20:48:05 UTC 
(rev 10474)
@@ -6,10 +6,7 @@

 class Node implements EventTarget
 {
-       public final static int STORE_SIZE = 10; // Max number of keys in store
-       public final static int CACHE_SIZE = 10; // Max number of keys in cache
-       public final static double MIN_SLEEP = 0.01; // Seconds
-       public final static double SHORT_SLEEP = 0.05; // Poll the bw limiter
+       public final static double SHORT_SLEEP = 0.01; // Poll the bw limiter

        // Token bucket bandwidth limiter
        public final static int BUCKET_RATE = 30000; // Bytes per second
@@ -20,8 +17,11 @@
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
        private HashSet<Integer> recentlySeenRequests; // Request IDs
        private HashMap<Integer,MessageHandler> messageHandlers; // By ID
-       private LruCache<Integer> chkStore; // CHK datastore
-       private LruCache<Integer> chkCache; // CHK datacache
+       private LruCache<Integer> chkStore;
+       private LruCache<Integer> chkCache;
+       private LruCache<Integer> sskStore;
+       private LruCache<Integer> sskCache;
+       private LruCache<Integer> pubKeyCache; // SSK public keys
        public TokenBucket bandwidth; // Bandwidth limiter
        private boolean timerRunning = false; // Is the timer running?

@@ -32,8 +32,11 @@
                peers = new HashMap<Integer,Peer>();
                recentlySeenRequests = new HashSet<Integer>();
                messageHandlers = new HashMap<Integer,MessageHandler>();
-               chkStore = new LruCache<Integer> (STORE_SIZE);
-               chkCache = new LruCache<Integer> (CACHE_SIZE);
+               chkStore = new LruCache<Integer> (10);
+               chkCache = new LruCache<Integer> (10);
+               sskStore = new LruCache<Integer> (10);
+               sskCache = new LruCache<Integer> (10);
+               pubKeyCache = new LruCache<Integer> (10);
                bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
        }

@@ -103,6 +106,30 @@
                else log ("key " + key + " not added to CHK store");
        }

+       // Add an SSK to the cache
+       public void cacheSsk (int key)
+       {
+               log ("key " + key + " added to SSK cache");
+               sskCache.put (key);
+       }
+       
+       // Consider adding an SSK to the store
+       public void storeSsk (int key)
+       {
+               if (closerThanPeers (keyToLocation (key))) {
+                       log ("key " + key + " added to SSK store");
+                       sskStore.put (key);
+               }
+               else log ("key " + key + " not added to SSK store");
+       }
+       
+       // Add a public key to the cache
+       public void cachePubKey (int key)
+       {
+               log ("public key " + key + " added to cache");
+               pubKeyCache.put (key);
+       }
+       
        // Called by Peer
        public void startTimer()
        {
@@ -128,6 +155,10 @@
                        handleChkRequest ((ChkRequest) m, src);
                else if (m instanceof ChkInsert)
                        handleChkInsert ((ChkInsert) m, src);
+               else if (m instanceof SskRequest)
+                       handleSskRequest ((SskRequest) m, src);
+               else if (m instanceof SskInsert)
+                       handleSskInsert ((SskInsert) m, src);
                else {
                        MessageHandler mh = messageHandlers.get (m.id);
                        if (mh == null) log ("no message handler for " + m);
@@ -150,28 +181,33 @@
                        log ("accepting " + r);
                        prev.sendMessage (new Accepted (r.id));
                }
-               // If the key is in the store, return it
+               // If the data is in the store, return it
                if (chkStore.get (r.key)) {
                        log ("key " + r.key + " found in CHK store");
                        if (prev == null) log (r + " succeeded locally");
-                       else prev.sendMessage (new ChkDataFound (r.id));
-                       chkRequestCompleted (r.id);
+                       else {
+                               prev.sendMessage (new ChkDataFound (r.id));
+                               for (int i = 0; i < 32; i++)
+                                       prev.sendMessage (new Block (r.id, i));
+                       }
                        return;
                }
                log ("key " + r.key + " not found in CHK store");
-               // If the key is in the cache, return it
+               // If the data is in the cache, return it
                if (chkCache.get (r.key)) {
                        log ("key " + r.key + " found in CHK cache");
                        if (prev == null) log (r + " succeeded locally");
-                       else prev.sendMessage (new ChkDataFound (r.id));
-                       chkRequestCompleted (r.id);
+                       else {
+                               prev.sendMessage (new ChkDataFound (r.id));
+                               for (int i = 0; i < 32; i++)
+                                       prev.sendMessage (new Block (r.id, i));
+                       }
                        return;
                }
                log ("key " + r.key + " not found in CHK cache");
                // Store the request handler and forward the search
                ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
                messageHandlers.put (r.id, rh);
-               rh.forwardSearch();
        }

        private void handleChkInsert (ChkInsert i, Peer prev)
@@ -194,18 +230,87 @@
                messageHandlers.put (i.id, ih);
        }

-       // Remove a completed request from the list of pending requests
-       public void chkRequestCompleted (int id)
+       private void handleSskRequest (SskRequest r, Peer prev)
        {
-               messageHandlers.remove (id);
+               if (!recentlySeenRequests.add (r.id)) {
+                       log ("rejecting recently seen " + r);
+                       prev.sendMessage (new RejectedLoop (r.id));
+                       // Don't forward the same search back to prev
+                       MessageHandler mh = messageHandlers.get (r.id);
+                       if (mh != null) mh.removeNextHop (prev);
+                       return;
+               }
+               // Look up the public key
+               boolean pub = pubKeyCache.get (r.key);
+               if (pub) log ("public key " + r.key + " found in cache");
+               else log ("public key " + r.key + " not found in cache");
+               // Accept the search
+               if (prev != null) {
+                       log ("accepting " + r);
+                       prev.sendMessage (new Accepted (r.id));
+               }
+               // If the data is in the store, return it
+               if (pub && sskStore.get (r.key)) {
+                       log ("key " + r.key + " found in SSK store");
+                       if (prev == null) log (r + " succeeded locally");
+                       else {
+                               prev.sendMessage (new SskDataFound (r.id));
+                               if (r.needPubKey)
+                                       prev.sendMessage
+                                               (new SskPubKey (r.id, r.key));
+                       }
+                       return;
+               }
+               log ("key " + r.key + " not found in SSK store");
+               // If the data is in the cache, return it
+               if (pub && sskCache.get (r.key)) {
+                       log ("key " + r.key + " found in SSK cache");
+                       if (prev == null) log (r + " succeeded locally");
+                       else {
+                               prev.sendMessage (new SskDataFound (r.id));
+                               if (r.needPubKey)
+                                       prev.sendMessage
+                                               (new SskPubKey (r.id, r.key));
+                       }
+                       return;
+               }
+               log ("key " + r.key + " not found in SSK cache");
+               // Store the request handler and forward the search
+               SskRequestHandler rh = new SskRequestHandler (r,this,prev,!pub);
+               messageHandlers.put (r.id, rh);
        }

-       // Remove a completed insert from the list of pending inserts
-       public void chkInsertCompleted (int id)
+       private void handleSskInsert (SskInsert i, Peer prev)
        {
-               messageHandlers.remove (id);
+               if (!recentlySeenRequests.add (i.id)) {
+                       log ("rejecting recently seen " + i);
+                       prev.sendMessage (new RejectedLoop (i.id));
+                       // Don't forward the same search back to prev
+                       MessageHandler mh = messageHandlers.get (i.id);
+                       if (mh != null) mh.removeNextHop (prev);
+                       return;
+               }
+               // Look up the public key
+               boolean pub = pubKeyCache.get (i.key);
+               if (pub) log ("public key " + i.key + " found in cache");
+               else log ("public key " + i.key + " not found in cache");
+               // Accept the search
+               if (prev != null) {
+                       log ("accepting " + i);
+                       prev.sendMessage (new SskAccepted (i.id, !pub));
+               }
+               // Store the insert handler and possibly wait for the pub key
+               SskInsertHandler ih = new SskInsertHandler (i,this,prev,!pub);
+               messageHandlers.put (i.id, ih);
        }

+       public void removeMessageHandler (int id)
+       {
+               MessageHandler mh = messageHandlers.remove (id);
+               if (mh == null) log ("no message handler to remove for " + id);
+               else log ("removing message handler for " + id);
+       }
+       
        // Return the list of peers in a random order
        public ArrayList<Peer> peers()
        {
@@ -219,16 +324,16 @@
                Event.log (net.address + " " + message);
        }

-       // Event callback
-       private void generateRequest (int key)
+       // Event callbacks
+       
+       private void generateChkRequest (int key)
        {
                ChkRequest cr = new ChkRequest (key, location);
                log ("generating " + cr);
                handleChkRequest (cr, null);
        }

-       // Event callback
-       private void generateInsert (int key)
+       private void generateChkInsert (int key)
        {
                ChkInsert ci = new ChkInsert (key, location);
                log ("generating " + ci);
@@ -238,7 +343,21 @@
                        handleMessage (new Block (ci.id, i), null);
        }

-       // Event callback
+       private void generateSskRequest (int key)
+       {
+               SskRequest sr = new SskRequest (key, location, true);
+               log ("generating " + sr);
+               handleSskRequest (sr, null);
+       }
+       
+       private void generateSskInsert (int key)
+       {
+               SskInsert si = new SskInsert (key, location);
+               log ("generating " + si);
+               pubKeyCache.put (key);
+               handleSskInsert (si, null);
+       }
+       
        private void checkTimeouts()
        {
                // Check the peers in a random order each time
@@ -251,7 +370,7 @@
                }
                else {
                        double sleep = deadline - Event.time(); // Can be < 0
-                       if (sleep < MIN_SLEEP) sleep = MIN_SLEEP;
+                       if (sleep < SHORT_SLEEP) sleep = SHORT_SLEEP;
                        // log ("sleeping for " + sleep + " seconds");
                        Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
                }
@@ -261,14 +380,22 @@
        public void handleEvent (int type, Object data)
        {
                switch (type) {
-                       case GENERATE_REQUEST:
-                       generateRequest ((Integer) data);
+                       case GENERATE_CHK_REQUEST:
+                       generateChkRequest ((Integer) data);
                        break;

-                       case GENERATE_INSERT:
-                       generateInsert ((Integer)data);
+                       case GENERATE_CHK_INSERT:
+                       generateChkInsert ((Integer) data);
                        break;

+                       case GENERATE_SSK_REQUEST:
+                       generateSskRequest ((Integer) data);
+                       break;
+                       
+                       case GENERATE_SSK_INSERT:
+                       generateSskInsert ((Integer) data);
+                       break;
+                       
                        case CHECK_TIMEOUTS:
                        checkTimeouts();
                        break;
@@ -276,7 +403,9 @@
        }

        // Each EventTarget class has its own event codes
-       public final static int GENERATE_REQUEST = 1;
-       public final static int GENERATE_INSERT = 2;
-       private final static int CHECK_TIMEOUTS = 3;
+       public final static int GENERATE_CHK_REQUEST = 1;
+       public final static int GENERATE_CHK_INSERT = 2;
+       public final static int GENERATE_SSK_REQUEST = 3;
+       public final static int GENERATE_SSK_INSERT = 4;
+       private final static int CHECK_TIMEOUTS = 5;
 }

Added: trunk/apps/load-balancing-sims/phase6/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestHandler.java   2006-09-14 
14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/RequestHandler.java   2006-09-14 
20:48:05 UTC (rev 10474)
@@ -0,0 +1,134 @@
+// The parent class of ChkRequestHandler and SskRequestHandler
+
+import messages.*;
+
+abstract class RequestHandler extends MessageHandler implements EventTarget
+{
+       protected int state = STARTED; // State of search
+       
+       public RequestHandler (Search s, Node node, Peer prev)
+       {
+               super (s, node, prev);
+       }
+       
+       protected void handleAccepted (Accepted a)
+       {
+               if (state != SENT) node.log (a + " out of order");
+               state = ACCEPTED;
+               // Wait 60 seconds for a reply to the search
+               Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
+       }
+       
+       protected void handleRejectedLoop (RejectedLoop rl)
+       {
+               if (state != SENT) node.log (rl + " out of order");
+               forwardSearch();
+       }
+       
+       protected void handleRouteNotFound (RouteNotFound rnf)
+       {
+               if (state != ACCEPTED) node.log (rnf + " out of order");
+               if (rnf.htl < htl) htl = rnf.htl;
+               // Use the remaining htl to try another peer
+               forwardSearch();
+       }
+       
+       protected void handleDataNotFound (DataNotFound dnf)
+       {
+               if (state != ACCEPTED) node.log (dnf + " out of order");
+               if (prev == null) node.log (this + " failed");
+               else prev.sendMessage (dnf); // Forward the message
+               finish();
+       }
+       
+       protected void forwardSearch()
+       {
+               next = null;
+               // If the search has run out of hops, send DataNotFound
+               if (htl == 0) {
+                       node.log ("data not found for " + this);
+                       if (prev == null) node.log (this + " failed");
+                       else prev.sendMessage (new DataNotFound (id));
+                       finish();
+                       return;
+               }
+               // Forward the search to the closest remaining peer
+               next = closestPeer();
+               if (next == null) {
+                       node.log ("route not found for " + this);
+                       if (prev == null) node.log (this + " failed");
+                       else prev.sendMessage (new RouteNotFound (id, htl));
+                       finish();
+                       return;
+               }
+               // Decrement the htl if the next node is not the closest so far
+               double target = Node.keyToLocation (key);
+               if (Node.distance (target, next.location)
+               > Node.distance (target, closest)) {
+                       htl = node.decrementHtl (htl);
+                       node.log (this + " has htl " + htl);
+               }
+               node.log ("forwarding " + this + " to " + next.address);
+               next.sendMessage (makeSearchMessage());
+               nexts.remove (next);
+               state = SENT;
+               // Wait 5 seconds for the next hop to accept the search
+               Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
+       }
+       
+       protected void finish()
+       {
+               state = COMPLETED;
+               node.removeMessageHandler (id);
+       }
+       
+       // Event callbacks
+       
+       protected void acceptedTimeout (Peer p)
+       {
+               if (p != next) return; // We've already moved on to another peer
+               if (state != SENT) return;
+               node.log (this + " accepted timeout waiting for " + p);
+               forwardSearch(); // Try another peer
+       }
+       
+       protected void searchTimeout (Peer p)
+       {
+               if (p != next) return; // We've already moved on to another peer
+               if (state != ACCEPTED) return;
+               node.log (this + " search timeout waiting for " + p);
+               if (prev == null) node.log (this + " failed");
+               finish();
+       }
+       
+       protected void transferTimeout (Peer p)
+       {
+               if (state != TRANSFERRING) return;
+               node.log (this + " transfer timeout receiving from " + p);
+               if (prev == null) node.log (this + " failed");
+               finish();
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               switch (type) {
+                       case ACCEPTED_TIMEOUT:
+                       acceptedTimeout ((Peer) data);
+                       break;
+                       
+                       case SEARCH_TIMEOUT:
+                       searchTimeout ((Peer) data);
+                       break;
+                       
+                       case TRANSFER_TIMEOUT:
+                       transferTimeout ((Peer) data);
+                       break;
+               }
+       }
+       
+       // Each EventTarget class has its own event codes
+       protected final static int ACCEPTED_TIMEOUT = 1;
+       protected final static int SEARCH_TIMEOUT = 2;
+       protected final static int TRANSFER_TIMEOUT = 3;
+}

Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java      2006-09-14 14:14:20 UTC 
(rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java      2006-09-14 20:48:05 UTC 
(rev 10474)
@@ -26,8 +26,8 @@
                n3.connectBothWays (n4, 0.1);

                int key = Node.locationToKey (Math.random());
-               Event.schedule (n0, 0.0, Node.GENERATE_INSERT, key);
-               Event.schedule (n4, 30.0, Node.GENERATE_REQUEST, key);
+               Event.schedule (n0, 0.0, Node.GENERATE_SSK_INSERT, key);
+               Event.schedule (n4, 30.0, Node.GENERATE_SSK_REQUEST, key);

                // Run the simulation
                Event.run();

Added: trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java 2006-09-14 
14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java 2006-09-14 
20:48:05 UTC (rev 10474)
@@ -0,0 +1,187 @@
+// The state of an SSK insert as stored at each node along the path
+
+import java.util.HashSet;
+import messages.*;
+
+class SskInsertHandler extends MessageHandler implements EventTarget
+{
+       private int state = STARTED; // State of search
+       private SskPubKey pubKey = null; 
+       
+       public SskInsertHandler (SskInsert i, Node node,
+                               Peer prev, boolean needPubKey)
+       {
+               super (i, node, prev);
+               // Wait 10 seconds for the previous hop to send the public key
+               if (needPubKey) Event.schedule (this, 10.0, KEY_TIMEOUT, null);
+               else {
+                       pubKey = new SskPubKey (id, key);
+                       node.cacheSsk (key);
+                       node.storeSsk (key);
+                       forwardSearch();
+               }
+       }
+       
+       public void handleMessage (Message m, Peer src)
+       {
+               if (src == prev) {
+                       if (m instanceof SskPubKey)
+                               handleSskPubKey ((SskPubKey) m);
+                       else node.log ("unexpected type for " + m);
+               }
+               else if (src == next) {
+                       if (m instanceof SskAccepted)
+                               handleSskAccepted ((SskAccepted) m);
+                       else if (m instanceof RejectedLoop)
+                               handleRejectedLoop ((RejectedLoop) m);
+                       else if (m instanceof RouteNotFound)
+                               handleRouteNotFound ((RouteNotFound) m);
+                       else if (m instanceof InsertReply)
+                               handleInsertReply ((InsertReply) m);
+                       else node.log ("unexpected type for " + m);
+               }
+               else node.log ("unexpected source for " + m);
+       }
+       
+       private void handleSskPubKey (SskPubKey pk)
+       {
+               if (state != STARTED) node.log (pk + " out of order");
+               pubKey = pk;
+               node.cachePubKey (key);
+               node.cacheSsk (key);
+               node.storeSsk (key);
+               forwardSearch();
+       }
+       
+       private void handleSskAccepted (SskAccepted sa)
+       {
+               if (state != SENT) node.log (sa + " out of order");
+               state = ACCEPTED;
+               // Wait 60 seconds for a reply to the search
+               Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
+               // Send the public key if requested
+               if (sa.needPubKey) next.sendMessage (pubKey);
+       }
+       
+       private void handleRejectedLoop (RejectedLoop rl)
+       {
+               if (state != SENT) node.log (rl + " out of order");
+               forwardSearch();
+       }
+       
+       private void handleRouteNotFound (RouteNotFound rnf)
+       {
+               if (state != ACCEPTED) node.log (rnf + " out of order");
+               if (rnf.htl < htl) htl = rnf.htl;
+               // Use the remaining htl to try another peer
+               forwardSearch();
+       }
+       
+       private void handleInsertReply (InsertReply ir)
+       {
+               if (state != ACCEPTED) node.log (ir + " out of order");
+               if (prev == null) node.log (this + " succeeded");
+               else prev.sendMessage (ir); // Forward the message
+               finish();
+       }
+       
+       public void forwardSearch()
+       {
+               next = null;
+               // If the search has run out of hops, send InsertReply
+               if (htl == 0) {
+                       node.log (this + " has no hops left");
+                       if (prev == null) node.log (this + " succeeded");
+                       else prev.sendMessage (new InsertReply (id));
+                       finish();
+                       return;
+               }
+               // Forward the search to the closest remaining peer
+               next = closestPeer();
+               if (next == null) {
+                       node.log ("route not found for " + this);
+                       if (prev == null) node.log (this + " failed");
+                       else prev.sendMessage (new RouteNotFound (id, htl));
+                       finish();
+                       return;
+               }
+               // Decrement the htl if the next node is not the closest so far
+               double target = Node.keyToLocation (key);
+               if (Node.distance (target, next.location)
+               > Node.distance (target, closest)) {
+                       htl = node.decrementHtl (htl);
+                       node.log (this + " has htl " + htl);
+               }
+               node.log ("forwarding " + this + " to " + next.address);
+               next.sendMessage (makeSearchMessage());
+               nexts.remove (next);
+               state = SENT;
+               // Wait 10 seconds for the next hop to accept the search
+               Event.schedule (this, 10.0, ACCEPTED_TIMEOUT, next);
+       }
+       
+       private void finish()
+       {
+               state = COMPLETED;
+               node.removeMessageHandler (id);
+       }
+       
+       protected Search makeSearchMessage()
+       {
+               return new SskInsert (id, key, closest, htl);
+       }
+       
+       public String toString()
+       {
+               return new String ("SSK insert (" + id + "," + key + ")");
+       }
+       
+       // Event callbacks
+       
+       private void keyTimeout()
+       {
+               if (state != STARTED) return;
+               node.log (this + " key timeout waiting for " + prev);
+               finish();
+       }
+       
+       private void acceptedTimeout (Peer p)
+       {
+               if (p != next) return; // We've already moved on to another peer
+               if (state != SENT) return;
+               node.log (this + " accepted timeout waiting for " + p);
+               forwardSearch(); // Try another peer
+       }
+       
+       private void searchTimeout (Peer p)
+       {
+               if (p != next) return; // We've already moved on to another peer
+               if (state != ACCEPTED) return;
+               node.log (this + " search timeout waiting for " + p);
+               if (prev == null) node.log (this + " failed");
+               finish();
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               switch (type) {
+                       case KEY_TIMEOUT:
+                       keyTimeout();
+                       break;
+                       
+                       case ACCEPTED_TIMEOUT:
+                       acceptedTimeout ((Peer) data);
+                       break;
+                       
+                       case SEARCH_TIMEOUT:
+                       searchTimeout ((Peer) data);
+                       break;                  
+               }
+       }
+       
+       // Each EventTarget class has its own event codes
+       private final static int KEY_TIMEOUT = 1;
+       private final static int ACCEPTED_TIMEOUT = 2;
+       private final static int SEARCH_TIMEOUT = 3;
+}

Added: trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java        
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java        
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,80 @@
+// The state of an SSK request as stored at each node along the path
+
+import messages.*;
+
+class SskRequestHandler extends RequestHandler
+{
+       private boolean needPubKey; // Ask the next hop for the public key?
+       private SskPubKey pubKey = null;
+       private SskDataFound data = null;
+       
+       public SskRequestHandler (SskRequest r, Node node,
+                               Peer prev, boolean needPubKey)
+       {
+               super (r, node, prev);
+               this.needPubKey = needPubKey;
+               if (!needPubKey) pubKey = new SskPubKey (id, key);
+               forwardSearch();
+       }
+       
+       public void handleMessage (Message m, Peer src)
+       {
+               if (src != next) {
+                       node.log ("unexpected source for " + m);
+                       return;
+               }
+               if (m instanceof Accepted)
+                       handleAccepted ((Accepted) m);
+               else if (m instanceof RejectedLoop)
+                       handleRejectedLoop ((RejectedLoop) m);
+               else if (m instanceof RouteNotFound)
+                       handleRouteNotFound ((RouteNotFound) m);
+               else if (m instanceof DataNotFound)
+                       handleDataNotFound ((DataNotFound) m);
+               else if (m instanceof SskDataFound)
+                       handleSskDataFound ((SskDataFound) m);
+               else if (m instanceof SskPubKey)
+                       handleSskPubKey ((SskPubKey) m);
+               else node.log ("unexpected type for " + m);
+       }
+       
+       private void handleSskDataFound (SskDataFound df)
+       {
+               if (state != ACCEPTED) node.log (df + " out of order");
+               data = df;
+               if (pubKey == null) return; // Keep waiting
+               if (prev == null) node.log (this + " succeeded");
+               else {
+                       prev.sendMessage (data);
+                       if (needPubKey) prev.sendMessage (pubKey);
+               }
+               node.cachePubKey (key);
+               node.cacheSsk (key);
+               finish();
+       }
+       
+       private void handleSskPubKey (SskPubKey pk)
+       {
+               if (state != ACCEPTED) node.log (pk + " out of order");
+               pubKey = pk;
+               if (data == null) return; // Keep waiting
+               if (prev == null) node.log (this + " succeeded");
+               else {
+                       prev.sendMessage (data);
+                       if (needPubKey) prev.sendMessage (pubKey);
+               }
+               node.cachePubKey (key);
+               node.cacheSsk (key);
+               finish();
+       }
+       
+       protected Search makeSearchMessage()
+       {
+               return new SskRequest (id, key, closest, htl, pubKey == null);
+       }
+       
+       public String toString()
+       {
+               return new String ("SSK request (" + id + "," + key + ")");
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase6/messages/ChkInsert.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/ChkInsert.java       
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/ChkInsert.java       
2006-09-14 20:48:05 UTC (rev 10474)
@@ -16,6 +16,6 @@

        public String toString()
        {
-               return new String ("CHK insert (" +id+ "," +key+ "," +htl+ ")");
+               return new String ("CHK insert (" + id + "," + key + ")");
        }
 }

Modified: trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java      
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/ChkRequest.java      
2006-09-14 20:48:05 UTC (rev 10474)
@@ -16,6 +16,6 @@

        public String toString()
        {
-               return new String ("CHK request (" +id+ "," +key+ "," +htl+")");
+               return new String ("CHK request (" + id + "," + key + ")");
        }
 }

Added: trunk/apps/load-balancing-sims/phase6/messages/SskAccepted.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskAccepted.java     
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskAccepted.java     
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,18 @@
+package messages;
+
+public class SskAccepted extends Message
+{
+       public final boolean needPubKey;
+       
+       public SskAccepted (int id, boolean needPubKey)
+       {
+               this.id = id;
+               this.needPubKey = needPubKey;
+               size = Message.HEADER_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("SSK accepted (" +id+ "," +needPubKey+ ")");
+       }
+}

Added: trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java    
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java    
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,15 @@
+package messages;
+
+public class SskDataFound extends Message
+{
+       public SskDataFound (int id)
+       {
+               this.id = id;
+               size = Message.HEADER_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("SSK data found (" + id + ")");
+       }
+}

Added: trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java       
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java       
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,21 @@
+package messages;
+
+public class SskInsert extends Search
+{
+       // Start a new insert
+       public SskInsert (int key, double location)
+       {
+               super (key, location);
+       }
+       
+       // Forward an insert
+       public SskInsert (int id, int key, double closest, int htl)
+       {
+               super (id, key, closest, htl);
+       }
+       
+       public String toString()
+       {
+               return new String ("SSK insert (" + id + "," + key + ")");
+       }
+}

Added: trunk/apps/load-balancing-sims/phase6/messages/SskPubKey.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskPubKey.java       
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskPubKey.java       
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,18 @@
+package messages;
+
+public class SskPubKey extends Message
+{
+       public final int key;
+       
+       public SskPubKey (int id, int key)
+       {
+               this.id = id;
+               this.key = key;
+               size = Message.HEADER_SIZE + Message.KEY_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("SSK public key (" + id + "," + key + ")");
+       }
+}

Added: trunk/apps/load-balancing-sims/phase6/messages/SskRequest.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskRequest.java      
2006-09-14 14:14:20 UTC (rev 10473)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskRequest.java      
2006-09-14 20:48:05 UTC (rev 10474)
@@ -0,0 +1,27 @@
+package messages;
+
+public class SskRequest extends Search
+{
+       public final boolean needPubKey;
+       
+       // Start a new request
+       public SskRequest (int key, double location, boolean needPubKey)
+       {
+               super (key, location);
+               this.needPubKey = needPubKey;
+       }
+       
+       // Forward a request
+       public SskRequest (int id, int key, double closest,
+                               int htl, boolean needPubKey)
+       {
+               super (id, key, closest, htl);
+               this.needPubKey = needPubKey;
+       }
+       
+       public String toString()
+       {
+               return new String ("SSK request (" + id + "," + key
+                                       + "," + needPubKey + ")");
+       }
+}


Reply via email to