Author: mrogers
Date: 2006-08-27 19:56:53 +0000 (Sun, 27 Aug 2006)
New Revision: 10288

Added:
   trunk/apps/load-balancing-sims/phase6/messages/DataNotFound.java
Modified:
   trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
   trunk/apps/load-balancing-sims/phase6/LruCache.java
   trunk/apps/load-balancing-sims/phase6/Node.java
   trunk/apps/load-balancing-sims/phase6/Packet.java
   trunk/apps/load-balancing-sims/phase6/Peer.java
   trunk/apps/load-balancing-sims/phase6/Sim.java
   trunk/apps/load-balancing-sims/phase6/TokenBucket.java
Log:
Search and transfer timeouts

Modified: trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java        
2006-08-27 19:02:28 UTC (rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java        
2006-08-27 19:56:53 UTC (rev 10288)
@@ -3,12 +3,14 @@
 import java.util.LinkedList;
 import messages.*;

-class ChkRequestHandler
+class ChkRequestHandler implements EventTarget
 {
        // State machine
+       public final static int STARTED = 0;
        public final static int REQUEST_SENT = 1;
        public final static int ACCEPTED = 2;
        public final static int TRANSFERRING = 3;
+       public final static int FAILED = 4;

        public final int id; // The unique ID of the request
        public final int key; // The requested key
@@ -16,7 +18,7 @@
        private Peer prev; // The previous hop of the request
        private Peer next = null; // The (current) next hop of the request
        private LinkedList<Peer> nexts; // Candidates for the next hop
-       private int state = REQUEST_SENT; // State machine
+       private int state = STARTED; // State machine
        private int blockBitmap = 0; // Bitmap of received blocks

        public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
@@ -26,6 +28,7 @@
                this.node = node;
                this.prev = prev;
                nexts = new LinkedList<Peer> (node.peers());
+               nexts.remove (prev);
        }

        // Remove a peer from the list of candidates for the next hop
@@ -34,72 +37,80 @@
                nexts.remove (p);
        }

-       public boolean handleMessage (Message m, Peer src)
+       public void handleMessage (Message m, Peer src)
        {
                if (src != next) {
                        node.log ("unexpected source for " + m);
-                       return false; // Request not completed
+                       return;
                }
-               if (m instanceof Accepted) return handleAccepted ((Accepted) m);
-               if (m instanceof ChkDataFound)
-                       return handleChkDataFound ((ChkDataFound) m);
-               if (m instanceof Block) return handleBlock ((Block) m);
-               if (m instanceof RouteNotFound) return forwardRequest();
-               if (m instanceof RejectedLoop) return forwardRequest();
-               // Unrecognised message type
-               node.log ("unexpected type for " + m);
-               return false; // Request not completed
+               if (m instanceof Accepted) handleAccepted ((Accepted) m);
+               else if (m instanceof ChkDataFound)
+                       handleChkDataFound ((ChkDataFound) m);
+               else if (m instanceof DataNotFound)
+                       handleDataNotFound ((DataNotFound) m);
+               else if (m instanceof Block) handleBlock ((Block) m);
+               else if (m instanceof RouteNotFound) forwardRequest();
+               else if (m instanceof RejectedLoop) forwardRequest();
+               else node.log ("unexpected type for " + m);
        }

-       private boolean handleAccepted (Accepted a)
+       private void handleAccepted (Accepted a)
        {
-               if (state != REQUEST_SENT)
-                       node.log (a + " received out of order");
+               if (state != REQUEST_SENT) node.log (a + " out of order");
                state = ACCEPTED;
-               return false; // Request not completed
+               // Wait 60 seconds for the next hop to start sending the data
+               Event.schedule (this, 60.0, FETCH_TIMEOUT, next);
        }

-       private boolean handleChkDataFound (ChkDataFound df)
+       private void handleChkDataFound (ChkDataFound df)
        {
-               if (state != ACCEPTED)
-                       node.log (df + " received out of order");
+               if (state != ACCEPTED) node.log (df + " out of order");
                state = TRANSFERRING;
-               if (prev != null) prev.sendMessage (df);
-               return false; // Request not completed
+               if (prev != null) prev.sendMessage (df); // Forward the message
        }

