Author: mrogers
Date: 2006-09-14 14:13:00 +0000 (Thu, 14 Sep 2006)
New Revision: 10462

Modified:
   trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
Log:
CHK inserts, sorry about the large number of commits, I accidentally added 
classfiles to the repository and I'm trying to fix it

Modified: trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java        
2006-09-14 14:12:57 UTC (rev 10461)
+++ trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java        
2006-09-14 14:13:00 UTC (rev 10462)
@@ -1,224 +1,169 @@
-// The state of an outstanding CHK request as stored at each node along the 
path
+// The state of a CHK request as stored at each node along the path

-import java.util.LinkedList;
 import messages.*;

-class ChkRequestHandler implements EventTarget
+class ChkRequestHandler extends MessageHandler implements EventTarget
 {
-       // State machine
-       public final static int STARTED = 0;
-       public final static int REQUEST_SENT = 1;
-       public final static int ACCEPTED = 2;
-       public final static int TRANSFERRING = 3;
-       public final static int COMPLETED = 4;
+       private int state = STARTED; // State of search
+       private boolean[] received; // Keep track of received blocks
+       private int blocksReceived = 0;

-       private final int id; // The unique ID of the request
-       private final int key; // The requested key
-       private double best; // The best location seen so far
-       private int htl; // Hops to live for backtracking
-       
-       private Node node; // The owner of this RequestHandler
-       private Peer prev; // The previous hop of the request
-       private Peer next = null; // The (current) next hop of the request
-       private LinkedList<Peer> nexts; // Candidates for the next hop
-       private int state = STARTED; // State machine
-       private int blockBitmap = 0; // Bitmap of received blocks
-       
        public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
        {
-               id = r.id;
-               key = r.key;
-               best = r.best;
-               htl = r.htl;
-               this.node = node;
-               this.prev = prev;
-               nexts = new LinkedList<Peer> (node.peers());
-               nexts.remove (prev);
-               // If this is the best node so far, update best & reset htl
-               double target = Node.keyToLocation (key);
-               if (Node.distance (target, node.location)
-               < Node.distance (target, best)) {
-                       node.log ("resetting htl of " + this);
-                       best = node.location;
-                       htl = ChkRequest.MAX_HTL;
-               }
+               super (r, node, prev);
+               received = new boolean[32];
        }

-       // Remove a peer from the list of candidates for the next hop
-       public void removeNextHop (Peer p)
-       {
-               nexts.remove (p);
-       }
-       
        public void handleMessage (Message m, Peer src)
        {
                if (src != next) {
                        node.log ("unexpected source for " + m);
                        return;
                }
-               if (m instanceof Accepted) handleAccepted ((Accepted) m);
-               else if (m instanceof ChkDataFound)
-                       handleChkDataFound ((ChkDataFound) m);
-               else if (m instanceof DataNotFound)
-                       handleDataNotFound ((DataNotFound) m);
+               if (m instanceof Accepted)
+                       handleAccepted ((Accepted) m);
+               else if (m instanceof RejectedLoop)
+                       handleRejectedLoop ((RejectedLoop) m);
                else if (m instanceof RouteNotFound)
                        handleRouteNotFound ((RouteNotFound) m);
-               else if (m instanceof Block) handleBlock ((Block) m);
-               else if (m instanceof RejectedLoop) forwardRequest();
+               else if (m instanceof DataNotFound)
+                       handleDataNotFound ((DataNotFound) m);
+               else if (m instanceof ChkDataFound)
+                       handleChkDataFound ((ChkDataFound) m);
+               else if (m instanceof Block)
+                       handleBlock ((Block) m);
                else node.log ("unexpected type for " + m);
        }

        private void handleAccepted (Accepted a)
        {
-               if (state != REQUEST_SENT) node.log (a + " out of order");
-               if (state != TRANSFERRING) state = ACCEPTED;
-               // Wait 60 seconds for the next hop to start sending the data
-               Event.schedule (this, 60.0, FETCH_TIMEOUT, next);
+               if (state != SENT) node.log (a + " out of order");
+               state = ACCEPTED;
+               // Wait 60 seconds for a reply to the search
+               Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
        }

-       private void handleChkDataFound (ChkDataFound df)
+       private void handleRejectedLoop (RejectedLoop rl)
        {
-               if (state != ACCEPTED) node.log (df + " out of order");
-               if (prev != null) prev.sendMessage (df); // Forward the message
-               state = TRANSFERRING;
-               // Wait 5 minutes for the transfer to complete
-               Event.schedule (this, 300.0, TRANSFER_TIMEOUT, next);
+               if (state != SENT) node.log (rl + " out of order");
+               forwardSearch();
        }

+       private void handleRouteNotFound (RouteNotFound rnf)
+       {
+               if (state != ACCEPTED) node.log (rnf + " out of order");
+               if (rnf.htl < htl) htl = rnf.htl;
+               // Use the remaining htl to try another peer
+               forwardSearch();
+       }
+       
        private void handleDataNotFound (DataNotFound dnf)
        {
                if (state != ACCEPTED) node.log (dnf + " out of order");
                if (prev == null) node.log (this + " failed");
                else prev.sendMessage (dnf); // Forward the message
-               node.chkRequestCompleted (id);
-               state = COMPLETED;
+               finish();
        }

-       private void handleRouteNotFound (RouteNotFound rnf)
+       private void handleChkDataFound (ChkDataFound df)
        {
-               if (state != ACCEPTED) node.log (rnf + " out of order");
-               if (rnf.htl < htl) htl = rnf.htl;
-               // Use the remaining htl to try another peer
-               forwardRequest();
+               if (state != ACCEPTED) node.log (df + " out of order");
+               state = TRANSFERRING;
+               if (prev != null) prev.sendMessage (df); // Forward the message
+               // Wait for the transfer to complete (FIXME: check real timeout)
+               Event.schedule (this, 120.0, TRANSFER_TIMEOUT, next);
        }

        private void handleBlock (Block b)
        {
                if (state != TRANSFERRING) node.log (b + " out of order");
-               if (receivedBlock (b.index)) return; // Ignore duplicates
+               if (received[b.index]) return; // Ignore duplicates
+               received[b.index] = true;
+               blocksReceived++;
                // Forward the block
                if (prev != null) {
                        node.log ("forwarding " + b);
                        prev.sendMessage (b);
                }
                // If the transfer is complete, cache the data
-               if (receivedAll()) {
+               if (blocksReceived == 32) {
                        node.cacheChk (key);
                        if (prev == null) node.log (this + " succeeded");
-                       node.chkRequestCompleted (id);
-                       state = COMPLETED;
+                       finish();
                }
        }

-       public void forwardRequest()
+       public void forwardSearch()
        {
-               // If the request has run out of hops, send DataNotFound
+               next = null;
+               // If the search has run out of hops, send DataNotFound
                if (htl == 0) {
                        node.log ("data not found for " + this);
                        if (prev == null) node.log (this + " failed");
                        else prev.sendMessage (new DataNotFound (id));
-                       node.chkRequestCompleted (id);
-                       state = COMPLETED;
+                       finish();
                        return;
                }
-               // Forward the request to the best remaining peer
-               next = bestPeer();
+               // Forward the search to the closest remaining peer
+               next = closestPeer();
                if (next == null) {
                        node.log ("route not found for " + this);
                        if (prev == null) node.log (this + " failed");
                        else prev.sendMessage (new RouteNotFound (id, htl));
-                       node.chkRequestCompleted (id);
-                       state = COMPLETED;
+                       finish();
                        return;
                }
-               // Decrement htl if next node is not best so far
+               // Decrement the htl if the next node is not the closest so far
                double target = Node.keyToLocation (key);
                if (Node.distance (target, next.location)
-               > Node.distance (target, best)) {
+               > Node.distance (target, closest)) {
                        htl = node.decrementHtl (htl);
                        node.log (this + " has htl " + htl);
                }
                node.log ("forwarding " + this + " to " + next.address);
-               next.sendMessage (new ChkRequest (id, key, best, htl));
+               next.sendMessage (new ChkRequest (id, key, closest, htl));
                nexts.remove (next);
-               state = REQUEST_SENT;
-               // Wait 5 seconds for the next hop to accept the request
+               state = SENT;
+               // Wait 5 seconds for the next hop to accept the search
                Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
        }

-       // Find the best remaining peer
-       private Peer bestPeer ()
+       private void finish()
        {
-               double keyLoc = Node.keyToLocation (key);
-               double bestDist = Double.POSITIVE_INFINITY;
-               Peer bestPeer = null;
-               for (Peer peer : nexts) {
-                       double dist = Node.distance (keyLoc, peer.location);
-                       if (dist < bestDist) {
-                               bestDist = dist;
-                               bestPeer = peer;
-                       }
-               }
-               return bestPeer; // Null if the list was empty
+               state = COMPLETED;
+               node.chkRequestCompleted (id);
        }

-       // Mark a block as received, return true if it's a duplicate
-       private boolean receivedBlock (int index)
-       {
-               int mask = 1 << index;
-               boolean duplicate = (blockBitmap & mask) != 0;
-               blockBitmap |= mask;
-               return duplicate;
-       }
-       
-       // Return true if all blocks have been received
-       private boolean receivedAll()
-       {
-               return blockBitmap == 0xFFFFFFFF;
-       }
-       
        public String toString()
        {
                return new String ("CHK request (" + id + "," + key + ")");
        }

-       // Event callback
+       // Event callbacks
+       
        private void acceptedTimeout (Peer p)
        {
                if (p != next) return; // We've already moved on to another peer
-               if (state != REQUEST_SENT) return; // Peer has already answered
+               if (state != SENT) return;
                node.log (this + " accepted timeout waiting for " + p);
-               forwardRequest(); // Try another peer
+               forwardSearch(); // Try another peer
        }

-       // Event callback
-       private void fetchTimeout (Peer p)
+       private void searchTimeout (Peer p)
        {
-               if (state != ACCEPTED) return; // Peer has already answered
-               node.log (this + " fetch timeout waiting for " + p);
+               if (p != next) return; // We've already moved on to another peer
+               if (state != ACCEPTED) return;
+               node.log (this + " search timeout waiting for " + p);
                if (prev == null) node.log (this + " failed");
-               node.chkRequestCompleted (id);
-               state = COMPLETED;
+               finish();
        }

-       // Event callback
        private void transferTimeout (Peer p)
        {
-               if (state != TRANSFERRING) return; // Transfer has completed
-               node.log (this + " transfer timeout waiting for " + p);
+               if (state != TRANSFERRING) return;
+               node.log (this + " transfer timeout receiving from " + p);
                if (prev == null) node.log (this + " failed");
-               node.chkRequestCompleted (id);
-               state = COMPLETED;
+               finish();
        }

        // EventTarget interface
@@ -229,8 +174,8 @@
                        acceptedTimeout ((Peer) data);
                        break;

-                       case FETCH_TIMEOUT:
-                       fetchTimeout ((Peer) data);
+                       case SEARCH_TIMEOUT:
+                       searchTimeout ((Peer) data);
                        break;

                        case TRANSFER_TIMEOUT:
@@ -241,6 +186,6 @@

        // Each EventTarget class has its own event codes
        private final static int ACCEPTED_TIMEOUT = 1;
-       private final static int FETCH_TIMEOUT = 2;
+       private final static int SEARCH_TIMEOUT = 2;
        private final static int TRANSFER_TIMEOUT = 3;
 }


Reply via email to