Author: mrogers
Date: 2006-10-31 11:01:12 +0000 (Tue, 31 Oct 2006)
New Revision: 10749
Added:
trunk/apps/load-balancing-sims/phase7/
trunk/apps/load-balancing-sims/phase7/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase7/ChkRequestHandler.java
trunk/apps/load-balancing-sims/phase7/CongestionWindow.java
trunk/apps/load-balancing-sims/phase7/DeadlineQueue.java
trunk/apps/load-balancing-sims/phase7/Event.java
trunk/apps/load-balancing-sims/phase7/EventTarget.java
trunk/apps/load-balancing-sims/phase7/LruCache.java
trunk/apps/load-balancing-sims/phase7/LruMap.java
trunk/apps/load-balancing-sims/phase7/MessageHandler.java
trunk/apps/load-balancing-sims/phase7/Network.java
trunk/apps/load-balancing-sims/phase7/NetworkInterface.java
trunk/apps/load-balancing-sims/phase7/Node.java
trunk/apps/load-balancing-sims/phase7/Packet.java
trunk/apps/load-balancing-sims/phase7/Peer.java
trunk/apps/load-balancing-sims/phase7/RequestHandler.java
trunk/apps/load-balancing-sims/phase7/Sim.java
trunk/apps/load-balancing-sims/phase7/SskInsertHandler.java
trunk/apps/load-balancing-sims/phase7/SskRequestHandler.java
trunk/apps/load-balancing-sims/phase7/TokenBucket.java
trunk/apps/load-balancing-sims/phase7/handlers/
trunk/apps/load-balancing-sims/phase7/messages/
Removed:
trunk/apps/load-balancing-sims/phase7/ChkInsertHandler.java
trunk/apps/load-balancing-sims/phase7/ChkRequestHandler.java
trunk/apps/load-balancing-sims/phase7/CongestionWindow.java
trunk/apps/load-balancing-sims/phase7/DeadlineQueue.java
trunk/apps/load-balancing-sims/phase7/Event.java
trunk/apps/load-balancing-sims/phase7/EventTarget.java
trunk/apps/load-balancing-sims/phase7/LruCache.java
trunk/apps/load-balancing-sims/phase7/LruMap.java
trunk/apps/load-balancing-sims/phase7/MessageHandler.java
trunk/apps/load-balancing-sims/phase7/Network.java
trunk/apps/load-balancing-sims/phase7/NetworkInterface.java
trunk/apps/load-balancing-sims/phase7/Node.java
trunk/apps/load-balancing-sims/phase7/Packet.java
trunk/apps/load-balancing-sims/phase7/Peer.java
trunk/apps/load-balancing-sims/phase7/RequestHandler.java
trunk/apps/load-balancing-sims/phase7/Sim.java
trunk/apps/load-balancing-sims/phase7/SskInsertHandler.java
trunk/apps/load-balancing-sims/phase7/SskRequestHandler.java
trunk/apps/load-balancing-sims/phase7/TokenBucket.java
trunk/apps/load-balancing-sims/phase7/messages/
Log:
Starting phase 7: traffic generators and token-passing
Copied: trunk/apps/load-balancing-sims/phase7 (from rev 10745,
trunk/apps/load-balancing-sims/phase6)
Deleted: trunk/apps/load-balancing-sims/phase7/ChkInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/ChkInsertHandler.java 2006-10-31
11:01:12 UTC (rev 10749)
@@ -1,268 +0,0 @@
-// The state of a CHK insert as stored at each node along the path
-
-import java.util.HashSet;
-import messages.*;
-
-class ChkInsertHandler extends MessageHandler implements EventTarget
-{
- private int inState = STARTED; // State of incoming transfer
- private HashSet<Peer> receivers; // Peers that should receive data
- private Block[] blocks; // Store incoming blocks for forwarding
- private int blocksReceived = 0;
-
- public ChkInsertHandler (ChkInsert i, Node node, Peer prev)
- {
- super (i, node, prev);
- receivers = new HashSet<Peer>();
- blocks = new Block[32];
- }
-
- public void start()
- {
- // Wait 10 seconds for the incoming transfer to start
- Event.schedule (this, 10.0, DATA_TIMEOUT, null);
- }
-
- public void handleMessage (Message m, Peer src)
- {
- if (src == prev) {
- if (m instanceof DataInsert)
- handleDataInsert ((DataInsert) m);
- else if (m instanceof Block)
- handleBlock ((Block) m);
- else node.log ("unexpected type for " + m);
- }
- else if (src == next) {
- if (m instanceof Accepted)
- handleAccepted ((Accepted) m);
- else if (m instanceof RejectedLoop)
- handleRejectedLoop ((RejectedLoop) m);
- else if (m instanceof RouteNotFound)
- handleRouteNotFound ((RouteNotFound) m);
- else if (m instanceof InsertReply)
- handleInsertReply ((InsertReply) m);
- else if (m instanceof TransfersCompleted)
- handleCompleted ((TransfersCompleted) m, src);
- else node.log ("unexpected type for " + m);
- }
- else if (receivers.contains (src)) {
- if (m instanceof TransfersCompleted)
- handleCompleted ((TransfersCompleted) m, src);
- else node.log ("unexpected type for " + m);
- }
- else node.log ("unexpected source for " + m);
- }
-
- private void handleDataInsert (DataInsert di)
- {
- if (inState != STARTED) node.log (di + " out of order");
- inState = TRANSFERRING;
- // Start the search
- forwardSearch();
- // Wait for transfer to complete (FIXME: check real timeout)
- Event.schedule (this, 120.0, TRANSFER_IN_TIMEOUT, null);
- }
-
- private void handleBlock (Block b)
- {
- if (inState != TRANSFERRING) node.log (b + " out of order");
- if (blocks[b.index] != null) return; // Ignore duplicates
- blocks[b.index] = b;
- blocksReceived++;
- // Forward the block to all receivers
- for (Peer p : receivers) p.sendMessage (b);
- // If the transfer is complete, consider finishing
- if (blocksReceived == 32) {
- inState = COMPLETED;
- considerFinishing();
- }
- }
-
- private void handleCompleted (TransfersCompleted tc, Peer src)
- {
- receivers.remove (src);
- considerFinishing();
- }
-
- private void handleAccepted (Accepted a)
- {
- if (searchState != SENT) node.log (a + " out of order");
- searchState = ACCEPTED;
- // Wait 120 seconds for a reply to the search
- Event.schedule (this, 120.0, SEARCH_TIMEOUT, next);
- // Add the next hop to the list of receivers
- receivers.add (next);
- next.sendMessage (new DataInsert (id));
- // Send all previously received blocks
- for (int i = 0; i < 32; i++)
- if (blocks[i] != null) next.sendMessage (blocks[i]);
- // Wait for TransfersCompleted (FIXME: check real timeout)
- Event.schedule (this, 240.0, TRANSFER_OUT_TIMEOUT, next);
- }
-
- private void handleRejectedLoop (RejectedLoop rl)
- {
- if (searchState != SENT) node.log (rl + " out of order");
- forwardSearch();
- }
-
- private void handleRouteNotFound (RouteNotFound rnf)
- {
- if (searchState != ACCEPTED) node.log (rnf + " out of order");
- if (rnf.htl < htl) htl = rnf.htl;
- // Use the remaining htl to try another peer
- forwardSearch();
- }
-
- private void handleInsertReply (InsertReply ir)
- {
- if (searchState != ACCEPTED) node.log (ir + " out of order");
- if (prev == null) node.log (this + " succeeded");
- else prev.sendMessage (ir); // Forward the message
- searchState = COMPLETED;
- considerFinishing();
- }
-
- public void forwardSearch()
- {
- next = null;
- // If the search has run out of hops, send InsertReply
- if (htl == 0) {
- node.log (this + " has no hops left");
- if (prev == null) node.log (this + " succeeded");
- else prev.sendMessage (new InsertReply (id));
- searchState = COMPLETED;
- considerFinishing();
- return;
- }
- // Forward the search to the closest remaining peer
- next = closestPeer();
- if (next == null) {
- node.log ("route not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new RouteNotFound (id, htl));
- searchState = COMPLETED;
- considerFinishing();
- return;
- }
- // Decrement the htl if the next node is not the closest so far
- double target = Node.keyToLocation (key);
- if (Node.distance (target, next.location)
- > Node.distance (target, closest))
- htl = node.decrementHtl (htl);
- node.log (this + " has htl " + htl);
- node.log ("forwarding " + this + " to " + next.address);
- next.sendMessage (makeSearchMessage());
- nexts.remove (next);
- searchState = SENT;
- // Wait 10 seconds for the next hop to accept the search
- Event.schedule (this, 10.0, ACCEPTED_TIMEOUT, next);
- }
-
- private void considerFinishing()
- {
- // An insert finishes when the search, the incoming transfer
- // and all outgoing transfers are complete
- if (searchState == COMPLETED && inState == COMPLETED
- && receivers.isEmpty()) finish();
- }
-
- private void finish()
- {
- inState = COMPLETED;
- searchState = COMPLETED;
- node.cacheChk (key);
- node.storeChk (key);
- if (prev == null) node.log (this + " completed");
- else prev.sendMessage (new TransfersCompleted (id));
- node.removeMessageHandler (id);
- }
-
- protected Search makeSearchMessage()
- {
- return new ChkInsert (id, key, closest, htl);
- }
-
- public String toString()
- {
- return new String ("CHK insert (" + id + "," + key + ")");
- }
-
- // Event callbacks
-
- private void acceptedTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != SENT) return;
- node.log (this + " accepted timeout waiting for " + p);
- forwardSearch(); // Try another peer
- }
-
- private void searchTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != ACCEPTED) return;
- node.log (this + " search timeout waiting for " + p);
- if (prev == null) node.log (this + " failed");
- searchState = COMPLETED;
- considerFinishing();
- }
-
- private void dataTimeout()
- {
- if (inState != STARTED) return;
- node.log (this + " data timeout waiting for " + prev);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new TransfersCompleted (id));
- finish();
- }
-
- private void transferInTimeout()
- {
- if (inState != TRANSFERRING) return;
- node.log (this + " transfer timeout receiving from " + prev);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new TransfersCompleted (id));
- finish();
- }
-
- private void transferOutTimeout (Peer p)
- {
- if (!receivers.remove (p)) return;
- node.log (this + " transfer timeout sending to " + p);
- considerFinishing();
- }
-
- // EventTarget interface
- public void handleEvent (int type, Object data)
- {
- switch (type) {
- case ACCEPTED_TIMEOUT:
- acceptedTimeout ((Peer) data);
- break;
-
- case SEARCH_TIMEOUT:
- searchTimeout ((Peer) data);
- break;
-
- case DATA_TIMEOUT:
- dataTimeout();
- break;
-
- case TRANSFER_IN_TIMEOUT:
- transferInTimeout();
- break;
-
- case TRANSFER_OUT_TIMEOUT:
- transferOutTimeout ((Peer) data);
- break;
- }
- }
-
- // Each EventTarget class has its own event codes
- private final static int ACCEPTED_TIMEOUT = 1;
- private final static int SEARCH_TIMEOUT = 2;
- private final static int DATA_TIMEOUT = 3;
- private final static int TRANSFER_IN_TIMEOUT = 4;
- private final static int TRANSFER_OUT_TIMEOUT = 5;
-}
Copied: trunk/apps/load-balancing-sims/phase7/ChkInsertHandler.java (from rev
10748, trunk/apps/load-balancing-sims/phase6/ChkInsertHandler.java)
Deleted: trunk/apps/load-balancing-sims/phase7/ChkRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java
2006-10-29 23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/ChkRequestHandler.java
2006-10-31 11:01:12 UTC (rev 10749)
@@ -1,74 +0,0 @@
-// The state of a CHK request as stored at each node along the path
-
-import messages.*;
-
-class ChkRequestHandler extends RequestHandler
-{
- private boolean[] received; // Keep track of received blocks
- private int blocksReceived = 0;
-
- public ChkRequestHandler (ChkRequest r, Node node, Peer prev)
- {
- super (r, node, prev);
- received = new boolean[32];
- }
-
- public void handleMessage (Message m, Peer src)
- {
- if (src != next) {
- node.log ("unexpected source for " + m);
- return;
- }
- if (m instanceof Accepted)
- handleAccepted ((Accepted) m);
- else if (m instanceof RejectedLoop)
- handleRejectedLoop ((RejectedLoop) m);
- else if (m instanceof RouteNotFound)
- handleRouteNotFound ((RouteNotFound) m);
- else if (m instanceof DataNotFound)
- handleDataNotFound ((DataNotFound) m);
- else if (m instanceof ChkDataFound)
- handleChkDataFound ((ChkDataFound) m);
- else if (m instanceof Block)
- handleBlock ((Block) m);
- else node.log ("unexpected type for " + m);
- }
-
- private void handleChkDataFound (ChkDataFound df)
- {
- if (searchState != ACCEPTED) node.log (df + " out of order");
- searchState = TRANSFERRING;
- if (prev != null) prev.sendMessage (df); // Forward the message
- // Wait for the transfer to complete (FIXME: check real timeout)
- Event.schedule (this, 120.0, TRANSFER_TIMEOUT, next);
- }
-
- private void handleBlock (Block b)
- {
- if (searchState != TRANSFERRING) node.log (b + " out of order");
- if (received[b.index]) return; // Ignore duplicates
- received[b.index] = true;
- blocksReceived++;
- // Forward the block
- if (prev != null) {
- node.log ("forwarding " + b);
- prev.sendMessage (b);
- }
- // If the transfer is complete, cache the data
- if (blocksReceived == 32) {
- node.cacheChk (key);
- if (prev == null) node.log (this + " succeeded");
- finish();
- }
- }
-
- protected Search makeSearchMessage()
- {
- return new ChkRequest (id, key, closest, htl);
- }
-
- public String toString()
- {
- return new String ("CHK request (" + id + "," + key + ")");
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/ChkRequestHandler.java (from rev
10748, trunk/apps/load-balancing-sims/phase6/ChkRequestHandler.java)
Deleted: trunk/apps/load-balancing-sims/phase7/CongestionWindow.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/CongestionWindow.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/CongestionWindow.java 2006-10-31
11:01:12 UTC (rev 10749)
@@ -1,74 +0,0 @@
-// AIMD congestion control
-
-class CongestionWindow
-{
- public final static int MIN_CWIND = Packet.MAX_SIZE; // Min window size
- public final static int MAX_CWIND = 1000000; // Max window size
- public final static double ALPHA = 0.3125; // AIMD increase parameter
- public final static double BETA = 0.875; // AIMD decrease parameter
- public final static double GAMMA = 3.0; // Slow start divisor
-
- private double cwind = MIN_CWIND; // Size of window in bytes
- private int inflight = 0; // Bytes sent but not acked
- private boolean slowStart = true; // Are we in the slow start phase?
- private Peer peer; // The owner
-
- public CongestionWindow (Peer peer)
- {
- this.peer = peer;
- }
-
- public void reset()
- {
- peer.log ("congestion window decreased to " + MIN_CWIND);
- cwind = MIN_CWIND;
- peer.log ("returning to slow start");
- slowStart = true;
- }
-
- public int available()
- {
- return (int) cwind - inflight;
- }
-
- // Put bytes in flight
- public void bytesSent (int bytes)
- {
- inflight += bytes;
- peer.log (inflight + " bytes in flight");
- }
-
- // Take bytes out of flight
- public void bytesAcked (int bytes)
- {
- inflight -= bytes;
- peer.log (inflight + " bytes in flight");
- // Increase the window
- if (slowStart) cwind += bytes / GAMMA;
- else cwind += bytes * bytes * ALPHA / cwind;
- if (cwind > MAX_CWIND) cwind = MAX_CWIND;
- peer.log ("congestion window increased to " + cwind);
- }
-
- // Decrease the window when a packet is fast retransmitted
- public void fastRetransmission (double now)
- {
- peer.log (inflight + " bytes in flight");
- cwind *= BETA;
- if (cwind < MIN_CWIND) cwind = MIN_CWIND;
- peer.log ("congestion window decreased to " + cwind);
- // The slow start phase ends when the first packet is lost
- if (slowStart) {
- peer.log ("leaving slow start");
- slowStart = false;
- }
- }
-
- // Decrease the window when a packet is retransmitted due to a timeout
- public void timeout (double now)
- {
- peer.log (inflight + " bytes in flight");
- if (slowStart) fastRetransmission (now); // Leave slow start
- else reset(); // Reset the window and return to slow start
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/CongestionWindow.java (from rev
10748, trunk/apps/load-balancing-sims/phase6/CongestionWindow.java)
Deleted: trunk/apps/load-balancing-sims/phase7/DeadlineQueue.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/DeadlineQueue.java 2006-10-31
11:01:12 UTC (rev 10749)
@@ -1,39 +0,0 @@
-// A queue storing outgoing messages and their coalescing deadlines
-
-import java.util.LinkedList;
-import messages.Message;
-
-class DeadlineQueue<MESSAGE extends Message>
-{
- public int size = 0; // Size in bytes
- private LinkedList<MESSAGE> messages = new LinkedList<MESSAGE>();
- private LinkedList<Double> deadlines = new LinkedList<Double>();
-
- public void add (MESSAGE m, double deadline)
- {
- size += m.size();
- messages.add (m);
- deadlines.add (deadline);
- }
-
- public int headSize()
- {
- if (messages.isEmpty()) return 0;
- else return messages.peek().size();
- }
-
- public double deadline()
- {
- Double deadline = deadlines.peek();
- if (deadline == null) return Double.POSITIVE_INFINITY;
- else return deadline;
- }
-
- public MESSAGE pop()
- {
- deadlines.poll();
- MESSAGE m = messages.poll();
- if (m != null) size -= m.size();
- return m;
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/DeadlineQueue.java (from rev
10748, trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java)
Deleted: trunk/apps/load-balancing-sims/phase7/Event.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Event.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/Event.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -1,98 +0,0 @@
-import java.util.TreeSet; // Gotta love the collections framework...
-
-class Event implements Comparable
-{
- // Static variables and methods for the event queue
-
- private static TreeSet<Event> queue = new TreeSet<Event>();
- private static double clockTime = 0.0;
- private static int nextId = 0;
- public static double duration = Double.POSITIVE_INFINITY;
-
- public static void reset()
- {
- queue.clear();
- clockTime = 0.0;
- nextId = 0;
- duration = Double.POSITIVE_INFINITY;
- }
-
- public static void schedule (EventTarget target, double time,
- int type, Object data)
- {
- queue.add (new Event (target, time + clockTime, type, data));
- }
-
- public static boolean nextEvent()
- {
- try {
- Event e = queue.first();
- queue.remove (e);
- // Update the clock
- clockTime = e.time;
- // Quit if the simulation's alloted time has run out
- if (clockTime > duration) return false;
- // Pass the packet to the target's callback method
- e.target.handleEvent (e.type, e.data);
- return true;
- }
- catch (java.util.NoSuchElementException x) {
- // No more events to dispatch
- return false;
- }
- }
-
- public static double time()
- {
- return clockTime;
- }
-
- public static void log (String message)
- {
- System.out.print (clockTime + " " + message + "\n");
- }
-
- // Run until the duration expires or there are no more events to process
- public static void run()
- {
- while (nextEvent()) {}
- }
-
- // Non-static variables and methods for individual events
-
- private EventTarget target;
- private double time;
- private int id;
- private int type;
- private Object data;
-
- public Event (EventTarget target, double time, int type, Object data)
- {
- this.target = target;
- this.time = time;
- this.type = type;
- this.data = data;
- id = nextId++;
- }
-
- // Must be consistent with compareTo()
- public boolean equals (Object o)
- {
- Event e = (Event) o;
- if (e.time == time && e.id == id) return true;
- return false;
- }
-
- // Must be consistent with equals()
- public int compareTo (Object o)
- {
- Event e = (Event) o;
- // Sort events by time (order of occurrence)
- if (e.time > time) return -1;
- if (e.time < time) return 1;
- // Break ties by ID (order of scheduling)
- if (e.id > id) return -1;
- if (e.id < id) return 1;
- return 0;
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/Event.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/Event.java)
Deleted: trunk/apps/load-balancing-sims/phase7/EventTarget.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/EventTarget.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/EventTarget.java 2006-10-31
11:01:12 UTC (rev 10749)
@@ -1,4 +0,0 @@
-interface EventTarget
-{
- public void handleEvent (int type, Object data);
-}
Copied: trunk/apps/load-balancing-sims/phase7/EventTarget.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/EventTarget.java)
Deleted: trunk/apps/load-balancing-sims/phase7/LruCache.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/LruCache.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/LruCache.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -1,46 +0,0 @@
-// Limited-capacity LRU cache
-
-import java.util.LinkedHashSet;
-
-class LruCache<Key>
-{
- public final int capacity;
- private LinkedHashSet<Key> set;
-
- public LruCache (int capacity)
- {
- this.capacity = capacity;
- set = new LinkedHashSet<Key> (capacity);
- }
-
- public boolean get (Key key)
- {
- log ("searching cache for key " + key);
- if (set.remove (key)) {
- set.add (key); // Move the key to the fresh end
- return true;
- }
- return false;
- }
-
- public void put (Key key)
- {
- if (set.remove (key))
- log ("key " + key + " already in cache");
- else {
- log ("adding key " + key + " to cache");
- if (set.size() == capacity) {
- // Discard the oldest element
- Key oldest = set.iterator().next();
- log ("discarding key " + oldest);
- set.remove (oldest);
- }
- }
- set.add (key); // Add or move the key to the fresh end
- }
-
- private void log (String message)
- {
- // Event.log (message);
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/LruCache.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/LruCache.java)
Deleted: trunk/apps/load-balancing-sims/phase7/LruMap.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/LruMap.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/LruMap.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -1,59 +0,0 @@
-// Limited-capacity LRU cache that stores a value for each key
-
-import java.util.LinkedHashSet;
-import java.util.HashMap;
-
-class LruMap<Key,Value>
-{
- public final int capacity;
- private LinkedHashSet<Key> set;
- private HashMap<Key,Value> map;
-
- public LruMap (int capacity)
- {
- this.capacity = capacity;
- set = new LinkedHashSet<Key> (capacity);
- map = new HashMap<Key,Value> (capacity);
- }
-
- public Value get (Key key)
- {
- log ("searching cache for key " + key);
- Value value = map.get (key);
- if (value != null) {
- // Move the key to the fresh end
- set.remove (key);
- set.add (key);
- }
- return value;
- }
-
- // Return the existing value (which is not replaced), or null
- public Value put (Key key, Value value)
- {
- Value existing = map.get (key);
- if (existing == null) {
- log ("adding key " + key + " to cache");
- if (set.size() == capacity) {
- // Discard the oldest element
- Key oldest = set.iterator().next();
- log ("discarding key " + oldest);
- set.remove (oldest);
- }
- map.put (key, value);
- return value;
- }
- else {
- log ("key " + key + " already in cache");
- // Move the key to the fresh end
- set.remove (key);
- set.add (key);
- return existing;
- }
- }
-
- private void log (String message)
- {
- // Event.log (message);
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/LruMap.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/LruMap.java)
Deleted: trunk/apps/load-balancing-sims/phase7/MessageHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/MessageHandler.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/MessageHandler.java 2006-10-31
11:01:12 UTC (rev 10749)
@@ -1,72 +0,0 @@
-// The state of a search as stored at each node along the path
-
-import java.util.LinkedList;
-import messages.Search;
-import messages.Message;
-
-abstract class MessageHandler
-{
- // State machine
- protected final static int STARTED = 0;
- protected final static int SENT = 1;
- protected final static int ACCEPTED = 2;
- protected final static int TRANSFERRING = 3;
- protected final static int COMPLETED = 4;
-
- protected final int id; // The unique ID of the request or insert
- protected final int key; // The target of the search
- protected double closest; // The closest location seen so far
- protected int htl; // Hops to live for backtracking
-
- protected Node node; // The owner of this MessageHandler
- protected Peer prev; // The previous hop of the search
- protected Peer next = null; // The (current) next hop of the search
- protected LinkedList<Peer> nexts; // Candidates for the next hop
- protected int searchState = STARTED; // The state of the search
-
- public MessageHandler (Search s, Node node, Peer prev)
- {
- id = s.id;
- key = s.key;
- closest = s.closest;
- htl = s.htl;
- this.node = node;
- this.prev = prev;
- nexts = new LinkedList<Peer> (node.peers());
- nexts.remove (prev);
- // If this is the closest location seen so far, reset htl
- double target = Node.keyToLocation (key);
- if (Node.distance (target, node.location)
- < Node.distance (target, closest)) {
- node.log ("resetting htl of " + this); // FIXME
- closest = node.location;
- htl = Search.MAX_HTL;
- }
- }
-
- // Remove a peer from the list of candidates for the next hop
- public void removeNextHop (Peer p)
- {
- nexts.remove (p);
- }
-
- // Find the closest remaining peer
- protected Peer closestPeer ()
- {
- double keyLoc = Node.keyToLocation (key);
- double closestDist = Double.POSITIVE_INFINITY;
- Peer closestPeer = null;
- for (Peer peer : nexts) {
- double dist = Node.distance (keyLoc, peer.location);
- if (dist < closestDist) {
- closestDist = dist;
- closestPeer = peer;
- }
- }
- return closestPeer; // Null if the list was empty
- }
-
- public abstract void handleMessage (Message m, Peer src);
-
- protected abstract Search makeSearchMessage();
-}
Copied: trunk/apps/load-balancing-sims/phase7/MessageHandler.java (from rev
10748, trunk/apps/load-balancing-sims/phase6/MessageHandler.java)
Deleted: trunk/apps/load-balancing-sims/phase7/Network.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Network.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/Network.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -1,34 +0,0 @@
-import java.util.HashMap;
-
-class Network
-{
- private static HashMap<Integer,NetworkInterface> interfaces
- = new HashMap<Integer,NetworkInterface>();
- private static int nextAddress = 0;
- public static boolean reorder = false; // Can packets be reordered?
- public static double lossRate = 0.0; // Random packet loss
- // FIXME: random packet duplication
-
- // Deliver a packet to an address
- public static void deliver (Packet p)
- {
- NetworkInterface ni = interfaces.get (p.dest);
- if (ni == null) return; // Node doesn't exist or is offline
- // If the network allows reordering, randomise the latency a bit
- if (reorder) p.latency *= (0.95 + Math.random() * 0.1);
- if (Math.random() < lossRate) {
- Event.log (p + " lost by network");
- return;
- }
- // Schedule the arrival of the packet at the destination
- Event.schedule (ni, p.latency, NetworkInterface.RX_QUEUE, p);
- }
-
- // Attach an interface to the network - returns the address
- public static int register (NetworkInterface ni)
- {
- int address = nextAddress++;
- interfaces.put (address, ni);
- return address;
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/Network.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/Network.java)
Deleted: trunk/apps/load-balancing-sims/phase7/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/NetworkInterface.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/NetworkInterface.java 2006-10-31
11:01:12 UTC (rev 10749)
@@ -1,126 +0,0 @@
-import java.util.LinkedList;
-
-class NetworkInterface implements EventTarget
-{
- public int address; // Represents an IP address and port
- private Node node; // The owner of this network interface
- private double txSpeed, rxSpeed; // Bytes per second
-
- private LinkedList<Packet> txQueue; // Queue of outgoing packets
- private LinkedList<Packet> rxQueue; // Queue of incoming packets
- private int txQueueSize, rxQueueSize; // Limited-size drop-tail queues
- private int txQueueMaxSize, rxQueueMaxSize; // Bytes
-
- public NetworkInterface (Node node, double txSpeed, double rxSpeed)
- {
- this.node = node;
- this.txSpeed = txSpeed;
- this.rxSpeed = rxSpeed;
- txQueue = new LinkedList<Packet>();
- rxQueue = new LinkedList<Packet>();
- txQueueSize = rxQueueSize = 0; // Bytes currently queued
- txQueueMaxSize = 10000;
- rxQueueMaxSize = 20000;
- // Attach the interface to the network
- address = Network.register (this);
- }
-
- // Called by Peer
- public void send (Packet p, int dest, double latency)
- {
- p.src = address;
- p.dest = dest;
- p.latency = latency;
- if (txQueueSize + p.size > txQueueMaxSize) {
- log ("no room in txQueue, " + p + " lost");
- return;
- }
- txQueue.add (p);
- txQueueSize += p.size;
- log (txQueueSize + " bytes in txQueue");
- // If there are no other packets in the queue, start to transmit
- if (txQueue.size() == 1) txStart (p);
- }
-
- // Event callbacks
-
- // Add a packet to the rx queue
- private void rxQueueAdd (Packet p)
- {
- if (rxQueueSize + p.size > rxQueueMaxSize) {
- log ("no room in rxQueue, " + p + " lost");
- return;
- }
- rxQueue.add (p);
- rxQueueSize += p.size;
- log (rxQueueSize + " bytes in rxQueue");
- // If there are no other packets in the queue, start to receive
- if (rxQueue.size() == 1) rxStart (p);
- }
-
- // Start receiving a packet
- private void rxStart (Packet p)
- {
- log ("starting to receive " + p);
- // Delay depends on rx speed
- Event.schedule (this, p.size / rxSpeed, RX_END, p);
- }
-
- // Finish receiving a packet, pass it to the node
- private void rxEnd (Packet p)
- {
- log ("finished receiving " + p);
- node.handlePacket (p);
- rxQueueSize -= p.size;
- rxQueue.remove (p);
- // If there's another packet waiting, start to receive it
- if (!rxQueue.isEmpty()) rxStart (rxQueue.peek());
- }
-
- // Start transmitting a packet
- private void txStart (Packet p)
- {
- log ("starting to transmit " + p);
- // Delay depends on tx speed
- Event.schedule (this, p.size / txSpeed, TX_END, p);
- }
-
- // Finish transmitting a packet
- private void txEnd (Packet p)
- {
- log ("finished transmitting " + p);
- Network.deliver (p);
- txQueueSize -= p.size;
- txQueue.remove (p);
- // If there's another packet waiting, start to transmit it
- if (!txQueue.isEmpty()) txStart (txQueue.peek());
- }
-
- private void log (String message)
- {
- // Event.log (address + " " + message);
- }
-
- // EventTarget interface
- public void handleEvent (int type, Object data)
- {
- switch (type) {
- case RX_QUEUE:
- rxQueueAdd ((Packet) data);
- break;
-
- case RX_END:
- rxEnd ((Packet) data);
- break;
-
- case TX_END:
- txEnd ((Packet) data);
- break;
- }
- }
-
- // Each EventTarget class has its own event codes
- public final static int RX_QUEUE = 1;
- private final static int RX_END = 2;
- private final static int TX_END = 3;
-}
Copied: trunk/apps/load-balancing-sims/phase7/NetworkInterface.java (from rev
10748, trunk/apps/load-balancing-sims/phase6/NetworkInterface.java)
Deleted: trunk/apps/load-balancing-sims/phase7/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/Node.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -1,444 +0,0 @@
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.ArrayList;
-import java.util.Collections;
-import messages.*;
-
-class Node implements EventTarget
-{
- public final static double SHORT_SLEEP = 0.01; // Poll the bw limiter
-
- // Token bucket bandwidth limiter
- public final static int BUCKET_RATE = 30000; // Bytes per second
- public final static int BUCKET_SIZE = 60000; // Burst size in bytes
-
- public double location; // Routing location
- public NetworkInterface net;
- private HashMap<Integer,Peer> peers; // Look up a peer by its address
- private HashSet<Integer> recentlySeenRequests; // Request IDs
- private HashMap<Integer,MessageHandler> messageHandlers; // By ID
- private LruCache<Integer> chkStore;
- private LruCache<Integer> chkCache;
- private LruMap<Integer,Integer> sskStore; // SSKs can collide
- private LruMap<Integer,Integer> sskCache;
- private LruCache<Integer> pubKeyCache; // SSK public keys
- private boolean decrementMaxHtl = false;
- private boolean decrementMinHtl = false;
- public TokenBucket bandwidth; // Bandwidth limiter
- private boolean timerRunning = false; // Is the retx timer running?
-
- public Node (double txSpeed, double rxSpeed)
- {
- this (Math.random(), txSpeed, rxSpeed);
- }
-
- public Node (double location, double txSpeed, double rxSpeed)
- {
- this.location = location;
- net = new NetworkInterface (this, txSpeed, rxSpeed);
- peers = new HashMap<Integer,Peer>();
- recentlySeenRequests = new HashSet<Integer>();
- messageHandlers = new HashMap<Integer,MessageHandler>();
- chkStore = new LruCache<Integer> (10);
- chkCache = new LruCache<Integer> (10);
- sskStore = new LruMap<Integer,Integer> (10);
- sskCache = new LruMap<Integer,Integer> (10);
- pubKeyCache = new LruCache<Integer> (10);
- if (Math.random() < 0.5) decrementMaxHtl = true;
- if (Math.random() < 0.25) decrementMinHtl = true;
- bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
- }
-
- // Return true if a connection was added, false if already connected
- public boolean connect (Node n, double latency)
- {
- if (n == this) return false;
- if (peers.containsKey (n.net.address)) return false;
- log ("adding peer " + n.net.address);
- Peer p = new Peer (this, n.net.address, n.location, latency);
- peers.put (n.net.address, p);
- return true;
- }
-
- public boolean connectBothWays (Node n, double latency)
- {
- if (connect (n, latency)) return n.connect (this, latency);
- else return false;
- }
-
- // Calculate the circular distance between two locations
- public static double distance (double a, double b)
- {
- if (a > b) return Math.min (a - b, b - a + 1.0);
- else return Math.min (b - a, a - b + 1.0);
- }
-
- // Convert an integer key to a routing location
- public static double keyToLocation (int key)
- {
- return key / (double) Integer.MAX_VALUE;
- }
-
- // Convert a routing location to an integer key
- public static int locationToKey (double location)
- {
- return (int) (location * Integer.MAX_VALUE);
- }
-
- // Return true if this node is as close to the target as any peer
- private boolean closerThanPeers (double target)
- {
- double bestDist = Double.POSITIVE_INFINITY;
- for (Peer peer : peers.values()) {
- double dist = distance (target, peer.location);
- if (dist < bestDist) bestDist = dist;
- }
- return distance (target, location) <= bestDist;
- }
-
- // Decrement a request or insert's hops to live
- public int decrementHtl (int htl)
- {
- if ((htl == Search.MAX_HTL && !decrementMaxHtl)
- || (htl == 1 && !decrementMinHtl)) return htl;
- else return htl - 1;
- }
-
- // Add a CHK to the cache
- public void cacheChk (int key)
- {
- log ("key " + key + " added to CHK cache");
- chkCache.put (key);
- }
-
- // Consider adding a CHK to the store
- public void storeChk (int key)
- {
- if (closerThanPeers (keyToLocation (key))) {
- log ("key " + key + " added to CHK store");
- chkStore.put (key);
- }
- else log ("key " + key + " not added to CHK store");
- }
-
- // Retrieve an SSK from the cache or the store
- public Integer fetchSsk (int key)
- {
- Integer data = sskStore.get (key);
- if (data == null) return sskCache.get (key);
- else return data;
- }
-
- // Add an SSK to the cache
- public void cacheSsk (int key, int value)
- {
- log ("key " + key + " added to SSK cache");
- sskCache.put (key, value);
- }
-
- // Consider adding an SSK to the store
- public void storeSsk (int key, int value)
- {
- if (closerThanPeers (keyToLocation (key))) {
- log ("key " + key + " added to SSK store");
- sskStore.put (key, value);
- }
- else log ("key " + key + " not added to SSK store");
- }
-
- // Add a public key to the cache
- public void cachePubKey (int key)
- {
- log ("public key " + key + " added to cache");
- pubKeyCache.put (key);
- }
-
- // Called by Peer
- public void startTimer()
- {
- if (timerRunning) return;
- // log ("starting retransmission/coalescing timer");
- Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
- timerRunning = true;
- }
-
- // Called by NetworkInterface
- public void handlePacket (Packet packet)
- {
- Peer peer = peers.get (packet.src);
- if (peer == null) log ("received packet from unknown peer");
- else peer.handlePacket (packet);
- }
-
- // Called by Peer
- public void handleMessage (Message m, Peer src)
- {
- if (src != null) log ("received " + m + " from " + src);
- if (m instanceof ChkRequest)
- handleChkRequest ((ChkRequest) m, src);
- else if (m instanceof ChkInsert)
- handleChkInsert ((ChkInsert) m, src);
- else if (m instanceof SskRequest)
- handleSskRequest ((SskRequest) m, src);
- else if (m instanceof SskInsert)
- handleSskInsert ((SskInsert) m, src);
- else {
- MessageHandler mh = messageHandlers.get (m.id);
- if (mh == null) log ("no message handler for " + m);
- else mh.handleMessage (m, src);
- }
- }
-
- private void handleChkRequest (ChkRequest r, Peer prev)
- {
- if (!recentlySeenRequests.add (r.id)) {
- log ("rejecting recently seen " + r);
- prev.sendMessage (new RejectedLoop (r.id));
- // Don't forward the same search back to prev
- MessageHandler mh = messageHandlers.get (r.id);
- if (mh != null) mh.removeNextHop (prev);
- return;
- }
- // Accept the search
- if (prev != null) {
- log ("accepting " + r);
- prev.sendMessage (new Accepted (r.id));
- }
- // If the data is in the store, return it
- if (chkStore.get (r.key)) {
- log ("key " + r.key + " found in CHK store");
- if (prev == null) log (r + " succeeded locally");
- else {
- prev.sendMessage (new ChkDataFound (r.id));
- for (int i = 0; i < 32; i++)
- prev.sendMessage (new Block (r.id, i));
- }
- return;
- }
- log ("key " + r.key + " not found in CHK store");
- // If the data is in the cache, return it
- if (chkCache.get (r.key)) {
- log ("key " + r.key + " found in CHK cache");
- if (prev == null) log (r + " succeeded locally");
- else {
- prev.sendMessage (new ChkDataFound (r.id));
- for (int i = 0; i < 32; i++)
- prev.sendMessage (new Block (r.id, i));
- }
- return;
- }
- log ("key " + r.key + " not found in CHK cache");
- // Store the request handler and forward the search
- ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
- messageHandlers.put (r.id, rh);
- rh.start();
- }
-
- private void handleChkInsert (ChkInsert i, Peer prev)
- {
- if (!recentlySeenRequests.add (i.id)) {
- log ("rejecting recently seen " + i);
- prev.sendMessage (new RejectedLoop (i.id));
- // Don't forward the same search back to prev
- MessageHandler mh = messageHandlers.get (i.id);
- if (mh != null) mh.removeNextHop (prev);
- return;
- }
- // Accept the search
- if (prev != null) {
- log ("accepting " + i);
- prev.sendMessage (new Accepted (i.id));
- }
- // Store the insert handler and wait for a DataInsert
- ChkInsertHandler ih = new ChkInsertHandler (i, this, prev);
- messageHandlers.put (i.id, ih);
- ih.start();
- }
-
- private void handleSskRequest (SskRequest r, Peer prev)
- {
- if (!recentlySeenRequests.add (r.id)) {
- log ("rejecting recently seen " + r);
- prev.sendMessage (new RejectedLoop (r.id));
- // Don't forward the same search back to prev
- MessageHandler mh = messageHandlers.get (r.id);
- if (mh != null) mh.removeNextHop (prev);
- return;
- }
- // Look up the public key
- boolean pub = pubKeyCache.get (r.key);
- if (pub) log ("public key " + r.key + " found in cache");
- else log ("public key " + r.key + " not found in cache");
- // Accept the search
- if (prev != null) {
- log ("accepting " + r);
- prev.sendMessage (new Accepted (r.id));
- }
- // If the data is in the store, return it
- Integer data = sskStore.get (r.key);
- if (pub && data != null) {
- log ("key " + r.key + " found in SSK store");
- if (prev == null) log (r + " succeeded locally");
- else {
- prev.sendMessage (new SskDataFound (r.id,data));
- if (r.needPubKey)
- prev.sendMessage
- (new SskPubKey (r.id, r.key));
- }
- return;
- }
- log ("key " + r.key + " not found in SSK store");
- // If the data is in the cache, return it
- data = sskCache.get (r.key);
- if (pub && data != null) {
- log ("key " + r.key + " found in SSK cache");
- if (prev == null) log (r + " succeeded locally");
- else {
- prev.sendMessage (new SskDataFound (r.id,data));
- if (r.needPubKey)
- prev.sendMessage
- (new SskPubKey (r.id, r.key));
- }
- return;
- }
- log ("key " + r.key + " not found in SSK cache");
- // Store the request handler and forward the search
- SskRequestHandler rh = new SskRequestHandler (r,this,prev,!pub);
- messageHandlers.put (r.id, rh);
- rh.start();
- }
-
- private void handleSskInsert (SskInsert i, Peer prev)
- {
- if (!recentlySeenRequests.add (i.id)) {
- log ("rejecting recently seen " + i);
- prev.sendMessage (new RejectedLoop (i.id));
- // Don't forward the same search back to prev
- MessageHandler mh = messageHandlers.get (i.id);
- if (mh != null) mh.removeNextHop (prev);
- return;
- }
- // Look up the public key
- boolean pub = pubKeyCache.get (i.key);
- if (pub) log ("public key " + i.key + " found in cache");
- else log ("public key " + i.key + " not found in cache");
- // Accept the search
- if (prev != null) {
- log ("accepting " + i);
- prev.sendMessage (new SskAccepted (i.id, !pub));
- }
- // Store the insert handler and possibly wait for the pub key
- SskInsertHandler ih = new SskInsertHandler (i,this,prev,!pub);
- messageHandlers.put (i.id, ih);
- ih.start();
- }
-
- public void removeMessageHandler (int id)
- {
- MessageHandler mh = messageHandlers.remove (id);
- if (mh == null) log ("no message handler to remove for " + id);
- else log ("removing message handler for " + id);
- }
-
- // Return the list of peers in a random order
- public ArrayList<Peer> peers()
- {
- ArrayList<Peer> copy = new ArrayList<Peer> (peers.values());
- Collections.shuffle (copy);
- return copy;
- }
-
- public void log (String message)
- {
- Event.log (net.address + " " + message);
- }
-
- // Event callbacks
-
- private void generateChkRequest (int key)
- {
- ChkRequest cr = new ChkRequest (key, location);
- log ("generating " + cr);
- handleChkRequest (cr, null);
- }
-
- private void generateChkInsert (int key)
- {
- ChkInsert ci = new ChkInsert (key, location);
- log ("generating " + ci);
- handleChkInsert (ci, null);
- handleMessage (new DataInsert (ci.id), null);
- for (int i = 0; i < 32; i++)
- handleMessage (new Block (ci.id, i), null);
- }
-
- private void generateSskRequest (int key)
- {
- SskRequest sr = new SskRequest (key, location, true);
- log ("generating " + sr);
- handleSskRequest (sr, null);
- }
-
- private void generateSskInsert (int key, int value)
- {
- SskInsert si = new SskInsert (key, value, location);
- log ("generating " + si);
- pubKeyCache.put (key);
- handleSskInsert (si, null);
- }
-
- private void checkTimeouts()
- {
- // Check the peers in a random order each time
- double deadline = Double.POSITIVE_INFINITY;
- for (Peer p : peers())
- deadline = Math.min (deadline, p.checkTimeouts());
- if (deadline == Double.POSITIVE_INFINITY) {
- // log ("stopping retransmission/coalescing timer");
- timerRunning = false;
- }
- else {
- double sleep = deadline - Event.time(); // Can be < 0
- if (sleep < SHORT_SLEEP) sleep = SHORT_SLEEP;
- // log ("sleeping for " + sleep + " seconds");
- Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
- }
- }
-
- // EventTarget interface
- public void handleEvent (int type, Object data)
- {
- switch (type) {
- case GENERATE_CHK_REQUEST:
- generateChkRequest ((Integer) data);
- break;
-
- case GENERATE_CHK_INSERT:
- generateChkInsert ((Integer) data);
- break;
-
- case GENERATE_SSK_REQUEST:
- generateSskRequest ((Integer) data);
- break;
-
- case GENERATE_SSK_INSERT:
- generateSskInsert ((Integer) data, 0);
- break;
-
- case GENERATE_SSK_COLLISION:
- generateSskInsert ((Integer) data, 1);
-
- case CHECK_TIMEOUTS:
- checkTimeouts();
- break;
- }
- }
-
- // Each EventTarget class has its own event codes
- public final static int GENERATE_CHK_REQUEST = 1;
- public final static int GENERATE_CHK_INSERT = 2;
- public final static int GENERATE_SSK_REQUEST = 3;
- public final static int GENERATE_SSK_INSERT = 4;
- public final static int GENERATE_SSK_COLLISION = 5;
- private final static int CHECK_TIMEOUTS = 6;
-}
Copied: trunk/apps/load-balancing-sims/phase7/Node.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/Node.java)
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-10-30 20:23:12 UTC
(rev 10748)
+++ trunk/apps/load-balancing-sims/phase7/Node.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -0,0 +1,441 @@
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.Collections;
+import messages.*;
+
+class Node implements EventTarget
+{
+ // Token bucket bandwidth limiter
+ public final static int BUCKET_RATE = 30000; // Bytes per second
+ public final static int BUCKET_SIZE = 60000; // Burst size in bytes
+
+ public double location; // Routing location
+ public NetworkInterface net;
+ private HashMap<Integer,Peer> peers; // Look up a peer by its address
+ private HashSet<Integer> recentlySeenRequests; // Request IDs
+ private HashMap<Integer,MessageHandler> messageHandlers; // By ID
+ private LruCache<Integer> chkStore;
+ private LruCache<Integer> chkCache;
+ private LruMap<Integer,Integer> sskStore; // SSKs can collide
+ private LruMap<Integer,Integer> sskCache;
+ private LruCache<Integer> pubKeyCache; // SSK public keys
+ private boolean decrementMaxHtl = false;
+ private boolean decrementMinHtl = false;
+ public TokenBucket bandwidth; // Bandwidth limiter
+ private boolean timerRunning = false; // Is the retx timer running?
+
+ public Node (double txSpeed, double rxSpeed)
+ {
+ this (Math.random(), txSpeed, rxSpeed);
+ }
+
+ public Node (double location, double txSpeed, double rxSpeed)
+ {
+ this.location = location;
+ net = new NetworkInterface (this, txSpeed, rxSpeed);
+ peers = new HashMap<Integer,Peer>();
+ recentlySeenRequests = new HashSet<Integer>();
+ messageHandlers = new HashMap<Integer,MessageHandler>();
+ chkStore = new LruCache<Integer> (10);
+ chkCache = new LruCache<Integer> (10);
+ sskStore = new LruMap<Integer,Integer> (10);
+ sskCache = new LruMap<Integer,Integer> (10);
+ pubKeyCache = new LruCache<Integer> (10);
+ if (Math.random() < 0.5) decrementMaxHtl = true;
+ if (Math.random() < 0.25) decrementMinHtl = true;
+ bandwidth = new TokenBucket (BUCKET_RATE, BUCKET_SIZE);
+ }
+
+ // Return true if a connection was added, false if already connected
+ public boolean connect (Node n, double latency)
+ {
+ if (n == this) return false;
+ if (peers.containsKey (n.net.address)) return false;
+ // log ("adding peer " + n.net.address);
+ Peer p = new Peer (this, n.net.address, n.location, latency);
+ peers.put (n.net.address, p);
+ return true;
+ }
+
+ public boolean connectBothWays (Node n, double latency)
+ {
+ if (connect (n, latency)) return n.connect (this, latency);
+ else return false;
+ }
+
+ // Calculate the circular distance between two locations
+ public static double distance (double a, double b)
+ {
+ if (a > b) return Math.min (a - b, b - a + 1.0);
+ else return Math.min (b - a, a - b + 1.0);
+ }
+
+ // Convert an integer key to a routing location
+ public static double keyToLocation (int key)
+ {
+ return key / (double) Integer.MAX_VALUE;
+ }
+
+ // Convert a routing location to an integer key
+ public static int locationToKey (double location)
+ {
+ return (int) (location * Integer.MAX_VALUE);
+ }
+
+ // Return true if this node is as close to the target as any peer
+ private boolean closerThanPeers (double target)
+ {
+ double bestDist = Double.POSITIVE_INFINITY;
+ for (Peer peer : peers.values()) {
+ double dist = distance (target, peer.location);
+ if (dist < bestDist) bestDist = dist;
+ }
+ return distance (target, location) <= bestDist;
+ }
+
+ // Decrement a request or insert's hops to live
+ public int decrementHtl (int htl)
+ {
+ if ((htl == Search.MAX_HTL && !decrementMaxHtl)
+ || (htl == 1 && !decrementMinHtl)) return htl;
+ else return htl - 1;
+ }
+
+ // Add a CHK to the cache
+ public void cacheChk (int key)
+ {
+ log ("key " + key + " added to CHK cache");
+ chkCache.put (key);
+ }
+
+ // Consider adding a CHK to the store
+ public void storeChk (int key)
+ {
+ if (closerThanPeers (keyToLocation (key))) {
+ log ("key " + key + " added to CHK store");
+ chkStore.put (key);
+ }
+ else log ("key " + key + " not added to CHK store");
+ }
+
+ // Retrieve an SSK from the cache or the store
+ public Integer fetchSsk (int key)
+ {
+ Integer data = sskStore.get (key);
+ if (data == null) return sskCache.get (key);
+ else return data;
+ }
+
+ // Add an SSK to the cache
+ public void cacheSsk (int key, int value)
+ {
+ log ("key " + key + " added to SSK cache");
+ sskCache.put (key, value);
+ }
+
+ // Consider adding an SSK to the store
+ public void storeSsk (int key, int value)
+ {
+ if (closerThanPeers (keyToLocation (key))) {
+ log ("key " + key + " added to SSK store");
+ sskStore.put (key, value);
+ }
+ else log ("key " + key + " not added to SSK store");
+ }
+
+ // Add a public key to the cache
+ public void cachePubKey (int key)
+ {
+ log ("public key " + key + " added to cache");
+ pubKeyCache.put (key);
+ }
+
+ // Called by Peer
+ public void startTimer()
+ {
+ if (timerRunning) return;
+ // log ("starting retransmission/coalescing timer");
+ Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null);
+ timerRunning = true;
+ }
+
+ // Called by NetworkInterface
+ public void handlePacket (Packet packet)
+ {
+ Peer peer = peers.get (packet.src);
+ if (peer == null) log ("received packet from unknown peer");
+ else peer.handlePacket (packet);
+ }
+
+ // Called by Peer
+ public void handleMessage (Message m, Peer src)
+ {
+ if (src != null) log ("received " + m + " from " + src);
+ if (m instanceof ChkRequest)
+ handleChkRequest ((ChkRequest) m, src);
+ else if (m instanceof ChkInsert)
+ handleChkInsert ((ChkInsert) m, src);
+ else if (m instanceof SskRequest)
+ handleSskRequest ((SskRequest) m, src);
+ else if (m instanceof SskInsert)
+ handleSskInsert ((SskInsert) m, src);
+ else {
+ MessageHandler mh = messageHandlers.get (m.id);
+ if (mh == null) log ("no message handler for " + m);
+ else mh.handleMessage (m, src);
+ }
+ }
+
+ private void handleChkRequest (ChkRequest r, Peer prev)
+ {
+ if (!recentlySeenRequests.add (r.id)) {
+ log ("rejecting recently seen " + r);
+ prev.sendMessage (new RejectedLoop (r.id));
+ // Don't forward the same search back to prev
+ MessageHandler mh = messageHandlers.get (r.id);
+ if (mh != null) mh.removeNextHop (prev);
+ return;
+ }
+ // Accept the search
+ if (prev != null) {
+ log ("accepting " + r);
+ prev.sendMessage (new Accepted (r.id));
+ }
+ // If the data is in the store, return it
+ if (chkStore.get (r.key)) {
+ log ("key " + r.key + " found in CHK store");
+ if (prev == null) log (r + " succeeded locally");
+ else {
+ prev.sendMessage (new ChkDataFound (r.id));
+ for (int i = 0; i < 32; i++)
+ prev.sendMessage (new Block (r.id, i));
+ }
+ return;
+ }
+ log ("key " + r.key + " not found in CHK store");
+ // If the data is in the cache, return it
+ if (chkCache.get (r.key)) {
+ log ("key " + r.key + " found in CHK cache");
+ if (prev == null) log (r + " succeeded locally");
+ else {
+ prev.sendMessage (new ChkDataFound (r.id));
+ for (int i = 0; i < 32; i++)
+ prev.sendMessage (new Block (r.id, i));
+ }
+ return;
+ }
+ log ("key " + r.key + " not found in CHK cache");
+ // Store the request handler and forward the search
+ ChkRequestHandler rh = new ChkRequestHandler (r, this, prev);
+ messageHandlers.put (r.id, rh);
+ rh.start();
+ }
+
+ private void handleChkInsert (ChkInsert i, Peer prev)
+ {
+ if (!recentlySeenRequests.add (i.id)) {
+ log ("rejecting recently seen " + i);
+ prev.sendMessage (new RejectedLoop (i.id));
+ // Don't forward the same search back to prev
+ MessageHandler mh = messageHandlers.get (i.id);
+ if (mh != null) mh.removeNextHop (prev);
+ return;
+ }
+ // Accept the search
+ if (prev != null) {
+ log ("accepting " + i);
+ prev.sendMessage (new Accepted (i.id));
+ }
+ // Store the insert handler and wait for a DataInsert
+ ChkInsertHandler ih = new ChkInsertHandler (i, this, prev);
+ messageHandlers.put (i.id, ih);
+ ih.start();
+ }
+
+ private void handleSskRequest (SskRequest r, Peer prev)
+ {
+ if (!recentlySeenRequests.add (r.id)) {
+ log ("rejecting recently seen " + r);
+ prev.sendMessage (new RejectedLoop (r.id));
+ // Don't forward the same search back to prev
+ MessageHandler mh = messageHandlers.get (r.id);
+ if (mh != null) mh.removeNextHop (prev);
+ return;
+ }
+ // Look up the public key
+ boolean pub = pubKeyCache.get (r.key);
+ if (pub) log ("public key " + r.key + " found in cache");
+ else log ("public key " + r.key + " not found in cache");
+ // Accept the search
+ if (prev != null) {
+ log ("accepting " + r);
+ prev.sendMessage (new Accepted (r.id));
+ }
+ // If the data is in the store, return it
+ Integer data = sskStore.get (r.key);
+ if (pub && data != null) {
+ log ("key " + r.key + " found in SSK store");
+ if (prev == null) log (r + " succeeded locally");
+ else {
+ prev.sendMessage (new SskDataFound (r.id,data));
+ if (r.needPubKey)
+ prev.sendMessage
+ (new SskPubKey (r.id, r.key));
+ }
+ return;
+ }
+ log ("key " + r.key + " not found in SSK store");
+ // If the data is in the cache, return it
+ data = sskCache.get (r.key);
+ if (pub && data != null) {
+ log ("key " + r.key + " found in SSK cache");
+ if (prev == null) log (r + " succeeded locally");
+ else {
+ prev.sendMessage (new SskDataFound (r.id,data));
+ if (r.needPubKey)
+ prev.sendMessage
+ (new SskPubKey (r.id, r.key));
+ }
+ return;
+ }
+ log ("key " + r.key + " not found in SSK cache");
+ // Store the request handler and forward the search
+ SskRequestHandler rh = new SskRequestHandler (r,this,prev,!pub);
+ messageHandlers.put (r.id, rh);
+ rh.start();
+ }
+
+ private void handleSskInsert (SskInsert i, Peer prev)
+ {
+ if (!recentlySeenRequests.add (i.id)) {
+ log ("rejecting recently seen " + i);
+ prev.sendMessage (new RejectedLoop (i.id));
+ // Don't forward the same search back to prev
+ MessageHandler mh = messageHandlers.get (i.id);
+ if (mh != null) mh.removeNextHop (prev);
+ return;
+ }
+ // Look up the public key
+ boolean pub = pubKeyCache.get (i.key);
+ if (pub) log ("public key " + i.key + " found in cache");
+ else log ("public key " + i.key + " not found in cache");
+ // Accept the search
+ if (prev != null) {
+ log ("accepting " + i);
+ prev.sendMessage (new SskAccepted (i.id, !pub));
+ }
+ // Store the insert handler and possibly wait for the pub key
+ SskInsertHandler ih = new SskInsertHandler (i,this,prev,!pub);
+ messageHandlers.put (i.id, ih);
+ ih.start();
+ }
+
+ public void removeMessageHandler (int id)
+ {
+ MessageHandler mh = messageHandlers.remove (id);
+ if (mh == null) log ("no message handler to remove for " + id);
+ else log ("removing message handler for " + id);
+ }
+
+ // Return the list of peers in a random order
+ public ArrayList<Peer> peers()
+ {
+ ArrayList<Peer> copy = new ArrayList<Peer> (peers.values());
+ Collections.shuffle (copy);
+ return copy;
+ }
+
+ public void log (String message)
+ {
+ Event.log (net.address + " " + message);
+ }
+
+ // Event callbacks
+
+ private void generateChkRequest (int key)
+ {
+ ChkRequest cr = new ChkRequest (key, location);
+ log ("generating " + cr);
+ handleChkRequest (cr, null);
+ }
+
+ private void generateChkInsert (int key)
+ {
+ ChkInsert ci = new ChkInsert (key, location);
+ log ("generating " + ci);
+ handleChkInsert (ci, null);
+ handleMessage (new DataInsert (ci.id), null);
+ for (int i = 0; i < 32; i++)
+ handleMessage (new Block (ci.id, i), null);
+ }
+
+ private void generateSskRequest (int key)
+ {
+ SskRequest sr = new SskRequest (key, location, true);
+ log ("generating " + sr);
+ handleSskRequest (sr, null);
+ }
+
+ private void generateSskInsert (int key, int value)
+ {
+ SskInsert si = new SskInsert (key, value, location);
+ log ("generating " + si);
+ pubKeyCache.put (key);
+ handleSskInsert (si, null);
+ }
+
+ private void checkTimeouts()
+ {
+ // Check the peers in a random order each time
+ double deadline = Double.POSITIVE_INFINITY;
+ for (Peer p : peers())
+ deadline = Math.min (deadline, p.checkTimeouts());
+ if (deadline == Double.POSITIVE_INFINITY) {
+ // log ("stopping retransmission/coalescing timer");
+ timerRunning = false;
+ }
+ else {
+ double sleep = Math.max (deadline - Event.time(), 0.0);
+ // log ("sleeping for " + sleep + " seconds");
+ Event.schedule (this, sleep, CHECK_TIMEOUTS, null);
+ }
+ }
+
+ // EventTarget interface
+ public void handleEvent (int type, Object data)
+ {
+ switch (type) {
+ case GENERATE_CHK_REQUEST:
+ generateChkRequest ((Integer) data);
+ break;
+
+ case GENERATE_CHK_INSERT:
+ generateChkInsert ((Integer) data);
+ break;
+
+ case GENERATE_SSK_REQUEST:
+ generateSskRequest ((Integer) data);
+ break;
+
+ case GENERATE_SSK_INSERT:
+ generateSskInsert ((Integer) data, 0);
+ break;
+
+ case GENERATE_SSK_COLLISION:
+ generateSskInsert ((Integer) data, 1);
+
+ case CHECK_TIMEOUTS:
+ checkTimeouts();
+ break;
+ }
+ }
+
+ // Each EventTarget class has its own event codes
+ public final static int GENERATE_CHK_REQUEST = 1;
+ public final static int GENERATE_CHK_INSERT = 2;
+ public final static int GENERATE_SSK_REQUEST = 3;
+ public final static int GENERATE_SSK_INSERT = 4;
+ public final static int GENERATE_SSK_COLLISION = 5;
+ private final static int CHECK_TIMEOUTS = 6;
+}
Deleted: trunk/apps/load-balancing-sims/phase7/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Packet.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/Packet.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -1,41 +0,0 @@
-// A low-level packet (as opposed to a high-level message)
-
-import java.util.ArrayList;
-import messages.Message;
-import messages.Ack;
-
-class Packet
-{
- public final static int HEADER_SIZE = 70; // Including IP & UDP headers
- public final static int ACK_SIZE = 4; // Size of an ack in bytes
- public final static int MAX_SIZE = 1450; // MTU including headers
- public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing
-
- public int src, dest; // Network addresses
- public int size = HEADER_SIZE; // Size in bytes, including headers
- public int seq = -1; // Data sequence number (-1 if no data)
- public ArrayList<Ack> acks = null;
- public ArrayList<Message> messages = null;
-
- public double sent; // Time at which the packet was (re) transmitted
- public double latency; // Link latency (stored here for convenience)
-
- public void addAck (Ack a)
- {
- if (acks == null) acks = new ArrayList<Ack>();
- acks.add (a);
- size += a.size();
- }
-
- public void addMessage (Message m)
- {
- if (messages == null) messages = new ArrayList<Message>();
- messages.add (m);
- size += m.size();
- }
-
- public String toString()
- {
- return new String ("packet " + src + ":" + dest + ":" + seq);
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/Packet.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/Packet.java)
Deleted: trunk/apps/load-balancing-sims/phase7/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/Peer.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -1,317 +0,0 @@
-import java.util.LinkedList;
-import java.util.Iterator;
-import java.util.HashSet;
-import messages.*;
-
-class Peer
-{
- private Node node; // The local node
- public int address; // The remote node's address
- public double location; // The remote node's routing location
- private double latency; // The latency of the connection in seconds
-
- // Retransmission parameters
- public final static double RTO = 4.0; // Retransmission timeout in RTTs
- public final static double FRTO = 1.5; // Fast retx timeout in RTTs
- public final static double RTT_DECAY = 0.9; // Exp moving average
- public final static double LINK_IDLE = 8.0; // RTTs without transmitting
-
- // Coalescing
- public final static double MAX_DELAY = 0.1; // Coalescing delay, seconds
-
- // Out-of-order delivery with eventual detection of missing packets
- public final static int SEQ_RANGE = 1000;
-
- // Sender state
- private double rtt = 5.0; // Estimated round-trip time in seconds
- private int txSeq = 0; // Sequence number of next outgoing data packet
- private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
- private LinkedList<Packet> txBuffer; // Retransmission buffer
- private DeadlineQueue<Ack> ackQueue; // Outgoing acks
- private DeadlineQueue<Message> searchQueue; // Outgoing search messages
- private DeadlineQueue<Message> transferQueue; // Outgoing transfers
- private CongestionWindow window; // AIMD congestion window
- private double lastTransmission = 0.0; // Clock time
- private boolean tgif = false; // "Transfers go in first" toggle
-
- // Receiver state
- private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
- private int rxSeq = 0; // Sequence number of next in-order incoming pkt
-
- public Peer (Node node, int address, double location, double latency)
- {
- this.node = node;
- this.address = address;
- this.location = location;
- this.latency = latency;
- txBuffer = new LinkedList<Packet>();
- ackQueue = new DeadlineQueue<Ack>();
- searchQueue = new DeadlineQueue<Message>();
- transferQueue = new DeadlineQueue<Message>();
- window = new CongestionWindow (this);
- rxDupe = new HashSet<Integer>();
- }
-
- // Queue a message for transmission
- public void sendMessage (Message m)
- {
- if (m instanceof Block || m instanceof DataInsert
- || m instanceof ChkDataFound) {
- log (m + " added to transfer queue");
- transferQueue.add (m, Event.time() + MAX_DELAY);
- }
- else {
- log (m + " added to search queue");
- searchQueue.add (m, Event.time() + MAX_DELAY);
- }
- // Start the node's timer if necessary
- node.startTimer();
- // Send as many packets as possible
- while (send());
- }
-
- // Queue an ack for transmission
- private void sendAck (int seq)
- {
- log ("ack " + seq + " added to ack queue");
- ackQueue.add (new Ack (seq), Event.time() + MAX_DELAY);
- // Start the node's timer if necessary
- node.startTimer();
- // Send as many packets as possible
- while (send());
- }
-
- // Try to send a packet, return true if a packet was sent
- private boolean send()
- {
- if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
- log ("nothing to send");
- return false;
- }
- log (ackQueue.size + " bytes of acks in queue");
- log (searchQueue.size + " bytes of searches in queue");
- log (transferQueue.size + " bytes of transfers in queue");
-
- // Return to slow start when the link is idle
- double now = Event.time();
- if (now - lastTransmission > LINK_IDLE * rtt) window.reset();
- lastTransmission = now;
-
- // Delay small packets for coalescing
- if (now < deadline (now)) {
- int payload = searchQueue.size + transferQueue.size;
- log ("delaying transmission of " + payload + " bytes");
- return false;
- }
-
- Packet p = new Packet();
-
- // Put all waiting acks in the packet
- while (ackQueue.size > 0) p.addAck (ackQueue.pop());
-
- // Don't send sequence number n+SEQ_RANGE until sequence
- // number n has been acked - this limits the number of
- // sequence numbers the receiver must store for replay
- // detection. We must still be allowed to send acks,
- // otherwise the connection could deadlock.
-
- if (txSeq > txMaxSeq)
- log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
- else if (window.available() <= 0)
- log ("no room in congestion window for messages");
- else if (node.bandwidth.available() <= 0)
- log ("no bandwidth available for messages");
- else pack (p); // OK to send data
-
- // Don't send empty packets
- if (p.acks == null && p.messages == null) return false;
-
- // If the packet contains data, buffer it for retransmission
- if (p.messages != null) {
- p.seq = txSeq++;
- p.sent = now;
- txBuffer.add (p);
- window.bytesSent (p.size);
- }
-
- // Send the packet
- log ("sending packet " + p.seq + ", " + p.size + " bytes");
- node.net.send (p, address, latency);
- node.bandwidth.remove (p.size);
- return true;
- }
-
- // Called by Node when a packet arrives
- public void handlePacket (Packet p)
- {
- if (p.acks != null) for (Ack a : p.acks) handleAck (a);
- if (p.messages != null) handleData (p);
- }
-
- private void handleData (Packet p)
- {
- log ("received packet " + p.seq + ", " + p.size + " bytes");
- if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
- log ("duplicate packet");
- sendAck (p.seq); // Original ack may have been lost
- }
- else if (p.seq == rxSeq) {
- log ("packet in order");
- // Find the sequence number of the next missing packet
- int was = rxSeq;
- while (rxDupe.remove (++rxSeq));
- log ("rxSeq was " + was + ", now " + rxSeq);
- // Deliver the packet
- unpack (p);
- sendAck (p.seq);
- }
- else if (p.seq < rxSeq + SEQ_RANGE) {
- log ("packet out of order - expected " + rxSeq);
- if (rxDupe.add (p.seq)) unpack (p);
- else log ("duplicate packet");
- sendAck (p.seq); // Original ack may have been lost
- }
- // This indicates a misbehaving sender - discard the packet
- else log ("warning: received " + p.seq + " before " + rxSeq);
- }
-
- private void handleAck (Ack a)
- {
- int seq = a.id;
- log ("received ack " + seq);
- double now = Event.time();
- Iterator<Packet> i = txBuffer.iterator();
- while (i.hasNext()) {
- Packet p = i.next();
- double age = now - p.sent;
- // Explicit ack
- if (p.seq == seq) {
- log ("packet " + p.seq + " acknowledged");
- i.remove();
- // Update the congestion window
- window.bytesAcked (p.size);
- // Update the average round-trip time
- rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
- log ("round-trip time " + age);
- log ("average round-trip time " + rtt);
- break;
- }
- // Fast retransmission
- if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
- p.sent = now;
- log ("fast retransmitting packet " + p.seq);
- node.net.send (p, address, latency);
- window.fastRetransmission (now);
- }
- }
- // Recalculate the maximum sequence number
- if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1;
- else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
- log ("maximum sequence number " + txMaxSeq);
- // Send as many packets as possible
- while (send());
- }
-
- // Add messages to a packet
- private void pack (Packet p)
- {
- // Alternate between giving searches and transfers priority
- if (tgif) {
- // Transfers go in first
- while (transferQueue.size > 0
- && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
- p.addMessage (transferQueue.pop());
- // Fill any remaining space with searches
- while (searchQueue.size > 0
- && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
- p.addMessage (searchQueue.pop());
- tgif = false;
- }
- else {
- // Searches go in first
- while (searchQueue.size > 0
- && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
- p.addMessage (searchQueue.pop());
- // Fill any remaining space with transfers
- while (transferQueue.size > 0
- && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
- p.addMessage (transferQueue.pop());
- tgif = true;
- }
- }
-
- // Remove messages from a packet and deliver them to the node
- private void unpack (Packet p)
- {
- if (p.messages == null) return;
- for (Message m : p.messages) node.handleMessage (m, this);
- }
-
- // Called by Node, returns the next coalescing or retx deadline
- public double checkTimeouts()
- {
- log ("checking timeouts");
- // Send as many packets as possible
- while (send());
-
- log (txBuffer.size() + " packets in flight");
- double now = Event.time();
- if (txBuffer.isEmpty()) return deadline (now);
-
- for (Packet p : txBuffer) {
- if (now - p.sent > RTO * rtt + MAX_DELAY) {
- // Retransmission timeout
- log ("retransmitting packet " + p.seq);
- p.sent = now;
- node.net.send (p, address, latency);
- window.timeout (now);
- }
- }
-
- // Sleep for up to MAX_DELAY seconds until the next deadline
- return Math.min (now + MAX_DELAY, deadline (now));
- }
-
- // Work out when the first ack or search or transfer needs to be sent
- private double deadline (double now)
- {
- return Math.min (ackQueue.deadline(), dataDeadline (now));
- }
-
- // Work out when the first search or transfer needs to be sent
- private double dataDeadline (double now)
- {
- // If there's no data waiting, use the ack deadline
- if (searchQueue.size + transferQueue.size == 0)
- return Double.POSITIVE_INFINITY;
-
- double deadline = Math.min (searchQueue.deadline(),
- transferQueue.deadline());
-
- // Delay small packets until the coalescing deadline
- if (searchQueue.size + transferQueue.size
- < Packet.SENSIBLE_PAYLOAD)
- return deadline;
-
- // If there's not enough room in the window, wait for an ack
- if (window.available() <= 0)
- return Double.POSITIVE_INFINITY;
-
- // If there's not enough bandwidth, try again shortly
- if (node.bandwidth.available() <= 0)
- return Math.max (deadline, now + Node.SHORT_SLEEP);
-
- // Send a packet immediately
- return now;
- }
-
- public void log (String message)
- {
- Event.log (node.net.address + ":" + address + " " + message);
- }
-
- public String toString()
- {
- return Integer.toString (address);
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/Peer.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/Peer.java)
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java 2006-10-30 20:23:12 UTC
(rev 10748)
+++ trunk/apps/load-balancing-sims/phase7/Peer.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -0,0 +1,319 @@
+import java.util.LinkedList;
+import java.util.Iterator;
+import java.util.HashSet;
+import messages.*;
+
+class Peer
+{
+ private Node node; // The local node
+ public int address; // The remote node's address
+ public double location; // The remote node's routing location
+ private double latency; // The latency of the connection in seconds
+
+ // Retransmission parameters
+ public final static double RTO = 4.0; // Retransmission timeout in RTTs
+ public final static double FRTO = 1.5; // Fast retx timeout in RTTs
+ public final static double RTT_DECAY = 0.9; // Exp moving average
+ public final static double LINK_IDLE = 8.0; // RTTs without transmitting
+
+ // Coalescing
+ public final static double MIN_SLEEP = 0.01; // Poll the b/w limiter
+ public final static double MAX_DELAY = 0.1; // Coalescing delay in secs
+
+ // Out-of-order delivery with eventual detection of missing packets
+ public final static int SEQ_RANGE = 1000;
+
+ // Sender state
+ private double rtt = 5.0; // Estimated round-trip time in seconds
+ private int txSeq = 0; // Sequence number of next outgoing data packet
+ private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
+ private LinkedList<Packet> txBuffer; // Retransmission buffer
+ private DeadlineQueue<Ack> ackQueue; // Outgoing acks
+ private DeadlineQueue<Message> searchQueue; // Outgoing search messages
+ private DeadlineQueue<Message> transferQueue; // Outgoing transfers
+ private CongestionWindow window; // AIMD congestion window
+ private double lastTransmission = 0.0; // Clock time
+ private boolean tgif = false; // "Transfers go in first" toggle
+
+ // Receiver state
+ private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
+ private int rxSeq = 0; // Sequence number of next in-order incoming pkt
+
+ public Peer (Node node, int address, double location, double latency)
+ {
+ this.node = node;
+ this.address = address;
+ this.location = location;
+ this.latency = latency;
+ txBuffer = new LinkedList<Packet>();
+ ackQueue = new DeadlineQueue<Ack>();
+ searchQueue = new DeadlineQueue<Message>();
+ transferQueue = new DeadlineQueue<Message>();
+ window = new CongestionWindow (this);
+ rxDupe = new HashSet<Integer>();
+ }
+
+ // Queue a message for transmission
+ public void sendMessage (Message m)
+ {
+ m.deadline = Event.time() + MAX_DELAY;
+ if (m instanceof Block || m instanceof DataInsert
+ || m instanceof ChkDataFound) {
+ log (m + " added to transfer queue");
+ transferQueue.add (m);
+ }
+ else {
+ log (m + " added to search queue");
+ searchQueue.add (m);
+ }
+ // Start the node's timer if necessary
+ node.startTimer();
+ // Send as many packets as possible
+ while (send());
+ }
+
+ // Queue an ack for transmission
+ private void sendAck (int seq)
+ {
+ log ("ack " + seq + " added to ack queue");
+ ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY));
+ // Start the node's timer if necessary
+ node.startTimer();
+ // Send as many packets as possible
+ while (send());
+ }
+
+ // Try to send a packet, return true if a packet was sent
+ private boolean send()
+ {
+ if (ackQueue.size + searchQueue.size + transferQueue.size == 0){
+ log ("nothing to send");
+ return false;
+ }
+ log (ackQueue.size + " bytes of acks in queue");
+ log (searchQueue.size + " bytes of searches in queue");
+ log (transferQueue.size + " bytes of transfers in queue");
+
+ // Return to slow start when the link is idle
+ double now = Event.time();
+ if (now - lastTransmission > LINK_IDLE * rtt) window.reset();
+ lastTransmission = now;
+
+ // Delay small packets for coalescing
+ if (now < deadline (now)) {
+ int payload = searchQueue.size + transferQueue.size;
+ log ("delaying transmission of " + payload + " bytes");
+ return false;
+ }
+
+ Packet p = new Packet();
+
+ // Put all waiting acks in the packet
+ while (ackQueue.size > 0) p.addAck (ackQueue.pop());
+
+ // Don't send sequence number n+SEQ_RANGE until sequence
+ // number n has been acked - this limits the number of
+ // sequence numbers the receiver must store for replay
+ // detection. We must still be allowed to send acks,
+ // otherwise the connection could deadlock.
+
+ if (txSeq > txMaxSeq)
+ log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
+ else if (window.available() <= 0)
+ log ("no room in congestion window for messages");
+ else if (node.bandwidth.available() <= 0)
+ log ("no bandwidth available for messages");
+ else pack (p); // OK to send data
+
+ // Don't send empty packets
+ if (p.acks == null && p.messages == null) return false;
+
+ // If the packet contains data, buffer it for retransmission
+ if (p.messages != null) {
+ p.seq = txSeq++;
+ p.sent = now;
+ txBuffer.add (p);
+ window.bytesSent (p.size);
+ }
+
+ // Send the packet
+ log ("sending packet " + p.seq + ", " + p.size + " bytes");
+ node.net.send (p, address, latency);
+ node.bandwidth.remove (p.size);
+ return true;
+ }
+
+ // Called by Node when a packet arrives
+ public void handlePacket (Packet p)
+ {
+ if (p.acks != null) for (Ack a : p.acks) handleAck (a);
+ if (p.messages != null) handleData (p);
+ }
+
+ private void handleData (Packet p)
+ {
+ log ("received packet " + p.seq + ", " + p.size + " bytes");
+ if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
+ log ("duplicate packet");
+ sendAck (p.seq); // Original ack may have been lost
+ }
+ else if (p.seq == rxSeq) {
+ log ("packet in order");
+ // Find the sequence number of the next missing packet
+ int was = rxSeq;
+ while (rxDupe.remove (++rxSeq));
+ log ("rxSeq was " + was + ", now " + rxSeq);
+ // Deliver the packet
+ unpack (p);
+ sendAck (p.seq);
+ }
+ else if (p.seq < rxSeq + SEQ_RANGE) {
+ log ("packet out of order - expected " + rxSeq);
+ if (rxDupe.add (p.seq)) unpack (p);
+ else log ("duplicate packet");
+ sendAck (p.seq); // Original ack may have been lost
+ }
+ // This indicates a misbehaving sender - discard the packet
+ else log ("warning: received " + p.seq + " before " + rxSeq);
+ }
+
+ private void handleAck (Ack a)
+ {
+ int seq = a.id;
+ log ("received ack " + seq);
+ double now = Event.time();
+ Iterator<Packet> i = txBuffer.iterator();
+ while (i.hasNext()) {
+ Packet p = i.next();
+ double age = now - p.sent;
+ // Explicit ack
+ if (p.seq == seq) {
+ log ("packet " + p.seq + " acknowledged");
+ i.remove();
+ // Update the congestion window
+ window.bytesAcked (p.size);
+ // Update the average round-trip time
+ rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
+ log ("round-trip time " + age);
+ log ("average round-trip time " + rtt);
+ break;
+ }
+ // Fast retransmission
+ if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) {
+ p.sent = now;
+ log ("fast retransmitting packet " + p.seq);
+ node.net.send (p, address, latency);
+ window.fastRetransmission (now);
+ }
+ }
+ // Recalculate the maximum sequence number
+ if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1;
+ else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1;
+ log ("maximum sequence number " + txMaxSeq);
+ // Send as many packets as possible
+ while (send());
+ }
+
+ // Add messages to a packet
+ private void pack (Packet p)
+ {
+ // Alternate between giving searches and transfers priority
+ if (tgif) {
+ // Transfers go in first
+ while (transferQueue.size > 0
+ && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
+ p.addMessage (transferQueue.pop());
+ // Fill any remaining space with searches
+ while (searchQueue.size > 0
+ && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
+ p.addMessage (searchQueue.pop());
+ tgif = false;
+ }
+ else {
+ // Searches go in first
+ while (searchQueue.size > 0
+ && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
+ p.addMessage (searchQueue.pop());
+ // Fill any remaining space with transfers
+ while (transferQueue.size > 0
+ && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
+ p.addMessage (transferQueue.pop());
+ tgif = true;
+ }
+ }
+
+ // Remove messages from a packet and deliver them to the node
+ private void unpack (Packet p)
+ {
+ if (p.messages == null) return;
+ for (Message m : p.messages) node.handleMessage (m, this);
+ }
+
+ // Called by Node, returns the next coalescing or retx deadline
+ public double checkTimeouts()
+ {
+ log ("checking timeouts");
+ // Send as many packets as possible
+ while (send());
+
+ log (txBuffer.size() + " packets in flight");
+ double now = Event.time();
+ if (txBuffer.isEmpty()) return deadline (now);
+
+ for (Packet p : txBuffer) {
+ if (now - p.sent > RTO * rtt + MAX_DELAY) {
+ // Retransmission timeout
+ log ("retransmitting packet " + p.seq);
+ p.sent = now;
+ node.net.send (p, address, latency);
+ window.timeout (now);
+ }
+ }
+
+ // Sleep for up to MAX_DELAY seconds until the next deadline
+ return Math.min (now + MAX_DELAY, deadline (now));
+ }
+
+ // Work out when the first ack or search or transfer needs to be sent
+ private double deadline (double now)
+ {
+ return Math.min (ackQueue.deadline(), dataDeadline (now));
+ }
+
+ // Work out when the first search or transfer needs to be sent
+ private double dataDeadline (double now)
+ {
+ // If there's no data waiting, use the ack deadline
+ if (searchQueue.size + transferQueue.size == 0)
+ return Double.POSITIVE_INFINITY;
+
+ double deadline = Math.min (searchQueue.deadline(),
+ transferQueue.deadline());
+
+ // Delay small packets until the coalescing deadline
+ if (searchQueue.size + transferQueue.size
+ < Packet.SENSIBLE_PAYLOAD)
+ return deadline;
+
+ // If there's not enough room in the window, wait for an ack
+ if (window.available() <= 0)
+ return Double.POSITIVE_INFINITY;
+
+ // If there's not enough bandwidth, try again shortly
+ if (node.bandwidth.available() <= 0)
+ return Math.max (now + MIN_SLEEP, deadline);
+
+ // Send a packet immediately
+ return now;
+ }
+
+ public void log (String message)
+ {
+ Event.log (node.net.address + ":" + address + " " + message);
+ }
+
+ public String toString()
+ {
+ return Integer.toString (address);
+ }
+}
Deleted: trunk/apps/load-balancing-sims/phase7/RequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/RequestHandler.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/RequestHandler.java 2006-10-31
11:01:12 UTC (rev 10749)
@@ -1,136 +0,0 @@
-// The parent class of ChkRequestHandler and SskRequestHandler
-
-import messages.*;
-
-abstract class RequestHandler extends MessageHandler implements EventTarget
-{
- public RequestHandler (Search s, Node node, Peer prev)
- {
- super (s, node, prev);
- }
-
- public void start()
- {
- forwardSearch();
- }
-
- protected void handleAccepted (Accepted a)
- {
- if (searchState != SENT) node.log (a + " out of order");
- searchState = ACCEPTED;
- // Wait 60 seconds for a reply to the search
- Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
- }
-
- protected void handleRejectedLoop (RejectedLoop rl)
- {
- if (searchState != SENT) node.log (rl + " out of order");
- forwardSearch();
- }
-
- protected void handleRouteNotFound (RouteNotFound rnf)
- {
- if (searchState != ACCEPTED) node.log (rnf + " out of order");
- if (rnf.htl < htl) htl = rnf.htl;
- // Use the remaining htl to try another peer
- forwardSearch();
- }
-
- protected void handleDataNotFound (DataNotFound dnf)
- {
- if (searchState != ACCEPTED) node.log (dnf + " out of order");
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (dnf); // Forward the message
- finish();
- }
-
- protected void forwardSearch()
- {
- next = null;
- // If the search has run out of hops, send DataNotFound
- if (htl == 0) {
- node.log ("data not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new DataNotFound (id));
- finish();
- return;
- }
- // Forward the search to the closest remaining peer
- next = closestPeer();
- if (next == null) {
- node.log ("route not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new RouteNotFound (id, htl));
- finish();
- return;
- }
- // Decrement the htl if the next node is not the closest so far
- double target = Node.keyToLocation (key);
- if (Node.distance (target, next.location)
- > Node.distance (target, closest))
- htl = node.decrementHtl (htl);
- node.log (this + " has htl " + htl);
- node.log ("forwarding " + this + " to " + next.address);
- next.sendMessage (makeSearchMessage());
- nexts.remove (next);
- searchState = SENT;
- // Wait 5 seconds for the next hop to accept the search
- Event.schedule (this, 5.0, ACCEPTED_TIMEOUT, next);
- }
-
- protected void finish()
- {
- searchState = COMPLETED;
- node.removeMessageHandler (id);
- }
-
- // Event callbacks
-
- protected void acceptedTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != SENT) return;
- node.log (this + " accepted timeout waiting for " + p);
- forwardSearch(); // Try another peer
- }
-
- protected void searchTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != ACCEPTED) return;
- node.log (this + " search timeout waiting for " + p);
- if (prev == null) node.log (this + " failed");
- finish();
- }
-
- protected void transferTimeout (Peer p)
- {
- if (searchState != TRANSFERRING) return;
- node.log (this + " transfer timeout receiving from " + p);
- if (prev == null) node.log (this + " failed");
- finish();
- }
-
- // EventTarget interface
- public void handleEvent (int type, Object data)
- {
- switch (type) {
- case ACCEPTED_TIMEOUT:
- acceptedTimeout ((Peer) data);
- break;
-
- case SEARCH_TIMEOUT:
- searchTimeout ((Peer) data);
- break;
-
- case TRANSFER_TIMEOUT:
- transferTimeout ((Peer) data);
- break;
- }
- }
-
- // Each EventTarget class has its own event codes
- protected final static int ACCEPTED_TIMEOUT = 1;
- protected final static int SEARCH_TIMEOUT = 2;
- protected final static int TRANSFER_TIMEOUT = 3;
-}
Copied: trunk/apps/load-balancing-sims/phase7/RequestHandler.java (from rev
10748, trunk/apps/load-balancing-sims/phase6/RequestHandler.java)
Deleted: trunk/apps/load-balancing-sims/phase7/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Sim.java 2006-10-29 23:34:22 UTC
(rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/Sim.java 2006-10-31 11:01:12 UTC
(rev 10749)
@@ -1,68 +0,0 @@
-class Sim
-{
- private final int NODES = 100; // Number of nodes
- private final int DEGREE = 4; // Average degree
- private final double SPEED = 20000; // Bytes per second
- private final double LATENCY = 0.1; // Seconds
- private Node[] nodes;
-
- public Sim()
- {
- Network.reorder = true;
- Network.lossRate = 0.001;
-
- // Create the nodes
- nodes = new Node[NODES];
- for (int i = 0; i < NODES; i++)
- nodes[i] = new Node (1.0 / NODES * i, SPEED, SPEED);
- // Connect the nodes
- makeKleinbergNetwork();
-
- int key = Node.locationToKey (Math.random());
- Event.schedule (nodes[0], 0.0,
- Node.GENERATE_CHK_INSERT, key);
- Event.schedule (nodes[NODES/4], 30.0,
- Node.GENERATE_CHK_REQUEST, key);
- Event.schedule (nodes[NODES/2], 60.0,
- Node.GENERATE_CHK_INSERT, key);
- Event.schedule (nodes[3*NODES/4], 90.0,
- Node.GENERATE_CHK_REQUEST, key);
-
- // Run the simulation
- Event.run();
- }
-
- // Return the lattice distance between a and b
- private int latticeDistance (int a, int b)
- {
- if (a > b) return Math.min (a - b, b - a + NODES);
- else return Math.min (b - a, a - b + NODES);
- }
-
- private void makeKleinbergNetwork()
- {
- // Calculate the normalising constant
- double norm = 0.0;
- for (int i = 1; i < NODES; i++)
- norm += 1.0 / latticeDistance (0, i);
-
- // Add DEGREE shortcuts per node, randomly with replacement
- for (int i = 0; i < NODES; i++) {
- for (int j = 0; j < i; j++) {
- double p = 1.0 / latticeDistance (i, j) / norm;
- for (int k = 0; k < DEGREE; k++) {
- if (Math.random() < p) {
- nodes[i].connectBothWays
- (nodes[j], LATENCY);
- break;
- }
- }
- }
- }
- }
-
- public static void main (String[] args)
- {
- new Sim();
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/Sim.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/Sim.java)
Deleted: trunk/apps/load-balancing-sims/phase7/SskInsertHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/SskInsertHandler.java 2006-10-31
11:01:12 UTC (rev 10749)
@@ -1,218 +0,0 @@
-// The state of an SSK insert as stored at each node along the path
-
-import java.util.HashSet;
-import messages.*;
-
-class SskInsertHandler extends MessageHandler implements EventTarget
-{
- private int searchState = STARTED; // searchState of search
- private SskPubKey pubKey = null;
- private int data; // The data being inserted
-
- public SskInsertHandler (SskInsert i, Node node,
- Peer prev, boolean needPubKey)
- {
- super (i, node, prev);
- data = i.data;
- if (!needPubKey) pubKey = new SskPubKey (id, key);
- }
-
- public void start()
- {
- if (pubKey == null) {
- // Wait 10 seconds for the previous hop to send the key
- Event.schedule (this, 10.0, KEY_TIMEOUT, null);
- }
- else {
- checkCollision();
- forwardSearch();
- }
- }
-
- // Check whether an older version of the data is already stored
- private void checkCollision()
- {
- Integer old = node.fetchSsk (key);
- if (old != null && old != data) {
- node.log (this + " collided");
- if (prev == null) node.log (this + " collided locally");
- else prev.sendMessage (new SskDataFound (id, old));
- // Continue inserting the old data
- data = old;
- return;
- }
- }
-
- public void handleMessage (Message m, Peer src)
- {
- if (src == prev) {
- if (m instanceof SskPubKey)
- handleSskPubKey ((SskPubKey) m);
- else node.log ("unexpected type for " + m);
- }
- else if (src == next) {
- if (m instanceof SskAccepted)
- handleSskAccepted ((SskAccepted) m);
- else if (m instanceof RejectedLoop)
- handleRejectedLoop ((RejectedLoop) m);
- else if (m instanceof RouteNotFound)
- handleRouteNotFound ((RouteNotFound) m);
- else if (m instanceof SskDataFound)
- handleCollision ((SskDataFound) m);
- else if (m instanceof InsertReply)
- handleInsertReply ((InsertReply) m);
- else node.log ("unexpected type for " + m);
- }
- else node.log ("unexpected source for " + m);
- }
-
- private void handleSskPubKey (SskPubKey pk)
- {
- if (searchState != STARTED) node.log (pk + " out of order");
- pubKey = pk;
- checkCollision();
- forwardSearch();
- }
-
- private void handleSskAccepted (SskAccepted sa)
- {
- if (searchState != SENT) node.log (sa + " out of order");
- searchState = ACCEPTED;
- // Wait 60 seconds for a reply to the search
- Event.schedule (this, 60.0, SEARCH_TIMEOUT, next);
- // Send the public key if requested
- if (sa.needPubKey) next.sendMessage (pubKey);
- }
-
- private void handleRejectedLoop (RejectedLoop rl)
- {
- if (searchState != SENT) node.log (rl + " out of order");
- forwardSearch();
- }
-
- private void handleRouteNotFound (RouteNotFound rnf)
- {
- if (searchState != ACCEPTED) node.log (rnf + " out of order");
- if (rnf.htl < htl) htl = rnf.htl;
- // Use the remaining htl to try another peer
- forwardSearch();
- }
-
- private void handleCollision (SskDataFound sdf)
- {
- if (searchState != ACCEPTED) node.log (sdf + " out of order");
- if (prev == null) node.log (this + " collided");
- else prev.sendMessage (sdf); // Forward the message
- data = sdf.data; // Is this safe?
- }
-
- private void handleInsertReply (InsertReply ir)
- {
- if (searchState != ACCEPTED) node.log (ir + " out of order");
- if (prev == null) node.log (this + " succeeded");
- else prev.sendMessage (ir); // Forward the message
- finish();
- }
-
- public void forwardSearch()
- {
- next = null;
- // If the search has run out of hops, send InsertReply
- if (htl == 0) {
- node.log (this + " has no hops left");
- if (prev == null) node.log (this + " succeeded");
- else prev.sendMessage (new InsertReply (id));
- finish();
- return;
- }
- // Forward the search to the closest remaining peer
- next = closestPeer();
- if (next == null) {
- node.log ("route not found for " + this);
- if (prev == null) node.log (this + " failed");
- else prev.sendMessage (new RouteNotFound (id, htl));
- finish();
- return;
- }
- // Decrement the htl if the next node is not the closest so far
- double target = Node.keyToLocation (key);
- if (Node.distance (target, next.location)
- > Node.distance (target, closest))
- htl = node.decrementHtl (htl);
- node.log (this + " has htl " + htl);
- node.log ("forwarding " + this + " to " + next.address);
- next.sendMessage (makeSearchMessage());
- nexts.remove (next);
- searchState = SENT;
- // Wait 10 seconds for the next hop to accept the search
- Event.schedule (this, 10.0, ACCEPTED_TIMEOUT, next);
- }
-
- private void finish()
- {
- searchState = COMPLETED;
- node.cachePubKey (key);
- node.cacheSsk (key, data);
- node.storeSsk (key, data);
- node.removeMessageHandler (id);
- }
-
- protected Search makeSearchMessage()
- {
- return new SskInsert (id, key, data, closest, htl);
- }
-
- public String toString()
- {
- return new String ("SSK insert (" +id+ "," +key+ "," +data+")");
- }
-
- // Event callbacks
-
- private void keyTimeout()
- {
- if (searchState != STARTED) return;
- node.log (this + " key timeout waiting for " + prev);
- finish();
- }
-
- private void acceptedTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != SENT) return;
- node.log (this + " accepted timeout waiting for " + p);
- forwardSearch(); // Try another peer
- }
-
- private void searchTimeout (Peer p)
- {
- if (p != next) return; // We've already moved on to another peer
- if (searchState != ACCEPTED) return;
- node.log (this + " search timeout waiting for " + p);
- if (prev == null) node.log (this + " failed");
- finish();
- }
-
- // EventTarget interface
- public void handleEvent (int type, Object data)
- {
- switch (type) {
- case KEY_TIMEOUT:
- keyTimeout();
- break;
-
- case ACCEPTED_TIMEOUT:
- acceptedTimeout ((Peer) data);
- break;
-
- case SEARCH_TIMEOUT:
- searchTimeout ((Peer) data);
- break;
- }
- }
-
- // Each EventTarget class has its own event codes
- private final static int KEY_TIMEOUT = 1;
- private final static int ACCEPTED_TIMEOUT = 2;
- private final static int SEARCH_TIMEOUT = 3;
-}
Copied: trunk/apps/load-balancing-sims/phase7/SskInsertHandler.java (from rev
10748, trunk/apps/load-balancing-sims/phase6/SskInsertHandler.java)
Deleted: trunk/apps/load-balancing-sims/phase7/SskRequestHandler.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java
2006-10-29 23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/SskRequestHandler.java
2006-10-31 11:01:12 UTC (rev 10749)
@@ -1,79 +0,0 @@
-// The state of an SSK request as stored at each node along the path
-
-import messages.*;
-
-class SskRequestHandler extends RequestHandler
-{
- private boolean needPubKey; // Ask the next hop for the public key?
- private SskPubKey pubKey = null;
- private SskDataFound dataFound = null;
-
- public SskRequestHandler (SskRequest r, Node node,
- Peer prev, boolean needPubKey)
- {
- super (r, node, prev);
- this.needPubKey = needPubKey;
- if (!needPubKey) pubKey = new SskPubKey (id, key);
- }
-
- public void handleMessage (Message m, Peer src)
- {
- if (src != next) {
- node.log ("unexpected source for " + m);
- return;
- }
- if (m instanceof Accepted)
- handleAccepted ((Accepted) m);
- else if (m instanceof RejectedLoop)
- handleRejectedLoop ((RejectedLoop) m);
- else if (m instanceof RouteNotFound)
- handleRouteNotFound ((RouteNotFound) m);
- else if (m instanceof DataNotFound)
- handleDataNotFound ((DataNotFound) m);
- else if (m instanceof SskDataFound)
- handleSskDataFound ((SskDataFound) m);
- else if (m instanceof SskPubKey)
- handleSskPubKey ((SskPubKey) m);
- else node.log ("unexpected type for " + m);
- }
-
- private void handleSskDataFound (SskDataFound df)
- {
- if (searchState != ACCEPTED) node.log (df + " out of order");
- dataFound = df;
- if (pubKey == null) return; // Keep waiting
- if (prev == null) node.log (this + " succeeded");
- else {
- prev.sendMessage (dataFound);
- if (needPubKey) prev.sendMessage (pubKey);
- }
- node.cachePubKey (key);
- node.cacheSsk (key, dataFound.data);
- finish();
- }
-
- private void handleSskPubKey (SskPubKey pk)
- {
- if (searchState != ACCEPTED) node.log (pk + " out of order");
- pubKey = pk;
- if (dataFound == null) return; // Keep waiting
- if (prev == null) node.log (this + " succeeded");
- else {
- prev.sendMessage (dataFound);
- if (needPubKey) prev.sendMessage (pubKey);
- }
- node.cachePubKey (key);
- node.cacheSsk (key, dataFound.data);
- finish();
- }
-
- protected Search makeSearchMessage()
- {
- return new SskRequest (id, key, closest, htl, pubKey == null);
- }
-
- public String toString()
- {
- return new String ("SSK request (" + id + "," + key + ")");
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/SskRequestHandler.java (from rev
10748, trunk/apps/load-balancing-sims/phase6/SskRequestHandler.java)
Deleted: trunk/apps/load-balancing-sims/phase7/TokenBucket.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/TokenBucket.java 2006-10-29
23:34:22 UTC (rev 10745)
+++ trunk/apps/load-balancing-sims/phase7/TokenBucket.java 2006-10-31
11:01:12 UTC (rev 10749)
@@ -1,29 +0,0 @@
-class TokenBucket
-{
- private double tokens, rate, size, lastUpdated;
-
- public TokenBucket (double rate, double size)
- {
- tokens = size;
- this.rate = rate;
- this.size = size;
- lastUpdated = 0.0; // Clock time
- }
-
- public int available()
- {
- double now = Event.time();
- double elapsed = now - lastUpdated;
- lastUpdated = now;
- tokens += elapsed * rate;
- if (tokens > size) tokens = size;
- // Event.log (tokens + " tokens available");
- return (int) tokens;
- }
-
- public void remove (int t)
- {
- tokens -= t; // Counter can go negative
- // Event.log (t + " tokens removed, " + tokens + " available");
- }
-}
Copied: trunk/apps/load-balancing-sims/phase7/TokenBucket.java (from rev 10748,
trunk/apps/load-balancing-sims/phase6/TokenBucket.java)
Copied: trunk/apps/load-balancing-sims/phase7/messages (from rev 10748,
trunk/apps/load-balancing-sims/phase6/messages)