Author: mrogers
Date: 2006-08-21 14:48:20 +0000 (Mon, 21 Aug 2006)
New Revision: 10224

Added:
   trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
   trunk/apps/load-balancing-sims/phase6/messages/Ack.java
   trunk/apps/load-balancing-sims/phase6/messages/Block.java
Removed:
   trunk/apps/load-balancing-sims/phase6/Deadline.java
Modified:
   trunk/apps/load-balancing-sims/phase6/Packet.java
   trunk/apps/load-balancing-sims/phase6/Peer.java
   trunk/apps/load-balancing-sims/phase6/messages/Message.java
   trunk/apps/load-balancing-sims/phase6/messages/Response.java
Log:
Interleave searches and transfers

Deleted: trunk/apps/load-balancing-sims/phase6/Deadline.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Deadline.java 2006-08-21 12:43:22 UTC 
(rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/Deadline.java 2006-08-21 14:48:20 UTC 
(rev 10224)
@@ -1,13 +0,0 @@
-// A queued item and the time at which it must be sent
-
-class Deadline<Item>
-{
-       public final Item item;
-       public final double deadline;
-       
-       public Deadline (Item item, double deadline)
-       {
-               this.item = item;
-               this.deadline = deadline;
-       }
-}

Added: trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java    2006-08-21 
12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/DeadlineQueue.java    2006-08-21 
14:48:20 UTC (rev 10224)
@@ -0,0 +1,40 @@
+// A queue storing outgoing messages (including acks and transfers) and their
+// coalescing deadlines
+
+import java.util.LinkedList;
+import messages.Message;
+
+class DeadlineQueue<T extends Message>
+{
+       public int size = 0; // Size in bytes
+       private LinkedList<T> messages = new LinkedList<T>();
+       private LinkedList<Double> deadlines = new LinkedList<Double>();
+       
+       public void add (T message, double deadline)
+       {
+               size += message.size;
+               messages.add (message);
+               deadlines.add (deadline);
+       }
+       
+       public int headSize()
+       {
+               if (messages.isEmpty()) return 0;
+               else return messages.peek().size;
+       }
+       
+       public double deadline()
+       {
+               Double d = deadlines.peek();
+               if (d == null) return Double.POSITIVE_INFINITY;
+               else return d;
+       }
+       
+       public T pop()
+       {
+               deadlines.poll();
+               T message = messages.poll();
+               size -= message.size;
+               return message;
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase6/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Packet.java   2006-08-21 12:43:22 UTC 
(rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/Packet.java   2006-08-21 14:48:20 UTC 
(rev 10224)
@@ -2,31 +2,30 @@

 import java.util.ArrayList;
 import messages.Message;
+import messages.Ack;

 class Packet
 {
        public final static int HEADER_SIZE = 80; // Including IP & UDP headers
-       public final static int ACK_SIZE = 4; // Size of an ack in bytes
        public final static int MAX_SIZE = 1450; // MTU including headers
        public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing

        public int src, dest; // Network addresses
        public int size = HEADER_SIZE; // Size in bytes, including headers
        public int seq = -1; // Data sequence number (-1 if no data)
-       public ArrayList<Integer> acks = null;
+       public ArrayList<Ack> acks = null;
        public ArrayList<Message> messages = null;

        public double sent; // Time at which the packet was (re) transmitted
        public double latency; // Link latency (stored here for convenience)

-       public void addAck (int ack)
+       public void addAck (Ack a)
        {
-               if (acks == null) acks = new ArrayList<Integer>();
-               acks.add (ack);
-               size += ACK_SIZE;
+               if (acks == null) acks = new ArrayList<Ack>();
+               acks.add (a);
+               size += a.size;
        }

-       // In real life the payload would be an array of bytes
        public void addMessage (Message m)
        {
                if (messages == null) messages = new ArrayList<Message>();

Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-21 12:43:22 UTC 
(rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-21 14:48:20 UTC 
(rev 10224)
@@ -1,7 +1,9 @@
 import java.util.LinkedList;
 import java.util.Iterator;
 import java.util.HashSet;
+import messages.Ack;
 import messages.Message;
+import messages.Block;

 class Peer
 {
@@ -14,7 +16,7 @@
        public final static double RTO = 4.0; // Retransmission timeout in RTTs
        public final static double FRTO = 1.5; // Fast retx timeout in RTTs
        public final static double RTT_DECAY = 0.9; // Exp moving average
-       public final static double LINK_IDLE = 5.0; // RTTs without a packet
+       public final static double LINK_IDLE = 8.0; // RTTs without transmitting

        // Coalescing
        public final static double MAX_DELAY = 0.1; // Coalescing delay, seconds
@@ -27,10 +29,10 @@
        private int txSeq = 0; // Sequence number of next outgoing data packet
        private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number
        private LinkedList<Packet> txBuffer; // Retransmission buffer
-       private LinkedList<Deadline<Message>> msgQueue; // Outgoing messages
-       private int msgQueueSize = 0; // Size of message queue in bytes
-       private LinkedList<Deadline<Integer>> ackQueue; // Outgoing acks
-       private int ackQueueSize = 0; // Size of ack queue in bytes
+       private DeadlineQueue<Ack> ackQueue; // Outgoing acks
+       private DeadlineQueue<Message> searchQueue; // Outgoing search messages
+       private DeadlineQueue<Block> transferQueue; // Outgoing transfers
+       private boolean tgif = false; // "Transfers go in first" toggle
        private CongestionWindow window; // AIMD congestion window
        private double lastTransmission = 0.0; // Clock time

@@ -45,36 +47,56 @@
                this.location = location;
                this.latency = latency;
                txBuffer = new LinkedList<Packet>();
-               msgQueue = new LinkedList<Deadline<Message>>();
-               ackQueue = new LinkedList<Deadline<Integer>>();
+               ackQueue = new DeadlineQueue<Ack>();
+               searchQueue = new DeadlineQueue<Message>();
+               transferQueue = new DeadlineQueue<Block>();
                window = new CongestionWindow (this);
                rxDupe = new HashSet<Integer>();
        }

-       // Queue a message for transmission
+       // Queue a search message for transmission
        public void sendMessage (Message m)
        {
-               log (m + " added to message queue");
-               // Warning: until token-passing is implemented the length of
-               // the message queue is unlimited
-               double now = Event.time();
-               msgQueue.add (new Deadline<Message> (m, now + MAX_DELAY));
-               msgQueueSize += m.size;
+               log (m + " added to search queue");
+               searchQueue.add (m, Event.time() + MAX_DELAY);
                // Start the node's timer if necessary
                node.startTimer();
                // Send as many packets as possible
                while (send());
        }

+       // Queue a transfer block for transmission
+       public void sendBlock (Block b)
+       {
+               log (b + " added to transfer queue");
+               transferQueue.add (b, Event.time() + MAX_DELAY);
+               // Start the node's timer if necessary
+               node.startTimer();
+               // Send as many packets as possible
+               while (send());
+       }
+       
+       // Queue an ack for transmission
+       private void sendAck (int seq)
+       {
+               log ("ack " + seq + " added to ack queue");
+               ackQueue.add (new Ack (seq), Event.time() + MAX_DELAY);
+               // Start the node's timer if necessary
+               node.startTimer();
+               // Send as many packets as possible
+               while (send());
+       }
+       
        // Try to send a packet, return true if a packet was sent
        private boolean send()
        {               
-               if (ackQueueSize == 0 && msgQueueSize == 0) {
-                       log ("no messages or acks to send");
+               if (ackQueue.size + searchQueue.size + transferQueue.size ==0) {
+                       log ("nothing to send");
                        return false;
                }
-               log (ackQueue.size() + " acks in queue");
-               log (msgQueue.size() + " messages in queue");
+               log (ackQueue.size + " bytes of acks in queue");
+               log (searchQueue.size + " bytes of searches in queue");
+               log (transferQueue.size + " bytes of transfers in queue");

                // Return to slow start when the link is idle
                double now = Event.time();
@@ -82,9 +104,10 @@
                lastTransmission = now;

                // Work out how large a packet we can send
-               int headersAndAcks = Packet.HEADER_SIZE + ackQueueSize;
+               int headersAndAcks = Packet.HEADER_SIZE + ackQueue.size;
                int payload = Packet.MAX_SIZE - headersAndAcks;
-               if (payload > msgQueueSize) payload = msgQueueSize;
+               if (payload > searchQueue.size + transferQueue.size)
+                       payload = searchQueue.size + transferQueue.size;
                int win = window.available() - headersAndAcks;
                if (payload > win) payload = win;
                int bw = node.bandwidth.available() - headersAndAcks;
@@ -99,9 +122,7 @@
                Packet p = new Packet();

                // Put all waiting acks in the packet
-               for (Deadline<Integer> a : ackQueue) p.addAck (a.item);
-               ackQueue.clear();
-               ackQueueSize = 0;
+               while (ackQueue.size > 0) p.addAck (ackQueue.pop());

                // Don't send sequence number n+SEQ_RANGE until sequence
                // number n has been acked - this limits the number of
@@ -130,23 +151,11 @@
                return true;
        }

-       private void sendAck (int seq)
-       {
-               log ("ack " + seq + " added to ack queue");
-               double now = Event.time();
-               ackQueue.add (new Deadline<Integer> (seq, now + MAX_DELAY));
-               ackQueueSize += Packet.ACK_SIZE;
-               // Start the node's timer if necessary
-               node.startTimer();
-               // Send as many packets as possible
-               while (send());
-       }
-       
        // Called by Node when a packet arrives
        public void handlePacket (Packet p)
        {
                if (p.messages != null) handleData (p);
-               if (p.acks != null) for (int ack : p.acks) handleAck (ack);
+               if (p.acks != null) for (Ack a : p.acks) handleAck (a.seq);
        }

        private void handleData (Packet p)
@@ -215,15 +224,39 @@
        // Add messages to a packet
        private void pack (Packet p, int payload)
        {
-               Iterator<Deadline<Message>> i = msgQueue.iterator();
-               while (i.hasNext()) {
-                       Message m = i.next().item;
-                       if (m.size > payload) break;
-                       i.remove();
-                       msgQueueSize -= m.size;
-                       p.addMessage (m);
-                       payload -= m.size;
+               // Priority alternates between transfers and searches
+               if (tgif) {
+                       // Transfers go in first
+                       while (transferQueue.size > 0) {
+                               if (transferQueue.headSize() > payload) break;
+                               Message m = transferQueue.pop();
+                               payload -= m.size;
+                               p.addMessage (m);
+                       }
+                       while (searchQueue.size > 0) {
+                               if (searchQueue.headSize() > payload) break;
+                               Message m = searchQueue.pop();
+                               payload -= m.size;
+                               p.addMessage (m);
+                       }
+                       tgif = false;
                }
+               else {
+                       // Searches go in first
+                       while (searchQueue.size > 0) {
+                               if (searchQueue.headSize() > payload) break;
+                               Message m = searchQueue.pop();
+                               payload -= m.size;
+                               p.addMessage (m);
+                       }
+                       while (transferQueue.size > 0) {
+                               if (transferQueue.headSize() > payload) break;
+                               Message m = transferQueue.pop();
+                               payload -= m.size;
+                               p.addMessage (m);
+                       }
+                       tgif = true;
+               }
        }

        // Remove messages from a packet and deliver them to the node
@@ -243,7 +276,7 @@
                double now = Event.time();
                if (txBuffer.isEmpty()) {
                        log ("no packets in flight");
-                       return deadline (now);
+                       return deadline (now); // Sleep until the next deadline
                }
                for (Packet p : txBuffer) {
                        if (now - p.sent > RTO * rtt + MAX_DELAY) {
@@ -254,36 +287,43 @@
                                window.timeout (now);
                        }
                }
+               
+               // Sleep for up to MAX_DELAY seconds until the next deadline
                return Math.min (now + MAX_DELAY, deadline (now));
        }

-       // Work out when the first message or ack needs to be sent
+       // Work out when the first ack or search or transfer needs to be sent
        private double deadline (double now)
        {
-               return Math.min (ackDeadline(), msgDeadline (now));
+               return Math.min (ackQueue.deadline(), dataDeadline (now));
        }
-
-       // Work out when the first ack needs to be sent
-       private double ackDeadline()
-       {
-               Deadline<Integer> firstAck = ackQueue.peek();
-               if (firstAck == null) return Double.POSITIVE_INFINITY;
-               return firstAck.deadline;
-       }

-       // Work out when the first message needs to be sent
-       private double msgDeadline (double now)
+       // Work out when the first search or transfer needs to be sent
+       private double dataDeadline (double now)
        {
-               Deadline<Message> firstMsg = msgQueue.peek();
-               if (firstMsg == null) return Double.POSITIVE_INFINITY;
-               double deadline = firstMsg.deadline;
-               if (msgQueueSize < Packet.SENSIBLE_PAYLOAD) return deadline;
+               // If there's no data waiting, wait until the ack deadline
+               if (searchQueue.size + transferQueue.size == 0)
+                       return Double.POSITIVE_INFINITY;
+               
+               double deadline = Math.min (searchQueue.deadline(),
+                                               transferQueue.deadline());
+               
+               // Delay small packets until the coalescing deadline
+               if (searchQueue.size + transferQueue.size
+               < Packet.SENSIBLE_PAYLOAD)
+                       return deadline;
+               
+               // If there's not enough room in the window, wait for an ack
                if (window.available() < Packet.SENSIBLE_PAYLOAD
                + Packet.HEADER_SIZE)
-                       return Double.POSITIVE_INFINITY; // Wait for an ack
+                       return Double.POSITIVE_INFINITY;
+               
+               // If there's not enough bandwidth, try again shortly
                if (node.bandwidth.available() < Packet.SENSIBLE_PAYLOAD
                + Packet.HEADER_SIZE)
                        return Math.max (deadline, now + Node.SHORT_SLEEP);
+               
+               // Send a packet immediately
                return now;
        }


Added: trunk/apps/load-balancing-sims/phase6/messages/Ack.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Ack.java     2006-08-21 
12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/messages/Ack.java     2006-08-21 
14:48:20 UTC (rev 10224)
@@ -0,0 +1,17 @@
+// Note: acks are not FNP messages, they're only here because it makes the
+// implementation simpler
+
+package messages;
+
+public class Ack extends Message
+{
+       public final static int SIZE = 4; // Bytes
+       
+       public final int seq; // Sequence number of the acknowledged packet
+       
+       public Ack (int seq)
+       {
+               this.seq = seq;
+               size = SIZE;
+       }
+}

Added: trunk/apps/load-balancing-sims/phase6/messages/Block.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Block.java   2006-08-21 
12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/messages/Block.java   2006-08-21 
14:48:20 UTC (rev 10224)
@@ -0,0 +1,14 @@
+// A single block of a data transfer (currently 32 blocks per transfer)
+
+package messages;
+
+public class Block extends Message
+{
+       public final static int SIZE = 1024; // Bytes
+       
+       // FIXME: placeholder
+       public Block()
+       {
+               size = SIZE;
+       }
+}

Modified: trunk/apps/load-balancing-sims/phase6/messages/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21 
12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21 
14:48:20 UTC (rev 10224)
@@ -4,10 +4,9 @@

 public class Message
 {
-       public final static int HEADER_SIZE = 30; // Sequence number, MAC, etc
-       public final static int ID_SIZE = 16; // Size of unique request ID
+       public final static int HEADER_SIZE = 4; // Message type etc
+       public final static int ID_SIZE = 8; // Size of unique request ID
        public final static int KEY_SIZE = 32; // Size of routing key
-       public final static int DATA_SIZE = 1024; // Size of data block

        public int size; // Size in bytes
 }

Modified: trunk/apps/load-balancing-sims/phase6/messages/Response.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Response.java        
2006-08-21 12:43:22 UTC (rev 10223)
+++ trunk/apps/load-balancing-sims/phase6/messages/Response.java        
2006-08-21 14:48:20 UTC (rev 10224)
@@ -10,7 +10,7 @@
                this.id = id;
                this.key = key;
                size = Message.HEADER_SIZE + Message.ID_SIZE +
-                       Message.KEY_SIZE + Message.DATA_SIZE;
+                       Message.KEY_SIZE + Block.SIZE;
        }

        public String toString()


Reply via email to