Author: mrogers
Date: 2006-08-05 13:49:40 +0000 (Sat, 05 Aug 2006)
New Revision: 9901

Modified:
   trunk/apps/load-balancing-sims/phase5-out-of-order/Packet.java
   trunk/apps/load-balancing-sims/phase5-out-of-order/Peer.java
Log:
Piggyback data on acks, use Nagle to delay data but not acks

Modified: trunk/apps/load-balancing-sims/phase5-out-of-order/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-out-of-order/Packet.java      
2006-08-05 12:48:35 UTC (rev 9900)
+++ trunk/apps/load-balancing-sims/phase5-out-of-order/Packet.java      
2006-08-05 13:49:40 UTC (rev 9901)
@@ -2,26 +2,27 @@

 import java.util.ArrayList;

-abstract class Packet
+class Packet
 {
        public final static int HEADER_SIZE = 80; // Including IP & UDP headers
-       public final static int MAX_PAYLOAD = 1400;
+       public final static int ACK_SIZE = 4; // Size of a sequence num in bytes
+       public final static int MAX_SIZE = 1450; // MTU including headers
        public final static int SENSIBLE_PAYLOAD = 1000; // Nagle's algorithm

        public int src, dest; // Network addresses
-       public int size; // Packet size in bytes, including headers
+       public int size = HEADER_SIZE; // Size in bytes, including headers
+       public int seq = -1; // Data sequence number (-1 if no data)
+       public ArrayList<Integer> acks = null; // Sequence numbers of acked pkts
+       public ArrayList<Message> messages = null; // Payload
+       
+       public double sent; // Time at which the packet was (re) transmitted
        public double latency; // Link latency (stored here for convenience)
-}
-
-class DataPacket extends Packet
-{
-       public int seq; // Sequence number
-       public ArrayList<Message> messages = null; // Payload   
-       public double sent; // Time at which the packet was (re)transmitted

-       public DataPacket (int dataSize)
+       public void addAck (Integer seq)
        {
-               size = dataSize + HEADER_SIZE;
+               if (acks == null) acks = new ArrayList<Integer>();
+               acks.add (seq);
+               size += ACK_SIZE;
        }

        // In real life the payload would be an array of bytes
@@ -29,16 +30,6 @@
        {
                if (messages == null) messages = new ArrayList<Message>();
                messages.add (m);
+               size += m.size;
        }
 }
-
-class Ack extends Packet
-{
-       public int ack; // Explicit ack of a DataPacket's sequence number
-       
-       public Ack (int ack)
-       {
-               size = HEADER_SIZE;
-               this.ack = ack;
-       }
-}

Modified: trunk/apps/load-balancing-sims/phase5-out-of-order/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase5-out-of-order/Peer.java        
2006-08-05 12:48:35 UTC (rev 9900)
+++ trunk/apps/load-balancing-sims/phase5-out-of-order/Peer.java        
2006-08-05 13:49:40 UTC (rev 9901)
@@ -33,9 +33,11 @@
        private int inflight = 0; // Bytes sent but not acked
        private int txSeq = 0; // Sequence number of next outgoing packet
        private int txMaxSeq = MAX_INFLIGHT - 1; // Highest sequence number
-       private LinkedList<DataPacket> txBuffer; // Retransmission buffer
-       private LinkedList<Message> txQueue; // Messages waiting to be sent
-       private int txQueueSize = 0; // Size of transmission queue in bytes
+       private LinkedList<Packet> txBuffer; // Retransmission buffer
+       private LinkedList<Message> msgQueue; // Messages waiting to be sent
+       private int msgQueueSize = 0; // Size of message queue in bytes
+       private LinkedList<Integer> ackQueue; // Acks waiting to be sent
+       private int ackQueueSize = 0; // Size of ack queue in bytes

        // Receiver state
        private HashSet<Integer> rxDupe; // Detect duplicates by sequence number
@@ -47,9 +49,10 @@
                this.address = address;
                this.location = location;
                this.latency = latency;
