Author: mrogers
Date: 2006-11-02 15:00:48 +0000 (Thu, 02 Nov 2006)
New Revision: 10796

Modified:
   trunk/apps/load-balancing-sims/phase7/sim/Sim.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
   trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
Log:
Allow blocks and headers to arrive in any order

Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Sim.java  2006-11-02 13:23:48 UTC 
(rev 10795)
+++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java  2006-11-02 15:00:48 UTC 
(rev 10796)
@@ -7,11 +7,10 @@
        private final int DEGREE = 5; // Average degree
        private final double SPEED = 40000; // Network speed, bytes per second
        private final double LATENCY = 0.1; // Latency of all links in seconds
-       private final double RATE = 1/120.0; // Inserts per publisher per second
-       private final int INSERTS = 60; // Number of inserts per publisher
+       private final int INSERTS = 100; // Number of inserts per publisher
        private Node[] nodes;

-       public Sim()
+       public Sim (double rate)
        {
                Network.reorder = true;
                Network.lossRate = 0.001;
@@ -25,7 +24,7 @@
                // One in ten nodes is a publisher, each with ten readers
                for (int i = 0; i < NODES; i += 10) {
                        SimplePublisher pub
-                               = new SimplePublisher (RATE, INSERTS, nodes[i]);
+                               = new SimplePublisher (rate, INSERTS, nodes[i]);
                        int readers = 0;
                        while (readers < 10) {
                                int index = (int) (Math.random() * NODES);
@@ -68,6 +67,9 @@

        public static void main (String[] args)
        {
-               new Sim();
+               if (args.length != 1) System.exit (1);
+               double interval = Double.parseDouble (args[0]);
+               if (interval < 1.0) System.exit (1);
+               new Sim (1.0 / interval);
        }
 }

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-02 13:23:48 UTC (rev 10795)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java    
2006-11-02 15:00:48 UTC (rev 10796)
@@ -61,8 +61,13 @@
                inState = TRANSFERRING;
                // Start the search
                forwardSearch();
+               // If we have all the blocks and the headers, consider finishing
+               if (blocksReceived == 32 && inState == TRANSFERRING) {
+                       inState = COMPLETED;
+                       considerFinishing();
+               }
                // Wait for transfer to complete (FIXME: check real timeout)
-               Event.schedule (this, 120.0, TRANSFER_IN_TIMEOUT, null);
+               else Event.schedule (this, 120.0, TRANSFER_IN_TIMEOUT, null);
        }

        private void handleBlock (Block b)
@@ -72,10 +77,9 @@
                blocks[b.index] = b;
                blocksReceived++;
                // Forward the block to all receivers
-               if (inState == TRANSFERRING)
-                       for (Peer p : receivers) p.sendMessage (b);
-               // If the transfer is complete, consider finishing
-               if (blocksReceived == 32) {
+               for (Peer p : receivers) p.sendMessage (b);
+               // If we have all the blocks and the headers, consider finishing
+               if (blocksReceived == 32 && inState == TRANSFERRING) {
                        inState = COMPLETED;
                        considerFinishing();
                }

Modified: 
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-02 13:23:48 UTC (rev 10795)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java   
2006-11-02 15:00:48 UTC (rev 10796)
@@ -6,13 +6,13 @@

 public class ChkRequestHandler extends RequestHandler
 {
-       private Block[] blocks; // Store incoming blocks for forwarding
+       private boolean[] blocks; // Keep track of blocks received
        private int blocksReceived = 0;

        public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
        {
                super (r, node, prev);
-               blocks = new Block[32];
+               blocks = new boolean[32];
        }

        public void handleMessage (Message m, Peer src)
@@ -40,32 +40,30 @@
        {
                if (searchState != ACCEPTED) node.log (df + " out of order");
                searchState = TRANSFERRING;
-               if (prev != null) {
-                       // Forward the message & all previously received blocks
-                       prev.sendMessage (df);
-                       for (int i = 0; i < 32; i++)
-                               if (blocks[i] != null)
-                                       prev.sendMessage (blocks[i]);
+               if (prev != null) prev.sendMessage (df); // Forward the message
+               // If we have all the blocks and the headers, cache the data
+               if (blocksReceived == 32 && searchState == TRANSFERRING) {
+                       node.cacheChk (key);
+                       if (prev == null) node.log (this + " succeeded");
+                       finish();
                }
                // Wait for the transfer to complete (FIXME: check real timeout)
-               Event.schedule (this, 120.0, TRANSFER_TIMEOUT, next);
+               else Event.schedule (this, 120.0, TRANSFER_TIMEOUT, next);
        }

        private void handleBlock (Block b)
        {
                if (searchState != TRANSFERRING) node.log (b + " out of order");
-               if (blocks[b.index] != null) return; // Ignore duplicates
-               blocks[b.index] = b;
+               if (blocks[b.index]) return; // Ignore duplicates
+               blocks[b.index] = true;
                blocksReceived++;
-               if (searchState == TRANSFERRING) {
-                       // Forward the block
-                       if (prev != null) {
-                               node.log ("forwarding " + b);
-                               prev.sendMessage (b);
-                       }
+               // Forward the block
+               if (prev != null) {
+                       node.log ("forwarding " + b);
+                       prev.sendMessage (b);
                }
-               // If the transfer is complete, cache the data
-               if (blocksReceived == 32) {
+               // If we have all the blocks and the headers, cache the data
+               if (blocksReceived == 32 && searchState == TRANSFERRING) {
                        node.cacheChk (key);
                        if (prev == null) node.log (this + " succeeded");
                        finish();


Reply via email to