Author: mrogers
Date: 2006-09-27 19:10:44 +0000 (Wed, 27 Sep 2006)
New Revision: 10520
Added:
trunk/apps/load-balancing-sims/phase6/LruMap.java
Modified:
trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase6/LruCache.java
trunk/apps/load-balancing-sims/phase6/Node.java
trunk/apps/load-balancing-sims/phase6/Sim.java
trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java
trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
Log:
SSK collisions (not properly tested)
Modified: trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-09-27
17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-09-27
19:10:44 UTC (rev 10520)
@@ -65,10 +65,8 @@
blocksReceived++;
// Forward the block to all receivers
for (Peer p : receivers) p.sendMessage (b);
- // If the transfer is complete, cache and maybe store the data
+ // If the transfer is complete, consider finishing
if (blocksReceived == 32) {
- node.cacheChk (key);
- node.storeChk (key);
inState = COMPLETED;
considerFinishing();
}
@@ -167,6 +165,8 @@
{
inState = COMPLETED;
searchState = COMPLETED;
+ node.cacheChk (key);
+ node.storeChk (key);
if (prev == null) node.log (this + " completed");
else prev.sendMessage (new TransfersCompleted (id));
node.removeMessageHandler (id);
Modified: trunk/apps/load-balancing-sims/phase6/LruCache.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/LruCache.java 2006-09-27 17:32:07 UTC
(rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/LruCache.java 2006-09-27 19:10:44 UTC
(rev 10520)
@@ -4,8 +4,8 @@
class LruCache<Key>
{
- public int capacity;
- public LinkedHashSet<Key> set;
+ public final int capacity;
+ private LinkedHashSet<Key> set;
public LruCache (int capacity)
{
Added: trunk/apps/load-balancing-sims/phase6/LruMap.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/LruMap.java 2006-09-27 17:32:07 UTC
(rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/LruMap.java 2006-09-27 19:10:44 UTC
(rev 10520)
@@ -0,0 +1,59 @@
+// Limited-capacity LRU cache that stores a value for each key
+
+import java.util.LinkedHashSet;
+import java.util.HashMap;
+
+class LruMap<Key,Value>
+{
+ public final int capacity;
+ private LinkedHashSet<Key> set;
+ private HashMap<Key,Value> map;
+
+ public LruMap (int capacity)
+ {
+ this.capacity = capacity;
+ set = new LinkedHashSet<Key> (capacity);
+ map = new HashMap<Key,Value> (capacity);
+ }
+
+ public Value get (Key key)
+ {
+ log ("searching cache for key " + key);
+ Value value = map.get (key);
+ if (value != null) {
+ // Move the key to the fresh end
+ set.remove (key);
+ set.add (key);
+ }
+ return value;
+ }
+
+ // Return the existing value (which is not replaced), or null
+ public Value put (Key key, Value value)
+ {
+ Value existing = map.get (key);
+ if (existing == null) {
+ log ("adding key " + key + " to cache");
+ if (set.size() == capacity) {
+ // Discard the oldest element
+ Key oldest = set.iterator().next();
+ log ("discarding key " + oldest);
+ set.remove (oldest);
+ }
+ map.put (key, value);
+ return value;
+ }
+ else {
+ log ("key " + key + " already in cache");
+ // Move the key to the fresh end
+ set.remove (key);
+ set.add (key);
+ return existing;
+ }
+ }
+
+ private void log (String message)
+ {
+ // Event.log (message);
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-09-27 17:32:07 UTC
(rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-09-27 19:10:44 UTC
(rev 10520)
@@ -19,8 +19,8 @@
private HashMap<Integer,MessageHandler> messageHandlers; // By ID
private LruCache<Integer> chkStore;
private LruCache<Integer> chkCache;
- private LruCache<Integer> sskStore;
- private LruCache<Integer> sskCache;
+ private LruMap<Integer,Integer> sskStore; // SSKs can collide
+ private LruMap<Integer,Integer> sskCache;
private LruCache<Integer> pubKeyCache; // SSK public keys
private boolean decrementMaxHtl = false;
private boolean decrementMinHtl = false;
@@ -41,8 +41,8 @@
messageHandlers = new HashMap<Integer,MessageHandler>();
chkStore = new LruCache<Integer> (10);
chkCache = new LruCache<Integer> (10);
- sskStore = new LruCache<Integer> (10);
- sskCache = new LruCache<Integer> (10);
+ sskStore = new LruMap<Integer,Integer> (10);
+ sskCache = new LruMap<Integer,Integer> (10);
pubKeyCache = new LruCache<Integer> (10);
if (Math.random() < 0.5) decrementMaxHtl = true;
if (Math.random() < 0.25) decrementMinHtl = true;
@@ -116,19 +116,27 @@
else log ("key " + key + " not added to CHK store");
}
+ // Retrieve an SSK from the cache or the store
+ public Integer fetchSsk (int key)
+ {
+ Integer data = sskStore.get (key);
+ if (data == null) return sskCache.get (key);
+ else return data;
+ }
+
// Add an SSK to the cache
- public void cacheSsk (int key)
+ public void cacheSsk (int key, int value)
{
log ("key " + key + " added to SSK cache");
- sskCache.put (key);
+ sskCache.put (key, value);
}
// Consider adding an SSK to the store
- public void storeSsk (int key)
+ public void storeSsk (int key, int value)
{
if (closerThanPeers (keyToLocation (key))) {
log ("key " + key + " added to SSK store");
- sskStore.put (key);
+ sskStore.put (key, value);
}
else log ("key " + key + " not added to SSK store");
}
@@ -260,11 +268,12 @@
prev.sendMessage (new Accepted (r.id));
}
// If the data is in the store, return it
- if (pub && sskStore.get (r.key)) {
+ Integer data = sskStore.get (r.key);
+ if (pub && data != null) {
log ("key " + r.key + " found in SSK store");
if (prev == null) log (r + " succeeded locally");
else {
- prev.sendMessage (new SskDataFound (r.id));
+ prev.sendMessage (new SskDataFound (r.id,data));
if (r.needPubKey)
prev.sendMessage
(new SskPubKey (r.id, r.key));
@@ -273,11 +282,12 @@
}
log ("key " + r.key + " not found in SSK store");
// If the data is in the cache, return it
- if (pub && sskCache.get (r.key)) {
+ data = sskCache.get (r.key);
+ if (pub && data != null) {
log ("key " + r.key + " found in SSK cache");
if (prev == null) log (r + " succeeded locally");
else {
- prev.sendMessage (new SskDataFound (r.id));
+ prev.sendMessage (new SskDataFound (r.id,data));
if (r.needPubKey)
prev.sendMessage
(new SskPubKey (r.id, r.key));
@@ -360,9 +370,9 @@
handleSskRequest (sr, null);
}
- private void generateSskInsert (int key)
+ private void generateSskInsert (int key, int value)
{
- SskInsert si = new SskInsert (key, location);
+ SskInsert si = new SskInsert (key, value, location);
log ("generating " + si);
pubKeyCache.put (key);
handleSskInsert (si, null);
@@ -403,9 +413,12 @@
break;
case GENERATE_SSK_INSERT:
- generateSskInsert ((Integer) data);
+ generateSskInsert ((Integer) data, 0);
break;
+ case GENERATE_SSK_COLLISION:
+ generateSskInsert ((Integer) data, 1);
+
case CHECK_TIMEOUTS:
checkTimeouts();
break;
@@ -417,5 +430,6 @@
public final static int GENERATE_CHK_INSERT = 2;
public final static int GENERATE_SSK_REQUEST = 3;
public final static int GENERATE_SSK_INSERT = 4;
- private final static int CHECK_TIMEOUTS = 5;
+ public final static int GENERATE_SSK_COLLISION = 5;
+ private final static int CHECK_TIMEOUTS = 6;
}
Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java 2006-09-27 17:32:07 UTC
(rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java 2006-09-27 19:10:44 UTC
(rev 10520)
@@ -22,11 +22,14 @@
}
int key = Node.locationToKey (Math.random());
- Event.schedule (nodes[0], 0.0, Node.GENERATE_SSK_INSERT, key);
- Event.schedule (nodes[10], 30.0, Node.GENERATE_SSK_REQUEST,key);
- key = Node.locationToKey (Math.random());
- Event.schedule (nodes[5], 60.0, Node.GENERATE_CHK_INSERT, key);
- Event.schedule (nodes[15], 90.0, Node.GENERATE_CHK_REQUEST,key);
+ Event.schedule (nodes[0], 0.0,
+ Node.GENERATE_SSK_INSERT, key);
+ Event.schedule (nodes[5], 30.0,
+ Node.GENERATE_SSK_REQUEST, key);
+ Event.schedule (nodes[10], 60.0,
+ Node.GENERATE_SSK_COLLISION, key);
+ Event.schedule (nodes[15], 90.0,
+ Node.GENERATE_SSK_REQUEST, key);
// Run the simulation
Event.run();
Modified: trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java 2006-09-27
17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java 2006-09-27
19:10:44 UTC (rev 10520)
@@ -7,21 +7,36 @@
{
private int searchState = STARTED; // searchState of search
private SskPubKey pubKey = null;
+ private int data; // The data being inserted
public SskInsertHandler (SskInsert i, Node node,
Peer prev, boolean needPubKey)
{
super (i, node, prev);
+ data = i.data;
// Wait 10 seconds for the previous hop to send the public key
if (needPubKey) Event.schedule (this, 10.0, KEY_TIMEOUT, null);
else {
pubKey = new SskPubKey (id, key);
- node.cacheSsk (key);
- node.storeSsk (key);
+ checkCollision();
forwardSearch();
}
}
+ // Check whether an older version of the data is already stored
+ private void checkCollision()
+ {
+ Integer old = node.fetchSsk (key);
+ if (old != null && old != data) {
+ node.log (this + " collided");
+ if (prev == null) node.log (this + " collided locally");
+ else prev.sendMessage (new SskDataFound (id, old));
+ // Continue inserting the old data
+ data = old;
+ return;
+ }
+ }
+
public void handleMessage (Message m, Peer src)
{
if (src == prev) {
@@ -36,6 +51,8 @@
handleRejectedLoop ((RejectedLoop) m);
else if (m instanceof RouteNotFound)
handleRouteNotFound ((RouteNotFound) m);
+ else if (m instanceof SskDataFound)
+ handleCollision ((SskDataFound) m);
else if (m instanceof InsertReply)
handleInsertReply ((InsertReply) m);
else node.log ("unexpected type for " + m);
@@ -47,9 +64,7 @@
{
if (searchState != STARTED) node.log (pk + " out of order");
pubKey = pk;
- node.cachePubKey (key);
- node.cacheSsk (key);
- node.storeSsk (key);
+ checkCollision();
forwardSearch();
}
@@ -77,6 +92,14 @@
forwardSearch();
}
+ private void handleCollision (SskDataFound sdf)
+ {
+ if (searchState != ACCEPTED) node.log (sdf + " out of order");
+ if (prev == null) node.log (this + " collided");
+ else prev.sendMessage (sdf); // Forward the message
+ data = sdf.data; // Is this safe?
+ }
+
private void handleInsertReply (InsertReply ir)
{
if (searchState != ACCEPTED) node.log (ir + " out of order");
@@ -122,17 +145,20 @@
private void finish()
{
searchState = COMPLETED;
+ node.cachePubKey (key);
+ node.cacheSsk (key, data);
+ node.storeSsk (key, data);
node.removeMessageHandler (id);
}
protected Search makeSearchMessage()
{
- return new SskInsert (id, key, closest, htl);
+ return new SskInsert (id, key, data, closest, htl);
}
public String toString()
{
- return new String ("SSK insert (" + id + "," + key + ")");
+ return new String ("SSK insert (" +id+ "," +key+ "," +data+")");
}
// Event callbacks
Modified: trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
2006-09-27 17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
2006-09-27 19:10:44 UTC (rev 10520)
@@ -6,7 +6,7 @@
{
private boolean needPubKey; // Ask the next hop for the public key?
private SskPubKey pubKey = null;
- private SskDataFound data = null;
+ private SskDataFound dataFound = null;
public SskRequestHandler (SskRequest r, Node node,
Peer prev, boolean needPubKey)
@@ -41,15 +41,15 @@
private void handleSskDataFound (SskDataFound df)
{
if (searchState != ACCEPTED) node.log (df + " out of order");
- data = df;
+ dataFound = df;
if (pubKey == null) return; // Keep waiting
if (prev == null) node.log (this + " succeeded");
else {
- prev.sendMessage (data);
+ prev.sendMessage (dataFound);
if (needPubKey) prev.sendMessage (pubKey);
}
node.cachePubKey (key);
- node.cacheSsk (key);
+ node.cacheSsk (key, dataFound.data);
finish();
}
@@ -57,14 +57,14 @@
{
if (searchState != ACCEPTED) node.log (pk + " out of order");
pubKey = pk;
- if (data == null) return; // Keep waiting
+ if (dataFound == null) return; // Keep waiting
if (prev == null) node.log (this + " succeeded");
else {
- prev.sendMessage (data);
+ prev.sendMessage (dataFound);
if (needPubKey) prev.sendMessage (pubKey);
}
node.cachePubKey (key);
- node.cacheSsk (key);
+ node.cacheSsk (key, dataFound.data);
finish();
}
Modified: trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
2006-09-27 17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskDataFound.java
2006-09-27 19:10:44 UTC (rev 10520)
@@ -2,14 +2,17 @@
public class SskDataFound extends Message
{
- public SskDataFound (int id)
+ public final int data;
+
+ public SskDataFound (int id, int data)
{
this.id = id;
+ this.data = data;
size = Message.HEADER_SIZE + Message.DATA_SIZE;
}
public String toString()
{
- return new String ("SSK data found (" + id + ")");
+ return new String ("SSK data found (" + id + "," + data + ")");
}
}
Modified: trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
2006-09-27 17:32:07 UTC (rev 10519)
+++ trunk/apps/load-balancing-sims/phase6/messages/SskInsert.java
2006-09-27 19:10:44 UTC (rev 10520)
@@ -2,22 +2,26 @@
public class SskInsert extends Search
{
+ public final int data;
+
// Start a new insert
- public SskInsert (int key, double location)
+ public SskInsert (int key, int data, double location)
{
super (key, location);
+ this.data = data;
size += DATA_SIZE;
}
// Forward an insert
- public SskInsert (int id, int key, double closest, int htl)
+ public SskInsert (int id, int key, int data, double closest, int htl)
{
super (id, key, closest, htl);
+ this.data = data;
size += DATA_SIZE;
}
public String toString()
{
- return new String ("SSK insert (" + id + "," + key + ")");
+ return new String ("SSK insert (" +id+ "," +key+ "," +data+")");
}
}