Author: mrogers
Date: 2006-09-14 14:13:11 +0000 (Thu, 14 Sep 2006)
New Revision: 10465
Modified:
trunk/apps/load-balancing-sims/phase6/Node.java
Log:
CHK inserts, sorry about the large number of commits, I accidentally added
classfiles to the repository and I'm trying to fix it
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-09-14 14:13:08 UTC
(rev 10464)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-09-14 14:13:11 UTC
(rev 10465)
@@ -12,28 +12,26 @@
public final static double SHORT_SLEEP = 0.05; // Poll the bw limiter
// Token bucket bandwidth limiter
- public final static int BUCKET_RATE = 15000; // Bytes per second
+ public final static int BUCKET_RATE = 30000; // Bytes per second
public final static int BUCKET_SIZE = 60000; // Burst size in bytes
public double location; // Routing location
public NetworkInterface net;
private HashMap<Integer,Peer> peers; // Look up a peer by its address
private HashSet<Integer> recentlySeenRequests; // Request IDs
- private HashMap<Integer,ChkRequestHandler> chkRequests; // By ID
+ private HashMap<Integer,MessageHandler> messageHandlers; // By ID
private LruCache<Integer> chkStore; // CHK datastore
private LruCache<Integer> chkCache; // CHK datacache
public TokenBucket bandwidth; // Bandwidth limiter
private boolean timerRunning = false; // Is the timer running?
- public boolean faulty = false; // DEBUG
-
public Node (double txSpeed, double rxSpeed)
{
location = Math.random();
net = new NetworkInterface (this, txSpeed, rxSpeed);
peers = new HashMap<Integer,Peer>();
recentlySeenRequests = new HashSet<Integer>();
- chkRequests = new HashMap<Integer,ChkRequestHandler>();
+ messageHandlers = new HashMap<Integer,MessageHandler>();
chkStore = new LruCache<Integer> (STORE_SIZE);
chkCache = new LruCache<Integer> (CACHE_SIZE);
bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
@@ -102,6 +100,7 @@
log ("key " + key + " added to CHK store");
chkStore.put (key);
}
+ else log ("key " + key + " not added to CHK store");
}
// Called by Peer
@@ -117,20 +116,22 @@
public void handlePacket (Packet packet)
{
Peer peer = peers.get (packet.src);
- if (peer == null) log ("unknown peer!");
+ if (peer == null) log ("received packet from unknown peer");
else peer.handlePacket (packet);
}
// Called by Peer
public void handleMessage (Message m, Peer src)
{
- log ("received " + m);
+ if (src != null) log ("received " + m + " from " + src);
if (m instanceof ChkRequest)
handleChkRequest ((ChkRequest) m, src);
+ else if (m instanceof ChkInsert)
+ handleChkInsert ((ChkInsert) m, src);
else {
- ChkRequestHandler rh = chkRequests.get (m.id);
- if (rh == null) log ("no request handler for " + m);
- else rh.handleMessage (m, src);
+ MessageHandler mh = messageHandlers.get (m.id);
+ if (mh == null) log ("no message handler for " + m);
+ else mh.handleMessage (m, src);
}
}
@@ -139,12 +140,12 @@
if (!recentlySeenRequests.add (r.id)) {
log ("rejecting recently seen " + r);
prev.sendMessage (new RejectedLoop (r.id));
- // Don't forward the same request back to prev
- ChkRequestHandler rh = chkRequests.get (r.id);
- if (rh != null) rh.removeNextHop (prev);
+ // Don't forward the same search back to prev
+ MessageHandler mh = messageHandlers.get (r.id);
+ if (mh != null) mh.removeNextHop (prev);
return;
}
- // Accept the request
+ // Accept the search
if (prev != null) {
log ("accepting " + r);
prev.sendMessage (new Accepted (r.id));
@@ -153,12 +154,7 @@
if (chkStore.get (r.key)) {
log ("key " + r.key + " found in CHK store");
if (prev == null) log (r + " succeeded locally");
- else {
- prev.sendMessage (new ChkDataFound (r.id));
- // DEBUG
- if (!faulty) for (int i = 0; i < 32; i++)
- prev.sendMessage (new Block (r.id, i));
- }
+ else prev.sendMessage (new ChkDataFound (r.id));
chkRequestCompleted (r.id);
return;
}
@@ -167,28 +163,49 @@
if (chkCache.get (r.key)) {
log ("key " + r.key + " found in CHK cache");
if (prev == null) log (r + " succeeded locally");
- else {
- prev.sendMessage (new ChkDataFound (r.id));
- // DEBUG
- if (!faulty) for (int i = 0; i < 32; i++)
- prev.sendMessage (new Block (r.id, i));
- }
+ else prev.sendMessage (new ChkDataFound (r.id));
chkRequestCompleted (r.id);
return;
}
log ("key " + r.key + " not found in CHK cache");
- // Forward the request and store the request state
+ // Store the request handler and forward the search
ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
- chkRequests.put (r.id, rh);
- rh.forwardRequest();
+ messageHandlers.put (r.id, rh);
+ rh.forwardSearch();
}
+ private void handleChkInsert (ChkInsert i, Peer prev)
+ {
+ if (!recentlySeenRequests.add (i.id)) {
+ log ("rejecting recently seen " + i);
+ prev.sendMessage (new RejectedLoop (i.id));
+ // Don't forward the same search back to prev
+ MessageHandler mh = messageHandlers.get (i.id);
+ if (mh != null) mh.removeNextHop (prev);
+ return;
+ }
+ // Accept the search
+ if (prev != null) {
+ log ("accepting " + i);
+ prev.sendMessage (new Accepted (i.id));
+ }
+ // Store the insert handler and wait for a DataInsert
+ ChkInsertHandler ih = new ChkInsertHandler (i, this, prev);
+ messageHandlers.put (i.id, ih);
+ }
+
// Remove a completed request from the list of pending requests
public void chkRequestCompleted (int id)
{
- chkRequests.remove (id);
+ messageHandlers.remove (id);
}
+ // Remove a completed insert from the list of pending inserts
+ public void chkInsertCompleted (int id)
+ {
+ messageHandlers.remove (id);
+ }
+
// Return the list of peers in a random order
public ArrayList<Peer> peers()
{
@@ -205,12 +222,23 @@
// Event callback
private void generateRequest (int key)
{
- ChkRequest r = new ChkRequest (key, location);
- log ("generating " + r);
- handleChkRequest (r, null);
+ ChkRequest cr = new ChkRequest (key, location);
+ log ("generating " + cr);
+ handleChkRequest (cr, null);
}
// Event callback
+ private void generateInsert (int key)
+ {
+ ChkInsert ci = new ChkInsert (key, location);
+ log ("generating " + ci);
+ handleChkInsert (ci, null);
+ handleMessage (new DataInsert (ci.id), null);
+ for (int i = 0; i < 32; i++)
+ handleMessage (new Block (ci.id, i), null);
+ }
+
+ // Event callback
private void checkTimeouts()
{
// Check the peers in a random order each time
@@ -224,7 +252,7 @@
else {
double sleep = deadline - Event.time(); // Can be < 0
if (sleep < MIN_SLEEP) sleep = MIN_SLEEP;
- log ("sleeping for " + sleep + " seconds");
+ // log ("sleeping for " + sleep + " seconds");
Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
}
}
@@ -232,11 +260,23 @@
// EventTarget interface
public void handleEvent (int type, Object data)
{
- if (type == GENERATE_REQUEST) generateRequest ((Integer) data);
- else if (type == CHECK_TIMEOUTS) checkTimeouts();
+ switch (type) {
+ case GENERATE_REQUEST:
+ generateRequest ((Integer) data);
+ break;
+
+ case GENERATE_INSERT:
+ generateInsert ((Integer)data);
+ break;
+
+ case CHECK_TIMEOUTS:
+ checkTimeouts();
+ break;
+ }
}
// Each EventTarget class has its own event codes
public final static int GENERATE_REQUEST = 1;
- public final static int CHECK_TIMEOUTS = 2;
+ public final static int GENERATE_INSERT = 2;
+ private final static int CHECK_TIMEOUTS = 3;
}