Author: mrogers
Date: 2006-07-26 20:14:47 +0000 (Wed, 26 Jul 2006)
New Revision: 9782
Added:
trunk/apps/load-balancing-sims/phase5/Request.java
trunk/apps/load-balancing-sims/phase5/RequestState.java
trunk/apps/load-balancing-sims/phase5/Response.java
trunk/apps/load-balancing-sims/phase5/RouteNotFound.java
Modified:
trunk/apps/load-balancing-sims/phase5/Message.java
trunk/apps/load-balancing-sims/phase5/Network.java
trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
trunk/apps/load-balancing-sims/phase5/Node.java
trunk/apps/load-balancing-sims/phase5/Packet.java
trunk/apps/load-balancing-sims/phase5/Peer.java
trunk/apps/load-balancing-sims/phase5/Sim.java
Log:
Multiple messages per packet
Modified: trunk/apps/load-balancing-sims/phase5/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Message.java 2006-07-26 20:06:51 UTC
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Message.java 2006-07-26 20:14:47 UTC
(rev 9782)
@@ -2,12 +2,10 @@
class Message
{
- public int seq; // Sequence number
- public int size; // Size in bytes
+ public final static int HEADER_SIZE = 30; // Sequence number, MAC, etc
+ public final static int ID_SIZE = 16; // Size of unique request ID
+ public final static int KEY_SIZE = 32; // Size of routing key
+ public final static int DATA_SIZE = 1024; // Size of data block
- public Message (int seq, int size)
- {
- this.seq = seq;
- this.size = size;
- }
+ public int size; // Size in bytes
}
Modified: trunk/apps/load-balancing-sims/phase5/Network.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Network.java 2006-07-26 20:06:51 UTC
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Network.java 2006-07-26 20:14:47 UTC
(rev 9782)
@@ -7,7 +7,7 @@
private static int nextAddress = 0;
public static boolean reorder = false; // Can packets be reordered?
public static double lossRate = 0.0; // Random packet loss
- // FIXME: duplication
+ // FIXME: random packet duplication
// Deliver a packet to an address
public static void deliver (Packet p)
@@ -17,13 +17,13 @@
// If the network allows reordering, randomise the latency a bit
if (reorder) p.latency *= (0.95 + Math.random() * 0.1);
if (Math.random() < lossRate) {
- Event.log ("packet lost by network");
+ Event.log (p + " lost by network");
return;
}
// Schedule the arrival of the packet at the destination
- Event.schedule (ni, p.latency, NetworkInterface.RX_Q_ADD, p);
+ Event.schedule (ni, p.latency, NetworkInterface.RX_QUEUE, p);
}
-
+
// Attach an interface to the network - returns the address
public static int register (NetworkInterface ni)
{
Modified: trunk/apps/load-balancing-sims/phase5/NetworkInterface.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-07-26
20:06:51 UTC (rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/NetworkInterface.java 2006-07-26
20:14:47 UTC (rev 9782)
@@ -4,7 +4,7 @@
class NetworkInterface implements EventTarget
{
public int address; // Represents an IP address and port
- private Node owner; // The owner of this network interface
+ private Node node; // The owner of this network interface
private double txSpeed, rxSpeed; // Bytes per second
private LinkedList<Packet> txQueue; // Queue of outgoing packets
@@ -12,14 +12,14 @@
private int txQueueSize, rxQueueSize; // Limited-size drop-tail queues
private int txQueueMaxSize, rxQueueMaxSize; // Bytes
- public NetworkInterface (Node owner, double txSpeed, double rxSpeed)
+ public NetworkInterface (Node node, double txSpeed, double rxSpeed)
{
- this.owner = owner;
+ this.node = node;
this.txSpeed = txSpeed;
this.rxSpeed = rxSpeed;
txQueue = new LinkedList<Packet>();
rxQueue = new LinkedList<Packet>();
- txQueueSize = rxQueueSize = 0; // Bytes
+ txQueueSize = rxQueueSize = 0; // Bytes currently queued
txQueueMaxSize = 10000;
rxQueueMaxSize = 20000;
// Attach the interface to the network
@@ -33,12 +33,12 @@
p.dest = dest;
p.latency = latency;
if (txQueueSize + p.size > txQueueMaxSize) {
- Event.log (address + " no room in txQueue");
- return; // Packet lost
+ log ("no room in txQueue, " + p + " lost");
+ return;
}
txQueue.add (p);
txQueueSize += p.size;
- Event.log (address + " " + txQueueSize + " bytes in txQueue");
+ log (txQueueSize + " bytes in txQueue");
// If there are no other packets in the queue, start to transmit
if (txQueue.size() == 1) txStart (p);
}
@@ -49,12 +49,12 @@
private void rxQueueAdd (Packet p)
{
if (rxQueueSize + p.size > rxQueueMaxSize) {
- Event.log (address + " no room in rxQueue");
- return; // Packet lost
+ log ("no room in rxQueue, " + p + " lost");
+ return;
}
rxQueue.add (p);
rxQueueSize += p.size;
- Event.log (address + " " + rxQueueSize + " bytes in rxQueue");
+ log (rxQueueSize + " bytes in rxQueue");
// If there are no other packets in the queue, start to receive
if (rxQueue.size() == 1) rxStart (p);
}
@@ -62,6 +62,7 @@
// Start receiving a packet
private void rxStart (Packet p)
{
+ log ("starting to receive " + p);
// Delay depends on rx speed
Event.schedule (this, p.size / rxSpeed, RX_END, p);
}
@@ -69,7 +70,8 @@
// Finish receiving a packet, pass it to the node
private void rxEnd (Packet p)
{
- owner.handlePacket (p);
+ log ("finished receiving " + p);
+ node.handlePacket (p);
// If there's another packet waiting, start to receive it
try {
rxQueueSize -= p.size;
@@ -82,6 +84,7 @@
// Start transmitting a packet
private void txStart (Packet p)
{
+ log ("starting to transmit " + p);
// Delay depends on tx speed
Event.schedule (this, p.size / txSpeed, TX_END, p);
}
@@ -89,6 +92,7 @@
// Finish transmitting a packet
private void txEnd (Packet p)
{
+ log ("finished transmitting " + p);
Network.deliver (p);
// If there's another packet waiting, start to transmit it
try {
@@ -99,26 +103,23 @@
catch (NoSuchElementException nse) {}
}
+ private void log (String message)
+ {
+ // Event.log (address + " " + message);
+ }
+
// EventTarget interface
public void handleEvent (int type, Object data)
{
switch (type) {
- case RX_Q_ADD:
+ case RX_QUEUE:
rxQueueAdd ((Packet) data);
break;
- case RX_START:
- rxStart ((Packet) data);
- break;
-
case RX_END:
rxEnd ((Packet) data);
break;
- case TX_START:
- txStart ((Packet) data);
- break;
-
case TX_END:
txEnd ((Packet) data);
break;
@@ -126,9 +127,7 @@
}
// Each EventTarget class has its own event codes
- public final static int RX_Q_ADD = 1;
- public final static int RX_START = 2;
- public final static int RX_END = 3;
- public final static int TX_START = 4;
- public final static int TX_END = 5;
+ public final static int RX_QUEUE = 1;
+ private final static int RX_END = 2;
+ private final static int TX_END = 3;
}
Modified: trunk/apps/load-balancing-sims/phase5/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Node.java 2006-07-26 20:06:51 UTC
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Node.java 2006-07-26 20:14:47 UTC
(rev 9782)
@@ -1,23 +1,45 @@
import java.util.HashMap;
+import java.util.HashSet;
class Node implements EventTarget
{
+ public double location; // Routing location
public NetworkInterface net;
private HashMap<Integer,Peer> peers; // Look up a peer by its address
- private int messagesSent = 0;
+ private int requestsGenerated = 0;
+ private HashSet<Integer> recentlySeenRequests; // Request IDs
+ private HashMap<Integer,RequestState> outstandingRequests;
+ public HashSet<Double> cache; // Datastore containing keys
public Node (double txSpeed, double rxSpeed)
{
+ location = Math.random();
+ net = new NetworkInterface (this, txSpeed, rxSpeed);
peers = new HashMap<Integer,Peer>();
- net = new NetworkInterface (this, txSpeed, rxSpeed);
+ recentlySeenRequests = new HashSet<Integer>();
+ outstandingRequests = new HashMap<Integer,RequestState>();
+ cache = new HashSet<Double>();
}
public void connect (Node n, double latency)
{
- Peer p = new Peer (n.net.address, latency, this);
+ Peer p = new Peer (this, n.net.address, n.location, latency);
peers.put (n.net.address, p);
}
+ public void connectBothWays (Node n, double latency)
+ {
+ connect (n, latency);
+ n.connect (this, latency);
+ }
+
+ // Returns the circular distance between two locations
+ public static double distance (double a, double b)
+ {
+ if (a > b) return Math.min (a - b, b - a + 1.0);
+ else return Math.min (b - a, a - b + 1.0);
+ }
+
// Called by NetworkInterface
public void handlePacket (Packet packet)
{
@@ -27,39 +49,103 @@
}
// Called by Peer
- public void messagesWaiting (Peer p)
+ public void handleMessage (Message m, Peer prev)
{
- for (Message m = p.receiveMessage(); m != null; m =
p.receiveMessage())
- log ("received message " + m.seq + ", " + m.size + "
bytes");
+ log ("received " + m);
+ // FIXME: ugly
+ if (m instanceof Request)
+ handleRequest ((Request) m, prev);
+ else if (m instanceof Response)
+ handleResponse ((Response) m);
+ else if (m instanceof RouteNotFound)
+ handleRouteNotFound ((RouteNotFound) m);
}
+ private void handleRequest (Request r, Peer prev)
+ {
+ if (!recentlySeenRequests.add (r.id)) {
+ log ("rejecting recently seen " + r);
+ prev.sendMessage (new RouteNotFound (r.id));
+ // Don't forward the request to prev, it's seen it
+ RequestState rs = outstandingRequests.get (r.id);
+ if (rs != null) rs.nexts.remove (prev);
+ return;
+ }
+ if (cache.contains (r.key)) {
+ log ("key " + r.key + " found in cache");
+ if (prev == null)
+ log (r + " succeeded locally");
+ else prev.sendMessage (new Response (r.id, r.key));
+ return;
+ }
+ log ("key " + r.key + " not found in cache");
+ forwardRequest (new RequestState (r, prev, peers.values()));
+ }
+
+ private void handleResponse (Response r)
+ {
+ RequestState rs = outstandingRequests.remove (r.id);
+ if (rs == null) {
+ log ("unexpected " + r);
+ return;
+ }
+ cache.add (r.key);
+ if (rs.prev == null) log (rs + " succeeded");
+ else {
+ log ("forwarding " + r);
+ rs.prev.sendMessage (r);
+ }
+ }
+
+ private void handleRouteNotFound (RouteNotFound r)
+ {
+ RequestState rs = outstandingRequests.remove (r.id);
+ if (rs == null) {
+ log ("unexpected route not found " + r.id);
+ return;
+ }
+ forwardRequest (rs);
+ }
+
+ private void forwardRequest (RequestState rs)
+ {
+ Peer next = rs.closestPeer();
+ if (next == null) {
+ log ("route not found for " + rs);
+ if (rs.prev == null)
+ log (rs + " failed");
+ else rs.prev.sendMessage (new RouteNotFound (rs.id));
+ return;
+ }
+ log ("forwarding " + rs + " to " + next.address);
+ next.sendMessage (new Request (rs.id, rs.key));
+ rs.nexts.remove (next);
+ outstandingRequests.put (rs.id, rs);
+ }
+
private void log (String message)
{
Event.log (net.address + " " + message);
}
// Event callback
- private void sendMessages()
+ private void generateRequest()
{
- // Send a message to each peer
- for (Peer p : peers.values()) {
- int size = (int) (Math.random() * 2500);
- Message m = new Message (messagesSent, size);
- log ("sending message " + m.seq + ", " + m.size + "
bytes");
- p.sendMessage (m);
- }
- // Send a total of 1000 messages to each peer
- messagesSent++;
- if (messagesSent < 1000)
- Event.schedule (this, 0.1, SEND_MESSAGES, null);
+ if (requestsGenerated++ > 1000) return;
+ // Send a request to a random location
+ Request r = new Request (0.1);
+ log ("generating request " + r.id);
+ handleRequest (r, null);
+ // Schedule the next request
+ // Event.schedule (this, Math.random(), GENERATE_REQUEST, null);
}
// EventTarget interface
public void handleEvent (int type, Object data)
{
- if (type == SEND_MESSAGES) sendMessages();
+ if (type == GENERATE_REQUEST) generateRequest();
}
// Each EventTarget class has its own event codes
- public final static int SEND_MESSAGES = 1;
+ public final static int GENERATE_REQUEST = 1;
}
Modified: trunk/apps/load-balancing-sims/phase5/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Packet.java 2006-07-26 20:06:51 UTC
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Packet.java 2006-07-26 20:14:47 UTC
(rev 9782)
@@ -1,11 +1,5 @@
-// A low-level packet containing one or more complete or incomplete messages
+// A low-level packet (as opposed to a high-level message)
-// In real life the payload would be an array of bytes, but in the sim the
-// payload is represented by an ArrayList of Messages. Large messages can be
-// split across more than one packet, in which case the message only appears
-// in the payload of the *last* packet. This means it's possible for a full
-// packet to have an apparently empty payload.
-
import java.util.ArrayList;
abstract class Packet
@@ -14,7 +8,6 @@
public final static int MAX_PAYLOAD = 1400;
public int src, dest; // Network addresses
- public int type; // Data, ack, etc
public int size; // Packet size in bytes, including headers
public int seq; // Sequence number or explicit ack
public double latency; // Link latency (stored here for convenience)
@@ -22,14 +15,27 @@
class DataPacket extends Packet
{
- public ArrayList messages; // Payload
+ public ArrayList<Message> messages = null; // Payload
public double sent; // Time at which the packet was (re)transmitted
public DataPacket (int dataSize)
{
size = dataSize + HEADER_SIZE;
- messages = new ArrayList();
}
+
+ /*
+ In real life the payload would be an array of bytes, but here the
+ payload is represented by an ArrayList of Messages. A large message can
+ be split across more than one packet, in which case the message only
+ appears in the payload of the *last* packet. This means it's possible
+ for a full packet to have an apparently empty payload.
+ */
+
+ public void addMessage (Message m)
+ {
+ if (messages == null) messages = new ArrayList<Message>();
+ messages.add (m);
+ }
}
class Ack extends Packet
Modified: trunk/apps/load-balancing-sims/phase5/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Peer.java 2006-07-26 20:06:51 UTC
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Peer.java 2006-07-26 20:14:47 UTC
(rev 9782)
@@ -4,10 +4,14 @@
class Peer implements EventTarget
{
+ private Node node; // The local node
public int address; // The remote node's address
- private double latency; // Latency of the connection in seconds
- private Node owner; // The local node
+ public double location; // The remote node's routing location
+ private double latency; // The latency of the connection in seconds
+ // Nagle's algorithm
+ public final static int SENSIBLE_PAYLOAD = 1000; // Minimum packet size
+
// Retransmission parameters
public final static double TIMER = 0.5; // Coarse-grained timer, seconds
public final static double RTO = 4.0; // Retransmission timeout in RTTs
@@ -28,48 +32,40 @@
private boolean slowStart = true; // Are we in the slow start phase?
private double rtt = 3.0; // Estimated round-trip time in seconds
private double lastTransmission = 0.0; // Clock time
- private double lastCongestionDecrease = 0.0; // Clock time
private boolean timerRunning = false; // Is the retx timer running?
private int inflight = 0; // Bytes sent but not acked
private int txSeq = 0; // Sequence number of next outgoing packet
private LinkedList<DataPacket> txBuffer; // Retransmission buffer
private LinkedList<Message> txQueue; // Messages waiting to be sent
private int txQueueSize = 0; // Size of transmission queue in bytes
- private int txRemaining = 0; // Bytes of current message unsent
+ private int txHeadSize = 0; // Size of first message in transmission q
// Receiver state
private int rxSeq = 0; // Sequence number of next in-order packet
private LinkedList<DataPacket> rxBuffer; // Reassembly buffer
private int rxBufferSize = 0; // Size of reassembly buffer in bytes
- private LinkedList<Message> rxQueue; // Messages waiting to be collected
- public Peer (int address, double latency, Node owner)
+ public Peer (Node node, int address, double location, double latency)
{
+ this.node = node;
this.address = address;
+ this.location = location;
this.latency = latency;
- this.owner = owner;
txBuffer = new LinkedList<DataPacket>();
txQueue = new LinkedList<Message>();
rxBuffer = new LinkedList<DataPacket>();
- rxQueue = new LinkedList<Message>();
}
- // Returns the first message in the queue or null if the queue is empty
- public Message receiveMessage()
- {
- try { return rxQueue.removeFirst(); }
- catch (NoSuchElementException nse) { return null; }
- }
-
// Queue a message for transmission
public void sendMessage (Message m)
{
+ log (m + " added to transmission queue");
// Warning: until token-passing is implemented the length of
// the transmission queue is unlimited
- if (txQueue.isEmpty()) txRemaining = m.size;
+ if (txQueue.isEmpty()) txHeadSize = m.size;
txQueue.add (m);
txQueueSize += m.size;
- log (txQueue.size() + " messages waiting to be sent");
+ log (txQueue.size() + " messages in transmission queue");
// Send as many packets as possible
while (send());
}
@@ -82,7 +78,7 @@
return false;
}
- if (inflight == cwind) {
+ if (cwind - inflight <= Packet.HEADER_SIZE) {
log ("no room in congestion window");
return false;
}
@@ -97,42 +93,39 @@
lastTransmission = now;
// Work out how large a packet we can send
- int size = Packet.MAX_PAYLOAD;
- if (size > txQueueSize) size = txQueueSize;
- if (size > cwind - inflight) size = (int) cwind - inflight;
+ int payload = Packet.MAX_PAYLOAD;
+ if (payload > txQueueSize) payload = txQueueSize;
+ if (payload > cwind - inflight - Packet.HEADER_SIZE)
+ payload = (int) cwind - inflight - Packet.HEADER_SIZE;
// Nagle's algorithm - try to coalesce small packets
- if (size < Packet.MAX_PAYLOAD && inflight > 0) {
- log ("delaying transmission of " + size + " bytes");
+ if (payload < SENSIBLE_PAYLOAD && inflight > 0) {
+ log ("delaying transmission of " + payload + " bytes");
return false;
}
- DataPacket p = new DataPacket (size);
// Put as many messages as possible in the packet
- while (txRemaining <= size) {
+ DataPacket p = new DataPacket (payload);
+ while (payload >= txHeadSize) {
try {
Message m = txQueue.removeFirst();
- p.messages.add (m);
- size -= txRemaining;
- txQueueSize -= txRemaining;
+ p.addMessage (m);
+ payload -= txHeadSize;
+ txQueueSize -= txHeadSize;
// Move on to the next message
- txRemaining = txQueue.getFirst().size;
+ txHeadSize = txQueue.getFirst().size;
}
catch (NoSuchElementException nse) {
// No more messages in the txQueue
- txRemaining = 0;
+ txHeadSize = 0;
break;
}
}
- // Fill the rest of the packet with part of the current message
- if (txRemaining > 0) {
- txRemaining -= size;
- txQueueSize -= size;
- }
+
// Send the packet
p.seq = txSeq++;
log ("sending packet " + p.seq + ", " + p.size + " bytes");
- owner.net.send (p, address, latency);
+ node.net.send (p, address, latency);
// Buffer the packet for retransmission
p.sent = now;
inflight += p.size;
@@ -140,7 +133,7 @@
txBuffer.add (p);
// Start the coarse-grained retransmission timer if necessary
if (!timerRunning) {
- log ("starting timer");
+ log ("starting retransmission timer");
Event.schedule (this, TIMER, CHECK_TIMEOUTS, null);
timerRunning = true;
}
@@ -151,7 +144,7 @@
{
Ack a = new Ack (seq);
log ("sending ack " + seq);
- owner.net.send (a, address, latency);
+ node.net.send (a, address, latency);
}
// Called by Node when a packet arrives
@@ -167,8 +160,8 @@
// Is this the packet we've been waiting for?
if (p.seq == rxSeq) {
log ("packet in order");
+ unpack (p);
rxSeq++;
- rxQueue.addAll (p.messages);
// Reassemble contiguous packets
Iterator<DataPacket> i = rxBuffer.iterator();
while (i.hasNext()) {
@@ -177,15 +170,13 @@
log ("adding packet " + q.seq);
i.remove();
rxBufferSize -= q.size;
- rxQueue.addAll (p.messages);
+ unpack (q);
rxSeq++;
}
else break;
}
log (rxBufferSize + " bytes buffered");
log ("expecting packet " + rxSeq);
- // Tell the node there are messages to be collected
- owner.messagesWaiting (this);
}
else if (p.seq > rxSeq) {
log ("packet out of order, expected " + rxSeq);
@@ -210,14 +201,8 @@
rxBuffer.add (index, p);
rxBufferSize += p.size;
log (rxBufferSize + " bytes buffered");
- // DEBUG
- if (!rxBuffer.isEmpty()) {
- for (Packet z : rxBuffer)
- System.out.print (z.seq + " ");
- System.out.println();
- }
}
- else log ("duplicate packet " + p.seq);
+ else log ("duplicate packet " + p.seq); // p.seq < rxSeq
sendAck (p.seq); // Ack may have been lost
}
@@ -225,7 +210,6 @@
{
log ("received ack " + a.seq);
double now = Event.time();
- boolean windowIncreased = false;
Iterator<DataPacket> i = txBuffer.iterator();
while (i.hasNext()) {
DataPacket p = i.next();
@@ -245,7 +229,6 @@
rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY);
log ("round-trip time " + age);
log ("average round-trip time " + rtt);
- windowIncreased = true;
break;
}
// Fast retransmission
@@ -253,18 +236,16 @@
p.sent = now;
log ("fast retransmitting packet " + p.seq);
log (inflight + " bytes in flight");
- owner.net.send (p, address, latency);
+ node.net.send (p, address, latency);
decreaseCongestionWindow (now);
}
}
- if (windowIncreased) while (send());
+ // Send as many packets as possible
+ while (send());
}
private void decreaseCongestionWindow (double now)
{
- // The congestion window should only be decreased once per RTT
- if (now - lastCongestionDecrease < rtt) return;
- lastCongestionDecrease = now;
cwind *= BETA;
if (cwind < MIN_CWIND) cwind = MIN_CWIND;
log ("congestion window decreased to " + cwind);
@@ -275,9 +256,16 @@
}
}
+ // Remove messages from a packet and deliver them to the node
+ private void unpack (DataPacket p)
+ {
+ if (p.messages == null) return;
+ for (Message m : p.messages) node.handleMessage (m, this);
+ }
+
private void log (String message)
{
- Event.log (owner.net.address + ":" + address + " " + message);
+ // Event.log (node.net.address + ":" + address + " " + message);
}
// Event callback
@@ -286,18 +274,19 @@
log ("checking timeouts");
// If there are no packets in flight, stop the timer
if (txBuffer.isEmpty()) {
- log ("stopping timer");
+ log ("stopping retransmission timer");
timerRunning = false;
return;
}
double now = Event.time();
for (DataPacket p : txBuffer) {
- // Slow retransmission
if (now - p.sent > RTO * rtt) {
+ // Retransmission timeout
p.sent = now;
log ("retransmitting packet " + p.seq);
log (inflight + " bytes in flight");
- owner.net.send (p, address, latency);
+ node.net.send (p, address, latency);
+ // Note: TCP would return to slow start
decreaseCongestionWindow (now);
}
}
Added: trunk/apps/load-balancing-sims/phase5/Request.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Request.java 2006-07-26 20:06:51 UTC
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Request.java 2006-07-26 20:14:47 UTC
(rev 9782)
@@ -0,0 +1,28 @@
+class Request extends Message
+{
+ private static int nextId = 0;
+
+ public final int id; // The unique ID of the request
+ public final double key; // The requested key (as a routing location)
+
+ // Start a new request
+ public Request (double key)
+ {
+ id = nextId++;
+ this.key = key;
+ size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+ }
+
+ // Forward a request
+ public Request (int id, double key)
+ {
+ this.id = id;
+ this.key = key;
+ size = Message.HEADER_SIZE + Message.ID_SIZE + Message.KEY_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("request (" + id + "," + key + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5/RequestState.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/RequestState.java 2006-07-26
20:06:51 UTC (rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/RequestState.java 2006-07-26
20:14:47 UTC (rev 9782)
@@ -0,0 +1,41 @@
+// The state of an outstanding request, stored at each node along the path
+
+import java.util.HashSet;
+import java.util.Collection;
+
+class RequestState
+{
+ public final int id; // The unique ID of the request
+ public final double key; // The requested key (as a routing location)
+ public final Peer prev; // The previous hop of the request
+ public final HashSet<Peer> nexts; // Possible next hops
+
+ public RequestState (Request r, Peer prev, Collection<Peer> peers)
+ {
+ id = r.id;
+ key = r.key;
+ this.prev = prev;
+ nexts = new HashSet<Peer> (peers);
+ if (prev != null) nexts.remove (prev);
+ }
+
+ // Returns the closest peer to the requested key
+ public Peer closestPeer()
+ {
+ double bestDist = Double.POSITIVE_INFINITY;
+ Peer bestPeer = null;
+ for (Peer peer : nexts) {
+ double dist = Node.distance (key, peer.location);
+ if (dist < bestDist) {
+ bestDist = dist;
+ bestPeer = peer;
+ }
+ }
+ return bestPeer; // Null if list was empty
+ }
+
+ public String toString()
+ {
+ return new String ("request (" + id + "," + key + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Response.java 2006-07-26 20:06:51 UTC
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Response.java 2006-07-26 20:14:47 UTC
(rev 9782)
@@ -0,0 +1,18 @@
+class Response extends Message
+{
+ public final int id; // The unique ID of the request
+ public final double key; // The requested key (as a routing location)
+
+ public Response (int id, double key)
+ {
+ this.id = id;
+ this.key = key;
+ size = Message.HEADER_SIZE + Message.ID_SIZE +
+ Message.KEY_SIZE + Message.DATA_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("response (" + id + "," + key + ")");
+ }
+}
Added: trunk/apps/load-balancing-sims/phase5/RouteNotFound.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/RouteNotFound.java 2006-07-26
20:06:51 UTC (rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/RouteNotFound.java 2006-07-26
20:14:47 UTC (rev 9782)
@@ -0,0 +1,18 @@
+// Note: for the purposes of this simulation, RejectedLoop and RouteNotFound
+// are equivalent
+
+class RouteNotFound extends Message
+{
+ public final int id; // The unique ID of the request
+
+ public RouteNotFound (int id)
+ {
+ this.id = id;
+ size = Message.HEADER_SIZE + Message.ID_SIZE;
+ }
+
+ public String toString()
+ {
+ return new String ("route not found (" + id + ")");
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase5/Sim.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5/Sim.java 2006-07-26 20:06:51 UTC
(rev 9781)
+++ trunk/apps/load-balancing-sims/phase5/Sim.java 2006-07-26 20:14:47 UTC
(rev 9782)
@@ -1,6 +1,6 @@
// Interesting parameters to play with: txSpeed and rxSpeed, retransmission
-// timeout, window size, AIMD increase and decrease (Peer.java), queue size
-// (NetworkInterface.java), packet size (Node.java).
+// timeout, window sizes, AIMD increase and decrease (Peer.java), queue sizes
+// (NetworkInterface.java), packet size (Packet.java).
class Sim
{
@@ -15,18 +15,15 @@
Node n0 = new Node (txSpeed, rxSpeed);
Node n1 = new Node (txSpeed, rxSpeed);
- // Node n2 = new Node (txSpeed, rxSpeed);
+ Node n2 = new Node (txSpeed, rxSpeed);
+ Node n3 = new Node (txSpeed, rxSpeed);
- n0.connect (n1, 0.1);
- // n0.connect (n2, 0.1);
- n1.connect (n0, 0.1);
- // n1.connect (n2, 0.1);
- // n2.connect (n0, 0.1);
- // n2.connect (n1, 0.1);
+ n0.connectBothWays (n1, 0.1);
+ n1.connectBothWays (n2, 0.1);
+ n1.connectBothWays (n3, 0.1);
+ n2.connectBothWays (n3, 0.1);
- Event.schedule (n0, Math.random(), Node.SEND_MESSAGES, null);
- // Event.schedule (n1, Math.random(), Node.SEND_MESSAGES, null);
- // Event.schedule (n2, Math.random(), Node.SEND_MESSAGES, null);
+ Event.schedule (n0, 0.0, Node.GENERATE_REQUEST, null);
// Run the simulation
Event.run();