Author: mrogers
Date: 2006-08-21 21:26:54 +0000 (Mon, 21 Aug 2006)
New Revision: 10233
Modified:
trunk/apps/load-balancing-sims/phase6/Node.java
trunk/apps/load-balancing-sims/phase6/Packet.java
trunk/apps/load-balancing-sims/phase6/Peer.java
trunk/apps/load-balancing-sims/phase6/messages/Message.java
Log:
Simplified handling of bandwidth and congestion, simplified interleaving of
searches and transfers
Modified: trunk/apps/load-balancing-sims/phase6/Node.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-21 20:11:11 UTC
(rev 10232)
+++ trunk/apps/load-balancing-sims/phase6/Node.java 2006-08-21 21:26:54 UTC
(rev 10233)
@@ -112,7 +112,7 @@
log ("key " + r.key + " found in cache");
if (prev == null) log (r + " succeeded locally");
else for (int i = 0; i < 32; i++)
- prev.sendBlock (new Response (r.id, i));
+ prev.sendMessage (new Response (r.id, i));
return;
}
log ("key " + r.key + " not found in cache");
@@ -131,7 +131,7 @@
// Forward the block
if (rs.prev != null) {
log ("forwarding " + r);
- rs.prev.sendBlock (r);
+ rs.prev.sendMessage (r);
}
}
Modified: trunk/apps/load-balancing-sims/phase6/Packet.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Packet.java 2006-08-21 20:11:11 UTC
(rev 10232)
+++ trunk/apps/load-balancing-sims/phase6/Packet.java 2006-08-21 21:26:54 UTC
(rev 10233)
@@ -5,7 +5,7 @@
class Packet
{
- public final static int HEADER_SIZE = 80; // Including IP & UDP headers
+ public final static int HEADER_SIZE = 70; // Including IP & UDP headers
public final static int ACK_SIZE = 4; // Size of an ack in bytes
public final static int MAX_SIZE = 1450; // MTU including headers
public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing
Modified: trunk/apps/load-balancing-sims/phase6/Peer.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-21 20:11:11 UTC
(rev 10232)
+++ trunk/apps/load-balancing-sims/phase6/Peer.java 2006-08-21 21:26:54 UTC
(rev 10233)
@@ -31,7 +31,6 @@
private AckQueue ackQueue; // Outgoing acks
private DeadlineQueue searchQueue; // Outgoing search messages
private DeadlineQueue transferQueue; // Outgoing transfers
- private boolean tgif = false; // "Transfers go in first" toggle
private CongestionWindow window; // AIMD congestion window
private double lastTransmission = 0.0; // Clock time
@@ -53,28 +52,23 @@
rxDupe = new HashSet<Integer>();
}
- // Queue a search message for transmission
+ // Queue a message for transmission
public void sendMessage (Message m)
{
- log (m + " added to search queue");
- searchQueue.add (m, Event.time() + MAX_DELAY);
+ if (m instanceof Block) {
+ log (m + " added to transfer queue");
+ transferQueue.add (m, Event.time() + MAX_DELAY);
+ }
+ else {
+ log (m + " added to search queue");
+ searchQueue.add (m, Event.time() + MAX_DELAY);
+ }
// Start the node's timer if necessary
node.startTimer();
// Send as many packets as possible
while (send());
}
- // Queue a transfer block for transmission
- public void sendBlock (Block b)
- {
- log (b + " added to transfer queue");
- transferQueue.add (b, Event.time() + MAX_DELAY);
- // Start the node's timer if necessary
- node.startTimer();
- // Send as many packets as possible
- while (send());
- }
-
// Queue an ack for transmission
private void sendAck (int seq)
{
@@ -102,18 +96,9 @@
if (now - lastTransmission > LINK_IDLE * rtt) window.reset();
lastTransmission = now;
- // Work out how large a packet we can send
- int headersAndAcks = Packet.HEADER_SIZE + ackQueue.size;
- int payload = Packet.MAX_SIZE - headersAndAcks;
- if (payload > searchQueue.size + transferQueue.size)
- payload = searchQueue.size + transferQueue.size;
- int win = window.available() - headersAndAcks;
- if (payload > win) payload = win;
- int bw = node.bandwidth.available() - headersAndAcks;
- if (payload > bw) payload = bw;
-
// Delay small packets for coalescing
if (now < deadline (now)) {
+ int payload = searchQueue.size + transferQueue.size;
log ("delaying transmission of " + payload + " bytes");
return false;
}
@@ -129,12 +114,14 @@
// detection. We must still be allowed to send acks,
// otherwise the connection could deadlock.
- if (txSeq <= txMaxSeq) pack (p, payload); // OK to send data
- else log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
+ if (txSeq > txMaxSeq)
+ log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1));
+ else if (window.available() <= 0)
+ log ("no room in congestion window for messages");
+ else if (node.bandwidth.available() <= 0)
+ log ("no bandwidth available for messages");
+ else pack (p); // OK to send data
- // Don't send empty packets
- if (p.acks == null && p.messages == null) return false;
-
// If the packet contains data, buffer it for retransmission
if (p.messages != null) {
p.seq = txSeq++;
@@ -221,41 +208,22 @@
}
// Add messages to a packet
- private void pack (Packet p, int payload)
+ private void pack (Packet p)
{
- // Priority alternates between transfers and searches
- if (tgif) {
- // Transfers go in first
- while (transferQueue.size > 0) {
- if (transferQueue.headSize() > payload) break;
- Message m = transferQueue.pop();
- payload -= m.size;
- p.addMessage (m);
- }
- while (searchQueue.size > 0) {
- if (searchQueue.headSize() > payload) break;
- Message m = searchQueue.pop();
- payload -= m.size;
- p.addMessage (m);
- }
- tgif = false;
- }
- else {
- // Searches go in first
- while (searchQueue.size > 0) {
- if (searchQueue.headSize() > payload) break;
- Message m = searchQueue.pop();
- payload -= m.size;
- p.addMessage (m);
- }
- while (transferQueue.size > 0) {
- if (transferQueue.headSize() > payload) break;
- Message m = transferQueue.pop();
- payload -= m.size;
- p.addMessage (m);
- }
- tgif = true;
- }
+ // Add one search, then one transfer, then as many searches as
+ // will fit. This ensures that neither searches nor transfers
+ // starve as long as at least *some* searches are small enough
+ // to share a packet with a transfer.
+
+ if (searchQueue.size > 0
+ && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
+ p.addMessage (searchQueue.pop());
+ if (transferQueue.size > 0
+ && p.size + transferQueue.headSize() <= Packet.MAX_SIZE)
+ p.addMessage (transferQueue.pop());
+ while (searchQueue.size > 0
+ && p.size + searchQueue.headSize() <= Packet.MAX_SIZE)
+ p.addMessage (searchQueue.pop());
}
// Remove messages from a packet and deliver them to the node
@@ -300,7 +268,7 @@
// Work out when the first search or transfer needs to be sent
private double dataDeadline (double now)
{
- // If there's no data waiting, wait until the ack deadline
+ // If there's no data waiting, use the ack deadline
if (searchQueue.size + transferQueue.size == 0)
return Double.POSITIVE_INFINITY;
@@ -313,13 +281,11 @@
return deadline;
// If there's not enough room in the window, wait for an ack
- if (window.available() < Packet.SENSIBLE_PAYLOAD
- + Packet.HEADER_SIZE)
+ if (window.available() <= 0)
return Double.POSITIVE_INFINITY;
// If there's not enough bandwidth, try again shortly
- if (node.bandwidth.available() < Packet.SENSIBLE_PAYLOAD
- + Packet.HEADER_SIZE)
+ if (node.bandwidth.available() <= 0)
return Math.max (deadline, now + Node.SHORT_SLEEP);
// Send a packet immediately
Modified: trunk/apps/load-balancing-sims/phase6/messages/Message.java
===================================================================
--- trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21
20:11:11 UTC (rev 10232)
+++ trunk/apps/load-balancing-sims/phase6/messages/Message.java 2006-08-21
21:26:54 UTC (rev 10233)
@@ -5,8 +5,8 @@
public class Message
{
public final static int HEADER_SIZE = 12; // Bytes, including unique ID
- public final static int KEY_SIZE = 32; // Size of routing key, bytes
- public final static int DATA_SIZE = 1024; // Size of data block, bytes
+ public final static int KEY_SIZE = 32; // Size of a routing key, bytes
+ public final static int DATA_SIZE = 1024; // Size of a data block, bytes
public int size; // Size in bytes
public int id; // Unique request ID