What's going on with the sendAck(int) -> send(int)?
On Mon, Feb 05, 2007 at 11:52:16AM +0000, mrogers at freenetproject.org wrote: > Author: mrogers > Date: 2007-02-05 11:52:15 +0000 (Mon, 05 Feb 2007) > New Revision: 11679 > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/Node.java > trunk/apps/load-balancing-sims/phase7/sim/Packet.java > trunk/apps/load-balancing-sims/phase7/sim/Peer.java > trunk/apps/load-balancing-sims/phase7/sim/Sim.java > trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java > trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java > trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java > trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java > trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java > Log: > Disabled ack coalescing in preparation for improved RTO/FRTO calculation > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Node.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-05 > 00:36:43 UTC (rev 11678) > +++ trunk/apps/load-balancing-sims/phase7/sim/Node.java 2007-02-05 > 11:52:15 UTC (rev 11679) > @@ -1,5 +1,5 @@ > package sim; > -import sim.generators.Client; > +import sim.clients.Client; > import sim.handlers.*; > import sim.messages.*; > import java.util.HashMap; > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Packet.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Packet.java 2007-02-05 > 00:36:43 UTC (rev 11678) > +++ trunk/apps/load-balancing-sims/phase7/sim/Packet.java 2007-02-05 > 11:52:15 UTC (rev 11679) > @@ -1,40 +1,32 @@ > // A low-level packet (as opposed to a high-level message) > > package sim; > -import sim.messages.Ack; > import sim.messages.Message; > import java.util.ArrayList; > > class Packet > { > - public final static int HEADER_SIZE = 70; // Including IP & UDP headers > - public final static int ACK_SIZE = 4; // Size of an ack in bytes > + public final static int HEADER_SIZE = 60; // Including IP & UDP headers > public final static int MAX_SIZE = 1450; // MTU including headers > public final static int SENSIBLE_PAYLOAD = 1000; // Coalescing > > public final int src, dest; // Network addresses > public int size = HEADER_SIZE; // Size in bytes, including headers > public int seq = -1; // Data sequence number (-1 if no data) > - public ArrayList<Ack> acks = null; > + public int ack = -1; // Ack sequence number (-1 if no ack) > public ArrayList<Message> messages = null; > > public double sent; // Time at which the packet was (re) transmitted > public double latency; // Link latency, stored here for convenience > > - public Packet (int src, int dest, double latency) > + public Packet (int src, int dest, double latency, int ack) > { > this.src = src; > this.dest = dest; > this.latency = latency; > + this.ack = ack; > } > > - public void addAck (Ack a) > - { > - if (acks == null) acks = new ArrayList<Ack>(); > - acks.add (a); > - size += a.size(); > - } > - > public void addMessage (Message m) > { > if (messages == null) messages = new ArrayList<Message>(); > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Peer.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-05 > 00:36:43 UTC (rev 11678) > +++ trunk/apps/load-balancing-sims/phase7/sim/Peer.java 2007-02-05 > 11:52:15 UTC (rev 11679) > @@ -36,7 +36,6 @@ > private int txSeq = 0; // Sequence number of next outgoing data packet > private int txMaxSeq = SEQ_RANGE - 1; // Highest sequence number > private LinkedList<Packet> txBuffer; // Retransmission buffer > - private DeadlineQueue<Ack> ackQueue; // Outgoing acks > private DeadlineQueue<Message> searchQueue; // Outgoing search messages > private DeadlineQueue<Message> transferQueue; // Outgoing transfers > private CongestionWindow window; // AIMD congestion window > @@ -61,7 +60,6 @@ > this.location = location; > this.latency = latency; > txBuffer = new LinkedList<Packet>(); > - ackQueue = new DeadlineQueue<Ack>(); > searchQueue = new DeadlineQueue<Message>(); > transferQueue = new DeadlineQueue<Message>(); > window = new CongestionWindow (this); > @@ -83,20 +81,9 @@ > // Start the coalescing timer > startTimer(); > // Send as many packets as possible > - while (send()); > + while (send (-1)); > } > > - // Queue an ack for transmission > - private void sendAck (int seq) > - { > - if (LOG) log ("ack " + seq + " added to ack queue"); > - ackQueue.add (new Ack (seq, Event.time() + MAX_DELAY)); > - // Start the coalescing timer > - startTimer(); > - // Send as many packets as possible > - while (send()); > - } > - > // Start the coalescing timer > private void startTimer() > { > @@ -107,11 +94,11 @@ > } > > // Try to send a packet, return true if a packet was sent > - private boolean send() > + private boolean send (int ack) > { > - int waiting = ackQueue.size+searchQueue.size+transferQueue.size; > + int waiting = searchQueue.size + transferQueue.size; > if (LOG) log (waiting + " bytes waiting"); > - if (waiting == 0) return false; > + if (ack == -1 && waiting == 0) return false; > > // Return to slow start when the link is idle > double now = Event.time(); > @@ -123,29 +110,30 @@ > size = Math.min (size, node.bandwidth.available()); > if (LOG) log (size + " bytes available for packet"); > > - // Urgent acks to send? > - if (ackQueue.deadline() <= now) return sendPacket (size); > + // Ack to send? > + if (ack != -1) return sendPacket (ack, size); > // Urgent searches and room to send them? > if (searchQueue.deadline() <= now > - && searchQueue.headSize() <= size) return sendPacket (size); > + && searchQueue.headSize() <= size) > + return sendPacket (ack, size); > // Urgent transfers and room to send them? > if (transferQueue.deadline() <= now > - && transferQueue.headSize() <= size) return sendPacket (size); > + && transferQueue.headSize() <= size) > + return sendPacket (ack, size); > // Enough non-urgent messages for a large packet, and room? > if (waiting >= Packet.SENSIBLE_PAYLOAD > - && size >= Packet.SENSIBLE_PAYLOAD) return sendPacket (size); > + && size >= Packet.SENSIBLE_PAYLOAD) > + return sendPacket (ack, size); > > if (LOG) log ("not sending a packet"); > return false; > } > > // Try to send a packet up to the specified size, return true if sent > - private boolean sendPacket (int maxSize) > + private boolean sendPacket (int ack, int maxSize) > { > // Construct a packet > - Packet p = new Packet (node.net.address, address, latency); > - // Add all waiting acks to the packet > - while (ackQueue.size > 0) p.addAck (ackQueue.pop()); > + Packet p = new Packet (node.net.address, address, latency, ack); > if (LOG) log ((maxSize - p.size) + " bytes for messages"); > // Don't allow more than SEQ_RANGE payloads to be in flight > if (txSeq <= txMaxSeq) { > @@ -169,7 +157,7 @@ > log ("waiting for ack " + (txMaxSeq - SEQ_RANGE + 1)); > } > // Don't send empty packets > - if (p.acks == null && p.messages == null) return false; > + if (p.ack == -1 && p.messages == null) return false; > // Transmit the packet > if (LOG) log ("sending packet " +p.seq+ ", " +p.size+ " bytes"); > node.sendPacket (p); > @@ -186,7 +174,7 @@ > // Called by Node when a packet arrives > public void handlePacket (Packet p) > { > - if (p.acks != null) for (Ack a : p.acks) handleAck (a); > + if (p.ack != -1) handleAck (p.ack); > if (p.messages != null) handleData (p); > } > > @@ -195,7 +183,7 @@ > if (LOG) log ("received packet " +p.seq+ ", expected " +rxSeq); > if (p.seq < rxSeq || rxDupe.contains (p.seq)) { > if (LOG) log ("duplicate packet"); > - sendAck (p.seq); // Original ack may have been lost > + send (p.seq); // Original ack may have been lost > } > else if (p.seq == rxSeq) { > // Find the sequence number of the next missing packet > @@ -204,7 +192,7 @@ > // Deliver the messages to the node > for (Message m : p.messages) > node.handleMessage (m, this); > - sendAck (p.seq); > + send (p.seq); > } > else if (p.seq < rxSeq + SEQ_RANGE) { > if (LOG) log ("packet out of order"); > @@ -212,37 +200,36 @@ > // Deliver the messages to the node > for (Message m : p.messages) > node.handleMessage (m, this); > - sendAck (p.seq); > + send (p.seq); > } > // This indicates a misbehaving sender - discard the packet > else if (LOG) log ("WARNING: sequence number out of range"); > } > > - private void handleAck (Ack a) > + private void handleAck (int ack) > { > - int seq = a.id; > - if (LOG) log ("received ack " + seq); > + if (LOG) log ("received ack " + ack); > double now = Event.time(); > Iterator<Packet> i = txBuffer.iterator(); > while (i.hasNext()) { > Packet p = i.next(); > double age = now - p.sent; > // Explicit ack > - if (p.seq == seq) { > + if (p.seq == ack) { > i.remove(); > // Update the congestion window > window.bytesAcked (p.size); > // Update the average round-trip time > rtt = rtt * RTT_DECAY + age * (1.0 - RTT_DECAY); > if (LOG) { > - log ("packet " +p.seq+ " acknowledged"); > + log ("packet " + ack + " acknowledged"); > log ("round-trip time " + age); > log ("average round-trip time " + rtt); > } > break; > } > // Fast retransmission > - if (p.seq < seq && age > FRTO * rtt + MAX_DELAY) { > + if (p.seq < ack && age > FRTO * rtt) { > p.sent = now; > if (LOG) log ("fast retransmitting " + p.seq); > node.resendPacket (p); > @@ -253,8 +240,8 @@ > if (txBuffer.isEmpty()) txMaxSeq = txSeq + SEQ_RANGE - 1; > else txMaxSeq = txBuffer.peek().seq + SEQ_RANGE - 1; > if (LOG) log ("maximum sequence number " + txMaxSeq); > - // Send as many packets a possible > - if (timerRunning) while (send()); > + // Send as many packets as possible > + if (timerRunning) while (send (-1)); > else checkDeadlines(); > } > > @@ -326,7 +313,7 @@ > > double now = Event.time(); > for (Packet p : txBuffer) { > - if (now - p.sent > RTO * rtt + MAX_DELAY) { > + if (now - p.sent > RTO * rtt) { > // Retransmission timeout > if (LOG) log ("retransmitting " + p.seq); > p.sent = now; > @@ -341,11 +328,11 @@ > private void checkDeadlines() > { > // Send as many packets as possible > - while (send()); > + while (send (-1)); > // Find the next coalescing deadline - ignore message deadlines > // if there isn't room in the congestion window to send them > - double dl = ackQueue.deadline(); > - int win = window.available() -Packet.HEADER_SIZE -ackQueue.size; > + double dl = Double.POSITIVE_INFINITY; > + int win = window.available() - Packet.HEADER_SIZE; > if (searchQueue.headSize() <= win) > dl = Math.min (dl, searchQueue.deadline()); > if (transferQueue.headSize() <= win) > @@ -370,12 +357,9 @@ > // Are we waiting for the bandwidth limiter? > private boolean shouldPoll() > { > - double now = Event.time(); > - // Will we need to send an ack before the next polling interval? > - if (ackQueue.deadline() < now + node.bandwidth.poll) > - return false; > double bw = node.bandwidth.available(); > double win = window.available(); > + double now = Event.time(); > // Is there an overdue search that's waiting for bandwidth? > if (searchQueue.headSize() > bw > && searchQueue.headSize() <= win > > Modified: trunk/apps/load-balancing-sims/phase7/sim/Sim.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2007-02-05 > 00:36:43 UTC (rev 11678) > +++ trunk/apps/load-balancing-sims/phase7/sim/Sim.java 2007-02-05 > 11:52:15 UTC (rev 11679) > @@ -1,5 +1,5 @@ > package sim; > -import sim.generators.SimplePublisher; > +import sim.clients.SimplePublisher; > > class Sim implements EventTarget > { > > Modified: trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java > 2007-02-05 00:36:43 UTC (rev 11678) > +++ trunk/apps/load-balancing-sims/phase7/sim/clients/Client.java > 2007-02-05 11:52:15 UTC (rev 11679) > @@ -1,4 +1,4 @@ > -package sim.generators; > +package sim.clients; > import sim.messages.Search; > > public interface Client > > Modified: > trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java > 2007-02-05 00:36:43 UTC (rev 11678) > +++ trunk/apps/load-balancing-sims/phase7/sim/clients/SimplePublisher.java > 2007-02-05 11:52:15 UTC (rev 11679) > @@ -1,7 +1,7 @@ > // A simple publisher that inserts keys using a Poisson process and informs > // each reader after an average of ten minutes > > -package sim.generators; > +package sim.clients; > import sim.Event; > import sim.EventTarget; > import sim.Node; > > Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java > 2007-02-05 00:36:43 UTC (rev 11678) > +++ trunk/apps/load-balancing-sims/phase7/sim/messages/ChkInsert.java > 2007-02-05 11:52:15 UTC (rev 11679) > @@ -1,5 +1,5 @@ > package sim.messages; > -import sim.generators.Client; > +import sim.clients.Client; > > public class ChkInsert extends Search > { > > Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java > 2007-02-05 00:36:43 UTC (rev 11678) > +++ trunk/apps/load-balancing-sims/phase7/sim/messages/Search.java > 2007-02-05 11:52:15 UTC (rev 11679) > @@ -1,5 +1,5 @@ > package sim.messages; > -import sim.generators.Client; > +import sim.clients.Client; > > public class Search extends Message > { > > Modified: trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java > =================================================================== > --- trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java > 2007-02-05 00:36:43 UTC (rev 11678) > +++ trunk/apps/load-balancing-sims/phase7/sim/messages/SskInsert.java > 2007-02-05 11:52:15 UTC (rev 11679) > @@ -1,5 +1,5 @@ > package sim.messages; > -import sim.generators.Client; > +import sim.clients.Client; > > public class SskInsert extends Search > { > > _______________________________________________ > cvs mailing list > cvs at freenetproject.org > http://emu.freenetproject.org/cgi-bin/mailman/listinfo/cvs > -------------- next part -------------- A non-text attachment was scrubbed... Name: signature.asc Type: application/pgp-signature Size: 189 bytes Desc: Digital signature URL: <https://emu.freenetproject.org/pipermail/devl/attachments/20070205/014645c0/attachment.pgp>