-       private boolean handleBlock (Block b)
+       private void handleDataNotFound (DataNotFound dnf)
        {
-               if (state != TRANSFERRING)
-                       node.log (b + " received out of order");
-               if (receivedBlock (b.index)) return false; // Ignore duplicates
+               if (state != ACCEPTED) node.log (dnf + " out of order");
+               if (prev == null) node.log (this + " failed");
+               else prev.sendMessage (dnf); // Forward the message
+               node.chkRequestCompleted (id);
+       }
+       
+       private void handleBlock (Block b)
+       {
+               if (state != TRANSFERRING) node.log (b + " out of order");
+               if (receivedBlock (b.index)) return; // Ignore duplicates
                // Forward the block
                if (prev != null) {
                        node.log ("forwarding " + b);
                        prev.sendMessage (b);
                }
+               // If the transfer is complete, store the data
                if (receivedAll()) {
                        node.storeChk (key);
                        if (prev == null) node.log (this + " succeeded");
-                       return true; // Request completed
+                       node.chkRequestCompleted (id);
                }
-               else return false; // Request not completed
        }

-       public boolean forwardRequest()
+       public void forwardRequest()
        {
                next = closestPeer();
                if (next == null) {
                        node.log ("route not found for " + this);
                        if (prev == null) node.log (this + " failed");
                        else prev.sendMessage (new RouteNotFound (id));
-                       return true; // Request completed
+                       node.chkRequestCompleted (id);
+                       state = FAILED;
                }
                else {
                        node.log ("forwarding " + this + " to " + next.address);
                        next.sendMessage (new ChkRequest (id, key));
                        nexts.remove (next);
-                       return false; // Request not completed
+                       state = REQUEST_SENT;
+                       // Wait 5 seconds for the next hop to accept the request
+                       Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
                }
        }

@@ -137,4 +148,34 @@
        {
                return new String ("CHK request (" + id + "," + key + ")");
        }
+       
+       // Event callback
+       private void acceptedTimeout (Peer p)
+       {
+               if (p != next) return; // We've already moved on to another peer
+               if (state != REQUEST_SENT) return; // Peer has already answered
+               node.log (this + " search timed out waiting for " + p);
+               forwardRequest(); // Try another peer
+       }
+       
+       // Event callback
+       private void fetchTimeout (Peer p)
+       {
+               if (state != ACCEPTED) return; // Peer has already answered
+               node.log (this + " transfer timed out waiting for " + p);
+               if (prev == null) node.log (this + " failed");
+               else prev.sendMessage (new DataNotFound (id));
+               node.chkRequestCompleted (id);
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               if (type == ACCEPTED_TIMEOUT) acceptedTimeout ((Peer) data);
+               else if (type == FETCH_TIMEOUT) fetchTimeout ((Peer) data);
+       }
+       
+       // Each EventTarget class has its own event codes
+       private final static int ACCEPTED_TIMEOUT = 1;
+       private final static int FETCH_TIMEOUT = 2;
 }

Modified: trunk/apps/load-balancing-sims/phase6/LruCache.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/LruCache.java 2006-08-27 19:02:28 UTC 
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/LruCache.java 2006-08-27 19:56:53 UTC 
(rev 10288)
@@ -41,6 +41,6 @@

        private void log (String message)
        {
-               Event.log (message);
+               // Event.log (message);
        }
 }

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-27 19:02:28 UTC 
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-27 19:56:53 UTC 
(rev 10288)
@@ -25,6 +25,8 @@
        public TokenBucket bandwidth; // Bandwidth limiter
        private boolean timerRunning = false; // Is the timer running?

+       public boolean faulty = false; // DEBUG
+       
        public Node (double txSpeed, double rxSpeed)
        {
                location = Math.random();
@@ -96,7 +98,7 @@
        public void startTimer()
        {
                if (timerRunning) return;
-               log ("starting retransmission/coalescing timer");
+               // log ("starting retransmission/coalescing timer");
                Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
                timerRunning = true;
        }
@@ -113,31 +115,35 @@
        public void handleMessage (Message m, Peer src)
        {
                log ("received " + m);
-               if (m instanceof ChkRequest) {
-                       if (handleChkRequest ((ChkRequest) m, src))
-                               chkRequests.remove (m.id); // Completed
-               }
+               if (m instanceof ChkRequest)
+                       handleChkRequest ((ChkRequest) m, src);
                else {
                        ChkRequestHandler rh = chkRequests.get (m.id);
                        if (rh == null) log ("no request handler for " + m);
-                       else if (rh.handleMessage (m, src))
-                               chkRequests.remove (m.id); // Completed
+                       else rh.handleMessage (m, src);
                }
        }

-       private boolean handleChkRequest (ChkRequest r, Peer prev)
+       private void handleChkRequest (ChkRequest r, Peer prev)
        {
                if (!recentlySeenRequests.add (r.id)) {
                        log ("rejecting recently seen " + r);
                        prev.sendMessage (new RejectedLoop (r.id));
-                       // Optimisation: the previous hop has already seen
-                       // this request, so don't ask it in the future
+                       // Don't forward the same request back to prev
                        ChkRequestHandler rh = chkRequests.get (r.id);
                        if (rh != null) rh.removeNextHop (prev);
-                       return false; // Request not completed
+                       return;
                }
                // Accept the request
-               if (prev != null) prev.sendMessage (new Accepted (r.id));
+               if (prev != null) {
+                       log ("accepting " + r);
+                       prev.sendMessage (new Accepted (r.id));
+               }
+               // DEBUG
+               if (faulty) {
+                       log ("DEBUG: dropping " + r);
+                       return;
+               }
                // If the key is in the store, return it
                if (chkStore.get (r.key)) {
                        log ("key " + r.key + " found in CHK store");
@@ -147,7 +153,8 @@
                                for (int i = 0; i < 32; i++)
                                        prev.sendMessage (new Block (r.id, i));
                        }
-                       return true; // Request completed
+                       chkRequestCompleted (r.id);
+                       return;
                }
                log ("key " + r.key + " not found in CHK store");
                // If the key is in the cache, return it
@@ -159,15 +166,22 @@
                                for (int i = 0; i < 32; i++)
                                        prev.sendMessage (new Block (r.id, i));
                        }
