Author: mrogers
Date: 2006-11-17 11:43:35 +0000 (Fri, 17 Nov 2006)
New Revision: 10970
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
Log:
Refactoring broke CHK inserts
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 09:40:51 UTC
(rev 10969)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-17 11:43:35 UTC
(rev 10970)
@@ -120,7 +120,7 @@
}
// Return true if the node appears to be overloaded
- private boolean shouldRejectRequest()
+ private boolean shouldRejectSearch()
{
if (delay > MAX_DELAY) return true;
if (delay > HIGH_DELAY) {
@@ -133,9 +133,10 @@
// Reject a request or insert if the node appears to be overloaded
private boolean rejectIfOverloaded (Peer prev, int id)
{
- if (shouldRejectRequest()) {
+ if (shouldRejectSearch()) {
if (prev == null) {
- // FIXME: throttle
+ // FIXME
+ log ("rejecting local search " + id);
}
else prev.sendMessage (new RejectedOverload (id, true));
return true;
@@ -415,6 +416,17 @@
ih.start();
}
+ public void searchSucceeded (MessageHandler m)
+ {
+ // FIXME: increase the rate of the relevant throttle
+ log (m + " succeeded");
+ }
+
+ public void reduceSearchRate (MessageHandler m)
+ {
+ // FIXME: reduce the rate of the relevant throttle
+ }
+
public void removeMessageHandler (int id)
{
MessageHandler mh = messageHandlers.remove (id);
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-17 09:40:51 UTC (rev 10969)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-17 11:43:35 UTC (rev 10970)
@@ -64,7 +64,10 @@
// Start the search
forwardSearch();
// If we have all the blocks and the headers, consider finishing
- if (blocksReceived == 32) finish();
+ if (blocksReceived == 32) {
+ inState = COMPLETED;
+ considerFinishing();
+ }
// Wait for transfer to complete (FIXME: check real timeout)
else Event.schedule (this, 120.0, TRANSFER_IN_TIMEOUT, null);
}
@@ -78,14 +81,16 @@
// Forward the block to all receivers
for (Peer p : receivers) p.sendMessage (b);
// If we have all the blocks and the headers, consider finishing
- if (blocksReceived == 32 && inState == TRANSFERRING) finish();
+ if (blocksReceived == 32 && inState == TRANSFERRING) {
+ inState = COMPLETED;
+ considerFinishing();
+ }
}
private void handleCompleted (TransfersCompleted tc, Peer src)
{
receivers.remove (src);
- if (searchState == COMPLETED && inState == COMPLETED
- && receivers.isEmpty()) reallyFinish();
+ considerFinishing();
}
private void handleAccepted (Accepted a)
@@ -109,14 +114,14 @@
{
if (searchState != ACCEPTED) node.log (ir + " out of order");
next.successNotOverload(); // Reset the backoff length
- if (prev == null) node.log (this + " succeeded");
+ if (prev == null) node.searchSucceeded (this);
else prev.sendMessage (ir); // Forward the message
finish();
}
protected void sendReply()
{
- if (prev == null) node.log (this + " succeeded");
+ if (prev == null) node.searchSucceeded (this);
else prev.sendMessage (new InsertReply (id));
}
@@ -149,6 +154,12 @@
node.removeMessageHandler (id);
}
+ private void considerFinishing()
+ {
+ if (inState == COMPLETED && searchState == COMPLETED
+ && receivers.isEmpty()) reallyFinish();
+ }
+
public String toString()
{
return new String ("CHK insert (" + id + "," + key + ")");
@@ -159,7 +170,7 @@
{
if (inState != STARTED) return;
node.log (this + " data timeout from " + prev);
- if (prev == null) node.log (this + " failed");
+ if (prev == null) node.log (this + " failed"); // Don't throttle
else prev.sendMessage (new TransfersCompleted (id));
reallyFinish();
}
@@ -169,7 +180,7 @@
{
if (inState != TRANSFERRING) return;
node.log (this + " transfer timeout from " + prev);
- if (prev == null) node.log (this + " failed");
+ if (prev == null) node.log (this + " failed"); // Don't throttle
else prev.sendMessage (new TransfersCompleted (id));
reallyFinish();
}
@@ -180,8 +191,7 @@
if (!receivers.remove (p)) return;
node.log (this + " transfer timeout to " + p);
// FIXME: should we back off?
- if (searchState == COMPLETED && inState == COMPLETED
- && receivers.isEmpty()) reallyFinish();
+ considerFinishing();
}
// EventTarget interface
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-17 09:40:51 UTC (rev 10969)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-17 11:43:35 UTC (rev 10970)
@@ -45,7 +45,7 @@
// If we have all the blocks and the headers, cache the data
if (blocksReceived == 32) {
node.cacheChk (key);
- if (prev == null) node.log (this + " succeeded");
+ if (prev == null) node.searchSucceeded (this);
finish();
}
// Wait for the transfer to complete (FIXME: check real timeout)
@@ -66,7 +66,7 @@
// If we have all the blocks and the headers, cache the data
if (blocksReceived == 32 && searchState == TRANSFERRING) {
node.cacheChk (key);
- if (prev == null) node.log (this + " succeeded");
+ if (prev == null) node.searchSucceeded (this);
finish();
}
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-17 09:40:51 UTC (rev 10969)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-17 11:43:35 UTC (rev 10970)
@@ -127,9 +127,7 @@
src.localRejectedOverload(); // Back off
if (src == next) forwardSearch(); // Try another peer
}
- if (prev == null) {
- // FIXME: throttle
- }
+ if (prev == null) node.reduceSearchRate (this);
else prev.sendMessage (ro); // Forward the message
}
@@ -150,9 +148,7 @@
node.log (this + " accepted timeout for " + p);
p.localRejectedOverload(); // Back off from p
// Tell the sender to slow down
- if (prev == null) {
- // FIXME: throttle
- }
+ if (prev == null) node.reduceSearchRate (this);
else prev.sendMessage (new RejectedOverload (id, false));
// Try another peer
forwardSearch();
@@ -166,9 +162,7 @@
node.log (this + " search timeout for " + p);
p.localRejectedOverload(); // Back off from p
// Tell the sender to slow down
- if (prev == null) {
- // FIXME: throttle
- }
+ if (prev == null) node.reduceSearchRate (this);
else prev.sendMessage (new RejectedOverload (id, false));
if (prev == null) node.log (this + " failed");
finish();
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-17 09:40:51 UTC (rev 10969)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-17 11:43:35 UTC (rev 10970)
@@ -101,14 +101,14 @@
{
if (searchState != ACCEPTED) node.log (ir + " out of order");
next.successNotOverload(); // Reset the backoff length
- if (prev == null) node.log (this + " succeeded");
+ if (prev == null) node.searchSucceeded (this);
else prev.sendMessage (ir); // Forward the message
finish();
}
protected void sendReply()
{
- if (prev == null) node.log (this + " succeeded");
+ if (prev == null) node.searchSucceeded (this);
else prev.sendMessage (new InsertReply (id));
}
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
2006-11-17 09:40:51 UTC (rev 10969)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
2006-11-17 11:43:35 UTC (rev 10970)
@@ -45,7 +45,7 @@
next.successNotOverload(); // Reset the backoff length
dataFound = df;
if (pubKey == null) return; // Keep waiting
- if (prev == null) node.log (this + " succeeded");
+ if (prev == null) node.searchSucceeded (this);
else {
prev.sendMessage (dataFound);
if (needPubKey) prev.sendMessage (pubKey);
@@ -61,7 +61,7 @@
next.successNotOverload(); // Reset the backoff length
pubKey = pk;
if (dataFound == null) return; // Keep waiting
- if (prev == null) node.log (this + " succeeded");
+ if (prev == null) node.searchSucceeded (this);
else {
prev.sendMessage (dataFound);
if (needPubKey) prev.sendMessage (pubKey);