Author: mrogers
Date: 2006-08-05 13:49:40 +0000 (Sat, 05 Aug 2006)
New Revision: 9901
Modified:
trunk/apps/load-balancing-sims/phase5-out-of-order/Packet.java
trunk/apps/load-balancing-sims/phase5-out-of-order/Peer.java
Log:
Piggyback data on acks, use Nagle to delay data but not acks
Modified: trunk/apps/load-balancing-sims/phase5-out-of-order/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-out-of-order/Packet.java
2006-08-05 12:48:35 UTC (rev 9900)
+++ trunk/apps/load-balancing-sims/phase5-out-of-order/Packet.java
2006-08-05 13:49:40 UTC (rev 9901)
@@ -2,26 +2,27 @@
import java.util.ArrayList;
-abstract class Packet
+class Packet
{
public final static int HEADER_SIZE = 80; // Including IP & UDP headers
- public final static int MAX_PAYLOAD = 1400;
+ public final static int ACK_SIZE = 4; // Size of a sequence num in bytes
+ public final static int MAX_SIZE = 1450; // MTU including headers
public final static int SENSIBLE_PAYLOAD = 1000; // Nagle's algorithm
public int src, dest; // Network addresses
- public int size; // Packet size in bytes, including headers
+ public int size = HEADER_SIZE; // Size in bytes, including headers
+ public int seq = -1; // Data sequence number (-1 if no data)
+ public ArrayList<Integer> acks = null; // Sequence numbers of acked pkts
+ public ArrayList<Message> messages = null; // Payload
+
+ public double sent; // Time at which the packet was (re) transmitted
public double latency; // Link latency (stored here for convenience)
-}
-
-class DataPacket extends Packet
-{
- public int seq; // Sequence number
- public ArrayList<Message> messages = null; // Payload
- public double sent; // Time at which the packet was (re)transmitted
- public DataPacket (int dataSize)
+ public void addAck (Integer seq)
{
- size = dataSize + HEADER_SIZE;
+ if (acks == null) acks = new ArrayList<Integer>();
+ acks.add (seq);
+ size += ACK_SIZE;
}
// In real life the payload would be an array of bytes
@@ -29,16 +30,6 @@
{
if (messages == null) messages = new ArrayList<Message>();
messages.add (m);
+ size += m.size;
}
}
-
-class Ack extends Packet
-{
- public int ack; // Explicit ack of a DataPacket's sequence number
-
- public Ack (int ack)
- {
- size = HEADER_SIZE;
- this.ack = ack;
- }
-}
Modified: trunk/apps/load-balancing-sims/phase5-out-of-order/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-out-of-order/Peer.java
2006-08-05 12:48:35 UTC (rev 9900)
+++ trunk/apps/load-balancing-sims/phase5-out-of-order/Peer.java
2006-08-05 13:49:40 UTC (rev 9901)
@@ -33,9 +33,11 @@
private int inflight = 0; // Bytes sent but not acked
private int txSeq = 0; // Sequence number of next outgoing packet
private int txMaxSeq = MAX_INFLIGHT - 1; // Highest sequence number
- private LinkedList<DataPacket> txBuffer; // Retransmission buffer
- private LinkedList<Message> txQueue; // Messages waiting to be sent
- private int txQueueSize = 0; // Size of transmission queue in bytes
+ private LinkedList<Packet> txBuffer; // Retransmission buffer
+ private LinkedList<Message> msgQueue; // Messages waiting to be sent
+ private int msgQueueSize = 0; // Size of message queue in bytes
+ private LinkedList<Integer> ackQueue; // Acks waiting to be sent
+ private int ackQueueSize = 0; // Size of ack queue in bytes
// Receiver state
private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -47,9 +49,10 @@
this.address = address;
this.location = location;
this.latency = latency;
- txBuffer = new LinkedList<DataPacket>();
- txQueue = new LinkedList<Message>();
- rxDupe = new HashSet<Integer> (MAX_INFLIGHT);
+ txBuffer = new LinkedList<Packet>();
+ msgQueue = new LinkedList<Message>();
+ ackQueue = new LinkedList<Integer>();
+ rxDupe = new HashSet<Integer>();
}
// Queue a message for transmission
@@ -58,27 +61,16 @@
log (m + " added to transmission queue");
// Warning: until token-passing is implemented the length of
// the transmission queue is unlimited
- txQueue.add (m);
- txQueueSize += m.size;
- log (txQueue.size() + " messages in transmission queue");
- // Send as many packets as possible
- while (send());
+ msgQueue.add (m);
+ msgQueueSize += m.size;
+ log (msgQueue.size() + " messages in transmission queue");
+ // Send as many packets as possible, using Nagle
+ while (send (true));
}
// Try to send a packet, return true if a packet was sent
- private boolean send()
+ private boolean send (boolean nagle)
{
- if (txQueueSize == 0) {
- log ("no messages to send");
- return false;
- }
-
- // Don't send packet n+MAX_INFLIGHT until packet n is acked
- if (txSeq > txMaxSeq) {
- log ("waiting for ack " + (txMaxSeq - MAX_INFLIGHT +1));
- return false;
- }
-
// Return to slow start when the link is idle
double now = Event.time();
if (now - lastTransmission > RTO * rtt) {
@@ -88,70 +80,87 @@
}
lastTransmission = now;
- if (cwind - inflight <= Packet.HEADER_SIZE) {
- log ("no room in congestion window");
+ if (ackQueueSize == 0 && msgQueueSize == 0) {
+ log ("no messages or acks to send");
return false;
}
+ Packet p = new Packet();
+
+ int window = (int) cwind - inflight - p.size - ackQueueSize;
+ if (window <= 0) log ("no room in congestion window");
+
// Work out how large a packet we can send
- int payload = Packet.MAX_PAYLOAD;
- if (payload > txQueueSize) payload = txQueueSize;
- if (payload > cwind - inflight - Packet.HEADER_SIZE)
- payload = (int) cwind - inflight - Packet.HEADER_SIZE;
+ int payload = Packet.MAX_SIZE - p.size - ackQueueSize;
+ if (payload > msgQueueSize) payload = msgQueueSize;
+ if (payload > window) payload = window;
// Nagle's algorithm - try to coalesce small packets
- if (payload < Packet.SENSIBLE_PAYLOAD && inflight > 0) {
+ if (nagle && payload < Packet.SENSIBLE_PAYLOAD && inflight >0) {
log ("delaying transmission of " + payload + " bytes");
return false;
}
- // Put as many messages as possible in the packet
- DataPacket p = new DataPacket (payload);
- Iterator<Message> i = txQueue.iterator();
- while (i.hasNext()) {
- Message m = i.next();
- if (m.size > payload) break;
- i.remove();
- txQueueSize -= m.size;
- p.addMessage (m);
- payload -= m.size;
+ // Put all waiting acks in the packet
+ for (Integer seq : ackQueue) p.addAck (seq);
+ ackQueue.clear();
+ ackQueueSize = 0;
+
+ // Don't put data in packet n+MAX_INFLIGHT unless packet n has
+ // been acked - this stalls the connection if an attacker
+ // repeatedly deletes the same packet, allowing the attack to
+ // be detected. We must still be allowed to send acks,
+ // otherwise the connection could deadlock.
+
+ if (txSeq > txMaxSeq)
+ log ("waiting for ack " + (txMaxSeq - MAX_INFLIGHT +1));
+ else {
+ // Put as many messages as possible in the packet
+ Iterator<Message> i = msgQueue.iterator();
+ while (i.hasNext()) {
+ Message m = i.next();
+ if (p.size + m.size > Packet.MAX_SIZE) break;
+ i.remove();
+ msgQueueSize -= m.size;
+ p.addMessage (m);
+ }
}
// Don't send empty packets
- if (p.messages == null) {
- log ("message too large for congestion window");
- return false;
+ if (p.acks == null && p.messages == null) return false;
+
+ // If the packet contains data, buffer it for retransmission
+ if (p.messages != null) {
+ p.seq = txSeq++;
+ p.sent = now;
+ inflight += p.size; // Acks aren't congestion-controlled
+ log (inflight + " bytes in flight");
+ txBuffer.add (p);
+ // Start the node's retransmission timer if necessary
+ node.startTimer();
}
// Send the packet
- p.seq = txSeq++;
log ("sending packet " + p.seq + ", " + p.size + " bytes");
node.net.send (p, address, latency);
- // Buffer the packet for retransmission
- p.sent = now;
- inflight += p.size;
- log (inflight + " bytes in flight");
- txBuffer.add (p);
- // Start the node's retransmission timer if necessary
- node.startTimer();
return true;
}
private void sendAck (int seq)
{
- Ack a = new Ack (seq);
- log ("sending ack " + seq);
- node.net.send (a, address, latency);
+ ackQueue.add (seq);
+ ackQueueSize += Packet.ACK_SIZE;
+ send (false); // Send a packet immediately, without using Nagle
}
// Called by Node when a packet arrives
public void handlePacket (Packet p)
{
- if (p instanceof DataPacket) handleData ((DataPacket) p);
- else if (p instanceof Ack) handleAck ((Ack) p);
+ if (p.messages != null) handleData (p);
+ if (p.acks != null) for (int seq : p.acks) handleAck (seq);
}
- private void handleData (DataPacket p)
+ private void handleData (Packet p)
{
log ("received packet " + p.seq + ", " + p.size + " bytes");
if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
@@ -178,16 +187,16 @@
else log ("warning: received " + p.seq + " before " + rxSeq);
}
- private void handleAck (Ack a)
+ private void handleAck (int seq)
{
- log ("received ack " + a.ack);
+ log ("received ack " + seq);
double now = Event.time();
- Iterator<DataPacket> i = txBuffer.iterator();
+ Iterator<Packet> i = txBuffer.iterator();
while (i.hasNext()) {
- DataPacket p = i.next();
+ Packet p = i.next();
double age = now - p.sent;
// Explicit ack
- if (p.seq == a.ack) {
+ if (p.seq == seq) {
log ("packet " + p.seq + " acknowledged");
i.remove();
inflight -= p.size;
@@ -204,7 +213,7 @@
break;
}
// Fast retransmission
- if (p.seq < a.ack && age > FRTO * rtt) {
+ if (p.seq < seq && age > FRTO * rtt) {
p.sent = now;
log ("fast retransmitting packet " + p.seq);
log (inflight + " bytes in flight");
@@ -216,8 +225,8 @@
if (txBuffer.isEmpty()) txMaxSeq = txSeq + MAX_INFLIGHT - 1;
else txMaxSeq = txBuffer.peek().seq + MAX_INFLIGHT - 1;
log ("maximum sequence number " + txMaxSeq);
- // Send as many packets as possible
- while (send());
+ // Send as many packets as possible, using Nagle
+ while (send (true));
}
private void decreaseCongestionWindow (double now)
@@ -234,7 +243,7 @@
}
// Remove messages from a packet and deliver them to the node
- private void unpack (DataPacket p)
+ private void unpack (Packet p)
{
if (p.messages == null) return;
for (Message m : p.messages) node.handleMessage (m, this);
@@ -254,7 +263,7 @@
return false;
}
double now = Event.time();
- for (DataPacket p : txBuffer) {
+ for (Packet p : txBuffer) {
if (now - p.sent > RTO * rtt) {
// Retransmission timeout
p.sent = now;