Author: mrogers
Date: 2006-08-21 21:26:54 +0000 (Mon, 21 Aug 2006)
New Revision: 10233

Modified:
   trunk/apps/load-balancing-sims/phase6/Node.java
   trunk/apps/load-balancing-sims/phase6/Packet.java
   trunk/apps/load-balancing-sims/phase6/Peer.java
   trunk/apps/load-balancing-sims/phase6/messages/Message.java
Log:
Simplified handling of bandwidth and congestion, simplified interleaving of 
searches and transfers

Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-21 20:11:11 UTC 
(rev 10232)
+++ trunk/apps/load-balancing-sims/phase6/Node.java     2006-08-21 21:26:54 UTC 
(rev 10233)
@@ -112,7 +112,7 @@
                        log ("key " + r.key + " found in cache");
                        if (prev == null) log (r + " succeeded locally");
                        else for (int i = 0; i < 32; i++)
-                               prev.sendBlock (new Response (r.id, i));
+                               prev.sendMessage (new Response (r.id, i));
                        return;
                }
                log ("key " + r.key + " not found in cache");
@@ -131,7 +131,7 @@
                // Forward the block
                if (rs.prev != null) {
                        log ("forwarding " + r);
-                       rs.prev.sendBlock (r);
+                       rs.prev.sendMessage (r);
                }
        }


Modified: trunk/apps/load-balancing-sims/phase6/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Packet.java   2006-08-21 20:11:11 UTC 
(rev 10232)
+++ trunk/apps/load-balancing-sims/phase6/Packet.java   2006-08-21 21:26:54 UTC 
(rev 10233)
@@ -5,7 +5,7 @@

 class Packet
 {
-       public final static int HEADER_SIZE = 80; // Including IP & UDP headers
+       public final static int HEADER_SIZE = 70; // Including IP & UDP headers
        public final static int ACK_SIZE = 4; // Size of an ack in bytes
        public final static int MAX_SIZE = 1450; // MTU including headers
        public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing

Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-21 20:11:11 UTC 
(rev 10232)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java     2006-08-21 21:26:54 UTC 
(rev 10233)
@@ -31,7 +31,6 @@
        private AckQueue ackQueue; // Outgoing acks
        private DeadlineQueue searchQueue; // Outgoing search messages
        private DeadlineQueue transferQueue; // Outgoing transfers
-       private boolean tgif = false; // "Transfers go in first" toggle
        private CongestionWindow window; // AIMD congestion window
        private double lastTransmission = 0.0; // Clock time

@@ -53,28 +52,23 @@
                rxDupe = new HashSet<Integer>();
        }

-       // Queue a search message for transmission
+       // Queue a message for transmission
        public void sendMessage (Message m)
        {
-               log (m + " added to search queue");
-               searchQueue.add (m, Event.time() + MAX_DELAY);
+               if (m instanceof Block) {
+                       log (m + " added to transfer queue");
+                       transferQueue.add (m, Event.time() + MAX_DELAY);
+               }
+               else {
+                       log (m + " added to search queue");
+                       searchQueue.add (m, Event.time() + MAX_DELAY);
+               }
                // Start the node's timer if necessary
                node.startTimer();
                // Send as many packets as possible
                while (send());
        }