-               txBuffer = new LinkedList<DataPacket>();
-               txQueue = new LinkedList<Message>();
-               rxDupe = new HashSet<Integer> (MAX_INFLIGHT);
+               txBuffer = new LinkedList<Packet>();
+               msgQueue = new LinkedList<Message>();
+               ackQueue = new LinkedList<Integer>();
+               rxDupe = new HashSet<Integer>();
        }

        // Queue a message for transmission
@@ -58,27 +61,16 @@
                log (m + " added to transmission queue");
                // Warning: until token-passing is implemented the length of
                // the transmission queue is unlimited
-               txQueue.add (m);
-               txQueueSize += m.size;
-               log (txQueue.size() + " messages in transmission queue");
-               // Send as many packets as possible
-               while (send());
+               msgQueue.add (m);
+               msgQueueSize += m.size;
+               log (msgQueue.size() + " messages in transmission queue");
+               // Send as many packets as possible, using Nagle
+               while (send (true));
        }

        // Try to send a packet, return true if a packet was sent
-       private boolean send()
+       private boolean send (boolean nagle)
        {
-               if (txQueueSize == 0) {
-                       log ("no messages to send");
-                       return false;
-               }
-               
-               // Don't send packet n+MAX_INFLIGHT until packet n is acked
-               if (txSeq > txMaxSeq) {
-                       log ("waiting for ack " + (txMaxSeq - MAX_INFLIGHT +1));
-                       return false;
-               }
-               
                // Return to slow start when the link is idle
                double now = Event.time();
                if (now - lastTransmission > RTO * rtt) {
@@ -88,70 +80,87 @@
                }
                lastTransmission = now;

-               if (cwind - inflight <= Packet.HEADER_SIZE) {
-                       log ("no room in congestion window");
+               if (ackQueueSize == 0 && msgQueueSize == 0) {
+                       log ("no messages or acks to send");
                        return false;
                }

+               Packet p = new Packet();
+               
+               int window = (int) cwind - inflight - p.size - ackQueueSize;
+               if (window <= 0) log ("no room in congestion window");
+               
                // Work out how large a packet we can send
-               int payload = Packet.MAX_PAYLOAD;
-               if (payload > txQueueSize) payload = txQueueSize;
-               if (payload > cwind - inflight - Packet.HEADER_SIZE)
-                       payload = (int) cwind - inflight - Packet.HEADER_SIZE;
+               int payload = Packet.MAX_SIZE - p.size - ackQueueSize;
+               if (payload > msgQueueSize) payload = msgQueueSize;
+               if (payload > window) payload = window;

                // Nagle's algorithm - try to coalesce small packets
-               if (payload < Packet.SENSIBLE_PAYLOAD && inflight > 0) {
+               if (nagle && payload < Packet.SENSIBLE_PAYLOAD && inflight >0) {
                        log ("delaying transmission of " + payload + " bytes");
                        return false;
                }

-               // Put as many messages as possible in the packet
-               DataPacket p = new DataPacket (payload);
-               Iterator<Message> i = txQueue.iterator();
-               while (i.hasNext()) {
-                       Message m = i.next();
-                       if (m.size > payload) break;
-                       i.remove();
-                       txQueueSize -= m.size;
-                       p.addMessage (m);
-                       payload -= m.size;
+               // Put all waiting acks in the packet
+               for (Integer seq : ackQueue) p.addAck (seq);
+               ackQueue.clear();
+               ackQueueSize = 0;
+               
+               // Don't put data in packet n+MAX_INFLIGHT unless packet n has
+               // been acked - this stalls the connection if an attacker
+               // repeatedly deletes the same packet, allowing the attack to
+               // be detected. We must still be allowed to send acks,
+               // otherwise the connection could deadlock.
+               
+               if (txSeq > txMaxSeq)
+                       log ("waiting for ack " + (txMaxSeq - MAX_INFLIGHT +1));
+               else {
+                       // Put as many messages as possible in the packet
+                       Iterator<Message> i = msgQueue.iterator();
+                       while (i.hasNext()) {
+                               Message m = i.next();
+                               if (p.size + m.size > Packet.MAX_SIZE) break;
+                               i.remove();
+                               msgQueueSize -= m.size;
+                               p.addMessage (m);
+                       }
                }

                // Don't send empty packets
-               if (p.messages == null) {
-                       log ("message too large for congestion window");
-                       return false;
+               if (p.acks == null && p.messages == null) return false;
+               
+               // If the packet contains data, buffer it for retransmission
+               if (p.messages != null) {
+                       p.seq = txSeq++;
+                       p.sent = now;
+                       inflight += p.size; // Acks aren't congestion-controlled
+                       log (inflight + " bytes in flight");
+                       txBuffer.add (p);
+                       // Start the node's retransmission timer if necessary
+                       node.startTimer();
                }

                // Send the packet
-               p.seq = txSeq++;
                log ("sending packet " + p.seq + ", " + p.size + " bytes");
                node.net.send (p, address, latency);
-               // Buffer the packet for retransmission
-               p.sent = now;
-               inflight += p.size;
-               log (inflight + " bytes in flight");
-               txBuffer.add (p);
-               // Start the node's retransmission timer if necessary
-               node.startTimer();
                return true;
        }

        private void sendAck (int seq)
        {
-               Ack a = new Ack (seq);
-               log ("sending ack " + seq);
-               node.net.send (a, address, latency);
+               ackQueue.add (seq);
+               ackQueueSize += Packet.ACK_SIZE;
+               send (false); // Send a packet immediately, without using Nagle
        }

        // Called by Node when a packet arrives
        public void handlePacket (Packet p)
        {
-               if (p instanceof DataPacket) handleData ((DataPacket) p);
-               else if (p instanceof Ack) handleAck ((Ack) p);
+               if (p.messages != null) handleData (p);
+               if (p.acks != null) for (int seq : p.acks) handleAck (seq);
        }

-       private void handleData (DataPacket p)
+       private void handleData (Packet p)
        {
                log ("received packet " + p.seq + ", " + p.size + " bytes");
                if (p.seq < rxSeq || rxDupe.contains (p.seq)) {
@@ -178,16 +187,16 @@
                else log ("warning: received " + p.seq + " before " + rxSeq);
        }

-       private void handleAck (Ack a)
+       private void handleAck (int seq)
        {
-               log ("received ack " + a.ack);
+               log ("received ack " + seq);
                double now = Event.time();
-               Iterator<DataPacket> i = txBuffer.iterator();
+               Iterator<Packet> i = txBuffer.iterator();
                while (i.hasNext()) {
-                       DataPacket p = i.next();
+                       Packet p = i.next();
                        double age = now - p.sent;
                        // Explicit ack
-                       if (p.seq == a.ack) {
+                       if (p.seq == seq) {
                                log ("packet " + p.seq + " acknowledged");
                                i.remove();
                                inflight -= p.size;
@@ -204,7 +213,7 @@
                                break;
                        }
                        // Fast retransmission
-                       if (p.seq < a.ack && age > FRTO * rtt) {
+                       if (p.seq < seq && age > FRTO * rtt) {
                                p.sent = now;
                                log ("fast retransmitting packet " + p.seq);
                                log (inflight + " bytes in flight");
@@ -216,8 +225,8 @@
                if (txBuffer.isEmpty()) txMaxSeq = txSeq + MAX_INFLIGHT - 1;
                else txMaxSeq = txBuffer.peek().seq + MAX_INFLIGHT - 1;
                log ("maximum sequence number " + txMaxSeq);
-               // Send as many packets as possible
-               while (send());
+               // Send as many packets as possible, using Nagle
+               while (send (true));
        }

        private void decreaseCongestionWindow (double now)
@@ -234,7 +243,7 @@
        }

        // Remove messages from a packet and deliver them to the node
-       private void unpack (DataPacket p)
+       private void unpack (Packet p)
        {
                if (p.messages == null) return;
                for (Message m : p.messages) node.handleMessage (m, this);
@@ -254,7 +263,7 @@
                        return false;
                }
                double now = Event.time();
-               for (DataPacket p : txBuffer) {
+               for (Packet p : txBuffer) {
                        if (now - p.sent > RTO * rtt) {
                                // Retransmission timeout
                                p.sent = now;


Reply via email to