Author: mrogers
Date: 2006-11-16 20:45:37 +0000 (Thu, 16 Nov 2006)
New Revision: 10955
Added:
trunk/apps/load-balancing-sims/phase7/sim/messages/RejectedOverload.java
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Event.java
trunk/apps/load-balancing-sims/phase7/sim/Node.java
trunk/apps/load-balancing-sims/phase7/sim/Peer.java
trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
Log:
Partially implemented backoff and refactored request handlers
Modified: trunk/apps/load-balancing-sims/phase7/sim/Event.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Event.java 2006-11-16
19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/Event.java 2006-11-16
20:45:37 UTC (rev 10955)
@@ -6,7 +6,7 @@
// Static variables and methods for the event queue
private static TreeSet<Event> queue = new TreeSet<Event>();
- private static double clockTime = 0.0;
+ private static double now = 0.0;
private static double lastLogTime = Double.POSITIVE_INFINITY;
private static int nextId = 0;
public static double duration = Double.POSITIVE_INFINITY;
@@ -14,16 +14,16 @@
public static void reset()
{
queue.clear();
- clockTime = 0.0;
+ now = 0.0;
lastLogTime = Double.POSITIVE_INFINITY;
nextId = 0;
duration = Double.POSITIVE_INFINITY;
}
- public static void schedule (EventTarget target, double time,
+ public static void schedule (EventTarget target, double delay,
int type, Object data)
{
- queue.add (new Event (target, time + clockTime, type, data));
+ queue.add (new Event (target, delay + now, type, data));
}
public static boolean nextEvent()
@@ -32,9 +32,9 @@
Event e = queue.first();
queue.remove (e);
// Update the clock
- clockTime = e.time;
+ now = e.time;
// Quit if the simulation's alloted time has run out
- if (clockTime > duration) return false;
+ if (now > duration) return false;
// Pass the packet to the target's callback method
e.target.handleEvent (e.type, e.data);
return true;
@@ -47,15 +47,15 @@
public static double time()
{
- return clockTime;
+ return now;
}
public static void log (String message)
{
// Print a blank line between events
- if (clockTime > lastLogTime) System.out.println();
- lastLogTime = clockTime;
- System.out.print (clockTime + " " + message + "\n");
+ if (now > lastLogTime) System.out.println();
+ lastLogTime = now;
+ System.out.print (now + " " + message + "\n");
}
// Run until the duration expires or there are no more events to process
@@ -72,7 +72,7 @@
private int type;
private Object data;
- public Event (EventTarget target, double time, int type, Object data)
+ private Event (EventTarget target, double time, int type, Object data)
{
this.target = target;
this.time = time;
Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-16 19:50:21 UTC
(rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-16 20:45:37 UTC
(rev 10955)
@@ -246,6 +246,7 @@
private void handleChkRequest (ChkRequest r, Peer prev)
{
+ // FIXME: reject if overloaded
if (!recentlySeenRequests.add (r.id)) {
log ("rejecting recently seen " + r);
prev.sendMessage (new RejectedLoop (r.id));
@@ -294,6 +295,7 @@
private void handleChkInsert (ChkInsert i, Peer prev)
{
+ // FIXME: reject if overloaded
if (!recentlySeenRequests.add (i.id)) {
log ("rejecting recently seen " + i);
prev.sendMessage (new RejectedLoop (i.id));
@@ -316,6 +318,7 @@
private void handleSskRequest (SskRequest r, Peer prev)
{
+ // FIXME: reject if overloaded
if (!recentlySeenRequests.add (r.id)) {
log ("rejecting recently seen " + r);
prev.sendMessage (new RejectedLoop (r.id));
@@ -372,6 +375,7 @@
private void handleSskInsert (SskInsert i, Peer prev)
{
+ // FIXME: reject if overloaded
if (!recentlySeenRequests.add (i.id)) {
log ("rejecting recently seen " + i);
prev.sendMessage (new RejectedLoop (i.id));
Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-16 19:50:21 UTC
(rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-16 20:45:37 UTC
(rev 10955)
@@ -21,6 +21,11 @@
public final static double MAX_DELAY = 0.1; // Max coalescing delay
public final static double MIN_SLEEP = 0.01; // Forty winks
+ // Backoff
+ public final static double INITIAL_BACKOFF = 1.0; // Seconds
+ public final static double BACKOFF_MULTIPLIER = 2.0;
+ public final static double MAX_BACKOFF = 10800000.0; // Three hours!?
+
// Out-of-order delivery with duplicate detection
public final static int SEQ_RANGE = 65536;
@@ -44,6 +49,8 @@
// Flow control
public int tokensOut = 0; // How many requests/inserts can we send?
public int tokensIn = 0; // How many requests/inserts should we accept?
+ public double backoffUntil = 0.0; // Time
+ public double backoffLength = INITIAL_BACKOFF; // Seconds
public Peer (Node node, int address, double location, double latency)
{
@@ -130,6 +137,7 @@
return false;
}
+ // Try to send a packet up to the specified size, return true if sent
private boolean sendPacket (int maxSize)
{
// Construct a packet
@@ -242,6 +250,25 @@
else checkDeadlines();
}
+ // When a local RejectedOverload is received, back off unless backed off
+ public void localRejectedOverload()
+ {
+ double now = Event.time();
+ if (now < backoffUntil) return; // Already backed off
+ backoffLength *= BACKOFF_MULTIPLIER;
+ if (backoffLength > MAX_BACKOFF) backoffLength = MAX_BACKOFF;
+ backoffUntil = now + backoffLength * Math.random();
+ log ("backing off until " + backoffUntil);
+ }
+
+ // When a search is accepted, reset the backoff length unless backed off
+ public void successNotOverload()
+ {
+ if (Event.time() < backoffUntil) return;
+ backoffLength = INITIAL_BACKOFF;
+ log ("resetting backoff length");
+ }
+
// Check retx timeouts, return true if there are packets in flight
public boolean checkTimeouts()
{
Modified: trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java 2006-11-16
19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java 2006-11-16
20:45:37 UTC (rev 10955)
@@ -14,7 +14,7 @@
if (poll > Peer.MAX_DELAY) poll = Peer.MAX_DELAY;
this.poll = poll; // Polling interval in seconds
tokens = size;
- lastUpdated = 0.0; // Clock time
+ lastUpdated = 0.0; // Time
}
public int available()
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-16 20:45:37 UTC (rev 10955)
@@ -39,6 +39,8 @@
handleAccepted ((Accepted) m);
else if (m instanceof RejectedLoop)
handleRejectedLoop ((RejectedLoop) m);
+ else if (m instanceof RejectedOverload)
+ handleRejectedOverload ((RejectedOverload) m);
else if (m instanceof RouteNotFound)
handleRouteNotFound ((RouteNotFound) m);
else if (m instanceof InsertReply)
@@ -62,10 +64,7 @@
// Start the search
forwardSearch();
// If we have all the blocks and the headers, consider finishing
- if (blocksReceived == 32) {
- inState = COMPLETED;
- considerFinishing();
- }
+ if (blocksReceived == 32) finish();
// Wait for transfer to complete (FIXME: check real timeout)
else Event.schedule (this, 120.0, TRANSFER_IN_TIMEOUT, null);
}
@@ -79,22 +78,21 @@
// Forward the block to all receivers
for (Peer p : receivers) p.sendMessage (b);
// If we have all the blocks and the headers, consider finishing
- if (blocksReceived == 32 && inState == TRANSFERRING) {
- inState = COMPLETED;
- considerFinishing();
- }
+ if (blocksReceived == 32 && inState == TRANSFERRING) finish();
}
private void handleCompleted (TransfersCompleted tc, Peer src)
{
receivers.remove (src);
- considerFinishing();
+ if (searchState == COMPLETED && inState == COMPLETED
+ && receivers.isEmpty()) reallyFinish();
}
private void handleAccepted (Accepted a)
{
if (searchState != SENT) node.log (a + " out of order");
searchState = ACCEPTED;
+ next.successNotOverload(); // Reset the backoff length
// Wait 120 seconds for a reply to the search
Event.schedule (this, 120.0, SEARCH_TIMEOUT, next);
// Add the next hop to the list of receivers
@@ -107,78 +105,40 @@
Event.schedule (this, 240.0, TRANSFER_OUT_TIMEOUT, next);
}
- private void handleRejectedLoop (RejectedLoop rl)
+ private void handleInsertReply (InsertReply ir)
{
- if (searchState != SENT) node.log (rl + " out of order");
- next.tokensOut++; // No token was consumed
- forwardSearch();
+ if (searchState != ACCEPTED) node.log (ir + " out of order");
+ next.successNotOverload(); // Reset the backoff length
+ if (prev == null) node.log (this + " succeeded");
+ else prev.sendMessage (ir); // Forward the message
+ finish();
}
-
- private void handleRouteNotFound (RouteNotFound rnf)
+
+ protected void sendReply()
{
- if (searchState != ACCEPTED) node.log (rnf + " out of order");
- if (rnf.htl < htl) htl = rnf.htl;
- // Use the remaining htl to try another peer
- forwardSearch();
+ if (prev == null) node.log (this + " succeeded");
+ else prev.sendMessage (new InsertReply (id));
}
- private void handleInsertReply (InsertReply ir)
+ protected Search makeSearchMessage()
{
- if (searchState != ACCEPTED) node.log (ir + " out of order");
- if (prev == null) node.log (this + " succeeded");
- else prev.sendMessage (ir); // Forward the message
- searchState = COMPLETED;
- considerFinishing();
+ return new ChkInsert (id, key, closest, htl);
}
- public void forwardSearch()
+ protected void scheduleAcceptedTimeout (Peer next)
{
- next = null;
- // If the search has run out of hops, send InsertReply
- if (htl == 0) {
- node.log (this + " has no hops left");
- if (prev == null) node.log (this + " succeeded");
- else prev.sendMessage (new InsertReply (id));
- searchState = COMPLETED;
- considerFinishing();
- return;
- }
- // Forward the search to the closest remaining peer
- next = closestPeer();
- if (next == null) {
- node.log ("route not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new RouteNotFound (id, htl));
- searchState = COMPLETED;
- considerFinishing();
- return;
- }
- // Decrement the htl if the next node is not the closest so far
- double target = Node.keyToLocation (key);
- if (Node.distance (target, next.location)
- >= Node.distance (target, closest))
- htl = node.decrementHtl (htl);
- node.log (this + " has htl " + htl);
- // Consume a token
- next.tokensOut--;
- // Forward the search
- node.log ("forwarding " + this + " to " + next.address);
- next.sendMessage (makeSearchMessage());
- nexts.remove (next);
- searchState = SENT;
- // Wait 10 seconds for the next hop to accept the search
Event.schedule (this, 10.0, ACCEPTED_TIMEOUT, next);
}
- private void considerFinishing()
+ protected void finish()
{
- // An insert finishes when the search, the incoming transfer
+ // Don't really finish until the incoming transfer
// and all outgoing transfers are complete
- if (searchState == COMPLETED && inState == COMPLETED
- && receivers.isEmpty()) finish();
+ searchState = COMPLETED;
+ if (inState == COMPLETED && receivers.isEmpty()) reallyFinish();
}
- private void finish()
+ private void reallyFinish()
{
inState = COMPLETED;
searchState = COMPLETED;
@@ -189,59 +149,39 @@
node.removeMessageHandler (id);
}
- protected Search makeSearchMessage()
- {
- return new ChkInsert (id, key, closest, htl);
- }
-
public String toString()
{
return new String ("CHK insert (" + id + "," + key + ")");
}
- // Event callbacks
-
- private void acceptedTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != SENT) return;
- node.log (this + " accepted timeout for " + p);
- forwardSearch(); // Try another peer
- }
-
- private void searchTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != ACCEPTED) return;
- node.log (this + " search timeout for " + p);
- if (prev == null) node.log (this + " failed");
- searchState = COMPLETED;
- considerFinishing();
- }
-
+ // Event callback
private void dataTimeout()
{
if (inState != STARTED) return;
- node.log (this + " data timeout for " + prev);
+ node.log (this + " data timeout from " + prev);
if (prev == null) node.log (this + " failed");
else prev.sendMessage (new TransfersCompleted (id));
- finish();
+ reallyFinish();
}
+ // Event callback
private void transferInTimeout()
{
if (inState != TRANSFERRING) return;
node.log (this + " transfer timeout from " + prev);
if (prev == null) node.log (this + " failed");
else prev.sendMessage (new TransfersCompleted (id));
- finish();
+ reallyFinish();
}
+ // Event callback
private void transferOutTimeout (Peer p)
{
if (!receivers.remove (p)) return;
node.log (this + " transfer timeout to " + p);
- considerFinishing();
+ // FIXME: should we back off?
+ if (searchState == COMPLETED && inState == COMPLETED
+ && receivers.isEmpty()) reallyFinish();
}
// EventTarget interface
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-16 20:45:37 UTC (rev 10955)
@@ -25,6 +25,8 @@
handleAccepted ((Accepted) m);
else if (m instanceof RejectedLoop)
handleRejectedLoop ((RejectedLoop) m);
+ else if (m instanceof RejectedOverload)
+ handleRejectedOverload ((RejectedOverload) m);
else if (m instanceof RouteNotFound)
handleRouteNotFound ((RouteNotFound) m);
else if (m instanceof DataNotFound)
@@ -40,6 +42,7 @@
{
if (searchState != ACCEPTED) node.log (df + " out of order");
searchState = TRANSFERRING;
+ next.successNotOverload(); // Reset the backoff length
if (prev != null) prev.sendMessage (df); // Forward the message
// If we have all the blocks and the headers, cache the data
if (blocksReceived == 32) {
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
2006-11-16 20:45:37 UTC (rev 10955)
@@ -1,10 +1,8 @@
// The state of a search as stored at each node along the path
package sim.handlers;
-import sim.Node;
-import sim.Peer;
-import sim.messages.Search;
-import sim.messages.Message;
+import sim.*;
+import sim.messages.*;
import java.util.LinkedList;
public abstract class MessageHandler
@@ -53,9 +51,47 @@
nexts.remove (p);
}
- // Find the closest remaining peer
- protected Peer closestPeer ()
+ // Forward the search to the closest remaining peer, if any
+ public void forwardSearch()
{
+ next = null;
+ // If the search has run out of hops, reply and finish
+ if (htl == 0) {
+ node.log (this + " has no hops remaining");
+ sendReply();
+ finish();
+ return;
+ }
+ // Find the closest remaining peer
+ next = closestPeer();
+ if (next == null) {
+ node.log ("route not found for " + this);
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (new RouteNotFound (id, htl));
+ finish();
+ return;
+ }
+ // Decrement the htl if the next node is not the closest so far
+ double target = Node.keyToLocation (key);
+ if (Node.distance (target, next.location)
+ >= Node.distance (target, closest))
+ htl = node.decrementHtl (htl);
+ node.log (this + " has htl " + htl);
+ // Consume a token
+ next.tokensOut--;
+ // Forward the search
+ node.log ("forwarding " + this + " to " + next.address);
+ next.sendMessage (makeSearchMessage());
+ nexts.remove (next);
+ searchState = SENT;
+ // Wait for the next hop to accept the search
+ scheduleAcceptedTimeout (next);
+ }
+
+ // Find the closest remaining peer, if any
+ private Peer closestPeer ()
+ {
+ double now = Event.time();
double keyLoc = Node.keyToLocation (key);
double closestDist = Double.POSITIVE_INFINITY;
Peer closestPeer = null;
@@ -64,6 +100,10 @@
node.log ("bypassing busy peer " + peer);
continue;
}
+ if (now < peer.backoffUntil) {
+ node.log ("bypassing backed off peer " + peer);
+ continue;
+ }
double dist = Node.distance (keyLoc, peer.location);
if (dist < closestDist) {
closestDist = dist;
@@ -73,7 +113,73 @@
return closestPeer; // Null if there are no suitable peers
}
+ protected void handleRejectedLoop (RejectedLoop rl)
+ {
+ if (searchState != SENT) node.log (rl + " out of order");
+ next.successNotOverload(); // Reset the backoff length
+ next.tokensOut++; // No token was consumed
+ forwardSearch();
+ }
+
+ protected void handleRejectedOverload (RejectedOverload ro)
+ {
+ if (searchState != SENT) node.log (ro + " out of order");
+ if (ro.local) {
+ ro.local = false;
+ // Back off and try another peer
+ next.localRejectedOverload();
+ forwardSearch();
+ }
+ if (prev == null) {
+ // FIXME: throttle
+ }
+ else prev.sendMessage (ro); // Forward the message
+ }
+
+ protected void handleRouteNotFound (RouteNotFound rnf)
+ {
+ if (searchState != ACCEPTED) node.log (rnf + " out of order");
+ next.successNotOverload(); // Reset the backoff length
+ // Use the remaining htl to try another peer
+ if (rnf.htl < htl) htl = rnf.htl;
+ forwardSearch();
+ }
+
+ // Event callback
+ protected void acceptedTimeout (Peer p)
+ {
+ if (p != next) return; // We've already moved on to another peer
+ if (searchState != SENT) return;
+ node.log (this + " accepted timeout for " + p);
+ p.localRejectedOverload(); // Back off from p
+ // Tell the sender to slow down
+ if (prev == null) {
+ // FIXME: throttle
+ }
+ else prev.sendMessage (new RejectedOverload (id, false));
+ // Try another peer
+ forwardSearch();
+ }
+
+ // Event callback
+ protected void searchTimeout (Peer p)
+ {
+ if (p != next) return; // We've already moved on to another peer
+ if (searchState != ACCEPTED) return;
+ node.log (this + " search timeout for " + p);
+ p.localRejectedOverload(); // Back off from p
+ // Tell the sender to slow down
+ if (prev == null) {
+ // FIXME: throttle
+ }
+ else prev.sendMessage (new RejectedOverload (id, false));
+ if (prev == null) node.log (this + " failed");
+ finish();
+ }
+
public abstract void handleMessage (Message m, Peer src);
-
+ protected abstract void sendReply();
protected abstract Search makeSearchMessage();
+ protected abstract void scheduleAcceptedTimeout (Peer next);
+ protected abstract void finish();
}
Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
2006-11-16 20:45:37 UTC (rev 10955)
@@ -21,67 +21,28 @@
{
if (searchState != SENT) node.log (a + " out of order");
searchState = ACCEPTED;
+ next.successNotOverload(); // Reset the backoff length
// Wait 60 seconds for a reply to the search
Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
}
- protected void handleRejectedLoop (RejectedLoop rl)
- {
- if (searchState != SENT) node.log (rl + " out of order");
- next.tokensOut++; // No token was consumed
- forwardSearch();
- }
-
- protected void handleRouteNotFound (RouteNotFound rnf)
- {
- if (searchState != ACCEPTED) node.log (rnf + " out of order");
- if (rnf.htl < htl) htl = rnf.htl;
- // Use the remaining htl to try another peer
- forwardSearch();
- }
-
protected void handleDataNotFound (DataNotFound dnf)
{
if (searchState != ACCEPTED) node.log (dnf + " out of order");
+ next.successNotOverload(); // Reset the backoff length
if (prev == null) node.log (this + " failed");
else prev.sendMessage (dnf); // Forward the message
finish();
}
- protected void forwardSearch()
+ protected void sendReply()
{
- next = null;
- // If the search has run out of hops, send DataNotFound
- if (htl == 0) {
- node.log ("data not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new DataNotFound (id));
- finish();
- return;
- }
- // Forward the search to the closest remaining peer
- next = closestPeer();
- if (next == null) {
- node.log ("route not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new RouteNotFound (id, htl));
- finish();
- return;
- }
- // Decrement the htl if the next node is not the closest so far
- double target = Node.keyToLocation (key);
- if (Node.distance (target, next.location)
- >= Node.distance (target, closest))
- htl = node.decrementHtl (htl);
- node.log (this + " has htl " + htl);
- // Consume a token
- next.tokensOut--;
- // Forward the search
- node.log ("forwarding " + this + " to " + next.address);
- next.sendMessage (makeSearchMessage());
- nexts.remove (next);
- searchState = SENT;
- // Wait 5 seconds for the next hop to accept the search
+ if (prev == null) node.log (this + " failed");
+ else prev.sendMessage (new DataNotFound (id));
+ }
+
+ protected void scheduleAcceptedTimeout (Peer next)
+ {
Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
}
@@ -91,25 +52,7 @@
node.removeMessageHandler (id);
}
- // Event callbacks
-
- protected void acceptedTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != SENT) return;
- node.log (this + " accepted timeout for " + p);
- forwardSearch(); // Try another peer
- }
-
- protected void searchTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != ACCEPTED) return;
- node.log (this + " search timeout for " + p);
- if (prev == null) node.log (this + " failed");
- finish();
- }
-
+ // Event callback
protected void transferTimeout (Peer p)
{
if (searchState != TRANSFERRING) return;
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
2006-11-16 20:45:37 UTC (rev 10955)
@@ -57,6 +57,8 @@
handleSskAccepted ((SskAccepted) m);
else if (m instanceof RejectedLoop)
handleRejectedLoop ((RejectedLoop) m);
+ else if (m instanceof RejectedOverload)
+ handleRejectedOverload ((RejectedOverload) m);
else if (m instanceof RouteNotFound)
handleRouteNotFound ((RouteNotFound) m);
else if (m instanceof SskDataFound)
@@ -86,24 +88,10 @@
if (sa.needPubKey) next.sendMessage (pubKey);
}
- private void handleRejectedLoop (RejectedLoop rl)
- {
- if (searchState != SENT) node.log (rl + " out of order");
- next.tokensOut++; // No token was consumed
- forwardSearch();
- }
-
- private void handleRouteNotFound (RouteNotFound rnf)
- {
- if (searchState != ACCEPTED) node.log (rnf + " out of order");
- if (rnf.htl < htl) htl = rnf.htl;
- // Use the remaining htl to try another peer
- forwardSearch();
- }
-
private void handleCollision (SskDataFound sdf)
{
if (searchState != ACCEPTED) node.log (sdf + " out of order");
+ // FIXME: should we reset the backoff length?
if (prev == null) node.log (this + " collided");
else prev.sendMessage (sdf); // Forward the message
data = sdf.data; // Is this safe?
@@ -112,49 +100,29 @@
private void handleInsertReply (InsertReply ir)
{
if (searchState != ACCEPTED) node.log (ir + " out of order");
+ next.successNotOverload(); // Reset the backoff length
if (prev == null) node.log (this + " succeeded");
else prev.sendMessage (ir); // Forward the message
finish();
}
- public void forwardSearch()
+ protected void sendReply()
{
- next = null;
- // If the search has run out of hops, send InsertReply
- if (htl == 0) {
- node.log (this + " has no hops left");
- if (prev == null) node.log (this + " succeeded");
- else prev.sendMessage (new InsertReply (id));
- finish();
- return;
- }
- // Forward the search to the closest remaining peer
- next = closestPeer();
- if (next == null) {
- node.log ("route not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new RouteNotFound (id, htl));
- finish();
- return;
- }
- // Decrement the htl if the next node is not the closest so far
- double target = Node.keyToLocation (key);
- if (Node.distance (target, next.location)
- >= Node.distance (target, closest))
- htl = node.decrementHtl (htl);
- node.log (this + " has htl " + htl);
- // Consume a token
- next.tokensOut--;
- // Forward the search
- node.log ("forwarding " + this + " to " + next.address);
- next.sendMessage (makeSearchMessage());
- nexts.remove (next);
- searchState = SENT;
- // Wait 10 seconds for the next hop to accept the search
+ if (prev == null) node.log (this + " succeeded");
+ else prev.sendMessage (new InsertReply (id));
+ }
+
+ protected Search makeSearchMessage()
+ {
+ return new SskInsert (id, key, data, closest, htl);
+ }
+
+ protected void scheduleAcceptedTimeout (Peer next)
+ {
Event.schedule (this, 10.0, ACCEPTED_TIMEOUT, next);
}
- private void finish()
+ protected void finish()
{
searchState = COMPLETED;
node.cachePubKey (key);
@@ -164,18 +132,12 @@
node.removeMessageHandler (id);
}
- protected Search makeSearchMessage()
- {
- return new SskInsert (id, key, data, closest, htl);
- }
-
public String toString()
{
return new String ("SSK insert (" +id+ "," +key+ "," +data+")");
}
- // Event callbacks
-
+ // Event callback
private void keyTimeout()
{
if (searchState != STARTED) return;
@@ -183,23 +145,6 @@
finish();
}
- private void acceptedTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != SENT) return;
- node.log (this + " accepted timeout for " + p);
- forwardSearch(); // Try another peer
- }
-
- private void searchTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != ACCEPTED) return;
- node.log (this + " search timeout for " + p);
- if (prev == null) node.log (this + " failed");
- finish();
- }
-
// EventTarget interface
public void handleEvent (int type, Object data)
{
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
2006-11-16 20:45:37 UTC (rev 10955)
@@ -28,6 +28,8 @@
handleAccepted ((Accepted) m);
else if (m instanceof RejectedLoop)
handleRejectedLoop ((RejectedLoop) m);
+ else if (m instanceof RejectedOverload)
+ handleRejectedOverload ((RejectedOverload) m);
else if (m instanceof RouteNotFound)
handleRouteNotFound ((RouteNotFound) m);
else if (m instanceof DataNotFound)
@@ -42,6 +44,7 @@
private void handleSskDataFound (SskDataFound df)
{
if (searchState != ACCEPTED) node.log (df + " out of order");
+ next.successNotOverload(); // Reset the backoff length
dataFound = df;
if (pubKey == null) return; // Keep waiting
if (prev == null) node.log (this + " succeeded");
@@ -57,6 +60,7 @@
private void handleSskPubKey (SskPubKey pk)
{
if (searchState != ACCEPTED) node.log (pk + " out of order");
+ next.successNotOverload(); // Reset the backoff length
pubKey = pk;
if (dataFound == null) return; // Keep waiting
if (prev == null) node.log (this + " succeeded");
Added: trunk/apps/load-balancing-sims/phase7/sim/messages/RejectedOverload.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/RejectedOverload.java
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/RejectedOverload.java
2006-11-16 20:45:37 UTC (rev 10955)
@@ -0,0 +1,17 @@
+package sim.messages;
+
+public class RejectedOverload extends Message
+{
+ public boolean local; // Was this rejection generated locally?
+
+ public RejectedOverload (int id, boolean local)
+ {
+ this.id = id;
+ this.local = local;
+ }
+
+ public String toString()
+ {
+ return new String ("rejected overload (" +id+ "," +local+ ")");
+ }
+}