Author: mrogers
Date: 2006-11-02 15:00:48 +0000 (Thu, 02 Nov 2006)
New Revision: 10796
Modified:
trunk/apps/load-balancing-sims/phase7/sim/Sim.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
Log:
Allow blocks and headers to arrive in any order
Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-02 13:23:48 UTC
(rev 10795)
+++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2006-11-02 15:00:48 UTC
(rev 10796)
@@ -7,11 +7,10 @@
private final int DEGREE = 5; // Average degree
private final double SPEED = 40000; // Network speed, bytes per second
private final double LATENCY = 0.1; // Latency of all links in seconds
- private final double RATE = 1/120.0; // Inserts per publisher per second
- private final int INSERTS = 60; // Number of inserts per publisher
+ private final int INSERTS = 100; // Number of inserts per publisher
private Node[] nodes;
- public Sim()
+ public Sim (double rate)
{
Network.reorder = true;
Network.lossRate = 0.001;
@@ -25,7 +24,7 @@
// One in ten nodes is a publisher, each with ten readers
for (int i = 0; i < NODES; i += 10) {
SimplePublisher pub
- = new SimplePublisher (RATE, INSERTS, nodes[i]);
+ = new SimplePublisher (rate, INSERTS, nodes[i]);
int readers = 0;
while (readers < 10) {
int index = (int) (Math.random() * NODES);
@@ -68,6 +67,9 @@
public static void main (String[] args)
{
- new Sim();
+ if (args.length != 1) System.exit (1);
+ double interval = Double.parseDouble (args[0]);
+ if (interval < 1.0) System.exit (1);
+ new Sim (1.0 / interval);
}
}
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-02 13:23:48 UTC (rev 10795)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java
2006-11-02 15:00:48 UTC (rev 10796)
@@ -61,8 +61,13 @@
inState = TRANSFERRING;
// Start the search
forwardSearch();
+ // If we have all the blocks and the headers, consider finishing
+ if (blocksReceived == 32 && inState == TRANSFERRING) {
+ inState = COMPLETED;
+ considerFinishing();
+ }
// Wait for transfer to complete (FIXME: check real timeout)
- Event.schedule (this, 120.0, TRANSFER_IN_TIMEOUT, null);
+ else Event.schedule (this, 120.0, TRANSFER_IN_TIMEOUT, null);
}
private void handleBlock (Block b)
@@ -72,10 +77,9 @@
blocks[b.index] = b;
blocksReceived++;
// Forward the block to all receivers
- if (inState == TRANSFERRING)
- for (Peer p : receivers) p.sendMessage (b);
- // If the transfer is complete, consider finishing
- if (blocksReceived == 32) {
+ for (Peer p : receivers) p.sendMessage (b);
+ // If we have all the blocks and the headers, consider finishing
+ if (blocksReceived == 32 && inState == TRANSFERRING) {
inState = COMPLETED;
considerFinishing();
}
Modified:
trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-02 13:23:48 UTC (rev 10795)
+++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkRequestHandler.java
2006-11-02 15:00:48 UTC (rev 10796)
@@ -6,13 +6,13 @@
public class ChkRequestHandler extends RequestHandler
{
- private Block[] blocks; // Store incoming blocks for forwarding
+ private boolean[] blocks; // Keep track of blocks received
private int blocksReceived = 0;
public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
{
super (r, node, prev);
- blocks = new Block[32];
+ blocks = new boolean[32];
}
public void handleMessage (Message m, Peer src)
@@ -40,32 +40,30 @@
{
if (searchState != ACCEPTED) node.log (df + " out of order");
searchState = TRANSFERRING;
- if (prev != null) {
- // Forward the message & all previously received blocks
- prev.sendMessage (df);
- for (int i = 0; i < 32; i++)
- if (blocks[i] != null)
- prev.sendMessage (blocks[i]);
+ if (prev != null) prev.sendMessage (df); // Forward the message
+ // If we have all the blocks and the headers, cache the data
+ if (blocksReceived == 32 && searchState == TRANSFERRING) {
+ node.cacheChk (key);
+ if (prev == null) node.log (this + " succeeded");
+ finish();
}
// Wait for the transfer to complete (FIXME: check real timeout)
- Event.schedule (this, 120.0, TRANSFER_TIMEOUT, next);
+ else Event.schedule (this, 120.0, TRANSFER_TIMEOUT, next);
}
private void handleBlock (Block b)
{
if (searchState != TRANSFERRING) node.log (b + " out of order");
- if (blocks[b.index] != null) return; // Ignore duplicates
- blocks[b.index] = b;
+ if (blocks[b.index]) return; // Ignore duplicates
+ blocks[b.index] = true;
blocksReceived++;
- if (searchState == TRANSFERRING) {
- // Forward the block
- if (prev != null) {
- node.log ("forwarding " + b);
- prev.sendMessage (b);
- }
+ // Forward the block
+ if (prev != null) {
+ node.log ("forwarding " + b);
+ prev.sendMessage (b);
}
- // If the transfer is complete, cache the data
- if (blocksReceived == 32) {
+ // If we have all the blocks and the headers, cache the data
+ if (blocksReceived == 32 && searchState == TRANSFERRING) {
node.cacheChk (key);
if (prev == null) node.log (this + " succeeded");
finish();