Author: mrogers
Date: 2006-08-21 14:48:20 +0000 (Mon, 21 Aug 2006)
New Revision: 10224
Added:
trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
trunk/apps/load-balancing-sims/phase6/messages/Ack.java
trunk/apps/load-balancing-sims/phase6/messages/Block.java
Removed:
trunk/apps/load-balancing-sims/phase6/Deadline.java
Modified:
trunk/apps/load-balancing-sims/phase6/Packet.java
trunk/apps/load-balancing-sims/phase6/Peer.java
trunk/apps/load-balancing-sims/phase6/messages/Message.java
trunk/apps/load-balancing-sims/phase6/messages/Response.java
Log:
Interleave searches and transfers
Deleted: trunk/apps/load-balancing-sims/phase6/Deadline.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Deadline.java 2006-08-21 12:43:22 UTC
(rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/Deadline.java 2006-08-21 14:48:20 UTC
(rev 10224)
@@ -1,13 +0,0 @@
-// A queued item and the time at which it must be sent
-
-class Deadline<Item>
-{
- public final Item item;
- public final double deadline;
-
- public Deadline (Item item, double deadline)
- {
- this.item = item;
- this.deadline = deadline;
- }
-}
Added: trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java 2006-08-21
12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java 2006-08-21
14:48:20 UTC (rev 10224)
@@ -0,0 +1,40 @@
+// A queue storing outgoing messages (including acks and transfers) and their
+// coalescing deadlines
+
+import java.util.LinkedList;
+import messages.Message;
+
+class DeadlineQueue<T extends Message>
+{
+ public int size = 0; // Size in bytes
+ private LinkedList<T> messages = new LinkedList<T>();
+ private LinkedList<Double> deadlines = new LinkedList<Double>();
+
+ public void add (T message, double deadline)
+ {
+ size += message.size;
+ messages.add (message);
+ deadlines.add (deadline);
+ }
+
+ public int headSize()
+ {
+ if (messages.isEmpty()) return 0;
+ else return messages.peek().size;
+ }
+
+ public double deadline()
+ {
+ Double d = deadlines.peek();
+ if (d == null) return Double.POSITIVE_INFINITY;
+ else return d;
+ }
+
+ public T pop()
+ {
+ deadlines.poll();
+ T message = messages.poll();
+ size -= message.size;
+ return message;
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase6/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Packet.java 2006-08-21 12:43:22 UTC
(rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/Packet.java 2006-08-21 14:48:20 UTC
(rev 10224)
@@ -2,31 +2,30 @@
import java.util.ArrayList;
import messages.Message;
+import messages.Ack;
class Packet
{
public final static int HEADER_SIZE = 80; // Including IP & UDP headers
- public final static int ACK_SIZE = 4; // Size of an ack in bytes
public final static int MAX_SIZE = 1450; // MTU including headers
public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing
public int src, dest; // Network addresses
public int size = HEADER_SIZE; // Size in bytes, including headers
public int seq = -1; // Data sequence number (-1 if no data)
- public ArrayList<Integer> acks = null;
+ public ArrayList<Ack> acks = null;
public ArrayList<Message> messages = null;
public double sent; // Time at which the packet was (re) transmitted
public double latency; // Link latency (stored here for convenience)
- public void addAck (int ack)
+ public void addAck (Ack a)
{
- if (acks == null) acks = new ArrayList<Integer>();
- acks.add (ack);
- size += ACK_SIZE;
+ if (acks == null) acks = new ArrayList<Ack>();
+ acks.add (a);
+ size += a.size;
}
- // In real life the payload would be an array of bytes
public void addMessage (Message m)
{
if (messages == null) messages = new ArrayList<Message>();
Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-21 12:43:22 UTC
(rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-21 14:48:20 UTC
(rev 10224)
@@ -1,7 +1,9 @@
import java.util.LinkedList;
import java.util.Iterator;
import java.util.HashSet;
+import messages.Ack;
import messages.Message;
+import messages.Block;
class Peer
{
@@ -14,7 +16,7 @@
public final static double RTO = 4.0; // Retransmission timeout in RTTs
public final static double FRTO = 1.5; // Fast retx timeout in RTTs
public final static double RTT_DECAY = 0.9; // Exp moving average
- public final static double LINK_IDLE = 5.0; // RTTs without a packet
+ public final static double LINK_IDLE = 8.0; // RTTs without transmitting
// Coalescing
public final static double MAX_DELAY = 0.1; // Coalescing delay, seconds
@@ -27,10 +29,10 @@
private int txSeq = 0; // Sequence number of next outgoing data packet
private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
private LinkedList<Packet> txBuffer; // Retransmission buffer
- private LinkedList<Deadline<Message>> msgQueue; // Outgoing messages
- private int msgQueueSize = 0; // Size of message queue in bytes
- private LinkedList<Deadline<Integer>> ackQueue; // Outgoing acks
- private int ackQueueSize = 0; // Size of ack queue in bytes
+ private DeadlineQueue<Ack> ackQueue; // Outgoing acks
+ private DeadlineQueue<Message> searchQueue; // Outgoing search messages
+ private DeadlineQueue<Block> transferQueue; // Outgoing transfers
+ private boolean tgif = false; // "Transfers go in first" toggle
private CongestionWindow window; // AIMD congestion window
private double lastTransmission = 0.0; // Clock time
@@ -45,36 +47,56 @@
this.location = location;
this.latency = latency;
txBuffer = new LinkedList<Packet>();
- msgQueue = new LinkedList<Deadline<Message>>();
- ackQueue = new LinkedList<Deadline<Integer>>();
+ ackQueue = new DeadlineQueue<Ack>();
+ searchQueue = new DeadlineQueue<Message>();
+ transferQueue = new DeadlineQueue<Block>();
window = new CongestionWindow (this);
rxDupe = new HashSet<Integer>();
}
- // Queue a message for transmission
+ // Queue a search message for transmission
public void sendMessage (Message m)
{
- log (m + " added to message queue");
- // Warning: until token-passing is implemented the length of
- // the message queue is unlimited
- double now = Event.time();
- msgQueue.add (new Deadline<Message> (m, now + MAX_DELAY));
- msgQueueSize += m.size;
+ log (m + " added to search queue");
+ searchQueue.add (m, Event.time() + MAX_DELAY);
// Start the node's timer if necessary
node.startTimer();
// Send as many packets as possible
while (send());
}
+ // Queue a transfer block for transmission
+ public void sendBlock (Block b)
+ {
+ log (b + " added to transfer queue");
+ transferQueue.add (b, Event.time() + MAX_DELAY);
+ // Start the node's timer if necessary
+ node.startTimer();
+ // Send as many packets as possible
+ while (send());
+ }
+
+ // Queue an ack for transmission
+ private void sendAck (int seq)
+ {
+ log ("ack " + seq + " added to ack queue");
+ ackQueue.add (new Ack (seq), Event.time() + MAX_DELAY);
+ // Start the node's timer if necessary
+ node.startTimer();
+ // Send as many packets as possible
+ while (send());
+ }
+
// Try to send a packet, return true if a packet was sent
private boolean send()
{
- if (ackQueueSize == 0 && msgQueueSize == 0) {
- log ("no messages or acks to send");
+ if (ackQueue.size + searchQueue.size + transferQueue.size ==0) {
+ log ("nothing to send");
return false;
}
- log (ackQueue.size() + " acks in queue");
- log (msgQueue.size() + " messages in queue");
+ log (ackQueue.size + " bytes of acks in queue");
+ log (searchQueue.size + " bytes of searches in queue");
+ log (transferQueue.size + " bytes of transfers in queue");
// Return to slow start when the link is idle
double now = Event.time();
@@ -82,9 +104,10 @@
lastTransmission = now;
// Work out how large a packet we can send
- int headersAndAcks = Packet.HEADER_SIZE + ackQueueSize;
+ int headersAndAcks = Packet.HEADER_SIZE + ackQueue.size;
int payload = Packet.MAX_SIZE - headersAndAcks;
- if (payload > msgQueueSize) payload = msgQueueSize;
+ if (payload > searchQueue.size + transferQueue.size)
+ payload = searchQueue.size + transferQueue.size;
int win = window.available() - headersAndAcks;
if (payload > win) payload = win;
int bw = node.bandwidth.available() - headersAndAcks;
@@ -99,9 +122,7 @@
Packet p = new Packet();
// Put all waiting acks in the packet
- for (Deadline<Integer> a : ackQueue) p.addAck (a.item);
- ackQueue.clear();
- ackQueueSize = 0;
+ while (ackQueue.size > 0) p.addAck (ackQueue.pop());
// Don't send sequence number n+SEQ_RANGE until sequence
// number n has been acked - this limits the number of
@@ -130,23 +151,11 @@
return true;
}
- private void sendAck (int seq)
- {
- log ("ack " + seq + " added to ack queue");
- double now = Event.time();
- ackQueue.add (new Deadline<Integer> (seq, now + MAX_DELAY));
- ackQueueSize += Packet.ACK_SIZE;
- // Start the node's timer if necessary
- node.startTimer();
- // Send as many packets as possible
- while (send());
- }
-
// Called by Node when a packet arrives
public void handlePacket (Packet p)
{
if (p.messages != null) handleData (p);
- if (p.acks != null) for (int ack : p.acks) handleAck (ack);
+ if (p.acks != null) for (Ack a : p.acks) handleAck (a.seq);
}
private void handleData (Packet p)
@@ -215,15 +224,39 @@
// Add messages to a packet
private void pack (Packet p, int payload)
{
- Iterator<Deadline<Message>> i = msgQueue.iterator();
- while (i.hasNext()) {
- Message m = i.next().item;
- if (m.size > payload) break;
- i.remove();
- msgQueueSize -= m.size;
- p.addMessage (m);
- payload -= m.size;
+ // Priority alternates between transfers and searches
+ if (tgif) {
+ // Transfers go in first
+ while (transferQueue.size > 0) {
+ if (transferQueue.headSize() > payload) break;
+ Message m = transferQueue.pop();
+ payload -= m.size;
+ p.addMessage (m);
+ }
+ while (searchQueue.size > 0) {
+ if (searchQueue.headSize() > payload) break;
+ Message m = searchQueue.pop();
+ payload -= m.size;
+ p.addMessage (m);
+ }
+ tgif = false;
}
+ else {
+ // Searches go in first
+ while (searchQueue.size > 0) {
+ if (searchQueue.headSize() > payload) break;
+ Message m = searchQueue.pop();
+ payload -= m.size;
+ p.addMessage (m);
+ }
+ while (transferQueue.size > 0) {
+ if (transferQueue.headSize() > payload) break;
+ Message m = transferQueue.pop();
+ payload -= m.size;
+ p.addMessage (m);
+ }
+ tgif = true;
+ }
}
// Remove messages from a packet and deliver them to the node
@@ -243,7 +276,7 @@
double now = Event.time();
if (txBuffer.isEmpty()) {
log ("no packets in flight");
- return deadline (now);
+ return deadline (now); // Sleep until the next deadline
}
for (Packet p : txBuffer) {
if (now - p.sent > RTO * rtt + MAX_DELAY) {
@@ -254,36 +287,43 @@
window.timeout (now);
}
}
+
+ // Sleep for up to MAX_DELAY seconds until the next deadline
return Math.min (now + MAX_DELAY, deadline (now));
}
- // Work out when the first message or ack needs to be sent
+ // Work out when the first ack or search or transfer needs to be sent
private double deadline (double now)
{
- return Math.min (ackDeadline(), msgDeadline (now));
+ return Math.min (ackQueue.deadline(), dataDeadline (now));
}
-
- // Work out when the first ack needs to be sent
- private double ackDeadline()
- {
- Deadline<Integer> firstAck = ackQueue.peek();
- if (firstAck == null) return Double.POSITIVE_INFINITY;
- return firstAck.deadline;
- }
- // Work out when the first message needs to be sent
- private double msgDeadline (double now)
+ // Work out when the first search or transfer needs to be sent
+ private double dataDeadline (double now)
{
- Deadline<Message> firstMsg = msgQueue.peek();
- if (firstMsg == null) return Double.POSITIVE_INFINITY;
- double deadline = firstMsg.deadline;
- if (msgQueueSize < Packet.SENSIBLE_PAYLOAD) return deadline;
+ // If there's no data waiting, wait until the ack deadline
+ if (searchQueue.size + transferQueue.size == 0)
+ return Double.POSITIVE_INFINITY;
+
+ double deadline = Math.min (searchQueue.deadline(),
+ transferQueue.deadline());
+
+ // Delay small packets until the coalescing deadline
+ if (searchQueue.size + transferQueue.size
+ < Packet.SENSIBLE_PAYLOAD)
+ return deadline;
+
+ // If there's not enough room in the window, wait for an ack
if (window.available() < Packet.SENSIBLE_PAYLOAD
+ Packet.HEADER_SIZE)
- return Double.POSITIVE_INFINITY; // Wait for an ack
+ return Double.POSITIVE_INFINITY;
+
+ // If there's not enough bandwidth, try again shortly
if (node.bandwidth.available() < Packet.SENSIBLE_PAYLOAD
+ Packet.HEADER_SIZE)
return Math.max (deadline, now + Node.SHORT_SLEEP);
+
+ // Send a packet immediately
return now;
}
Added: trunk/apps/load-balancing-sims/phase6/messages/Ack.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Ack.java 2006-08-21
12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/messages/Ack.java 2006-08-21
14:48:20 UTC (rev 10224)
@@ -0,0 +1,17 @@
+// Note: acks are not FNP messages, they're only here because it makes the
+// implementation simpler
+
+package messages;
+
+public class Ack extends Message
+{
+ public final static int SIZE = 4; // Bytes
+
+ public final int seq; // Sequence number of the acknowledged packet
+
+ public Ack (int seq)
+ {
+ this.seq = seq;
+ size = SIZE;
+ }
+}
Added: trunk/apps/load-balancing-sims/phase6/messages/Block.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Block.java 2006-08-21
12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/messages/Block.java 2006-08-21
14:48:20 UTC (rev 10224)
@@ -0,0 +1,14 @@
+// A single block of a data transfer (currently 32 blocks per transfer)
+
+package messages;
+
+public class Block extends Message
+{
+ public final static int SIZE = 1024; // Bytes
+
+ // FIXME: placeholder
+ public Block()
+ {
+ size = SIZE;
+ }
+}
Modified: trunk/apps/load-balancing-sims/phase6/messages/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21
12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21
14:48:20 UTC (rev 10224)
@@ -4,10 +4,9 @@
public class Message
{
- public final static int HEADER_SIZE = 30; // Sequence number, MAC, etc
- public final static int ID_SIZE = 16; // Size of unique request ID
+ public final static int HEADER_SIZE = 4; // Message type etc
+ public final static int ID_SIZE = 8; // Size of unique request ID
public final static int KEY_SIZE = 32; // Size of routing key
- public final static int DATA_SIZE = 1024; // Size of data block
public int size; // Size in bytes
}
Modified: trunk/apps/load-balancing-sims/phase6/messages/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Response.java
2006-08-21 12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/messages/Response.java
2006-08-21 14:48:20 UTC (rev 10224)
@@ -10,7 +10,7 @@
this.id = id;
this.key = key;
size = Message.HEADER_SIZE + Message.ID_SIZE +
- Message.KEY_SIZE + Message.DATA_SIZE;
+ Message.KEY_SIZE + Block.SIZE;
}
public String toString()