Author: mrogers
Date: 2006-11-16 20:45:37 +0000 (Thu, 16 Nov 2006)
New Revision: 10955

Added:
   trunk/apps/load-balancing-sims/phase7/sim/messages/RejectedOverload.java
Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Event.java
   trunk/apps/load-balancing-sims/phase7/sim/Node.java
   trunk/apps/load-balancing-sims/phase7/sim/Peer.java
   trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
Log:
Partially implemented backoff and refactored request handlers

Modified: trunk/apps/load-balancing-sims/phase7/sim/Event.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Event.java        2006-11-16 
19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/Event.java        2006-11-16 
20:45:37 UTC (rev 10955)
@@ -6,7 +6,7 @@
        // Static variables and methods for the event queue

        private static TreeSet<Event> queue = new TreeSet<Event>();
-       private static double clockTime = 0.0;
+       private static double now = 0.0;
        private static double lastLogTime = Double.POSITIVE_INFINITY;
        private static int nextId = 0;
        public static double duration = Double.POSITIVE_INFINITY;
@@ -14,16 +14,16 @@
        public static void reset()
        {
                queue.clear();
-               clockTime = 0.0;
+               now = 0.0;
                lastLogTime = Double.POSITIVE_INFINITY;
                nextId = 0;
                duration = Double.POSITIVE_INFINITY;
        }

