Author: mrogers
Date: 2006-09-14 14:13:11 +0000 (Thu, 14 Sep 2006)
New Revision: 10465

Modified:
   trunk/apps/load-balancing-sims/phase6/Node.java
Log:
CHK inserts, sorry about the large number of commits, I accidentally added 
classfiles to the repository and I'm trying to fix it

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-09-14 14:13:08 UTC 
(rev 10464)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-09-14 14:13:11 UTC 
(rev 10465)
@@ -12,28 +12,26 @@
        public final static double SHORT_SLEEP = 0.05; // Poll the bw limiter

        // Token bucket bandwidth limiter
-       public final static int BUCKET_RATE = 15000; // Bytes per second
+       public final static int BUCKET_RATE = 30000; // Bytes per second
        public final static int BUCKET_SIZE = 60000; // Burst size in bytes

        public double location; // Routing location
        public NetworkInterface net;
        private HashMap<Integer,Peer> peers; // Look up a peer by its address
        private HashSet<Integer> recentlySeenRequests; // Request IDs
-       private HashMap<Integer,ChkRequestHandler> chkRequests; // By ID
+       private HashMap<Integer,MessageHandler> messageHandlers; // By ID
        private LruCache<Integer> chkStore; // CHK datastore
        private LruCache<Integer> chkCache; // CHK datacache
        public TokenBucket bandwidth; // Bandwidth limiter
        private boolean timerRunning = false; // Is the timer running?

-       public boolean faulty = false; // DEBUG
-       
        public Node (double txSpeed, double rxSpeed)
        {
                location = Math.random();
                net = new NetworkInterface (this, txSpeed, rxSpeed);
                peers = new HashMap<Integer,Peer>();
                recentlySeenRequests = new HashSet<Integer>();
-               chkRequests = new HashMap<Integer,ChkRequestHandler>();
+               messageHandlers = new HashMap<Integer,MessageHandler>();
                chkStore = new LruCache<Integer> (STORE_SIZE);
                chkCache = new LruCache<Integer> (CACHE_SIZE);
                bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
@@ -102,6 +100,7 @@
                        log ("key " + key + " added to CHK store");
                        chkStore.put (key);
                }
+               else log ("key " + key + " not added to CHK store");
        }

        // Called by Peer
@@ -117,20 +116,22 @@
        public void handlePacket (Packet packet)
        {
                Peer peer = peers.get (packet.src);
-               if (peer == null) log ("unknown peer!");
+               if (peer == null) log ("received packet from unknown peer");
                else peer.handlePacket (packet);
        }

        // Called by Peer
        public void handleMessage (Message m, Peer src)
        {
-               log ("received " + m);
+               if (src != null) log ("received " + m + " from " + src);
                if (m instanceof ChkRequest)
                        handleChkRequest ((ChkRequest) m, src);
+               else if (m instanceof ChkInsert)
+                       handleChkInsert ((ChkInsert) m, src);
                else {
-                       ChkRequestHandler rh = chkRequests.get (m.id);
-                       if (rh == null) log ("no request handler for " + m);
-                       else rh.handleMessage (m, src);
+                       MessageHandler mh = messageHandlers.get (m.id);
+                       if (mh == null) log ("no message handler for " + m);
+                       else mh.handleMessage (m, src);
                }
        }

@@ -139,12 +140,12 @@
                if (!recentlySeenRequests.add (r.id)) {
                        log ("rejecting recently seen " + r);
                        prev.sendMessage (new RejectedLoop (r.id));
-                       // Don't forward the same request back to prev
-                       ChkRequestHandler rh = chkRequests.get (r.id);
-                       if (rh != null) rh.removeNextHop (prev);
+                       // Don't forward the same search back to prev
+                       MessageHandler mh = messageHandlers.get (r.id);
+                       if (mh != null) mh.removeNextHop (prev);
                        return;
                }
