Author: mrogers
Date: 2006-09-14 14:12:57 +0000 (Thu, 14 Sep 2006)
New Revision: 10461

Added:
   trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java
Log:
CHK inserts, sorry about the large number of commits, I accidentally added 
classfiles to the repository and I'm trying to fix it

Copied: trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java (from rev 
10460, trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java)
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java        
2006-09-12 19:52:08 UTC (rev 10460)
+++ trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-09-14 
14:12:57 UTC (rev 10461)
@@ -0,0 +1,258 @@
+// The state of a CHK insert as stored at each node along the path
+
+import java.util.HashSet;
+import messages.*;
+
+class ChkInsertHandler extends MessageHandler implements EventTarget
+{
+       private int inState = STARTED; // State of incoming transfer
+       private int searchState = STARTED; // State of search
+       private HashSet<Peer> receivers; // Peers that should receive data
+       private Block[] blocks; // Store incoming blocks for forwarding
+       private int blocksReceived = 0;
+       
+       public ChkInsertHandler (ChkInsert i, Node node, Peer prev)
+       {
+               super (i, node, prev);
+               receivers = new HashSet<Peer>();
+               blocks = new Block[32];
+       }
+       
+       public void handleMessage (Message m, Peer src)
+       {
+               if (src == prev) {
+                       if (m instanceof DataInsert)
+                               handleDataInsert ((DataInsert) m);
+                       else if (m instanceof Block)
+                               handleBlock ((Block) m);
+                       else node.log ("unexpected type for " + m);
+               }
+               else if (src == next) {
+                       if (m instanceof Accepted)
+                               handleAccepted ((Accepted) m);
+                       else if (m instanceof RejectedLoop)
+                               handleRejectedLoop ((RejectedLoop) m);
+                       else if (m instanceof RouteNotFound)
+                               handleRouteNotFound ((RouteNotFound) m);
+                       else if (m instanceof InsertReply)
+                               handleInsertReply ((InsertReply) m);
+                       else if (m instanceof TransfersCompleted)
+                               handleCompleted ((TransfersCompleted) m, src);
+                       else node.log ("unexpected type for " + m);
+               }
+               else if (receivers.contains (src)) {
+                       if (m instanceof TransfersCompleted)
+                               handleCompleted ((TransfersCompleted) m, src);
+                       else node.log ("unexpected type for " + m);
+               }
+               else node.log ("unexpected source for " + m);
+       }
+       
+       private void handleDataInsert (DataInsert di)
+       {
+               if (inState != STARTED) node.log (di + " out of order");
+               inState = TRANSFERRING;
+               // Start the search
+               forwardSearch();
+               // Wait for transfer to complete (FIXME: check real timeout)
+               Event.schedule (this, 120.0, TRANSFER_IN_TIMEOUT, null);
+       }
+       
+       private void handleBlock (Block b)
+       {
+               if (inState != TRANSFERRING) node.log (b + " out of order");
+               if (blocks[b.index] != null) return; // Ignore duplicates
+               blocks[b.index] = b;
+               blocksReceived++;
+               // Forward the block to all receivers
+               for (Peer p : receivers) p.sendMessage (b);
+               // If the transfer is complete, cache and maybe store the data
+               if (blocksReceived == 32) {
+                       node.cacheChk (key);
+                       node.storeChk (key);
+                       inState = COMPLETED;
+                       considerFinishing();
+               }
+       }
+       
+       private void handleCompleted (TransfersCompleted tc, Peer src)
+       {
+               receivers.remove (src);
+               if (src == next) forwardSearch();
+               else considerFinishing();
+       }
+       
+       private void handleAccepted (Accepted a)
+       {
+               if (searchState != SENT) node.log (a + " out of order");
+               searchState = ACCEPTED;
+               // Wait 120 seconds for a reply to the search
+               Event.schedule (this, 120.0, SEARCH_TIMEOUT, next);
+               // Add the next hop to the list of receivers
+               receivers.add (next);
+               next.sendMessage (new DataInsert (id));
+               // Send all previously received blocks
+               for (int i = 0; i < 32; i++)
+                       if (blocks[i] != null) next.sendMessage (blocks[i]);
+               // Wait for TransfersCompleted (FIXME: check real timeout)
+               Event.schedule (this, 120.0, TRANSFER_OUT_TIMEOUT, next);
+       }
+       
+       private void handleRejectedLoop (RejectedLoop rl)
+       {
+               if (searchState != SENT) node.log (rl + " out of order");
+               forwardSearch();
+       }
+       
+       private void handleRouteNotFound (RouteNotFound rnf)
+       {
+               if (searchState != ACCEPTED) node.log (rnf + " out of order");
+               if (rnf.htl < htl) htl = rnf.htl;
+               // Use the remaining htl to try another peer
+               forwardSearch();
+       }
+       
+       private void handleInsertReply (InsertReply ir)
+       {
+               if (searchState != ACCEPTED) node.log (ir + " out of order");
+               if (prev == null) node.log (this + " succeeded");
+               else prev.sendMessage (ir); // Forward the message
+       }
+       
+       public void forwardSearch()
+       {
+               next = null;
+               // If the search has run out of hops, send InsertReply
+               if (htl == 0) {
+                       node.log (this + " has no hops left");
+                       if (prev == null) node.log (this + " succeeded");
+                       else prev.sendMessage (new InsertReply (id));
+                       searchState = COMPLETED;
+                       considerFinishing();
+                       return;
+               }
+               // Forward the search to the closest remaining peer
+               next = closestPeer();
+               if (next == null) {
+                       node.log ("route not found for " + this);
+                       if (prev == null) node.log (this + " failed");
+                       else prev.sendMessage (new RouteNotFound (id, htl));
+                       searchState = COMPLETED;
+                       considerFinishing();
+                       return;
+               }
+               // Decrement the htl if the next node is not the closest so far
+               double target = Node.keyToLocation (key);
+               if (Node.distance (target, next.location)
+               > Node.distance (target, closest)) {
+                       htl = node.decrementHtl (htl);
+                       node.log (this + " has htl " + htl);
+               }
+               node.log ("forwarding " + this + " to " + next.address);
+               next.sendMessage (new ChkInsert (id, key, closest, htl));
+               nexts.remove (next);
+               searchState = SENT;
+               // Wait 10 seconds for the next hop to accept the search
+               Event.schedule (this, 10.0, ACCEPTED_TIMEOUT, next);
+       }
+       
+       private void considerFinishing()
+       {
+               // An insert finishes when the search, the incoming transfer
+               // and all outgoing transfers are complete
+               if (searchState == COMPLETED && inState == COMPLETED 
+               && receivers.isEmpty()) finish();
+       }
+       
+       private void finish()
+       {
+               inState = COMPLETED;
+               searchState = COMPLETED;
+               if (prev == null) node.log (this + " completed");
+               else prev.sendMessage (new TransfersCompleted (id));
+               node.chkInsertCompleted (id);
+       }
+       
+       public String toString()
+       {
+               return new String ("CHK insert (" +id+ "," +key+ "," +htl+ ")");
+       }
+       
+       // Event callbacks
+       
+       private void acceptedTimeout (Peer p)
+       {
+               if (p != next) return; // We've already moved on to another peer
+               if (searchState != SENT) return;
+               node.log (this + " accepted timeout waiting for " + p);
+               forwardSearch(); // Try another peer
+       }
+       
+       private void searchTimeout (Peer p)
+       {
+               if (p != next) return; // We've already moved on to another peer
+               if (searchState != ACCEPTED) return;
+               node.log (this + " search timeout waiting for " + p);
+               if (prev == null) node.log (this + " failed");
+               searchState = COMPLETED;
+               considerFinishing();
+       }
+       
+       private void dataTimeout()
+       {
+               if (inState != STARTED) return;
+               node.log (this + " data timeout waiting for " + prev);
+               if (prev == null) node.log (this + " failed");
+               else prev.sendMessage (new TransfersCompleted (id));
+               finish();
+       }
+       
+       private void transferInTimeout()
+       {
+               if (inState != TRANSFERRING) return;
+               node.log (this + " transfer timeout receiving from " + prev);
+               if (prev == null) node.log (this + " failed");
+               else prev.sendMessage (new TransfersCompleted (id));
+               finish();
+       }
+       
+       private void transferOutTimeout (Peer p)
+       {
+               if (!receivers.remove (p)) return;
+               node.log (this + " transfer timeout sending to " + p);
+               considerFinishing();
+       }
+       
+       // EventTarget interface
+       public void handleEvent (int type, Object data)
+       {
+               switch (type) {
+                       case ACCEPTED_TIMEOUT:
+                       acceptedTimeout ((Peer) data);
+                       break;
+                       
+                       case SEARCH_TIMEOUT:
+                       searchTimeout ((Peer) data);
+                       break;
+                       
+                       case DATA_TIMEOUT:
+                       dataTimeout();
+                       break;
+                       
+                       case TRANSFER_IN_TIMEOUT:
+                       transferInTimeout();
+                       break;
+                       
+                       case TRANSFER_OUT_TIMEOUT:
+                       transferOutTimeout ((Peer) data);
+                       break;
+               }
+       }
+       
+       // Each EventTarget class has its own event codes
+       private final static int ACCEPTED_TIMEOUT = 1;
+       private final static int SEARCH_TIMEOUT = 2;
+       private final static int DATA_TIMEOUT = 3;
+       private final static int TRANSFER_IN_TIMEOUT = 4;
+       private final static int TRANSFER_OUT_TIMEOUT = 5;
+}


Reply via email to