-       public static void schedule (EventTarget target, double time,
+       public static void schedule (EventTarget target, double delay,
                                        int type, Object data)
        {
-               queue.add (new Event (target, time + clockTime, type, data));
+               queue.add (new Event (target, delay + now, type, data));
        }

        public static boolean nextEvent()
@@ -32,9 +32,9 @@
                        Event e = queue.first();
                        queue.remove (e);
                        // Update the clock
-                       clockTime = e.time;
+                       now = e.time;
                        // Quit if the simulation's alloted time has run out
-                       if (clockTime > duration) return false;
+                       if (now > duration) return false;
                        // Pass the packet to the target's callback method
                        e.target.handleEvent (e.type, e.data);
                        return true;
@@ -47,15 +47,15 @@

        public static double time()
        {
-               return clockTime;
+               return now;
        }

        public static void log (String message)
        {
                // Print a blank line between events
-               if (clockTime > lastLogTime) System.out.println();
-               lastLogTime = clockTime;
-               System.out.print (clockTime + " " + message + "\n");
+               if (now > lastLogTime) System.out.println();
+               lastLogTime = now;
+               System.out.print (now + " " + message + "\n");
        }

        // Run until the duration expires or there are no more events to process
@@ -72,7 +72,7 @@
        private int type;
        private Object data;

-       public Event (EventTarget target, double time, int type, Object data)
+       private Event (EventTarget target, double time, int type, Object data)
        {
                this.target = target;
                this.time = time;

Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-16 19:50:21 UTC 
(rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-16 20:45:37 UTC 
(rev 10955)
@@ -246,6 +246,7 @@

        private void handleChkRequest (ChkRequest r, Peer prev)
        {
+               // FIXME: reject if overloaded
                if (!recentlySeenRequests.add (r.id)) {
                        log ("rejecting recently seen " + r);
                        prev.sendMessage (new RejectedLoop (r.id));
@@ -294,6 +295,7 @@

        private void handleChkInsert (ChkInsert i, Peer prev)
        {
+               // FIXME: reject if overloaded
                if (!recentlySeenRequests.add (i.id)) {
                        log ("rejecting recently seen " + i);
                        prev.sendMessage (new RejectedLoop (i.id));
@@ -316,6 +318,7 @@

        private void handleSskRequest (SskRequest r, Peer prev)
        {
+               // FIXME: reject if overloaded
                if (!recentlySeenRequests.add (r.id)) {
                        log ("rejecting recently seen " + r);
                        prev.sendMessage (new RejectedLoop (r.id));
@@ -372,6 +375,7 @@

        private void handleSskInsert (SskInsert i, Peer prev)
        {
+               // FIXME: reject if overloaded
                if (!recentlySeenRequests.add (i.id)) {
                        log ("rejecting recently seen " + i);
                        prev.sendMessage (new RejectedLoop (i.id));

Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-16 19:50:21 UTC 
(rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-16 20:45:37 UTC 
(rev 10955)
@@ -21,6 +21,11 @@
        public final static double MAX_DELAY = 0.1; // Max coalescing delay
        public final static double MIN_SLEEP = 0.01; // Forty winks

+       // Backoff
+       public final static double INITIAL_BACKOFF = 1.0; // Seconds
+       public final static double BACKOFF_MULTIPLIER = 2.0;
+       public final static double MAX_BACKOFF = 10800000.0; // Three hours!?
+       
        // Out-of-order delivery with duplicate detection
        public final static int SEQ_RANGE = 65536;

@@ -44,6 +49,8 @@
        // Flow control
        public int tokensOut = 0; // How many requests/inserts can we send?
        public int tokensIn = 0; // How many requests/inserts should we accept?
+       public double backoffUntil = 0.0; // Time
+       public double backoffLength = INITIAL_BACKOFF; // Seconds

        public Peer (Node node, int address, double location, double latency)
        {
@@ -130,6 +137,7 @@
                return false;
        }

+       // Try to send a packet up to the specified size, return true if sent
        private boolean sendPacket (int maxSize)
        {
                // Construct a packet
@@ -242,6 +250,25 @@
                else checkDeadlines();
        }

+       // When a local RejectedOverload is received, back off unless backed off
+       public void localRejectedOverload()
+       {
+               double now = Event.time();
+               if (now < backoffUntil) return; // Already backed off
+               backoffLength *= BACKOFF_MULTIPLIER;
+               if (backoffLength > MAX_BACKOFF) backoffLength = MAX_BACKOFF;
+               backoffUntil = now + backoffLength * Math.random();
+               log ("backing off until " + backoffUntil);
+       }
+       
+       // When a search is accepted, reset the backoff length unless backed off
+       public void successNotOverload()
+       {
+               if (Event.time() < backoffUntil) return;
+               backoffLength = INITIAL_BACKOFF;
+               log ("resetting backoff length");
+       }
+       
        // Check retx timeouts, return true if there are packets in flight
        public boolean checkTimeouts()
        {

Modified: trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java  2006-11-16 
19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/TokenBucket.java  2006-11-16 
20:45:37 UTC (rev 10955)
@@ -14,7 +14,7 @@
                if (poll > Peer.MAX_DELAY) poll = Peer.MAX_DELAY;
                this.poll = poll; // Polling interval in seconds
                tokens = size;
-               lastUpdated = 0.0; // Clock time
+               lastUpdated = 0.0; // Time
        }

        public int available()

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-16 20:45:37 UTC (rev 10955)
@@ -39,6 +39,8 @@
                                handleAccepted ((Accepted) m);
                        else if (m instanceof RejectedLoop)
                                handleRejectedLoop ((RejectedLoop) m);
+                       else if (m instanceof RejectedOverload)
+                               handleRejectedOverload ((RejectedOverload) m);
                        else if (m instanceof RouteNotFound)
                                handleRouteNotFound ((RouteNotFound) m);
                        else if (m instanceof InsertReply)
@@ -62,10 +64,7 @@
                // Start the search
                forwardSearch();
                // If we have all the blocks and the headers, consider finishing
-               if (blocksReceived == 32) {
-                       inState = COMPLETED;
-                       considerFinishing();
-               }
+               if (blocksReceived == 32) finish();
                // Wait for transfer to complete (FIXME: check real timeout)
                else Event.schedule (this, 120.0, TRANSFER_IN_TIMEOUT, null);
        }
@@ -79,22 +78,21 @@
                // Forward the block to all receivers
                for (Peer p : receivers) p.sendMessage (b);
                // If we have all the blocks and the headers, consider finishing
-               if (blocksReceived == 32 && inState == TRANSFERRING) {
-                       inState = COMPLETED;
-                       considerFinishing();
-               }
+               if (blocksReceived == 32 && inState == TRANSFERRING) finish();
        }

        private void handleCompleted (TransfersCompleted tc, Peer src)
        {
                receivers.remove (src);
-               considerFinishing();
+               if (searchState == COMPLETED && inState == COMPLETED 
+               && receivers.isEmpty()) reallyFinish();
        }

        private void handleAccepted (Accepted a)
        {
                if (searchState != SENT) node.log (a + " out of order");
                searchState = ACCEPTED;
+               next.successNotOverload(); // Reset the backoff length
                // Wait 120 seconds for a reply to the search
                Event.schedule (this, 120.0, SEARCH_TIMEOUT, next);
                // Add the next hop to the list of receivers
@@ -107,78 +105,40 @@
                Event.schedule (this, 240.0, TRANSFER_OUT_TIMEOUT, next);
        }

-       private void handleRejectedLoop (RejectedLoop rl)
+       private void handleInsertReply (InsertReply ir)
        {
-               if (searchState != SENT) node.log (rl + " out of order");
-               next.tokensOut++; // No token was consumed
-               forwardSearch();
+               if (searchState != ACCEPTED) node.log (ir + " out of order");
+               next.successNotOverload(); // Reset the backoff length
+               if (prev == null) node.log (this + " succeeded");
+               else prev.sendMessage (ir); // Forward the message
+               finish();
        }
-       
-       private void handleRouteNotFound (RouteNotFound rnf)
+
+       protected void sendReply()
        {
-               if (searchState != ACCEPTED) node.log (rnf + " out of order");
-               if (rnf.htl < htl) htl = rnf.htl;
-               // Use the remaining htl to try another peer
-               forwardSearch();
+               if (prev == null) node.log (this + " succeeded");
+               else prev.sendMessage (new InsertReply (id));
        }

-       private void handleInsertReply (InsertReply ir)
+       protected Search makeSearchMessage()
        {
-               if (searchState != ACCEPTED) node.log (ir + " out of order");
-               if (prev == null) node.log (this + " succeeded");
-               else prev.sendMessage (ir); // Forward the message
-               searchState = COMPLETED;
-               considerFinishing();
+               return new ChkInsert (id, key, closest, htl);
        }

-       public void forwardSearch()
+       protected void scheduleAcceptedTimeout (Peer next)
        {
-               next = null;
-               // If the search has run out of hops, send InsertReply
-               if (htl == 0) {
-                       node.log (this + " has no hops left");
-                       if (prev == null) node.log (this + " succeeded");
-                       else prev.sendMessage (new InsertReply (id));
-                       searchState = COMPLETED;
-                       considerFinishing();
-                       return;
-               }
-               // Forward the search to the closest remaining peer
-               next = closestPeer();
-               if (next == null) {
-                       node.log ("route not found for " + this);
-                       if (prev == null) node.log (this + " failed");
-                       else prev.sendMessage (new RouteNotFound (id, htl));
-                       searchState = COMPLETED;
-                       considerFinishing();
-                       return;
-               }
-               // Decrement the htl if the next node is not the closest so far
-               double target = Node.keyToLocation (key);
-               if (Node.distance (target, next.location)
-               >= Node.distance (target, closest))
-                       htl = node.decrementHtl (htl);
-               node.log (this + " has htl " + htl);
-               // Consume a token
-               next.tokensOut--;
-               // Forward the search
-               node.log ("forwarding " + this + " to " + next.address);
-               next.sendMessage (makeSearchMessage());
-               nexts.remove (next);
-               searchState = SENT;
-               // Wait 10 seconds for the next hop to accept the search
                Event.schedule (this, 10.0, ACCEPTED_TIMEOUT, next);
        }

-       private void considerFinishing()
+       protected void finish()
        {
-               // An insert finishes when the search, the incoming transfer
+               // Don't really finish until the incoming transfer
                // and all outgoing transfers are complete
-               if (searchState == COMPLETED && inState == COMPLETED 
-               && receivers.isEmpty()) finish();
+               searchState = COMPLETED;
+               if (inState == COMPLETED && receivers.isEmpty()) reallyFinish();
        }

-       private void finish()
+       private void reallyFinish()
        {
                inState = COMPLETED;
                searchState = COMPLETED;
@@ -189,59 +149,39 @@
                node.removeMessageHandler (id);
        }

-       protected Search makeSearchMessage()
-       {
-               return new ChkInsert (id, key, closest, htl);
-       }
-       
        public String toString()
        {
                return new String ("CHK insert (" + id + "," + key + ")");
        }

-       // Event callbacks
-       
-       private void acceptedTimeout (Peer p)
-       {
-               if (p != next) return; // We've already moved on to another peer
-               if (searchState != SENT) return;
-               node.log (this + " accepted timeout for " + p);
-               forwardSearch(); // Try another peer
-       }
-       
-       private void searchTimeout (Peer p)
-       {
-               if (p != next) return; // We've already moved on to another peer
-               if (searchState != ACCEPTED) return;
-               node.log (this + " search timeout for " + p);
-               if (prev == null) node.log (this + " failed");
-               searchState = COMPLETED;
-               considerFinishing();
-       }
-       
+       // Event callback
        private void dataTimeout()
        {
                if (inState != STARTED) return;
-               node.log (this + " data timeout for " + prev);
+               node.log (this + " data timeout from " + prev);
                if (prev == null) node.log (this + " failed");
                else prev.sendMessage (new TransfersCompleted (id));
-               finish();
+               reallyFinish();
        }

+       // Event callback
        private void transferInTimeout()
        {
                if (inState != TRANSFERRING) return;
                node.log (this + " transfer timeout from " + prev);
                if (prev == null) node.log (this + " failed");
                else prev.sendMessage (new TransfersCompleted (id));
-               finish();
+               reallyFinish();
        }

+       // Event callback
        private void transferOutTimeout (Peer p)
        {
                if (!receivers.remove (p)) return;
                node.log (this + " transfer timeout to " + p);
-               considerFinishing();
+               // FIXME: should we back off?
+               if (searchState == COMPLETED && inState == COMPLETED 
+               && receivers.isEmpty()) reallyFinish();
        }

        // EventTarget interface

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-16 20:45:37 UTC (rev 10955)
@@ -25,6 +25,8 @@
                        handleAccepted ((Accepted) m);
                else if (m instanceof RejectedLoop)
                        handleRejectedLoop ((RejectedLoop) m);
+               else if (m instanceof RejectedOverload)
+                       handleRejectedOverload ((RejectedOverload) m);
                else if (m instanceof RouteNotFound)
                        handleRouteNotFound ((RouteNotFound) m);
                else if (m instanceof DataNotFound)
@@ -40,6 +42,7 @@
        {
                if (searchState != ACCEPTED) node.log (df + " out of order");
                searchState = TRANSFERRING;
+               next.successNotOverload(); // Reset the backoff length
                if (prev != null) prev.sendMessage (df); // Forward the message
                // If we have all the blocks and the headers, cache the data
                if (blocksReceived == 32) {

Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/MessageHandler.java      
2006-11-16 20:45:37 UTC (rev 10955)
@@ -1,10 +1,8 @@
 // The state of a search as stored at each node along the path

 package sim.handlers;
-import sim.Node;
-import sim.Peer;
-import sim.messages.Search;
-import sim.messages.Message;
+import sim.*;
+import sim.messages.*;
 import java.util.LinkedList;

 public abstract class MessageHandler
@@ -53,9 +51,47 @@
                nexts.remove (p);
        }

-       // Find the closest remaining peer
-       protected Peer closestPeer ()
+       // Forward the search to the closest remaining peer, if any
+       public void forwardSearch()
        {
+               next = null;
+               // If the search has run out of hops, reply and finish
+               if (htl == 0) {
+                       node.log (this + " has no hops remaining");
+                       sendReply();
+                       finish();
+                       return;
+               }
+               // Find the closest remaining peer
+               next = closestPeer();
+               if (next == null) {
+                       node.log ("route not found for " + this);
+                       if (prev == null) node.log (this + " failed");
+                       else prev.sendMessage (new RouteNotFound (id, htl));
+                       finish();
+                       return;
+               }
+               // Decrement the htl if the next node is not the closest so far
+               double target = Node.keyToLocation (key);
+               if (Node.distance (target, next.location)
+               >= Node.distance (target, closest))
+                       htl = node.decrementHtl (htl);
+               node.log (this + " has htl " + htl);
+               // Consume a token
+               next.tokensOut--;
+               // Forward the search
+               node.log ("forwarding " + this + " to " + next.address);
+               next.sendMessage (makeSearchMessage());
+               nexts.remove (next);
+               searchState = SENT;
+               // Wait for the next hop to accept the search
+               scheduleAcceptedTimeout (next);
+       }
+       
+       // Find the closest remaining peer, if any
+       private Peer closestPeer ()
+       {
+               double now = Event.time();
                double keyLoc = Node.keyToLocation (key);
                double closestDist = Double.POSITIVE_INFINITY;
                Peer closestPeer = null;
@@ -64,6 +100,10 @@
                                node.log ("bypassing busy peer " + peer);
                                continue;
                        }
+                       if (now < peer.backoffUntil) {
+                               node.log ("bypassing backed off peer " + peer);
+                               continue;
+                       }
                        double dist = Node.distance (keyLoc, peer.location);
                        if (dist < closestDist) {
                                closestDist = dist;
@@ -73,7 +113,73 @@
                return closestPeer; // Null if there are no suitable peers
        }

+       protected void handleRejectedLoop (RejectedLoop rl)
+       {
+               if (searchState != SENT) node.log (rl + " out of order");
+               next.successNotOverload(); // Reset the backoff length
+               next.tokensOut++; // No token was consumed
+               forwardSearch();
+       }
+       
+       protected void handleRejectedOverload (RejectedOverload ro)
+       {
+               if (searchState != SENT) node.log (ro + " out of order");
+               if (ro.local) {
+                       ro.local = false;
+                       // Back off and try another peer
+                       next.localRejectedOverload();
+                       forwardSearch();
+               }
+               if (prev == null) {
+                       // FIXME: throttle
+               }
+               else prev.sendMessage (ro); // Forward the message
+       }
+       
+       protected void handleRouteNotFound (RouteNotFound rnf)
+       {
+               if (searchState != ACCEPTED) node.log (rnf + " out of order");
+               next.successNotOverload(); // Reset the backoff length
+               // Use the remaining htl to try another peer
+               if (rnf.htl < htl) htl = rnf.htl;
+               forwardSearch();
+       }
+       
+       // Event callback
+       protected void acceptedTimeout (Peer p)
+       {
+               if (p != next) return; // We've already moved on to another peer
+               if (searchState != SENT) return;
+               node.log (this + " accepted timeout for " + p);
+               p.localRejectedOverload(); // Back off from p
+               // Tell the sender to slow down
+               if (prev == null) {
+                       // FIXME: throttle
+               }
+               else prev.sendMessage (new RejectedOverload (id, false));
+               // Try another peer
+               forwardSearch();
+       }
+       
+       // Event callback
+       protected void searchTimeout (Peer p)
+       {
+               if (p != next) return; // We've already moved on to another peer
+               if (searchState != ACCEPTED) return;
+               node.log (this + " search timeout for " + p);
+               p.localRejectedOverload(); // Back off from p
+               // Tell the sender to slow down
+               if (prev == null) {
+                       // FIXME: throttle
+               }
+               else prev.sendMessage (new RejectedOverload (id, false));
+               if (prev == null) node.log (this + " failed");
+               finish();
+       }
+       
        public abstract void handleMessage (Message m, Peer src);
-       
+       protected abstract void sendReply();
        protected abstract Search makeSearchMessage();
+       protected abstract void scheduleAcceptedTimeout (Peer next);
+       protected abstract void finish();
 }

Modified: trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java      
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java      
2006-11-16 20:45:37 UTC (rev 10955)
@@ -21,67 +21,28 @@
        {
                if (searchState != SENT) node.log (a + " out of order");
                searchState = ACCEPTED;
+               next.successNotOverload(); // Reset the backoff length
                // Wait 60 seconds for a reply to the search
                Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
        }

-       protected void handleRejectedLoop (RejectedLoop rl)
-       {
-               if (searchState != SENT) node.log (rl + " out of order");
-               next.tokensOut++; // No token was consumed
-               forwardSearch();
-       }
-       
-       protected void handleRouteNotFound (RouteNotFound rnf)
-       {
-               if (searchState != ACCEPTED) node.log (rnf + " out of order");
-               if (rnf.htl < htl) htl = rnf.htl;
-               // Use the remaining htl to try another peer
-               forwardSearch();
-       }
-       
        protected void handleDataNotFound (DataNotFound dnf)
        {
                if (searchState != ACCEPTED) node.log (dnf + " out of order");
+               next.successNotOverload(); // Reset the backoff length
                if (prev == null) node.log (this + " failed");
                else prev.sendMessage (dnf); // Forward the message
                finish();
        }

-       protected void forwardSearch()
+       protected void sendReply()
        {
-               next = null;
-               // If the search has run out of hops, send DataNotFound
-               if (htl == 0) {
-                       node.log ("data not found for " + this);
-                       if (prev == null) node.log (this + " failed");
-                       else prev.sendMessage (new DataNotFound (id));
-                       finish();
-                       return;
-               }
-               // Forward the search to the closest remaining peer
-               next = closestPeer();
-               if (next == null) {
-                       node.log ("route not found for " + this);
-                       if (prev == null) node.log (this + " failed");
-                       else prev.sendMessage (new RouteNotFound (id, htl));
-                       finish();
-                       return;
-               }
-               // Decrement the htl if the next node is not the closest so far
-               double target = Node.keyToLocation (key);
-               if (Node.distance (target, next.location)
-               >= Node.distance (target, closest))
-                       htl = node.decrementHtl (htl);
-               node.log (this + " has htl " + htl);
-               // Consume a token
-               next.tokensOut--;
-               // Forward the search
-               node.log ("forwarding " + this + " to " + next.address);
-               next.sendMessage (makeSearchMessage());
-               nexts.remove (next);
-               searchState = SENT;
-               // Wait 5 seconds for the next hop to accept the search
+               if (prev == null) node.log (this + " failed");
+               else prev.sendMessage (new DataNotFound (id));
+       }
+       
+       protected void scheduleAcceptedTimeout (Peer next)
+       {
                Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
        }

@@ -91,25 +52,7 @@
                node.removeMessageHandler (id);
        }

-       // Event callbacks
-       
-       protected void acceptedTimeout (Peer p)
-       {
-               if (p != next) return; // We've already moved on to another peer
-               if (searchState != SENT) return;
-               node.log (this + " accepted timeout for " + p);
-               forwardSearch(); // Try another peer
-       }
-       
-       protected void searchTimeout (Peer p)
-       {
-               if (p != next) return; // We've already moved on to another peer
-               if (searchState != ACCEPTED) return;
-               node.log (this + " search timeout for " + p);
-               if (prev == null) node.log (this + " failed");
-               finish();
-       }
-       
+       // Event callback
        protected void transferTimeout (Peer p)
        {
                if (searchState != TRANSFERRING) return;

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java    
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java    
2006-11-16 20:45:37 UTC (rev 10955)
@@ -57,6 +57,8 @@
                                handleSskAccepted ((SskAccepted) m);
                        else if (m instanceof RejectedLoop)
                                handleRejectedLoop ((RejectedLoop) m);
+                       else if (m instanceof RejectedOverload)
+                               handleRejectedOverload ((RejectedOverload) m);
                        else if (m instanceof RouteNotFound)
                                handleRouteNotFound ((RouteNotFound) m);
                        else if (m instanceof SskDataFound)
@@ -86,24 +88,10 @@
                if (sa.needPubKey) next.sendMessage (pubKey);
        }

-       private void handleRejectedLoop (RejectedLoop rl)
-       {
-               if (searchState != SENT) node.log (rl + " out of order");
-               next.tokensOut++; // No token was consumed
-               forwardSearch();
-       }
-       
-       private void handleRouteNotFound (RouteNotFound rnf)
-       {
-               if (searchState != ACCEPTED) node.log (rnf + " out of order");
-               if (rnf.htl < htl) htl = rnf.htl;
-               // Use the remaining htl to try another peer
-               forwardSearch();
-       }
-       
        private void handleCollision (SskDataFound sdf)
        {
                if (searchState != ACCEPTED) node.log (sdf + " out of order");
+               // FIXME: should we reset the backoff length?
                if (prev == null) node.log (this + " collided");
                else prev.sendMessage (sdf); // Forward the message
                data = sdf.data; // Is this safe?
@@ -112,49 +100,29 @@
        private void handleInsertReply (InsertReply ir)
        {
                if (searchState != ACCEPTED) node.log (ir + " out of order");
+               next.successNotOverload(); // Reset the backoff length
                if (prev == null) node.log (this + " succeeded");
                else prev.sendMessage (ir); // Forward the message
                finish();
        }

-       public void forwardSearch()
+       protected void sendReply()
        {
-               next = null;
-               // If the search has run out of hops, send InsertReply
-               if (htl == 0) {
-                       node.log (this + " has no hops left");
-                       if (prev == null) node.log (this + " succeeded");
-                       else prev.sendMessage (new InsertReply (id));
-                       finish();
-                       return;
-               }
-               // Forward the search to the closest remaining peer
-               next = closestPeer();
-               if (next == null) {
-                       node.log ("route not found for " + this);
-                       if (prev == null) node.log (this + " failed");
-                       else prev.sendMessage (new RouteNotFound (id, htl));
-                       finish();
-                       return;
-               }
-               // Decrement the htl if the next node is not the closest so far
-               double target = Node.keyToLocation (key);
-               if (Node.distance (target, next.location)
-               >= Node.distance (target, closest))
-                       htl = node.decrementHtl (htl);
-               node.log (this + " has htl " + htl);
-               // Consume a token
-               next.tokensOut--;
-               // Forward the search
-               node.log ("forwarding " + this + " to " + next.address);
-               next.sendMessage (makeSearchMessage());
-               nexts.remove (next);
-               searchState = SENT;
-               // Wait 10 seconds for the next hop to accept the search
+               if (prev == null) node.log (this + " succeeded");
+               else prev.sendMessage (new InsertReply (id));
+       }
+       
+       protected Search makeSearchMessage()
+       {
+               return new SskInsert (id, key, data, closest, htl);
+       }
+       
+       protected void scheduleAcceptedTimeout (Peer next)
+       {
                Event.schedule (this, 10.0, ACCEPTED_TIMEOUT, next);
        }

-       private void finish()
+       protected void finish()
        {
                searchState = COMPLETED;
                node.cachePubKey (key);
@@ -164,18 +132,12 @@
                node.removeMessageHandler (id);
        }

-       protected Search makeSearchMessage()
-       {
-               return new SskInsert (id, key, data, closest, htl);
-       }
-       
        public String toString()
        {
                return new String ("SSK insert (" +id+ "," +key+ "," +data+")");
        }

-       // Event callbacks
-       
+       // Event callback
        private void keyTimeout()
        {
                if (searchState != STARTED) return;
@@ -183,23 +145,6 @@
                finish();
        }

-       private void acceptedTimeout (Peer p)
-       {
-               if (p != next) return; // We've already moved on to another peer
-               if (searchState != SENT) return;
-               node.log (this + " accepted timeout for " + p);
-               forwardSearch(); // Try another peer
-       }
-       
-       private void searchTimeout (Peer p)
-       {
-               if (p != next) return; // We've already moved on to another peer
-               if (searchState != ACCEPTED) return;
-               node.log (this + " search timeout for " + p);
-               if (prev == null) node.log (this + " failed");
-               finish();
-       }
-       
        // EventTarget interface
        public void handleEvent (int type, Object data)
        {

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java   
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskRequestHandler.java   
2006-11-16 20:45:37 UTC (rev 10955)
@@ -28,6 +28,8 @@
                        handleAccepted ((Accepted) m);
                else if (m instanceof RejectedLoop)
                        handleRejectedLoop ((RejectedLoop) m);
+               else if (m instanceof RejectedOverload)
+                       handleRejectedOverload ((RejectedOverload) m);
                else if (m instanceof RouteNotFound)
                        handleRouteNotFound ((RouteNotFound) m);
                else if (m instanceof DataNotFound)
@@ -42,6 +44,7 @@
        private void handleSskDataFound (SskDataFound df)
        {
                if (searchState != ACCEPTED) node.log (df + " out of order");
+               next.successNotOverload(); // Reset the backoff length
                dataFound = df;
                if (pubKey == null) return; // Keep waiting
                if (prev == null) node.log (this + " succeeded");
@@ -57,6 +60,7 @@
        private void handleSskPubKey (SskPubKey pk)
        {
                if (searchState != ACCEPTED) node.log (pk + " out of order");
+               next.successNotOverload(); // Reset the backoff length
                pubKey = pk;
                if (dataFound == null) return; // Keep waiting
                if (prev == null) node.log (this + " succeeded");

Added: trunk/apps/load-balancing-sims/phase7/sim/messages/RejectedOverload.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/messages/RejectedOverload.java    
2006-11-16 19:50:21 UTC (rev 10954)
+++ trunk/apps/load-balancing-sims/phase7/sim/messages/RejectedOverload.java    
2006-11-16 20:45:37 UTC (rev 10955)
@@ -0,0 +1,17 @@
+package sim.messages;
+
+public class RejectedOverload extends Message
+{
+       public boolean local; // Was this rejection generated locally?
+       
+       public RejectedOverload (int id, boolean local)
+       {
+               this.id = id;
+               this.local = local;
+       }
+       
+       public String toString()
+       {
+               return new String ("rejected overload (" +id+ "," +local+ ")");
+       }
+}


Reply via email to