+ // If there are no urgent acks, and no urgent messages or no + // room to send them, and not enough messages for a large + // packet or no room to send a large packet, give up! + if (ackQueue.deadline() > now + && (searchQueue.deadline() > now + || searchQueue.headSize() > available)
So we hold search messages forever if there is no transfer traffic? On Wed, Nov 01, 2006 at 05:11:30PM +0000, mrogers at freenetproject.org wrote: > Author: mrogers > Date: 2006-11-01 17:11:27 +0000 (Wed, 01 Nov 2006) > New Revision: 10775 > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/Event.java > trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java > trunk/apps/load-balancing-sims/phase7/sim/Node.java > trunk/apps/load-balancing-sims/phase7/sim/Peer.java > trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java > trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java > trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java > Log: > Refactored interleaving, coalescing, congestion control and bandwidth limiter > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Event.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Event.java 2006-11-01 > 10:00:25 UTC (rev 10774) > +++ trunk/apps/load-balancing-sims/phase7/sim/Event.java 2006-11-01 > 17:11:27 UTC (rev 10775) > @@ -7,6 +7,7 @@ > > private static TreeSet<Event> queue = new TreeSet<Event>(); > private static double clockTime = 0.0; > + private static double lastLogTime = Double.POSITIVE_INFINITY; > private static int nextId = 0; > public static double duration = Double.POSITIVE_INFINITY; > > @@ -14,12 +15,13 @@ > { > queue.clear(); > clockTime = 0.0; > + lastLogTime = Double.POSITIVE_INFINITY; > nextId = 0; > duration = Double.POSITIVE_INFINITY; > } > > public static void schedule (EventTarget target, double time, > - int type, Object data) > + int type, Object data) > { > queue.add (new Event (target, time + clockTime, type, data)); > } > @@ -50,6 +52,9 @@ > > public static void log (String message) > { > + // Print a blank line between events > + if (clockTime > lastLogTime) System.out.println(); > + lastLogTime = clockTime; > System.out.print (clockTime + " " + message + "\n"); > } > > > Modified: trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java > 2006-11-01 10:00:25 UTC (rev 10774) > +++ trunk/apps/load-balancing-sims/phase7/sim/NetworkInterface.java > 2006-11-01 17:11:27 UTC (rev 10775) > @@ -122,7 +122,6 @@ > } > } > > - // Each EventTarget class has its own event codes > public final static int RX_QUEUE = 1; > private final static int RX_END = 2; > private final static int TX_END = 3; > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 > 10:00:25 UTC (rev 10774) > +++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2006-11-01 > 17:11:27 UTC (rev 10775) > @@ -8,6 +8,9 @@ > > public class Node implements EventTarget > { > + // Coarse-grained retransmission timer > + public final static double RETX_TIMER = 0.1; // Seconds > + > public double location; // Routing location > public NetworkInterface net; > private HashMap<Integer,Peer> peers; // Look up a peer by its address > @@ -21,7 +24,7 @@ > private boolean decrementMaxHtl = false; > private boolean decrementMinHtl = false; > public TokenBucket bandwidth; // Bandwidth limiter > - private boolean timerRunning = false; // Is the retx timer running? > + private boolean timerRunning = false; > > public Node (double txSpeed, double rxSpeed) > { > @@ -149,13 +152,13 @@ > pubKeyCache.put (key); > } > > - // Called by Peer > + // Called by Peer after transmitting a packet > public void startTimer() > { > if (timerRunning) return; > - // log ("starting retransmission/coalescing timer"); > - Event.schedule (this, Peer.MAX_DELAY, CHECK_TIMEOUTS, null); > timerRunning = true; > + log ("starting retransmission timer"); > + Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null); > } > > // Called by NetworkInterface > @@ -385,20 +388,13 @@ > > private void checkTimeouts() > { > - // Check the peers in a random order each time > - double deadline = Double.POSITIVE_INFINITY; > - for (Peer p : peers()) > - deadline = Math.min (deadline, p.checkTimeouts()); > - if (deadline == Double.POSITIVE_INFINITY) { > - // log ("stopping retransmission/coalescing timer"); > + boolean stopTimer = true; > + for (Peer p : peers()) if (p.checkTimeouts()) stopTimer = false; > + if (stopTimer) { > + log ("stopping retransmission timer"); > timerRunning = false; > } > - else { > - double sleep = deadline - Event.time(); > - if (sleep < Peer.MIN_SLEEP) sleep = Peer.MIN_SLEEP; > - // log ("sleeping for " + sleep + " seconds"); > - Event.schedule (this, sleep, CHECK_TIMEOUTS, null); > - } > + else Event.schedule (this, RETX_TIMER, CHECK_TIMEOUTS, null); > } > > // EventTarget interface > @@ -423,6 +419,7 @@ > > case SSK_COLLISION: > generateSskInsert ((Integer) data, 1); > + break; > > case CHECK_TIMEOUTS: > checkTimeouts(); > @@ -430,7 +427,6 @@ > } > } > > - // Each EventTarget class has its own event codes > public final static int REQUEST_CHK = 1; > public final static int INSERT_CHK = 2; > public final static int REQUEST_SSK = 3; > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 > 10:00:25 UTC (rev 10774) > +++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2006-11-01 > 17:11:27 UTC (rev 10775) > @@ -4,7 +4,7 @@ > import java.util.Iterator; > import java.util.HashSet; > > -public class Peer > +public class Peer implements EventTarget > { > private Node node; // The local node > public int address; // The remote node's address > @@ -18,10 +18,10 @@ > public final static double LINK_IDLE = 8.0; // RTTs without transmitting > > // Coalescing > - public final static double MIN_SLEEP = 0.01; // Poll the b/w limiter > - public final static double MAX_DELAY = 0.1; // Coalescing delay in secs > + private final static double MAX_DELAY = 0.1; // Max coalescing delay > + private final static double MIN_SLEEP = 0.01; // Poll the b/w limiter > > - // Out-of-order delivery with eventual detection of missing packets > + // Out-of-order delivery with duplicate detection > public final static int SEQ_RANGE = 1000; > > // Sender state > @@ -33,8 +33,9 @@ > private DeadlineQueue<Message> searchQueue; // Outgoing search messages > private DeadlineQueue<Message> transferQueue; // Outgoing transfers > private CongestionWindow window; // AIMD congestion window > - private double lastTransmission = 0.0; // Clock time > - private boolean tgif = false; // "Transfers go in first" toggle > + private double lastTransmission = Double.POSITIVE_INFINITY; // Time > + private int searchBytesSent = 0, transferBytesSent = 0; > + private boolean timerRunning = false; // Coalescing timer > > // Receiver state > private HashSet<Integer> rxDupe; // Detect duplicates by sequence number > @@ -58,8 +59,7 @@ > public void sendMessage (Message m) > { > m.deadline = Event.time() + MAX_DELAY; > - if (m instanceof Block || m instanceof DataInsert > - || m instanceof ChkDataFound) { > + if (m instanceof Block) { > log (m + " added to transfer queue"); > transferQueue.add (m); > } > @@ -67,8 +67,8 @@ > log (m + " added to search queue"); > searchQueue.add (m); > } > - // Start the node's timer if necessary > - node.startTimer(); > + // Start the coalescing timer > + startTimer(); > // Send as many packets as possible > while (send()); > } > @@ -78,72 +78,115 @@ > { > log ("ack " + seq + " added to ack queue"); > ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY)); > - // Start the node's timer if necessary > - node.startTimer(); > + // Start the coalescing timer > + startTimer(); > // Send as many packets as possible > while (send()); > } > > + // Start the coalescing timer > + private void startTimer() > + { > + if (timerRunning) return; > + timerRunning = true; > + log ("starting coalescing timer"); > + Event.schedule (this, MAX_DELAY, CHECK_DEADLINES, null); > + } > + > // Try to send a packet, return true if a packet was sent > private boolean send() > - { > - if (ackQueue.size + searchQueue.size + transferQueue.size == 0){ > - log ("nothing to send"); > - return false; > - } > - log (ackQueue.size + " bytes of acks in queue"); > - log (searchQueue.size + " bytes of searches in queue"); > - log (transferQueue.size + " bytes of transfers in queue"); > - > + { > + int waiting = ackQueue.size+searchQueue.size+transferQueue.size; > + log (waiting + " bytes waiting"); > + if (waiting == 0) return false; > // Return to slow start when the link is idle > double now = Event.time(); > if (now - lastTransmission > LINK_IDLE * rtt) window.reset(); > lastTransmission = now; > - > - // Delay small packets for coalescing > - if (now < deadline (now)) { > - int payload = searchQueue.size + transferQueue.size; > - log ("delaying transmission of " + payload + " bytes"); > + // How many bytes of messages can we send? > + int available = Math.min (window.available(), > + node.bandwidth.available()); > + log (available + " bytes available for packet"); > + // If there are no urgent acks, and no urgent messages or no > + // room to send them, and not enough messages for a large > + // packet or no room to send a large packet, give up! > + if (ackQueue.deadline() > now > + && (searchQueue.deadline() > now > + || searchQueue.headSize() > available) > + && (transferQueue.deadline() > now > + || transferQueue.headSize() > available) > + && (waiting < Packet.SENSIBLE_PAYLOAD > + || available < Packet.SENSIBLE_PAYLOAD)) { > + log ("not sending a packet"); > return false; > } > - > + // Construct a packet > Packet p = new Packet(); > - > - // Put all waiting acks in the packet > while (ackQueue.size > 0) p.addAck (ackQueue.pop()); > - > - // Don't send sequence number n+SEQ_RANGE until sequence > - // number n has been acked - this limits the number of > - // sequence numbers the receiver must store for replay > - // detection. We must still be allowed to send acks, > - // otherwise the connection could deadlock. > - > - if (txSeq > txMaxSeq) > - log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1)); > - else if (window.available() <= 0) > - log ("no room in congestion window for messages"); > - else if (node.bandwidth.available() <= 0) > - log ("no bandwidth available for messages"); > - else pack (p); // OK to send data > - > + int space = Math.min (available, Packet.MAX_SIZE - p.size); > + addPayload (p, space); > // Don't send empty packets > if (p.acks == null && p.messages == null) return false; > - > + // Transmit the packet > + log ("sending packet " + p.seq + ", " + p.size + " bytes"); > + node.net.send (p, address, latency); > + node.bandwidth.remove (p.size); > // If the packet contains data, buffer it for retransmission > if (p.messages != null) { > - p.seq = txSeq++; > p.sent = now; > txBuffer.add (p); > + node.startTimer(); // Start the retransmission timer > window.bytesSent (p.size); > } > - > - // Send the packet > - log ("sending packet " + p.seq + ", " + p.size + " bytes"); > - node.net.send (p, address, latency); > - node.bandwidth.remove (p.size); > return true; > } > > + // Allocate a payload number and add messages to a packet > + private void addPayload (Packet p, int space) > + { > + log (space + " bytes available for messages"); > + if (txSeq > txMaxSeq) { > + log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1)); > + return; > + } > + p.seq = txSeq++; > + // Searches get priority unless transfers are starving > + if (searchBytesSent < transferBytesSent) { > + while (searchQueue.size > 0 > + && searchQueue.headSize() <= space) { > + Message m = searchQueue.pop(); > + searchBytesSent += m.size(); > + space -= m.size(); > + p.addMessage (m); > + } > + while (transferQueue.size > 0 > + && transferQueue.headSize() <= space) { > + Message m = transferQueue.pop(); > + transferBytesSent += m.size(); > + space -= m.size(); > + p.addMessage (m); > + } > + } > + else { > + while (transferQueue.size > 0 > + && transferQueue.headSize() <= space) { > + Message m = transferQueue.pop(); > + transferBytesSent += m.size(); > + space -= m.size(); > + p.addMessage (m); > + } > + while (searchQueue.size > 0 > + && searchQueue.headSize() <= space) { > + Message m = searchQueue.pop(); > + searchBytesSent += m.size(); > + space -= m.size(); > + p.addMessage (m); > + } > + } > + if (p.messages == null) log ("no messages added"); > + else log (p.messages.size() + " messages added"); > + } > + > // Called by Node when a packet arrives > public void handlePacket (Packet p) > { > @@ -153,26 +196,24 @@ > > private void handleData (Packet p) > { > - log ("received packet " + p.seq + ", " + p.size + " bytes"); > + log ("received " + p + ", " + p.size + " bytes"); > + sendAck (p.seq); > if (p.seq < rxSeq || rxDupe.contains (p.seq)) { > - log ("duplicate packet"); > - sendAck (p.seq); // Original ack may have been lost > + log (p + " is a duplicate"); > } > else if (p.seq == rxSeq) { > - log ("packet in order"); > + log (p + " is in order"); > // Find the sequence number of the next missing packet > int was = rxSeq; > while (rxDupe.remove (++rxSeq)); > log ("rxSeq was " + was + ", now " + rxSeq); > // Deliver the packet > unpack (p); > - sendAck (p.seq); > } > - else if (p.seq < rxSeq + SEQ_RANGE) { > - log ("packet out of order - expected " + rxSeq); > + else if (p.seq < rxSeq + SEQ_RANGE * 2) { > + log (p + " is out of order - expected " + rxSeq); > if (rxDupe.add (p.seq)) unpack (p); > - else log ("duplicate packet"); > - sendAck (p.seq); // Original ack may have been lost > + else log (p + " is a duplicate"); > } > // This indicates a misbehaving sender - discard the packet > else log ("warning: received " + p.seq + " before " + rxSeq); > @@ -211,38 +252,11 @@ > if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1; > else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1; > log ("maximum sequence number " + txMaxSeq); > - // Send as many packets as possible > - while (send()); > + // Send as many packets a possible > + if (timerRunning) while (send()); > + else checkDeadlines(); > } > > - // Add messages to a packet > - private void pack (Packet p) > - { > - // Alternate between giving searches and transfers priority > - if (tgif) { > - // Transfers go in first > - while (transferQueue.size > 0 > - && p.size + transferQueue.headSize() <= Packet.MAX_SIZE) > - p.addMessage (transferQueue.pop()); > - // Fill any remaining space with searches > - while (searchQueue.size > 0 > - && p.size + searchQueue.headSize() <= Packet.MAX_SIZE) > - p.addMessage (searchQueue.pop()); > - tgif = false; > - } > - else { > - // Searches go in first > - while (searchQueue.size > 0 > - && p.size + searchQueue.headSize() <= Packet.MAX_SIZE) > - p.addMessage (searchQueue.pop()); > - // Fill any remaining space with transfers > - while (transferQueue.size > 0 > - && p.size + transferQueue.headSize() <= Packet.MAX_SIZE) > - p.addMessage (transferQueue.pop()); > - tgif = true; > - } > - } > - > // Remove messages from a packet and deliver them to the node > private void unpack (Packet p) > { > @@ -250,17 +264,13 @@ > for (Message m : p.messages) node.handleMessage (m, this); > } > > - // Called by Node, returns the next coalescing or retx deadline > - public double checkTimeouts() > + // Check retx timeouts, return true if there are packets in flight > + public boolean checkTimeouts() > { > - log ("checking timeouts"); > - // Send as many packets as possible > - while (send()); > + log (txBuffer.size() + " packets in flight"); > + if (txBuffer.isEmpty()) return false; > > double now = Event.time(); > - if (txBuffer.isEmpty()) return deadline (now); > - log (txBuffer.size() + " packets in flight"); > - > for (Packet p : txBuffer) { > if (now - p.sent > RTO * rtt + MAX_DELAY) { > // Retransmission timeout > @@ -270,51 +280,68 @@ > window.timeout (now); > } > } > - > - // Sleep for up to MAX_DELAY seconds until the next deadline > - return Math.min (now + MAX_DELAY, deadline (now)); > + return true; > } > > - // Work out when the first ack or search or transfer needs to be sent > - private double deadline (double now) > + // Event callback: wake up, send packets, go back to sleep > + private void checkDeadlines() > { > - return Math.min (ackQueue.deadline(), dataDeadline (now)); > + // Send as many packets as possible > + while (send()); > + // Find the next coalescing deadline - ignore message > + // deadlines if there isn't room in the congestion window > + // (we have to wait for an ack before sending them) > + double dl = ackQueue.deadline(); > + if (searchQueue.headSize() <= window.available()) > + dl = Math.min (dl, searchQueue.deadline()); > + if (transferQueue.headSize() <= window.available()) > + dl = Math.min (dl, transferQueue.deadline()); > + // If there's no deadline, stop the timer > + if (dl == Double.POSITIVE_INFINITY) { > + if (timerRunning) { > + log ("stopping coalescing timer"); > + timerRunning = false; > + } > + return; > + } > + // Schedule the next check > + double sleep = Math.max (dl - Event.time(), MIN_SLEEP); > + if (waitingForBandwidth()) { > + log ("waiting for bandwidth"); > + sleep = MIN_SLEEP; // Poll the bandwidth limiter > + } > + timerRunning = true; > + log ("sleeping for " + sleep + " seconds"); > + Event.schedule (this, sleep, CHECK_DEADLINES, null); > } > > - // Work out when the first search or transfer needs to be sent > - private double dataDeadline (double now) > + // Are there any messages blocked by the bandwidth limiter? > + private boolean waitingForBandwidth() > { > - // If there's no data waiting, use the ack deadline > - if (searchQueue.size + transferQueue.size == 0) > - return Double.POSITIVE_INFINITY; > - > - double deadline = Math.min (searchQueue.deadline(), > - transferQueue.deadline()); > - > - // Delay small packets until the coalescing deadline > - if (searchQueue.size + transferQueue.size > - < Packet.SENSIBLE_PAYLOAD) > - return deadline; > - > - // If there's not enough room in the window, wait for an ack > - if (window.available() <= 0) > - return Double.POSITIVE_INFINITY; > - > - // If there's not enough bandwidth, try again shortly > - if (node.bandwidth.available() <= 0) > - return Math.max (now + MIN_SLEEP, deadline); > - > - // Send a packet immediately > - return now; > + int bandwidth = node.bandwidth.available(); > + double now = Event.time(); > + if (searchQueue.headSize() > bandwidth > + && searchQueue.deadline() <= now) return true; > + if (transferQueue.headSize() > bandwidth > + && transferQueue.deadline() <= now) return true; > + return false; > } > > public void log (String message) > { > - // Event.log (node.net.address + ":" + address + " " + message); > + Event.log (node.net.address + ":" + address + " " + message); > } > > public String toString() > { > return Integer.toString (address); > } > + > + // EventTarget interface > + public void handleEvent (int type, Object data) > + { > + if (type == CHECK_DEADLINES) checkDeadlines(); > + } > + > + private final static int CHECK_DEADLINES = 1; > } > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java > 2006-11-01 10:00:25 UTC (rev 10774) > +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/ChkInsertHandler.java > 2006-11-01 17:11:27 UTC (rev 10775) > @@ -261,7 +261,6 @@ > } > } > > - // Each EventTarget class has its own event codes > private final static int ACCEPTED_TIMEOUT = 1; > private final static int SEARCH_TIMEOUT = 2; > private final static int DATA_TIMEOUT = 3; > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java > 2006-11-01 10:00:25 UTC (rev 10774) > +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/RequestHandler.java > 2006-11-01 17:11:27 UTC (rev 10775) > @@ -132,7 +132,6 @@ > } > } > > - // Each EventTarget class has its own event codes > protected final static int ACCEPTED_TIMEOUT = 1; > protected final static int SEARCH_TIMEOUT = 2; > protected final static int TRANSFER_TIMEOUT = 3; > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java > 2006-11-01 10:00:25 UTC (rev 10774) > +++ trunk/apps/load-balancing-sims/phase7/sim/handlers/SskInsertHandler.java > 2006-11-01 17:11:27 UTC (rev 10775) > @@ -213,7 +213,6 @@ > } > } > > - // Each EventTarget class has its own event codes > private final static int KEY_TIMEOUT = 1; > private final static int ACCEPTED_TIMEOUT = 2; > private final static int SEARCH_TIMEOUT = 3; > > _______________________________________________ > cvs mailing list > cvs at freenetproject.org > http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs > -------------- next part -------------- A non-text attachment was scrubbed... Name: signature.asc Type: application/pgp-signature Size: 189 bytes Desc: Digital signature URL: <https://emu.freenetproject.org/pipermail/devl/attachments/20061101/648be1b2/attachment.pgp>