-       // Queue a transfer block for transmission
-       public void sendBlock (Block b)
-       {
-               log (b + " added to transfer queue");
-               transferQueue.add (b, Event.time() + MAX_DELAY);
-               // Start the node's timer if necessary
-               node.startTimer();
-               // Send as many packets as possible
-               while (send());
-       }
-       
        // Queue an ack for transmission
        private void sendAck (int seq)
        {
@@ -102,18 +96,9 @@
                if (now - lastTransmission > LINK_IDLE * rtt) window.reset();
                lastTransmission = now;

-               // Work out how large a packet we can send
-               int headersAndAcks = Packet.HEADER_SIZE + ackQueue.size;
-               int payload = Packet.MAX_SIZE - headersAndAcks;
-               if (payload > searchQueue.size + transferQueue.size)
-                       payload = searchQueue.size + transferQueue.size;
-               int win = window.available() - headersAndAcks;
-               if (payload > win) payload = win;
-               int bw = node.bandwidth.available() - headersAndAcks;
-               if (payload > bw) payload = bw;
-               
                // Delay small packets for coalescing
                if (now < deadline (now)) {
+                       int payload = searchQueue.size + transferQueue.size;
                        log ("delaying transmission of " + payload + " bytes");
                        return false;
                }
@@ -129,12 +114,14 @@
                // detection. We must still be allowed to send acks,
                // otherwise the connection could deadlock.

-               if (txSeq <= txMaxSeq) pack (p, payload); // OK to send data
-               else log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
+               if (txSeq > txMaxSeq)
+                       log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
+               else if (window.available() <= 0)
+                       log ("no room in congestion window for messages");
+               else if (node.bandwidth.available() <= 0)
+                       log ("no bandwidth available for messages");
+               else pack (p); // OK to send data

-               // Don't send empty packets
-               if (p.acks == null && p.messages == null) return false;
-               
                // If the packet contains data, buffer it for retransmission
                if (p.messages != null) {
                        p.seq = txSeq++;
@@ -221,41 +208,22 @@
        }

        // Add messages to a packet
-       private void pack (Packet p, int payload)
+       private void pack (Packet p)
        {
-               // Priority alternates between transfers and searches
-               if (tgif) {
-                       // Transfers go in first
-                       while (transferQueue.size > 0) {
-                               if (transferQueue.headSize() > payload) break;
-                               Message m = transferQueue.pop();
-                               payload -= m.size;
-                               p.addMessage (m);
-                       }
-                       while (searchQueue.size > 0) {
-                               if (searchQueue.headSize() > payload) break;
-                               Message m = searchQueue.pop();
-                               payload -= m.size;
-                               p.addMessage (m);
-                       }
-                       tgif = false;
-               }
-               else {
-                       // Searches go in first
-                       while (searchQueue.size > 0) {
-                               if (searchQueue.headSize() > payload) break;
-                               Message m = searchQueue.pop();
-                               payload -= m.size;
-                               p.addMessage (m);
-                       }
-                       while (transferQueue.size > 0) {
-                               if (transferQueue.headSize() > payload) break;
-                               Message m = transferQueue.pop();
-                               payload -= m.size;
-                               p.addMessage (m);
-                       }
-                       tgif = true;
-               }
+               // Add one search, then one transfer, then as many searches as
+               // will fit. This ensures that neither searches nor transfers
+               // starve as long as at least *some* searches are small enough
+               // to share a packet with a transfer.
+               
+               if (searchQueue.size > 0
+               && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
+                       p.addMessage (searchQueue.pop());
+               if (transferQueue.size > 0
+               && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
+                       p.addMessage (transferQueue.pop());
+               while (searchQueue.size > 0
+               && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
+                       p.addMessage (searchQueue.pop());
        }

        // Remove messages from a packet and deliver them to the node
@@ -300,7 +268,7 @@
        // Work out when the first search or transfer needs to be sent
        private double dataDeadline (double now)
        {
-               // If there's no data waiting, wait until the ack deadline
+               // If there's no data waiting, use the ack deadline
                if (searchQueue.size + transferQueue.size == 0)
                        return Double.POSITIVE_INFINITY;

@@ -313,13 +281,11 @@
                        return deadline;

                // If there's not enough room in the window, wait for an ack
-               if (window.available() < Packet.SENSIBLE_PAYLOAD
-               + Packet.HEADER_SIZE)
+               if (window.available() <= 0)
                        return Double.POSITIVE_INFINITY;

                // If there's not enough bandwidth, try again shortly
-               if (node.bandwidth.available() < Packet.SENSIBLE_PAYLOAD
-               + Packet.HEADER_SIZE)
+               if (node.bandwidth.available() <= 0)
                        return Math.max (deadline, now + Node.SHORT_SLEEP);

                // Send a packet immediately

Modified: trunk/apps/load-balancing-sims/phase6/messages/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21 
20:11:11 UTC (rev 10232)
+++ trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21 
21:26:54 UTC (rev 10233)
@@ -5,8 +5,8 @@
 public class Message
 {
        public final static int HEADER_SIZE = 12; // Bytes, including unique ID
-       public final static int KEY_SIZE = 32; // Size of routing key, bytes
-       public final static int DATA_SIZE = 1024; // Size of data block, bytes
+       public final static int KEY_SIZE = 32; // Size of a routing key, bytes
+       public final static int DATA_SIZE = 1024; // Size of a data block, bytes

        public int size; // Size in bytes
        public int id; // Unique request ID


Reply via email to