-               // Accept the request
+               // Accept the search
                if (prev != null) {
                        log ("accepting " + r);
                        prev.sendMessage (new Accepted (r.id));
@@ -153,12 +154,7 @@
                if (chkStore.get (r.key)) {
                        log ("key " + r.key + " found in CHK store");
                        if (prev == null) log (r + " succeeded locally");
-                       else {
-                               prev.sendMessage (new ChkDataFound (r.id));
-                               // DEBUG
-                               if (!faulty) for (int i = 0; i < 32; i++)
-                                       prev.sendMessage (new Block (r.id, i));
-                       }
+                       else prev.sendMessage (new ChkDataFound (r.id));
                        chkRequestCompleted (r.id);
                        return;
                }
@@ -167,28 +163,49 @@
                if (chkCache.get (r.key)) {
                        log ("key " + r.key + " found in CHK cache");
                        if (prev == null) log (r + " succeeded locally");
-                       else {
-                               prev.sendMessage (new ChkDataFound (r.id));
-                               // DEBUG
-                               if (!faulty) for (int i = 0; i < 32; i++)
-                                       prev.sendMessage (new Block (r.id, i));
-                       }
+                       else prev.sendMessage (new ChkDataFound (r.id));
                        chkRequestCompleted (r.id);
                        return;
                }
                log ("key " + r.key + " not found in CHK cache");
-               // Forward the request and store the request state
+               // Store the request handler and forward the search
                ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
-               chkRequests.put (r.id, rh);
-               rh.forwardRequest();
+               messageHandlers.put (r.id, rh);
+               rh.forwardSearch();
        }

+       private void handleChkInsert (ChkInsert i, Peer prev)
+       {
+               if (!recentlySeenRequests.add (i.id)) {
+                       log ("rejecting recently seen " + i);
+                       prev.sendMessage (new RejectedLoop (i.id));
+                       // Don't forward the same search back to prev
+                       MessageHandler mh = messageHandlers.get (i.id);
+                       if (mh != null) mh.removeNextHop (prev);
+                       return;
+               }
+               // Accept the search
+               if (prev != null) {
+                       log ("accepting " + i);
+                       prev.sendMessage (new Accepted (i.id));
+               }
+               // Store the insert handler and wait for a DataInsert
+               ChkInsertHandler ih = new ChkInsertHandler (i, this, prev);
+               messageHandlers.put (i.id, ih);
+       }
+       
        // Remove a completed request from the list of pending requests
        public void chkRequestCompleted (int id)
        {
-               chkRequests.remove (id);
+               messageHandlers.remove (id);
        }

+       // Remove a completed insert from the list of pending inserts
+       public void chkInsertCompleted (int id)
+       {
+               messageHandlers.remove (id);
+       }
+       
        // Return the list of peers in a random order
        public ArrayList<Peer> peers()
        {
@@ -205,12 +222,23 @@
        // Event callback
        private void generateRequest (int key)
        {
-               ChkRequest r = new ChkRequest (key, location);
-               log ("generating " + r);
-               handleChkRequest (r, null);
+               ChkRequest cr = new ChkRequest (key, location);
+               log ("generating " + cr);
+               handleChkRequest (cr, null);
        }

        // Event callback
+       private void generateInsert (int key)
+       {
+               ChkInsert ci = new ChkInsert (key, location);
+               log ("generating " + ci);
+               handleChkInsert (ci, null);
+               handleMessage (new DataInsert (ci.id), null);
+               for (int i = 0; i < 32; i++)
+                       handleMessage (new Block (ci.id, i), null);
+       }
+       
+       // Event callback
        private void checkTimeouts()
        {
                // Check the peers in a random order each time
@@ -224,7 +252,7 @@
                else {
                        double sleep = deadline - Event.time(); // Can be < 0
                        if (sleep < MIN_SLEEP) sleep = MIN_SLEEP;
-                       log ("sleeping for " + sleep + " seconds");
+                       // log ("sleeping for " + sleep + " seconds");
                        Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
                }
        }
@@ -232,11 +260,23 @@
        // EventTarget interface
        public void handleEvent (int type, Object data)
        {
-               if (type == GENERATE_REQUEST) generateRequest ((Integer) data);
-               else if (type == CHECK_TIMEOUTS) checkTimeouts();
+               switch (type) {
+                       case GENERATE_REQUEST:
+                       generateRequest ((Integer) data);
+                       break;
+                       
+                       case GENERATE_INSERT:
+                       generateInsert ((Integer)data);
+                       break;
+                       
+                       case CHECK_TIMEOUTS:
+                       checkTimeouts();
+                       break;
+               }
        }

        // Each EventTarget class has its own event codes
        public final static int GENERATE_REQUEST = 1;
-       public final static int CHECK_TIMEOUTS = 2;
+       public final static int GENERATE_INSERT = 2;
+       private final static int CHECK_TIMEOUTS = 3;
 }


Reply via email to