Author: mrogers
Date: 2006-09-27 19:10:44 +0000 (Wed, 27 Sep 2006)
New Revision: 10520

Added:
   trunk/apps/load-balancing-sims/phase6/LruMap.java
Modified:
   trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java
   trunk/apps/load-balancing-sims/phase6/LruCache.java
   trunk/apps/load-balancing-sims/phase6/Node.java
   trunk/apps/load-balancing-sims/phase6/Sim.java
   trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java
   trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
   trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
   trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
Log:
SSK collisions (not properly tested)

Modified: trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-09-27 
17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-09-27 
19:10:44 UTC (rev 10520)
@@ -65,10 +65,8 @@
                blocksReceived++;
                // Forward the block to all receivers
                for (Peer p : receivers) p.sendMessage (b);
-               // If the transfer is complete, cache and maybe store the data
+               // If the transfer is complete, consider finishing
                if (blocksReceived == 32) {
-                       node.cacheChk (key);
-                       node.storeChk (key);
                        inState = COMPLETED;
                        considerFinishing();
                }
@@ -167,6 +165,8 @@
        {
                inState = COMPLETED;
                searchState = COMPLETED;
+               node.cacheChk (key);
+               node.storeChk (key);
                if (prev == null) node.log (this + " completed");
                else prev.sendMessage (new TransfersCompleted (id));
                node.removeMessageHandler (id);

Modified: trunk/apps/load-balancing-sims/phase6/LruCache.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/LruCache.java 2006-09-27 17:32:07 UTC 
(rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/LruCache.java 2006-09-27 19:10:44 UTC 
(rev 10520)
@@ -4,8 +4,8 @@

 class LruCache<Key>
 {
-       public int capacity;
-       public LinkedHashSet<Key> set;
+       public final int capacity;
+       private LinkedHashSet<Key> set;

        public LruCache (int capacity)
        {

Added: trunk/apps/load-balancing-sims/phase6/LruMap.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/LruMap.java   2006-09-27 17:32:07 UTC 
(rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/LruMap.java   2006-09-27 19:10:44 UTC 
(rev 10520)
@@ -0,0 +1,59 @@
+// Limited-capacity LRU cache that stores a value for each key
+
+import java.util.LinkedHashSet;
+import java.util.HashMap;
+
+class LruMap<Key,Value>
+{
+       public final int capacity;
+       private LinkedHashSet<Key> set;
+       private HashMap<Key,Value> map;
+       
+       public LruMap (int capacity)
+       {
+               this.capacity = capacity;
+               set = new LinkedHashSet<Key> (capacity);
+               map = new HashMap<Key,Value> (capacity);
+       }
+       
+       public Value get (Key key)
+       {
+               log ("searching cache for key " + key);
+               Value value = map.get (key);
+               if (value != null) {
+                       // Move the key to the fresh end
+                       set.remove (key);
+                       set.add (key);
+               }
+               return value;
+       }
+       
+       // Return the existing value (which is not replaced), or null
+       public Value put (Key key, Value value)
+       {
+               Value existing = map.get (key);
+               if (existing == null) {
+                       log ("adding key " + key + " to cache");
+                       if (set.size() == capacity) {
+                               // Discard the oldest element
+                               Key oldest = set.iterator().next();
+                               log ("discarding key " + oldest);
+                               set.remove (oldest);
+                       }
+                       map.put (key, value);
+                       return value;
+               }
+               else {
+                       log ("key " + key + " already in cache");
+                       // Move the key to the fresh end
+                       set.remove (key);
+                       set.add (key);
+                       return existing;
+               }
+       }
+       
+       private void log (String message)
+       {
+               // Event.log (message);
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-09-27 17:32:07 UTC 
(rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-09-27 19:10:44 UTC 
(rev 10520)
@@ -19,8 +19,8 @@
        private HashMap<Integer,MessageHandler> messageHandlers; // By ID
        private LruCache<Integer> chkStore;
        private LruCache<Integer> chkCache;
-       private LruCache<Integer> sskStore;
-       private LruCache<Integer> sskCache;
+       private LruMap<Integer,Integer> sskStore; // SSKs can collide
+       private LruMap<Integer,Integer> sskCache;
        private LruCache<Integer> pubKeyCache; // SSK public keys
        private boolean decrementMaxHtl = false;
        private boolean decrementMinHtl = false;
@@ -41,8 +41,8 @@
                messageHandlers = new HashMap<Integer,MessageHandler>();
                chkStore = new LruCache<Integer> (10);
                chkCache = new LruCache<Integer> (10);
-               sskStore = new LruCache<Integer> (10);
-               sskCache = new LruCache<Integer> (10);
+               sskStore = new LruMap<Integer,Integer> (10);
+               sskCache = new LruMap<Integer,Integer> (10);
                pubKeyCache = new LruCache<Integer> (10);
                if (Math.random() < 0.5) decrementMaxHtl = true;
                if (Math.random() < 0.25) decrementMinHtl = true;
@@ -116,19 +116,27 @@
                else log ("key " + key + " not added to CHK store");
        }

+       // Retrieve an SSK from the cache or the store
+       public Integer fetchSsk (int key)
+       {
+               Integer data = sskStore.get (key);
+               if (data == null) return sskCache.get (key);
+               else return data;
+       }
+       
        // Add an SSK to the cache
-       public void cacheSsk (int key)
+       public void cacheSsk (int key, int value)
        {
                log ("key " + key + " added to SSK cache");
-               sskCache.put (key);
+               sskCache.put (key, value);
        }

        // Consider adding an SSK to the store
-       public void storeSsk (int key)
+       public void storeSsk (int key, int value)
        {
                if (closerThanPeers (keyToLocation (key))) {
                        log ("key " + key + " added to SSK store");
-                       sskStore.put (key);
+                       sskStore.put (key, value);
                }
                else log ("key " + key + " not added to SSK store");
        }
@@ -260,11 +268,12 @@
                        prev.sendMessage (new Accepted (r.id));
                }
                // If the data is in the store, return it
-               if (pub && sskStore.get (r.key)) {
+               Integer data = sskStore.get (r.key);
+               if (pub && data != null) {
                        log ("key " + r.key + " found in SSK store");
                        if (prev == null) log (r + " succeeded locally");
                        else {
-                               prev.sendMessage (new SskDataFound (r.id));
+                               prev.sendMessage (new SskDataFound (r.id,data));
                                if (r.needPubKey)
                                        prev.sendMessage
                                                (new SskPubKey (r.id, r.key));
@@ -273,11 +282,12 @@
                }
                log ("key " + r.key + " not found in SSK store");
                // If the data is in the cache, return it
-               if (pub && sskCache.get (r.key)) {
+               data = sskCache.get (r.key);
+               if (pub && data != null) {
                        log ("key " + r.key + " found in SSK cache");
                        if (prev == null) log (r + " succeeded locally");
                        else {
-                               prev.sendMessage (new SskDataFound (r.id));
+                               prev.sendMessage (new SskDataFound (r.id,data));
                                if (r.needPubKey)
                                        prev.sendMessage
                                                (new SskPubKey (r.id, r.key));
@@ -360,9 +370,9 @@
                handleSskRequest (sr, null);
        }

-       private void generateSskInsert (int key)
+       private void generateSskInsert (int key, int value)
        {
-               SskInsert si = new SskInsert (key, location);
+               SskInsert si = new SskInsert (key, value, location);
                log ("generating " + si);
                pubKeyCache.put (key);
                handleSskInsert (si, null);
@@ -403,9 +413,12 @@
                        break;

                        case GENERATE_SSK_INSERT:
-                       generateSskInsert ((Integer) data);
+                       generateSskInsert ((Integer) data, 0);
                        break;

+                       case GENERATE_SSK_COLLISION:
+                       generateSskInsert ((Integer) data, 1);
+                       
                        case CHECK_TIMEOUTS:
                        checkTimeouts();
                        break;
@@ -417,5 +430,6 @@
        public final static int GENERATE_CHK_INSERT = 2;
        public final static int GENERATE_SSK_REQUEST = 3;
        public final static int GENERATE_SSK_INSERT = 4;
-       private final static int CHECK_TIMEOUTS = 5;
+       public final static int GENERATE_SSK_COLLISION = 5;
+       private final static int CHECK_TIMEOUTS = 6;
 }

Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java      2006-09-27 17:32:07 UTC 
(rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java      2006-09-27 19:10:44 UTC 
(rev 10520)
@@ -22,11 +22,14 @@
                }

                int key = Node.locationToKey (Math.random());
-               Event.schedule (nodes[0], 0.0, Node.GENERATE_SSK_INSERT, key);
-               Event.schedule (nodes[10], 30.0, Node.GENERATE_SSK_REQUEST,key);
-               key = Node.locationToKey (Math.random());
-               Event.schedule (nodes[5], 60.0, Node.GENERATE_CHK_INSERT, key);
-               Event.schedule (nodes[15], 90.0, Node.GENERATE_CHK_REQUEST,key);
+               Event.schedule (nodes[0], 0.0,
+                       Node.GENERATE_SSK_INSERT, key);
+               Event.schedule (nodes[5], 30.0,
+                       Node.GENERATE_SSK_REQUEST, key);
+               Event.schedule (nodes[10], 60.0,
+                       Node.GENERATE_SSK_COLLISION, key);
+               Event.schedule (nodes[15], 90.0,
+                       Node.GENERATE_SSK_REQUEST, key);

                // Run the simulation
                Event.run();

Modified: trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java 2006-09-27 
17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java 2006-09-27 
19:10:44 UTC (rev 10520)
@@ -7,21 +7,36 @@
 {
        private int searchState = STARTED; // searchState of search
        private SskPubKey pubKey = null; 
+       private int data; // The data being inserted

        public SskInsertHandler (SskInsert i, Node node,
                                Peer prev, boolean needPubKey)
        {
                super (i, node, prev);
+               data = i.data;
                // Wait 10 seconds for the previous hop to send the public key
                if (needPubKey) Event.schedule (this, 10.0, KEY_TIMEOUT, null);
                else {
                        pubKey = new SskPubKey (id, key);
-                       node.cacheSsk (key);
-                       node.storeSsk (key);
+                       checkCollision();
                        forwardSearch();
                }
        }

+       // Check whether an older version of the data is already stored
+       private void checkCollision()
+       {
+               Integer old = node.fetchSsk (key);
+               if (old != null && old != data) {
+                       node.log (this + " collided");
+                       if (prev == null) node.log (this + " collided locally");
+                       else prev.sendMessage (new SskDataFound (id, old));
+                       // Continue inserting the old data
+                       data = old;
+                       return;
+               }
+       }
+       
        public void handleMessage (Message m, Peer src)
        {
                if (src == prev) {
@@ -36,6 +51,8 @@
                                handleRejectedLoop ((RejectedLoop) m);
                        else if (m instanceof RouteNotFound)
                                handleRouteNotFound ((RouteNotFound) m);
+                       else if (m instanceof SskDataFound)
+                               handleCollision ((SskDataFound) m);
                        else if (m instanceof InsertReply)
                                handleInsertReply ((InsertReply) m);
                        else node.log ("unexpected type for " + m);
@@ -47,9 +64,7 @@
        {
                if (searchState != STARTED) node.log (pk + " out of order");
                pubKey = pk;
-               node.cachePubKey (key);
-               node.cacheSsk (key);
-               node.storeSsk (key);
+               checkCollision();
                forwardSearch();
        }

@@ -77,6 +92,14 @@
                forwardSearch();
        }

+       private void handleCollision (SskDataFound sdf)
+       {
+               if (searchState != ACCEPTED) node.log (sdf + " out of order");
+               if (prev == null) node.log (this + " collided");
+               else prev.sendMessage (sdf); // Forward the message
+               data = sdf.data; // Is this safe?
+       }
+       
        private void handleInsertReply (InsertReply ir)
        {
                if (searchState != ACCEPTED) node.log (ir + " out of order");
@@ -122,17 +145,20 @@
        private void finish()
        {
                searchState = COMPLETED;
+               node.cachePubKey (key);
+               node.cacheSsk (key, data);
+               node.storeSsk (key, data);
                node.removeMessageHandler (id);
        }

        protected Search makeSearchMessage()
        {
-               return new SskInsert (id, key, closest, htl);
+               return new SskInsert (id, key, data, closest, htl);
        }

        public String toString()
        {
-               return new String ("SSK insert (" + id + "," + key + ")");
+               return new String ("SSK insert (" +id+ "," +key+ "," +data+")");
        }

        // Event callbacks

Modified: trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java        
2006-09-27 17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java        
2006-09-27 19:10:44 UTC (rev 10520)
@@ -6,7 +6,7 @@
 {
        private boolean needPubKey; // Ask the next hop for the public key?
        private SskPubKey pubKey = null;
-       private SskDataFound data = null;
+       private SskDataFound dataFound = null;

        public SskRequestHandler (SskRequest r, Node node,
                                Peer prev, boolean needPubKey)
@@ -41,15 +41,15 @@
        private void handleSskDataFound (SskDataFound df)
        {
                if (searchState != ACCEPTED) node.log (df + " out of order");
-               data = df;
+               dataFound = df;
                if (pubKey == null) return; // Keep waiting
                if (prev == null) node.log (this + " succeeded");
                else {
-                       prev.sendMessage (data);
+                       prev.sendMessage (dataFound);
                        if (needPubKey) prev.sendMessage (pubKey);
                }
                node.cachePubKey (key);
-               node.cacheSsk (key);
+               node.cacheSsk (key, dataFound.data);
                finish();
        }

@@ -57,14 +57,14 @@
        {
                if (searchState != ACCEPTED) node.log (pk + " out of order");
                pubKey = pk;
-               if (data == null) return; // Keep waiting
+               if (dataFound == null) return; // Keep waiting
                if (prev == null) node.log (this + " succeeded");
                else {
-                       prev.sendMessage (data);
+                       prev.sendMessage (dataFound);
                        if (needPubKey) prev.sendMessage (pubKey);
                }
                node.cachePubKey (key);
-               node.cacheSsk (key);
+               node.cacheSsk (key, dataFound.data);
                finish();
        }


Modified: trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java    
2006-09-27 17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java    
2006-09-27 19:10:44 UTC (rev 10520)
@@ -2,14 +2,17 @@

 public class SskDataFound extends Message
 {
-       public SskDataFound (int id)
+       public final int data;
+       
+       public SskDataFound (int id, int data)
        {
                this.id = id;
+               this.data = data;
                size = Message.HEADER_SIZE + Message.DATA_SIZE;
        }

        public String toString()
        {
-               return new String ("SSK data found (" + id + ")");
+               return new String ("SSK data found (" + id + "," + data + ")");
        }
 }

Modified: trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java       
2006-09-27 17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java       
2006-09-27 19:10:44 UTC (rev 10520)
@@ -2,22 +2,26 @@

 public class SskInsert extends Search
 {
+       public final int data;
+       
        // Start a new insert
-       public SskInsert (int key, double location)
+       public SskInsert (int key, int data, double location)
        {
                super (key, location);
+               this.data = data;
                size += DATA_SIZE;
        }

        // Forward an insert
-       public SskInsert (int id, int key, double closest, int htl)
+       public SskInsert (int id, int key, int data, double closest, int htl)
        {
                super (id, key, closest, htl);
+               this.data = data;
                size += DATA_SIZE;
        }

        public String toString()
        {
-               return new String ("SSK insert (" + id + "," + key + ")");
+               return new String ("SSK insert (" +id+ "," +key+ "," +data+")");
        }
 }


Reply via email to