-                       return true; // Request completed
+                       chkRequestCompleted (r.id);
+                       return;
                }
                log ("key " + r.key + " not found in CHK cache");
                // Forward the request and store the request state
                ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
                chkRequests.put (r.id, rh);
-               return rh.forwardRequest();
+               rh.forwardRequest();
        }

+       // Remove a completed request from the list of pending requests
+       public void chkRequestCompleted (int id)
+       {
+               chkRequests.remove (id);
+       }
+       
        // Return the list of peers in a random order
        public ArrayList<Peer> peers()
        {
@@ -197,7 +211,7 @@
                for (Peer p : peers())
                        deadline = Math.min (deadline, p.checkTimeouts());
                if (deadline == Double.POSITIVE_INFINITY) {
-                       log ("stopping retransmission/coalescing timer");
+                       // log ("stopping retransmission/coalescing timer");
                        timerRunning = false;
                }
                else {

Modified: trunk/apps/load-balancing-sims/phase6/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Packet.java   2006-08-27 19:02:28 UTC 
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/Packet.java   2006-08-27 19:56:53 UTC 
(rev 10288)
@@ -32,4 +32,9 @@
                messages.add (m);
                size += m.size;
        }
+       
+       public String toString()
+       {
+               return new String (src + ":" + dest + ":" + seq);
+       }
 }

Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-27 19:02:28 UTC 
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-27 19:56:53 UTC 
(rev 10288)
@@ -293,6 +293,11 @@

        public void log (String message)
        {
-               Event.log (node.net.address + ":" + address + " " + message);
+               // Event.log (node.net.address + ":" + address + " " + message);
        }
+       
+       public String toString()
+       {
+               return Integer.toString (address);
+       }
 }

Modified: trunk/apps/load-balancing-sims/phase6/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java      2006-08-27 19:02:28 UTC 
(rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/Sim.java      2006-08-27 19:56:53 UTC 
(rev 10288)
@@ -23,7 +23,10 @@
                n1.connectBothWays (n3, 0.001);
                n2.connectBothWays (n3, 0.001);

-               for (int i = 0; i < 10; i++) {
+               // DEBUG
+               n2.faulty = true;
+               
+               for (int i = 0; i < 4; i++) {
                        int key = Node.locationToKey (Math.random());
                        // Half the requests will succeed, half will fail
                        if (i % 2 == 0) n3.storeChk (key);

Modified: trunk/apps/load-balancing-sims/phase6/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/TokenBucket.java      2006-08-27 
19:02:28 UTC (rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/TokenBucket.java      2006-08-27 
19:56:53 UTC (rev 10288)
@@ -24,6 +24,6 @@
        public void remove (int t)
        {
                tokens -= t; // Counter can go negative
-               Event.log (t + " tokens removed, " + tokens + " available");
+               // Event.log (t + " tokens removed, " + tokens + " available");
        }
 }

Added: trunk/apps/load-balancing-sims/phase6/messages/DataNotFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/DataNotFound.java    
2006-08-27 19:02:28 UTC (rev 10287)
+++ trunk/apps/load-balancing-sims/phase6/messages/DataNotFound.java    
2006-08-27 19:56:53 UTC (rev 10288)
@@ -0,0 +1,15 @@
+package messages;
+
+public class DataNotFound extends Message
+{
+       public DataNotFound (int id)
+       {
+               this.id = id;
+               size = Message.HEADER_SIZE;
+       }
+       
+       public String toString()
+       {
+               return new String ("data not found (" + id + ")");
+       }
+}


Reply via email to